Skip to main content

zebra_state/service/
non_finalized_state.rs

1//! Non-finalized chain state management as defined by [RFC0005]
2//!
3//! [RFC0005]: https://zebra.zfnd.org/dev/rfcs/0005-state-updates.html
4
5use std::{
6    collections::{BTreeSet, HashMap},
7    mem,
8    path::{Path, PathBuf},
9    sync::Arc,
10};
11
12use indexmap::IndexMap;
13use tokio::sync::watch;
14use zebra_chain::{
15    block::{self, Block, Hash, Height},
16    parameters::Network,
17    sprout::{self},
18    transparent,
19};
20
21use crate::{
22    constants::{MAX_INVALIDATED_BLOCKS, MAX_NON_FINALIZED_CHAIN_FORKS},
23    error::ReconsiderError,
24    request::{ContextuallyVerifiedBlock, FinalizableBlock},
25    service::{
26        check,
27        finalized_state::{calculate_deferred_pool_balance_change, ZebraDb},
28        InvalidateError,
29    },
30    SemanticallyVerifiedBlock, ValidateContextError, WatchReceiver,
31};
32
33mod backup;
34mod chain;
35
36#[cfg(test)]
37pub(crate) use backup::MIN_DURATION_BETWEEN_BACKUP_UPDATES;
38
39#[cfg(test)]
40mod tests;
41
42pub(crate) use chain::{Chain, SpendingTransactionId};
43
44/// The state of the chains in memory, including queued blocks.
45///
46/// Clones of the non-finalized state contain independent copies of the chains.
47/// This is different from `FinalizedState::clone()`,
48/// which returns a shared reference to the database.
49///
50/// Most chain data is clone-on-write using [`Arc`].
51pub struct NonFinalizedState {
52    // Chain Data
53    //
54    /// Verified, non-finalized chains, in ascending work order.
55    ///
56    /// The best chain is [`NonFinalizedState::best_chain()`], or `chain_iter().next()`.
57    /// Using `chain_set.last()` or `chain_set.iter().next_back()` is deprecated,
58    /// callers should migrate to `chain_iter().next()`.
59    chain_set: BTreeSet<Arc<Chain>>,
60
61    /// Blocks that have been invalidated in, and removed from, the non finalized
62    /// state.
63    invalidated_blocks: IndexMap<Height, Arc<Vec<ContextuallyVerifiedBlock>>>,
64
65    // Configuration
66    //
67    /// The configured Zcash network.
68    pub network: Network,
69
70    // Diagnostics
71    //
72    /// Configures the non-finalized state to count metrics.
73    ///
74    /// Used for skipping metrics and progress bars when testing block proposals
75    /// with a commit to a cloned non-finalized state.
76    //
77    // TODO: make this field private and set it via an argument to NonFinalizedState::new()
78    should_count_metrics: bool,
79
80    /// Number of chain forks transmitter.
81    #[cfg(feature = "progress-bar")]
82    chain_count_bar: Option<howudoin::Tx>,
83
84    /// A chain fork length transmitter for each [`Chain`] in [`chain_set`](Self.chain_set).
85    ///
86    /// Because `chain_set` contains `Arc<Chain>`s, it is difficult to update the metrics state
87    /// on each chain. ([`Arc`]s are read-only, and we don't want to clone them just for metrics.)
88    #[cfg(feature = "progress-bar")]
89    chain_fork_length_bars: Vec<howudoin::Tx>,
90}
91
92impl std::fmt::Debug for NonFinalizedState {
93    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
94        let mut f = f.debug_struct("NonFinalizedState");
95
96        f.field("chain_set", &self.chain_set)
97            .field("network", &self.network);
98
99        f.field("should_count_metrics", &self.should_count_metrics);
100
101        f.finish()
102    }
103}
104
105impl Clone for NonFinalizedState {
106    fn clone(&self) -> Self {
107        Self {
108            chain_set: self.chain_set.clone(),
109            network: self.network.clone(),
110            invalidated_blocks: self.invalidated_blocks.clone(),
111            should_count_metrics: self.should_count_metrics,
112            // Don't track progress in clones.
113            #[cfg(feature = "progress-bar")]
114            chain_count_bar: None,
115            #[cfg(feature = "progress-bar")]
116            chain_fork_length_bars: Vec::new(),
117        }
118    }
119}
120
121impl NonFinalizedState {
122    /// Returns a new non-finalized state for `network`.
123    pub fn new(network: &Network) -> NonFinalizedState {
124        NonFinalizedState {
125            chain_set: Default::default(),
126            network: network.clone(),
127            invalidated_blocks: Default::default(),
128            should_count_metrics: true,
129            #[cfg(feature = "progress-bar")]
130            chain_count_bar: None,
131            #[cfg(feature = "progress-bar")]
132            chain_fork_length_bars: Vec::new(),
133        }
134    }
135
136    /// Writes the current non-finalized state to the backup directory at `backup_dir_path`.
137    ///
138    /// Reads the existing backup directory contents, writes any blocks that are in the
139    /// non-finalized state but missing from the backup, and deletes any backup files that
140    /// are no longer present in the non-finalized state.
141    ///
142    /// This method performs blocking I/O and should only be called from a blocking context.
143    pub(crate) fn write_to_backup(&self, backup_dir_path: &Path) {
144        let backup_blocks: HashMap<block::Hash, PathBuf> =
145            backup::list_backup_dir_entries(backup_dir_path).collect();
146        backup::update_non_finalized_state_backup(backup_dir_path, self, backup_blocks);
147    }
148
149    /// Accepts an optional path to the non-finalized state backup directory and a handle to the database.
150    ///
151    /// If a backup directory path is provided:
152    /// - Creates a new backup directory at the provided path if none exists,
153    /// - Restores non-finalized blocks from the backup directory, if any, and
154    /// - Unless `skip_backup_task` is true, spawns a task that updates the non-finalized
155    ///   backup cache with the latest non-finalized state sent to the returned watch channel.
156    ///
157    /// Returns the non-finalized state with a watch channel sender and receiver.
158    pub async fn with_backup(
159        self,
160        backup_dir_path: Option<PathBuf>,
161        finalized_state: &ZebraDb,
162        should_restore_backup: bool,
163        skip_backup_task: bool,
164    ) -> (
165        Self,
166        watch::Sender<NonFinalizedState>,
167        WatchReceiver<NonFinalizedState>,
168    ) {
169        let with_watch_channel = |non_finalized_state: NonFinalizedState| {
170            let (sender, receiver) = watch::channel(non_finalized_state.clone());
171            (non_finalized_state, sender, WatchReceiver::new(receiver))
172        };
173
174        let Some(backup_dir_path) = backup_dir_path else {
175            return with_watch_channel(self);
176        };
177
178        if skip_backup_task {
179            tracing::info!(
180                ?backup_dir_path,
181                "restoring non-finalized blocks from backup (sync write mode, backup task skipped)"
182            );
183        } else {
184            tracing::info!(
185                ?backup_dir_path,
186                "restoring non-finalized blocks from backup and spawning backup task"
187            );
188        }
189
190        let non_finalized_state = {
191            let backup_dir_path = backup_dir_path.clone();
192            let finalized_state = finalized_state.clone();
193            tokio::task::spawn_blocking(move || {
194                // Create a new backup directory if none exists
195                std::fs::create_dir_all(&backup_dir_path)
196                    .expect("failed to create non-finalized state backup directory");
197
198                if should_restore_backup {
199                    backup::restore_backup(self, &backup_dir_path, &finalized_state)
200                } else {
201                    self
202                }
203            })
204            .await
205            .expect("failed to join blocking task")
206        };
207
208        let (non_finalized_state, sender, receiver) = with_watch_channel(non_finalized_state);
209
210        if !skip_backup_task {
211            tokio::spawn(backup::run_backup_task(receiver.clone(), backup_dir_path));
212        }
213
214        if !non_finalized_state.is_chain_set_empty() {
215            let num_blocks_restored = non_finalized_state
216                .best_chain()
217                .expect("must have best chain if chain set is not empty")
218                .len();
219
220            tracing::info!(
221                ?num_blocks_restored,
222                "restored blocks from non-finalized backup cache"
223            );
224        }
225
226        (non_finalized_state, sender, receiver)
227    }
228
229    /// Is the internal state of `self` the same as `other`?
230    ///
231    /// [`Chain`] has a custom [`Eq`] implementation based on proof of work,
232    /// which is used to select the best chain. So we can't derive [`Eq`] for [`NonFinalizedState`].
233    ///
234    /// Unlike the custom trait impl, this method returns `true` if the entire internal state
235    /// of two non-finalized states is equal.
236    ///
237    /// If the internal states are different, it returns `false`,
238    /// even if the chains and blocks are equal.
239    #[cfg(any(test, feature = "proptest-impl"))]
240    #[allow(dead_code)]
241    pub fn eq_internal_state(&self, other: &NonFinalizedState) -> bool {
242        // this method must be updated every time a consensus-critical field is added to NonFinalizedState
243        // (diagnostic fields can be ignored)
244
245        self.chain_set.len() == other.chain_set.len()
246            && self
247                .chain_set
248                .iter()
249                .zip(other.chain_set.iter())
250                .all(|(self_chain, other_chain)| self_chain.eq_internal_state(other_chain))
251            && self.network == other.network
252    }
253
254    /// Returns an iterator over the non-finalized chains, with the best chain first.
255    //
256    // TODO: replace chain_set.iter().rev() with this method
257    pub fn chain_iter(&self) -> impl Iterator<Item = &Arc<Chain>> {
258        self.chain_set.iter().rev()
259    }
260
261    /// Insert `chain` into `self.chain_set`, apply `chain_filter` to the chains,
262    /// then limit the number of tracked chains.
263    fn insert_with<F>(&mut self, chain: Arc<Chain>, chain_filter: F)
264    where
265        F: FnOnce(&mut BTreeSet<Arc<Chain>>),
266    {
267        self.chain_set.insert(chain);
268
269        chain_filter(&mut self.chain_set);
270
271        while self.chain_set.len() > MAX_NON_FINALIZED_CHAIN_FORKS {
272            // The first chain is the chain with the lowest work.
273            self.chain_set.pop_first();
274        }
275
276        self.update_metrics_bars();
277    }
278
279    /// Insert `chain` into `self.chain_set`, then limit the number of tracked chains.
280    fn insert(&mut self, chain: Arc<Chain>) {
281        self.insert_with(chain, |_ignored_chain| { /* no filter */ })
282    }
283
284    /// Finalize the lowest height block in the non-finalized portion of the best
285    /// chain and update all side-chains to match.
286    pub fn finalize(&mut self) -> FinalizableBlock {
287        // Chain::cmp uses the partial cumulative work, and the hash of the tip block.
288        // Neither of these fields has interior mutability.
289        // (And when the tip block is dropped for a chain, the chain is also dropped.)
290        #[allow(clippy::mutable_key_type)]
291        let chains = mem::take(&mut self.chain_set);
292        let mut chains = chains.into_iter();
293
294        // extract best chain
295        let mut best_chain = chains.next_back().expect("there's at least one chain");
296
297        // clone if required
298        let mut_best_chain = Arc::make_mut(&mut best_chain);
299
300        // extract the rest into side_chains so they can be mutated
301        let side_chains = chains;
302
303        // Pop the lowest height block from the best chain to be finalized, and
304        // also obtain its associated treestate.
305        let (best_chain_root, root_treestate) = mut_best_chain.pop_root();
306
307        // add best_chain back to `self.chain_set`
308        if !best_chain.is_empty() {
309            self.insert(best_chain);
310        }
311
312        // for each remaining chain in side_chains
313        for mut side_chain in side_chains.rev() {
314            if side_chain.non_finalized_root_hash() != best_chain_root.hash {
315                // If we popped the root, the chain would be empty or orphaned,
316                // so just drop it now.
317                drop(side_chain);
318
319                continue;
320            }
321
322            // otherwise, the popped root block is the same as the finalizing block
323
324            // clone if required
325            let mut_side_chain = Arc::make_mut(&mut side_chain);
326
327            // remove the first block from `chain`
328            let (side_chain_root, _treestate) = mut_side_chain.pop_root();
329            assert_eq!(side_chain_root.hash, best_chain_root.hash);
330
331            // add the chain back to `self.chain_set`
332            self.insert(side_chain);
333        }
334
335        // Remove all invalidated_blocks at or below the finalized height
336        self.invalidated_blocks
337            .retain(|height, _blocks| *height >= best_chain_root.height);
338
339        self.update_metrics_for_chains();
340
341        // Add the treestate to the finalized block.
342        FinalizableBlock::new(best_chain_root, root_treestate)
343    }
344
345    /// Commit block to the non-finalized state, on top of:
346    /// - an existing chain's tip, or
347    /// - a newly forked chain.
348    #[tracing::instrument(level = "debug", skip(self, finalized_state, prepared))]
349    pub fn commit_block(
350        &mut self,
351        prepared: SemanticallyVerifiedBlock,
352        finalized_state: &ZebraDb,
353    ) -> Result<(), ValidateContextError> {
354        let parent_hash = prepared.block.header.previous_block_hash;
355        let (height, hash) = (prepared.height, prepared.hash);
356
357        let parent_chain = self.parent_chain(parent_hash)?;
358
359        // If the block is invalid, return the error,
360        // and drop the cloned parent Arc, or newly created chain fork.
361        let modified_chain = self.validate_and_commit(parent_chain, prepared, finalized_state)?;
362
363        // If the block is valid:
364        // - add the new chain fork or updated chain to the set of recent chains
365        // - remove the parent chain, if it was in the chain set
366        //   (if it was a newly created fork, it won't be in the chain set)
367        self.insert_with(modified_chain, |chain_set| {
368            chain_set.retain(|chain| chain.non_finalized_tip_hash() != parent_hash)
369        });
370
371        self.update_metrics_for_committed_block(height, hash);
372
373        Ok(())
374    }
375
376    /// Invalidate block with hash `block_hash` and all descendants from the non-finalized state. Insert
377    /// the new chain into the chain_set and discard the previous.
378    #[allow(clippy::unwrap_in_result)]
379    pub fn invalidate_block(&mut self, block_hash: Hash) -> Result<block::Hash, InvalidateError> {
380        let Some(chain) = self.find_chain(|chain| chain.contains_block_hash(block_hash)) else {
381            return Err(InvalidateError::BlockNotFound(block_hash));
382        };
383
384        let invalidated_blocks = if chain.non_finalized_root_hash() == block_hash {
385            self.chain_set.remove(&chain);
386            chain.blocks.values().cloned().collect()
387        } else {
388            let (new_chain, invalidated_blocks) = chain
389                .invalidate_block(block_hash)
390                .expect("already checked that chain contains hash");
391
392            // Add the new chain fork or updated chain to the set of recent chains, and
393            // remove the chain containing the hash of the block from chain set
394            self.insert_with(Arc::new(new_chain.clone()), |chain_set| {
395                chain_set.retain(|c| !c.contains_block_hash(block_hash))
396            });
397
398            invalidated_blocks
399        };
400
401        // TODO: Allow for invalidating multiple block hashes at a given height (#9552).
402        self.invalidated_blocks.insert(
403            invalidated_blocks
404                .first()
405                .expect("should not be empty")
406                .clone()
407                .height,
408            Arc::new(invalidated_blocks),
409        );
410
411        while self.invalidated_blocks.len() > MAX_INVALIDATED_BLOCKS {
412            self.invalidated_blocks.shift_remove_index(0);
413        }
414
415        self.update_metrics_for_chains();
416        self.update_metrics_bars();
417
418        Ok(block_hash)
419    }
420
421    /// Reconsiders a previously invalidated block and its descendants into the non-finalized state
422    /// based on a block_hash. Reconsidered blocks are inserted into the previous chain and re-inserted
423    /// into the chain_set.
424    #[allow(clippy::unwrap_in_result)]
425    pub fn reconsider_block(
426        &mut self,
427        block_hash: block::Hash,
428        finalized_state: &ZebraDb,
429    ) -> Result<Vec<block::Hash>, ReconsiderError> {
430        // Get the invalidated blocks that were invalidated by the given block_hash
431        let height = self
432            .invalidated_blocks
433            .iter()
434            .find_map(|(height, blocks)| {
435                if blocks.first()?.hash == block_hash {
436                    Some(height)
437                } else {
438                    None
439                }
440            })
441            .ok_or(ReconsiderError::MissingInvalidatedBlock(block_hash))?;
442
443        let invalidated_blocks = Arc::unwrap_or_clone(
444            self.invalidated_blocks
445                .clone()
446                .shift_remove(height)
447                .ok_or(ReconsiderError::MissingInvalidatedBlock(block_hash))?,
448        );
449
450        let invalidated_block_hashes = invalidated_blocks
451            .iter()
452            .map(|block| block.hash)
453            .collect::<Vec<_>>();
454
455        // Find and fork the parent chain of the invalidated_root. Update the parent chain
456        // with the invalidated_descendants
457        let invalidated_root = invalidated_blocks
458            .first()
459            .ok_or(ReconsiderError::InvalidatedBlocksEmpty)?;
460
461        let root_parent_hash = invalidated_root.block.header.previous_block_hash;
462
463        // If the parent is the tip of the finalized_state we create a new chain and insert it
464        // into the non finalized state
465        let chain_result = if root_parent_hash == finalized_state.finalized_tip_hash() {
466            let chain = Chain::new(
467                &self.network,
468                finalized_state
469                    .finalized_tip_height()
470                    .ok_or(ReconsiderError::ParentChainNotFound(block_hash))?,
471                finalized_state.sprout_tree_for_tip(),
472                finalized_state.sapling_tree_for_tip(),
473                finalized_state.orchard_tree_for_tip(),
474                finalized_state.history_tree(),
475                finalized_state.finalized_value_pool(),
476            );
477            Arc::new(chain)
478        } else {
479            // The parent is not the finalized_tip and still exist in the NonFinalizedState
480            // or else we return an error due to the parent not existing in the NonFinalizedState
481            self.parent_chain(root_parent_hash)
482                .map_err(|_| ReconsiderError::ParentChainNotFound(block_hash))?
483        };
484
485        let mut modified_chain = Arc::unwrap_or_clone(chain_result);
486        for block in invalidated_blocks {
487            modified_chain = modified_chain
488                .push(block)
489                .expect("previously invalidated block should be valid for chain");
490        }
491
492        let (height, hash) = modified_chain.non_finalized_tip();
493
494        // Only track invalidated_blocks that are not yet finalized. Once blocks are finalized (below the best_chain_root_height)
495        // we can discard the block.
496        if let Some(best_chain_root_height) = finalized_state.finalized_tip_height() {
497            self.invalidated_blocks
498                .retain(|height, _blocks| *height >= best_chain_root_height);
499        }
500
501        self.insert_with(Arc::new(modified_chain), |chain_set| {
502            chain_set.retain(|chain| chain.non_finalized_tip_hash() != root_parent_hash)
503        });
504
505        self.update_metrics_for_committed_block(height, hash);
506
507        Ok(invalidated_block_hashes)
508    }
509
510    /// Commit block to the non-finalized state as a new chain where its parent
511    /// is the finalized tip.
512    #[tracing::instrument(level = "debug", skip(self, finalized_state, prepared))]
513    #[allow(clippy::unwrap_in_result)]
514    pub fn commit_new_chain(
515        &mut self,
516        prepared: SemanticallyVerifiedBlock,
517        finalized_state: &ZebraDb,
518    ) -> Result<(), ValidateContextError> {
519        let finalized_tip_height = finalized_state.finalized_tip_height();
520
521        // TODO: fix tests that don't initialize the finalized state
522        #[cfg(not(test))]
523        let finalized_tip_height = finalized_tip_height.expect("finalized state contains blocks");
524        #[cfg(test)]
525        let finalized_tip_height = finalized_tip_height.unwrap_or(zebra_chain::block::Height(0));
526
527        let chain = Chain::new(
528            &self.network,
529            finalized_tip_height,
530            finalized_state.sprout_tree_for_tip(),
531            finalized_state.sapling_tree_for_tip(),
532            finalized_state.orchard_tree_for_tip(),
533            finalized_state.history_tree(),
534            finalized_state.finalized_value_pool(),
535        );
536
537        let (height, hash) = (prepared.height, prepared.hash);
538
539        // If the block is invalid, return the error, and drop the newly created chain fork
540        let chain = self.validate_and_commit(Arc::new(chain), prepared, finalized_state)?;
541
542        // If the block is valid, add the new chain fork to the set of recent chains.
543        self.insert(chain);
544        self.update_metrics_for_committed_block(height, hash);
545
546        Ok(())
547    }
548
549    /// Contextually validate `prepared` using `finalized_state`.
550    /// If validation succeeds, push `prepared` onto `new_chain`.
551    ///
552    /// `new_chain` should start as a clone of the parent chain fork,
553    /// or the finalized tip.
554    #[tracing::instrument(level = "debug", skip(self, finalized_state, new_chain))]
555    fn validate_and_commit(
556        &self,
557        new_chain: Arc<Chain>,
558        prepared: SemanticallyVerifiedBlock,
559        finalized_state: &ZebraDb,
560    ) -> Result<Arc<Chain>, ValidateContextError> {
561        if self
562            .invalidated_blocks
563            .values()
564            .any(|blocks| blocks.iter().any(|block| block.hash == prepared.hash))
565        {
566            return Err(ValidateContextError::BlockPreviouslyInvalidated {
567                block_hash: prepared.hash,
568            });
569        }
570
571        // Reads from disk
572        //
573        // TODO: if these disk reads show up in profiles, run them in parallel, using std::thread::spawn()
574        let spent_utxos = check::utxo::transparent_spend(
575            &prepared,
576            &new_chain.unspent_utxos(),
577            &new_chain.spent_utxos,
578            finalized_state,
579        )?;
580
581        // Reads from disk
582        check::anchors::block_sapling_orchard_anchors_refer_to_final_treestates(
583            finalized_state,
584            &new_chain,
585            &prepared,
586        )?;
587
588        // Reads from disk
589        let sprout_final_treestates = check::anchors::block_fetch_sprout_final_treestates(
590            finalized_state,
591            &new_chain,
592            &prepared,
593        );
594
595        // Quick check that doesn't read from disk
596        let contextual = ContextuallyVerifiedBlock::with_block_and_spent_utxos(
597            prepared.clone(),
598            spent_utxos.clone(),
599            calculate_deferred_pool_balance_change(prepared.height, &self.network),
600        )
601        .map_err(|value_balance_error| {
602            ValidateContextError::CalculateBlockChainValueChange {
603                value_balance_error,
604                height: prepared.height,
605                block_hash: prepared.hash,
606                transaction_count: prepared.block.transactions.len(),
607                spent_utxo_count: spent_utxos.len(),
608            }
609        })?;
610
611        Self::validate_and_update_parallel(new_chain, contextual, sprout_final_treestates)
612    }
613
614    /// Validate `contextual` and update `new_chain`, doing CPU-intensive work in parallel batches.
615    #[allow(clippy::unwrap_in_result)]
616    #[tracing::instrument(skip(new_chain, sprout_final_treestates))]
617    fn validate_and_update_parallel(
618        new_chain: Arc<Chain>,
619        contextual: ContextuallyVerifiedBlock,
620        sprout_final_treestates: HashMap<sprout::tree::Root, Arc<sprout::tree::NoteCommitmentTree>>,
621    ) -> Result<Arc<Chain>, ValidateContextError> {
622        let mut block_commitment_result = None;
623        let mut sprout_anchor_result = None;
624        let mut chain_push_result = None;
625
626        // Clone function arguments for different threads
627        let block = contextual.block.clone();
628        let network = new_chain.network();
629        let history_tree = new_chain.history_block_commitment_tree();
630
631        let block2 = contextual.block.clone();
632        let height = contextual.height;
633        let transaction_hashes = contextual.transaction_hashes.clone();
634
635        rayon::in_place_scope_fifo(|scope| {
636            scope.spawn_fifo(|_scope| {
637                block_commitment_result = Some(check::block_commitment_is_valid_for_chain_history(
638                    block,
639                    &network,
640                    &history_tree,
641                ));
642            });
643
644            scope.spawn_fifo(|_scope| {
645                sprout_anchor_result =
646                    Some(check::anchors::block_sprout_anchors_refer_to_treestates(
647                        sprout_final_treestates,
648                        block2,
649                        transaction_hashes,
650                        height,
651                    ));
652            });
653
654            // We're pretty sure the new block is valid,
655            // so clone the inner chain if needed, then add the new block.
656            //
657            // Pushing a block onto a Chain can launch additional parallel batches.
658            // TODO: should we pass _scope into Chain::push()?
659            scope.spawn_fifo(|_scope| {
660                // TODO: Replace with Arc::unwrap_or_clone() when it stabilises:
661                // https://github.com/rust-lang/rust/issues/93610
662                let new_chain = Arc::try_unwrap(new_chain)
663                    .unwrap_or_else(|shared_chain| (*shared_chain).clone());
664                chain_push_result = Some(new_chain.push(contextual).map(Arc::new));
665            });
666        });
667
668        // Don't return the updated Chain unless all the parallel results were Ok
669        block_commitment_result.expect("scope has finished")?;
670        sprout_anchor_result.expect("scope has finished")?;
671
672        chain_push_result.expect("scope has finished")
673    }
674
675    /// Returns the length of the non-finalized portion of the current best chain
676    /// or `None` if the best chain has no blocks.
677    pub fn best_chain_len(&self) -> Option<u32> {
678        // This `as` can't overflow because the number of blocks in the chain is limited to i32::MAX,
679        // and the non-finalized chain is further limited by the rollback window
680        // (`MAX_BLOCK_REORG_HEIGHT`, currently 1000 blocks).
681        Some(self.best_chain()?.blocks.len() as u32)
682    }
683
684    /// Returns the root height of the non-finalized state, if the non-finalized state is not empty.
685    pub fn root_height(&self) -> Option<block::Height> {
686        self.best_chain()
687            .map(|chain| chain.non_finalized_root_height())
688    }
689
690    /// Returns `true` if `hash` is contained in the non-finalized portion of any
691    /// known chain.
692    #[allow(dead_code)]
693    pub fn any_chain_contains(&self, hash: &block::Hash) -> bool {
694        self.chain_set
695            .iter()
696            .rev()
697            .any(|chain| chain.height_by_hash.contains_key(hash))
698    }
699
700    /// Returns the first chain satisfying the given predicate.
701    ///
702    /// If multiple chains satisfy the predicate, returns the chain with the highest difficulty.
703    /// (Using the tip block hash tie-breaker.)
704    pub fn find_chain<P>(&self, mut predicate: P) -> Option<Arc<Chain>>
705    where
706        P: FnMut(&Chain) -> bool,
707    {
708        // Reverse the iteration order, to find highest difficulty chains first.
709        self.chain_set
710            .iter()
711            .rev()
712            .find(|chain| predicate(chain))
713            .cloned()
714    }
715
716    /// Returns the [`transparent::Utxo`] pointed to by the given
717    /// [`transparent::OutPoint`] if it is present in any chain.
718    ///
719    /// UTXOs are returned regardless of whether they have been spent.
720    pub fn any_utxo(&self, outpoint: &transparent::OutPoint) -> Option<transparent::Utxo> {
721        self.chain_set
722            .iter()
723            .rev()
724            .find_map(|chain| chain.created_utxo(outpoint))
725    }
726
727    /// Returns the `block` with the given hash in any chain.
728    #[allow(dead_code)]
729    pub fn any_block_by_hash(&self, hash: block::Hash) -> Option<Arc<Block>> {
730        // This performs efficiently because the number of chains is limited to 10.
731        for chain in self.chain_set.iter().rev() {
732            if let Some(prepared) = chain
733                .height_by_hash
734                .get(&hash)
735                .and_then(|height| chain.blocks.get(height))
736            {
737                return Some(prepared.block.clone());
738            }
739        }
740
741        None
742    }
743
744    /// Returns the previous block hash for the given block hash in any chain.
745    #[allow(dead_code)]
746    pub fn any_prev_block_hash_for_hash(&self, hash: block::Hash) -> Option<block::Hash> {
747        // This performs efficiently because the blocks are in memory.
748        self.any_block_by_hash(hash)
749            .map(|block| block.header.previous_block_hash)
750    }
751
752    /// Returns the hash for a given `block::Height` if it is present in the best chain.
753    #[allow(dead_code)]
754    pub fn best_hash(&self, height: block::Height) -> Option<block::Hash> {
755        self.best_chain()?
756            .blocks
757            .get(&height)
758            .map(|prepared| prepared.hash)
759    }
760
761    /// Returns the tip of the best chain.
762    #[allow(dead_code)]
763    pub fn best_tip(&self) -> Option<(block::Height, block::Hash)> {
764        let best_chain = self.best_chain()?;
765        let height = best_chain.non_finalized_tip_height();
766        let hash = best_chain.non_finalized_tip_hash();
767
768        Some((height, hash))
769    }
770
771    /// Returns the block at the tip of the best chain.
772    #[allow(dead_code)]
773    pub fn best_tip_block(&self) -> Option<&ContextuallyVerifiedBlock> {
774        let best_chain = self.best_chain()?;
775
776        best_chain.tip_block()
777    }
778
779    /// Returns the height of `hash` in the best chain.
780    #[allow(dead_code)]
781    pub fn best_height_by_hash(&self, hash: block::Hash) -> Option<block::Height> {
782        let best_chain = self.best_chain()?;
783        let height = *best_chain.height_by_hash.get(&hash)?;
784        Some(height)
785    }
786
787    /// Returns the height of `hash` in any chain.
788    #[allow(dead_code)]
789    pub fn any_height_by_hash(&self, hash: block::Hash) -> Option<block::Height> {
790        for chain in self.chain_set.iter().rev() {
791            if let Some(height) = chain.height_by_hash.get(&hash) {
792                return Some(*height);
793            }
794        }
795
796        None
797    }
798
799    /// Returns `true` if the best chain contains `sprout_nullifier`.
800    #[cfg(any(test, feature = "proptest-impl"))]
801    #[allow(dead_code)]
802    pub fn best_contains_sprout_nullifier(&self, sprout_nullifier: &sprout::Nullifier) -> bool {
803        self.best_chain()
804            .map(|best_chain| best_chain.sprout_nullifiers.contains_key(sprout_nullifier))
805            .unwrap_or(false)
806    }
807
808    /// Returns `true` if the best chain contains `sapling_nullifier`.
809    #[cfg(any(test, feature = "proptest-impl"))]
810    #[allow(dead_code)]
811    pub fn best_contains_sapling_nullifier(
812        &self,
813        sapling_nullifier: &zebra_chain::sapling::Nullifier,
814    ) -> bool {
815        self.best_chain()
816            .map(|best_chain| {
817                best_chain
818                    .sapling_nullifiers
819                    .contains_key(sapling_nullifier)
820            })
821            .unwrap_or(false)
822    }
823
824    /// Returns `true` if the best chain contains `orchard_nullifier`.
825    #[cfg(any(test, feature = "proptest-impl"))]
826    #[allow(dead_code)]
827    pub fn best_contains_orchard_nullifier(
828        &self,
829        orchard_nullifier: &zebra_chain::orchard::Nullifier,
830    ) -> bool {
831        self.best_chain()
832            .map(|best_chain| {
833                best_chain
834                    .orchard_nullifiers
835                    .contains_key(orchard_nullifier)
836            })
837            .unwrap_or(false)
838    }
839
840    /// Return the non-finalized portion of the current best chain.
841    pub fn best_chain(&self) -> Option<&Arc<Chain>> {
842        self.chain_iter().next()
843    }
844
845    /// Return the number of chains.
846    pub fn chain_count(&self) -> usize {
847        self.chain_set.len()
848    }
849
850    /// Returns true if this [`NonFinalizedState`] contains no chains.
851    pub fn is_chain_set_empty(&self) -> bool {
852        self.chain_count() == 0
853    }
854
855    /// Return the invalidated blocks.
856    pub fn invalidated_blocks(&self) -> IndexMap<Height, Arc<Vec<ContextuallyVerifiedBlock>>> {
857        self.invalidated_blocks.clone()
858    }
859
860    /// Return the chain whose tip block hash is `parent_hash`.
861    ///
862    /// The chain can be an existing chain in the non-finalized state, or a freshly
863    /// created fork.
864    fn parent_chain(&self, parent_hash: block::Hash) -> Result<Arc<Chain>, ValidateContextError> {
865        match self.find_chain(|chain| chain.non_finalized_tip_hash() == parent_hash) {
866            // Clone the existing Arc<Chain> in the non-finalized state
867            Some(chain) => Ok(chain.clone()),
868            // Create a new fork
869            None => {
870                // Check the lowest difficulty chains first,
871                // because the fork could be closer to their tip.
872                let fork_chain = self
873                    .chain_set
874                    .iter()
875                    .rev()
876                    .find_map(|chain| chain.fork(parent_hash))
877                    .ok_or(ValidateContextError::NotReadyToBeCommitted)?;
878
879                Ok(Arc::new(fork_chain))
880            }
881        }
882    }
883
884    /// Should this `NonFinalizedState` instance track metrics and progress bars?
885    fn should_count_metrics(&self) -> bool {
886        self.should_count_metrics
887    }
888
889    /// Update the metrics after `block` is committed
890    fn update_metrics_for_committed_block(&self, height: block::Height, hash: block::Hash) {
891        if !self.should_count_metrics() {
892            return;
893        }
894
895        metrics::counter!("state.memory.committed.block.count").increment(1);
896        metrics::gauge!("state.memory.committed.block.height").set(height.0 as f64);
897
898        if self
899            .best_chain()
900            .expect("metrics are only updated after initialization")
901            .non_finalized_tip_hash()
902            == hash
903        {
904            metrics::counter!("state.memory.best.committed.block.count").increment(1);
905            metrics::gauge!("state.memory.best.committed.block.height").set(height.0 as f64);
906        }
907
908        self.update_metrics_for_chains();
909    }
910
911    /// Update the metrics after `self.chain_set` is modified
912    fn update_metrics_for_chains(&self) {
913        if !self.should_count_metrics() {
914            return;
915        }
916
917        metrics::gauge!("state.memory.chain.count").set(self.chain_set.len() as f64);
918        metrics::gauge!("state.memory.best.chain.length",)
919            .set(self.best_chain_len().unwrap_or_default() as f64);
920    }
921
922    /// Update the progress bars after any chain is modified.
923    /// This includes both chain forks and committed blocks.
924    fn update_metrics_bars(&mut self) {
925        // TODO: make chain_count_bar interior mutable, move to update_metrics_for_committed_block()
926
927        if !self.should_count_metrics() {
928            #[allow(clippy::needless_return)]
929            return;
930        }
931
932        #[cfg(feature = "progress-bar")]
933        {
934            use std::cmp::Ordering::*;
935
936            if matches!(howudoin::cancelled(), Some(true)) {
937                self.disable_metrics();
938                return;
939            }
940
941            // Update the chain count bar
942            if self.chain_count_bar.is_none() {
943                self.chain_count_bar = Some(howudoin::new_root().label("Chain Forks"));
944            }
945
946            let chain_count_bar = self
947                .chain_count_bar
948                .as_ref()
949                .expect("just initialized if missing");
950            let finalized_tip_height = self
951                .best_chain()
952                .map(|chain| chain.non_finalized_root_height().0 - 1);
953
954            chain_count_bar.set_pos(u64::try_from(self.chain_count()).expect("fits in u64"));
955            // .set_len(u64::try_from(MAX_NON_FINALIZED_CHAIN_FORKS).expect("fits in u64"));
956
957            if let Some(finalized_tip_height) = finalized_tip_height {
958                chain_count_bar.desc(format!("Finalized Root {finalized_tip_height}"));
959            }
960
961            // Update each chain length bar, creating or deleting bars as needed
962            let prev_length_bars = self.chain_fork_length_bars.len();
963
964            match self.chain_count().cmp(&prev_length_bars) {
965                Greater => self
966                    .chain_fork_length_bars
967                    .resize_with(self.chain_count(), || {
968                        howudoin::new_with_parent(chain_count_bar.id())
969                    }),
970                Less => {
971                    let redundant_bars = self.chain_fork_length_bars.split_off(self.chain_count());
972                    for bar in redundant_bars {
973                        bar.close();
974                    }
975                }
976                Equal => {}
977            }
978
979            // It doesn't matter what chain the bar was previously used for,
980            // because we update everything based on the latest chain in that position.
981            for (chain_length_bar, chain) in
982                std::iter::zip(self.chain_fork_length_bars.iter(), self.chain_iter())
983            {
984                let fork_height = chain
985                    .last_fork_height
986                    .unwrap_or_else(|| chain.non_finalized_tip_height())
987                    .0;
988
989                // We need to initialize and set all the values of the bar here, because:
990                // - the bar might have been newly created, or
991                // - the chain this bar was previously assigned to might have changed position.
992                chain_length_bar
993                    .label(format!("Fork {fork_height}"))
994                    .set_pos(u64::try_from(chain.len()).expect("fits in u64"));
995                // TODO: should this be MAX_BLOCK_REORG_HEIGHT?
996                // .set_len(u64::from(
997                //     zebra_chain::transparent::MIN_TRANSPARENT_COINBASE_MATURITY,
998                // ));
999
1000                // TODO: store work in the finalized state for each height (#7109),
1001                //       and show the full chain work here, like `zcashd` (#7110)
1002                //
1003                // For now, we don't show any work here, see the deleted code in PR #7087.
1004                let mut desc = String::new();
1005
1006                if let Some(recent_fork_height) = chain.recent_fork_height() {
1007                    let recent_fork_length = chain
1008                        .recent_fork_length()
1009                        .expect("just checked recent fork height");
1010
1011                    let mut plural = "s";
1012                    if recent_fork_length == 1 {
1013                        plural = "";
1014                    }
1015
1016                    desc.push_str(&format!(
1017                        " at {recent_fork_height:?} + {recent_fork_length} block{plural}"
1018                    ));
1019                }
1020
1021                chain_length_bar.desc(desc);
1022            }
1023        }
1024    }
1025
1026    /// Stop tracking metrics for this non-finalized state and all its chains.
1027    pub fn disable_metrics(&mut self) {
1028        self.should_count_metrics = false;
1029
1030        #[cfg(feature = "progress-bar")]
1031        {
1032            let count_bar = self.chain_count_bar.take().into_iter();
1033            let fork_bars = self.chain_fork_length_bars.drain(..);
1034            count_bar.chain(fork_bars).for_each(howudoin::Tx::close);
1035        }
1036    }
1037}
1038
1039impl Drop for NonFinalizedState {
1040    fn drop(&mut self) {
1041        self.disable_metrics();
1042    }
1043}