Skip to main content

miden_node_store/state/
mod.rs

1//! Abstraction to synchronize state modifications.
2//!
3//! The [State] provides data access and modifications methods, its main purpose is to ensure that
4//! data is atomically written, and that reads are consistent.
5
6use std::collections::{BTreeMap, BTreeSet, HashSet};
7use std::num::NonZeroUsize;
8use std::path::{Path, PathBuf};
9use std::sync::Arc;
10
11use miden_node_proto::domain::batch::BatchInputs;
12use miden_node_utils::clap::StorageOptions;
13use miden_node_utils::formatting::format_array;
14use miden_protocol::Word;
15use miden_protocol::account::AccountId;
16use miden_protocol::block::account_tree::AccountWitness;
17use miden_protocol::block::nullifier_tree::{NullifierTree, NullifierWitness};
18use miden_protocol::block::{BlockHeader, BlockInputs, BlockNumber, Blockchain};
19use miden_protocol::crypto::merkle::mmr::{MmrProof, PartialMmr};
20use miden_protocol::crypto::merkle::smt::{LargeSmt, SmtStorage};
21use miden_protocol::note::{NoteId, NoteScript, Nullifier};
22use miden_protocol::transaction::PartialBlockchain;
23use tokio::sync::{Mutex, RwLock, watch};
24use tracing::{Instrument, Span, info, instrument};
25
26use crate::account_state_forest::{AccountStateForest, AccountStateForestBackend};
27use crate::accounts::AccountTreeWithHistory;
28use crate::blocks::BlockStore;
29use crate::db::{Db, NoteRecord, NullifierInfo};
30use crate::errors::{
31    DatabaseError,
32    GetBatchInputsError,
33    GetBlockHeaderError,
34    GetBlockInputsError,
35    StateInitializationError,
36};
37use crate::proven_tip::ProvenTipWriter;
38use crate::{COMPONENT, DataDirectory, DatabaseOptions};
39
40/// Number of recent committed blocks held in the in-memory cache for replica subscriptions.
41const BLOCK_CACHE_CAPACITY: NonZeroUsize = NonZeroUsize::new(512).unwrap();
42
43/// Number of recent block proofs held in the in-memory cache for replica subscriptions.
44const PROOF_CACHE_CAPACITY: NonZeroUsize = NonZeroUsize::new(512).unwrap();
45
46mod loader;
47use loader::{
48    ACCOUNT_STATE_FOREST_STORAGE_DIR,
49    ACCOUNT_TREE_STORAGE_DIR,
50    AccountForestLoader,
51    NULLIFIER_TREE_STORAGE_DIR,
52    TreeStorage,
53    TreeStorageLoader,
54    load_mmr,
55    verify_account_state_forest_consistency,
56    verify_tree_consistency,
57};
58
59mod replica;
60pub use replica::{BlockCache, BlockNotification, ProofCache, ProofNotification};
61
62mod account;
63
64mod subscription;
65pub use subscription::{
66    BlockSubscriptionEvent,
67    BlockSubscriptionStream,
68    ProofSubscriptionEvent,
69    ProofSubscriptionStream,
70    StateSubscriptionError,
71};
72
73mod apply_block;
74mod apply_proof;
75mod bootstrap;
76mod disk_monitor;
77mod sync_state;
78
79// FINALITY
80// ================================================================================================
81
82/// The finality level for chain tip queries.
83#[derive(Debug, Clone, Copy)]
84pub enum Finality {
85    /// The latest committed (but not necessarily proven) block.
86    Committed,
87    /// The latest block that has been proven in an unbroken sequence from genesis.
88    Proven,
89}
90
91// STRUCTURES
92// ================================================================================================
93
94#[derive(Debug, Default)]
95pub struct TransactionInputs {
96    pub account_commitment: Word,
97    pub nullifiers: Vec<NullifierInfo>,
98    pub found_unauthenticated_notes: HashSet<Word>,
99    pub new_account_id_prefix_is_unique: Option<bool>,
100}
101
102type BlockInputWitnesses = (
103    BlockNumber,
104    BTreeMap<AccountId, AccountWitness>,
105    BTreeMap<Nullifier, NullifierWitness>,
106    PartialMmr,
107);
108
109/// Container for state that needs to be updated atomically.
110struct InnerState<S>
111where
112    S: SmtStorage,
113{
114    nullifier_tree: NullifierTree<LargeSmt<S>>,
115    blockchain: Blockchain,
116    account_tree: AccountTreeWithHistory<S>,
117}
118
119impl<S: SmtStorage> InnerState<S> {
120    /// Returns the latest block number.
121    fn latest_block_num(&self) -> BlockNumber {
122        self.blockchain
123            .chain_tip()
124            .expect("chain should always have at least the genesis block")
125    }
126}
127
128// CHAIN STATE
129// ================================================================================================
130
131/// The rollup state.
132pub struct State {
133    /// Root directory containing the store's on-disk data.
134    data_directory: PathBuf,
135
136    /// The database which stores block headers, nullifiers, notes, and the latest states of
137    /// accounts.
138    db: Arc<Db>,
139
140    /// The block store which stores full block contents for all blocks.
141    block_store: Arc<BlockStore>,
142
143    /// Read-write lock used to prevent writing to a structure while it is being used.
144    ///
145    /// The lock is writer-preferring, meaning the writer won't be starved.
146    inner: RwLock<InnerState<TreeStorage>>,
147
148    /// Forest-related state `(SmtForest, storage_map_roots, vault_roots)` with its own lock.
149    forest: RwLock<AccountStateForest<AccountStateForestBackend>>,
150
151    /// To allow readers to access the tree data while an update in being performed, and prevent
152    /// TOCTOU issues, there must be no concurrent writers. This locks to serialize the writers.
153    writer: Mutex<()>,
154
155    /// The latest proven-in-sequence block number, updated by the proof scheduler or `apply_proof`.
156    proven_tip: ProvenTipWriter,
157
158    /// Watch sender fired after each block is committed. Replicas subscribe via
159    /// `subscribe_committed_tip()` to be woken when new blocks arrive.
160    committed_tip_tx: watch::Sender<BlockNumber>,
161
162    /// FIFO cache of recent committed blocks for replica subscriptions. When a subscriber needs a
163    /// block that has been evicted, it falls back to loading from the block store.
164    pub(crate) block_cache: BlockCache,
165
166    /// FIFO cache of recent block proofs for replica subscriptions. When a subscriber needs a proof
167    /// that has been evicted, it falls back to loading from the block store.
168    pub(crate) proof_cache: ProofCache,
169}
170
171impl State {
172    // CONSTRUCTOR
173    // --------------------------------------------------------------------------------------------
174
175    /// Loads the state from the data directory.
176    ///
177    /// The loaded state owns all store data structures and exposes subscription methods for
178    /// sequencer and replica tasks.
179    #[instrument(target = COMPONENT, skip_all)]
180    pub async fn load(
181        data_path: &Path,
182        storage_options: StorageOptions,
183    ) -> Result<Self, StateInitializationError> {
184        Self::load_with_database_options(data_path, storage_options, DatabaseOptions::default())
185            .await
186    }
187
188    /// Loads the state from the data directory using explicit database options.
189    ///
190    /// The loaded state owns all store data structures and exposes subscription methods for
191    /// sequencer and replica tasks.
192    #[instrument(target = COMPONENT, skip_all)]
193    pub async fn load_with_database_options(
194        data_path: &Path,
195        storage_options: StorageOptions,
196        database_options: DatabaseOptions,
197    ) -> Result<Self, StateInitializationError> {
198        let data_directory = DataDirectory::load(data_path.to_path_buf())
199            .map_err(StateInitializationError::DataDirectoryLoadError)?;
200
201        let block_store = Arc::new(
202            BlockStore::load(data_directory.block_store_dir())
203                .map_err(StateInitializationError::BlockStoreLoadError)?,
204        );
205
206        let database_filepath = data_directory.database_path();
207        let mut db = Db::load_with_pool_size(
208            database_filepath.clone(),
209            database_options.connection_pool_size,
210        )
211        .await
212        .map_err(StateInitializationError::DatabaseLoadError)?;
213
214        let blockchain = load_mmr(&mut db).await?;
215        let latest_block_num = blockchain.chain_tip().unwrap_or(BlockNumber::GENESIS);
216
217        #[cfg(feature = "rocksdb")]
218        let (account_storage_config, nullifier_storage_config, forest_storage_config) = (
219            storage_options.account_tree.into(),
220            storage_options.nullifier_tree.into(),
221            storage_options.account_state_forest.into(),
222        );
223        #[cfg(not(feature = "rocksdb"))]
224        let (account_storage_config, nullifier_storage_config, forest_storage_config) = {
225            let _ = &storage_options;
226            ((), (), ())
227        };
228        let account_storage =
229            TreeStorage::create(data_path, &account_storage_config, ACCOUNT_TREE_STORAGE_DIR)?;
230        let account_tree = account_storage.load_account_tree(&mut db).await?;
231
232        let nullifier_storage =
233            TreeStorage::create(data_path, &nullifier_storage_config, NULLIFIER_TREE_STORAGE_DIR)?;
234        let nullifier_tree = nullifier_storage.load_nullifier_tree(&mut db).await?;
235
236        // Verify that tree roots match the expected roots from the database. This catches any
237        // divergence between persistent storage and the database caused by corruption or incomplete
238        // shutdown.
239        verify_tree_consistency(account_tree.root(), nullifier_tree.root(), &mut db).await?;
240
241        let account_tree = AccountTreeWithHistory::new(account_tree, latest_block_num);
242
243        let forest_backend = AccountStateForestBackend::create(
244            data_path,
245            &forest_storage_config,
246            ACCOUNT_STATE_FOREST_STORAGE_DIR,
247        )?;
248        let forest = forest_backend.load_account_state_forest(&mut db, latest_block_num).await?;
249        verify_account_state_forest_consistency(&forest, &mut db).await?;
250
251        let inner = RwLock::new(InnerState { nullifier_tree, blockchain, account_tree });
252
253        let forest = RwLock::new(forest);
254        let writer = Mutex::new(());
255        let db = Arc::new(db);
256
257        // Initialize the proven tip from the block store.
258        let proven_tip_init = block_store
259            .load_proven_tip()
260            .map_err(StateInitializationError::ProvenTipLoadError)?;
261        let (proven_tip, _rx) = ProvenTipWriter::new(proven_tip_init);
262
263        // Committed-tip watch: fires after each successful apply_block.
264        let (committed_tip_tx, _rx) = watch::channel(latest_block_num);
265
266        Ok(Self {
267            data_directory: data_path.to_path_buf(),
268            db,
269            block_store,
270            inner,
271            forest,
272            writer,
273            proven_tip,
274            committed_tip_tx,
275            block_cache: BlockCache::new(BLOCK_CACHE_CAPACITY),
276            proof_cache: ProofCache::new(PROOF_CACHE_CAPACITY),
277        })
278    }
279
280    /// Returns a watch receiver that wakes every time a new block is committed.
281    pub fn subscribe_committed_tip(&self) -> watch::Receiver<BlockNumber> {
282        self.committed_tip_tx.subscribe()
283    }
284
285    /// Loads serialized block proving inputs from the block store.
286    pub async fn load_proving_inputs(
287        &self,
288        block_num: BlockNumber,
289    ) -> std::io::Result<Option<Vec<u8>>> {
290        self.block_store.load_proving_inputs(block_num).await
291    }
292
293    /// Returns a watch receiver that wakes every time the proven-in-sequence tip advances.
294    pub(crate) fn subscribe_proven_tip(&self) -> watch::Receiver<BlockNumber> {
295        self.proven_tip.subscribe()
296    }
297
298    // HELPER FUNCTIONS TO AVOID BLOCKING CALLS IN ASYNC CONTEXT
299    // --------------------------------------------------------------------------------------------
300
301    /// Runs a synchronous read-only operation over the inner state on Tokio's blocking path.
302    ///
303    /// The account and nullifier trees may be backed by `RocksDB`, so tree access must not run on
304    /// an async worker thread directly. This helper preserves the current tracing span while
305    /// moving the blocking lock acquisition and closure body into `block_in_place`.
306    fn with_inner_read_blocking<R>(&self, f: impl FnOnce(&InnerState<TreeStorage>) -> R) -> R {
307        let span = Span::current();
308        tokio::task::block_in_place(|| {
309            span.in_scope(|| {
310                let inner = self.inner.blocking_read();
311                f(&inner)
312            })
313        })
314    }
315
316    /// Runs a synchronous mutable operation over the inner state on Tokio's blocking path.
317    ///
318    /// See [`Self::with_inner_read_blocking`] for why this uses `block_in_place`.
319    fn with_inner_write_blocking<R>(&self, f: impl FnOnce(&mut InnerState<TreeStorage>) -> R) -> R {
320        let span = Span::current();
321        tokio::task::block_in_place(|| {
322            span.in_scope(|| {
323                let mut inner = self.inner.blocking_write();
324                f(&mut inner)
325            })
326        })
327    }
328
329    /// Runs a synchronous read-only operation over the account state forest on Tokio's blocking
330    /// path.
331    ///
332    /// The forest may be backed by `RocksDB`, so accesses to the underlying `LargeSmtForest` must
333    /// not run directly on an async worker thread.
334    fn with_forest_read_blocking<R>(
335        &self,
336        f: impl FnOnce(&AccountStateForest<AccountStateForestBackend>) -> R,
337    ) -> R {
338        let span = Span::current();
339        tokio::task::block_in_place(|| {
340            span.in_scope(|| {
341                let forest = self.forest.blocking_read();
342                f(&forest)
343            })
344        })
345    }
346
347    /// Runs a synchronous mutable operation over the account state forest on Tokio's blocking path.
348    ///
349    /// See [`Self::with_forest_read_blocking`] for why this uses `block_in_place`.
350    fn with_forest_write_blocking<R>(
351        &self,
352        f: impl FnOnce(&mut AccountStateForest<AccountStateForestBackend>) -> R,
353    ) -> R {
354        let span = Span::current();
355        tokio::task::block_in_place(|| {
356            span.in_scope(|| {
357                let mut forest = self.forest.blocking_write();
358                f(&mut forest)
359            })
360        })
361    }
362
363    // STATE ACCESSORS
364    // --------------------------------------------------------------------------------------------
365
366    /// Queries a [BlockHeader] from the database, and returns it alongside its inclusion proof.
367    ///
368    /// If [None] is given as the value of `block_num`, the data for the latest [BlockHeader] is
369    /// returned.
370    #[instrument(level = "debug", target = COMPONENT, skip_all, ret(level = "debug"), err)]
371    pub async fn get_block_header(
372        &self,
373        block_num: Option<BlockNumber>,
374        include_mmr_proof: bool,
375    ) -> Result<(Option<BlockHeader>, Option<MmrProof>), GetBlockHeaderError> {
376        let block_header = self.db.select_block_header_by_block_num(block_num).await?;
377        if let Some(header) = block_header {
378            let mmr_proof = if include_mmr_proof {
379                let inner = self.inner.read().await;
380                let mmr_proof = inner.blockchain.open(header.block_num())?;
381                Some(mmr_proof)
382            } else {
383                None
384            };
385            Ok((Some(header), mmr_proof))
386        } else {
387            Ok((None, None))
388        }
389    }
390
391    /// Queries a list of notes from the database.
392    ///
393    /// If the provided list of [`NoteId`] given is empty or no note matches the provided
394    /// [`NoteId`] an empty list is returned.
395    pub async fn get_notes_by_id(
396        &self,
397        note_ids: Vec<NoteId>,
398    ) -> Result<Vec<NoteRecord>, DatabaseError> {
399        self.db.select_notes_by_id(note_ids).await
400    }
401
402    /// Fetches the inputs for a transaction batch from the database.
403    ///
404    /// ## Inputs
405    ///
406    /// The function takes as input:
407    /// - The tx reference blocks are the set of blocks referenced by transactions in the batch.
408    /// - The unauthenticated note commitments are the set of commitments of unauthenticated notes
409    ///   consumed by all transactions in the batch. For these notes, we attempt to find inclusion
410    ///   proofs. Not all notes will exist in the DB necessarily, as some notes can be created and
411    ///   consumed within the same batch.
412    ///
413    /// ## Outputs
414    ///
415    /// The function will return:
416    /// - A block inclusion proof for all tx reference blocks and for all blocks which are
417    ///   referenced by a note inclusion proof.
418    /// - Note inclusion proofs for all notes that were found in the DB.
419    /// - The block header that the batch should reference, i.e. the latest known block.
420    pub async fn get_batch_inputs(
421        &self,
422        tx_reference_blocks: BTreeSet<BlockNumber>,
423        unauthenticated_note_commitments: BTreeSet<Word>,
424    ) -> Result<BatchInputs, GetBatchInputsError> {
425        if tx_reference_blocks.is_empty() {
426            return Err(GetBatchInputsError::TransactionBlockReferencesEmpty);
427        }
428
429        // First we grab note inclusion proofs for the known notes. These proofs only prove that the
430        // note was included in a given block. We then also need to prove that each of those blocks
431        // is included in the chain.
432        let note_proofs = self
433            .db
434            .select_note_inclusion_proofs(unauthenticated_note_commitments)
435            .await
436            .map_err(GetBatchInputsError::SelectNoteInclusionProofError)?;
437
438        // The set of blocks that the notes are included in.
439        let note_blocks = note_proofs.values().map(|proof| proof.location().block_num());
440
441        // Collect all blocks we need to query without duplicates, which is:
442        // - all blocks for which we need to prove note inclusion.
443        // - all blocks referenced by transactions in the batch.
444        let mut blocks: BTreeSet<BlockNumber> = tx_reference_blocks;
445        blocks.extend(note_blocks);
446
447        // Scoped block to automatically drop the read lock guard as soon as we're done. We also
448        // avoid accessing the db in the block as this would delay dropping the guard.
449        let (batch_reference_block, partial_mmr) = {
450            let inner_state = self.inner.read().await;
451
452            let latest_block_num = inner_state.latest_block_num();
453
454            let highest_block_num =
455                *blocks.last().expect("we should have checked for empty block references");
456            if highest_block_num > latest_block_num {
457                return Err(GetBatchInputsError::UnknownTransactionBlockReference {
458                    highest_block_num,
459                    latest_block_num,
460                });
461            }
462
463            // Remove the latest block from the to-be-tracked blocks as it will be the reference
464            // block for the batch itself and thus added to the MMR within the batch kernel, so
465            // there is no need to prove its inclusion.
466            blocks.remove(&latest_block_num);
467
468            // SAFETY:
469            // - The latest block num was retrieved from the inner blockchain from which we will
470            //   also retrieve the proofs, so it is guaranteed to exist in that chain.
471            // - We have checked that no block number in the blocks set is greater than latest block
472            //   number *and* latest block num was removed from the set. Therefore only block
473            //   numbers smaller than latest block num remain in the set. Therefore all the block
474            //   numbers are guaranteed to exist in the chain state at latest block num.
475            let partial_mmr = inner_state
476                .blockchain
477                .partial_mmr_from_blocks(&blocks, latest_block_num)
478                .expect("latest block num should exist and all blocks in set should be < than latest block");
479
480            (latest_block_num, partial_mmr)
481        };
482
483        // Fetch the reference block of the batch as part of this query, so we can avoid looking it
484        // up in a separate DB access.
485        let mut headers = self
486            .db
487            .select_block_headers(blocks.into_iter().chain(std::iter::once(batch_reference_block)))
488            .await
489            .map_err(GetBatchInputsError::SelectBlockHeaderError)?;
490
491        // Find and remove the batch reference block as we don't want to add it to the chain MMR.
492        let header_index = headers
493            .iter()
494            .enumerate()
495            .find_map(|(index, header)| {
496                (header.block_num() == batch_reference_block).then_some(index)
497            })
498            .expect("DB should have returned the header of the batch reference block");
499
500        // The order doesn't matter for PartialBlockchain::new, so swap remove is fine.
501        let batch_reference_block_header = headers.swap_remove(header_index);
502
503        // SAFETY: This should not error because:
504        // - we're passing exactly the block headers that we've added to the partial MMR,
505        // - so none of the block headers block numbers should exceed the chain length of the
506        //   partial MMR,
507        // - and we've added blocks to a BTreeSet, so there can be no duplicates.
508        //
509        // We construct headers and partial MMR in concert, so they are consistent. This is why we
510        // can call the unchecked constructor.
511        let partial_block_chain = PartialBlockchain::new_unchecked(partial_mmr, headers)
512            .expect("partial mmr and block headers should be consistent");
513
514        Ok(BatchInputs {
515            batch_reference_block_header,
516            note_proofs,
517            partial_block_chain,
518        })
519    }
520
521    /// Returns data needed by the block producer to construct and prove the next block.
522    pub async fn get_block_inputs(
523        &self,
524        account_ids: Vec<AccountId>,
525        nullifiers: Vec<Nullifier>,
526        unauthenticated_note_commitments: BTreeSet<Word>,
527        reference_blocks: BTreeSet<BlockNumber>,
528    ) -> Result<BlockInputs, GetBlockInputsError> {
529        // Get the note inclusion proofs from the DB. We do this first so we have to acquire the
530        // lock to the state just once. There we need the reference blocks of the note proofs to get
531        // their authentication paths in the chain MMR.
532        let unauthenticated_note_proofs = self
533            .db
534            .select_note_inclusion_proofs(unauthenticated_note_commitments)
535            .await
536            .map_err(GetBlockInputsError::SelectNoteInclusionProofError)?;
537
538        // The set of blocks that the notes are included in.
539        let note_proof_reference_blocks =
540            unauthenticated_note_proofs.values().map(|proof| proof.location().block_num());
541
542        // Collect all blocks we need to prove inclusion for, without duplicates.
543        let mut blocks = reference_blocks;
544        blocks.extend(note_proof_reference_blocks);
545
546        let (latest_block_number, account_witnesses, nullifier_witnesses, partial_mmr) =
547            self.get_block_inputs_witnesses(&mut blocks, &account_ids, &nullifiers)?;
548
549        // Fetch the block headers for all blocks in the partial MMR plus the latest one which will
550        // be used as the previous block header of the block being built.
551        let mut headers = self
552            .db
553            .select_block_headers(blocks.into_iter().chain(std::iter::once(latest_block_number)))
554            .await
555            .map_err(GetBlockInputsError::SelectBlockHeaderError)?;
556
557        // Find and remove the latest block as we must not add it to the chain MMR, since it is not
558        // yet in the chain.
559        let latest_block_header_index = headers
560            .iter()
561            .enumerate()
562            .find_map(|(index, header)| {
563                (header.block_num() == latest_block_number).then_some(index)
564            })
565            .expect("DB should have returned the header of the latest block header");
566
567        // The order doesn't matter for PartialBlockchain::new, so swap remove is fine.
568        let latest_block_header = headers.swap_remove(latest_block_header_index);
569
570        // SAFETY: This should not error because:
571        // - we're passing exactly the block headers that we've added to the partial MMR,
572        // - so none of the block header's block numbers should exceed the chain length of the
573        //   partial MMR,
574        // - and we've added blocks to a BTreeSet, so there can be no duplicates.
575        //
576        // We construct headers and partial MMR in concert, so they are consistent. This is why we
577        // can call the unchecked constructor.
578        let partial_block_chain = PartialBlockchain::new_unchecked(partial_mmr, headers)
579            .expect("partial mmr and block headers should be consistent");
580
581        Ok(BlockInputs::new(
582            latest_block_header,
583            partial_block_chain,
584            account_witnesses,
585            nullifier_witnesses,
586            unauthenticated_note_proofs,
587        ))
588    }
589
590    /// Get account and nullifier witnesses for the requested account IDs and nullifier as well as
591    /// the [`PartialMmr`] for the given blocks. The MMR won't contain the latest block and its
592    /// number is removed from `blocks` and returned separately.
593    ///
594    /// This method acquires the lock to the inner state and does not access the DB so we release
595    /// the lock asap.
596    fn get_block_inputs_witnesses(
597        &self,
598        blocks: &mut BTreeSet<BlockNumber>,
599        account_ids: &[AccountId],
600        nullifiers: &[Nullifier],
601    ) -> Result<BlockInputWitnesses, GetBlockInputsError> {
602        self.with_inner_read_blocking(|inner| {
603            let latest_block_number = inner.latest_block_num();
604
605            // If `blocks` is empty, use the latest block number which will never trigger the error.
606            let highest_block_number = blocks.last().copied().unwrap_or(latest_block_number);
607            if highest_block_number > latest_block_number {
608                return Err(GetBlockInputsError::UnknownBatchBlockReference {
609                    highest_block_number,
610                    latest_block_number,
611                });
612            }
613
614            // The latest block is not yet in the chain MMR, so we can't (and don't need to) prove
615            // its inclusion in the chain.
616            blocks.remove(&latest_block_number);
617
618            // Fetch the partial MMR at the state of the latest block with authentication paths for
619            // the provided set of blocks.
620            //
621            // SAFETY:
622            // - The latest block num was retrieved from the inner blockchain from which we will
623            //   also retrieve the proofs, so it is guaranteed to exist in that chain.
624            // - We have checked that no block number in the blocks set is greater than latest block
625            //   number *and* latest block num was removed from the set. Therefore only block
626            //   numbers smaller than latest block num remain in the set. Therefore all the block
627            //   numbers are guaranteed to exist in the chain state at latest block num.
628            let partial_mmr =
629                inner.blockchain.partial_mmr_from_blocks(blocks, latest_block_number).expect(
630                    "latest block num should exist and all blocks in set should be < than latest block",
631                );
632
633            // Fetch witnesses for all accounts.
634            let account_witnesses = account_ids
635                .iter()
636                .copied()
637                .map(|account_id| (account_id, inner.account_tree.open_latest(account_id)))
638                .collect::<BTreeMap<AccountId, AccountWitness>>();
639
640            // Fetch witnesses for all nullifiers. We don't check whether the nullifiers are spent
641            // or not as this is done as part of proposing the block.
642            let nullifier_witnesses: BTreeMap<Nullifier, NullifierWitness> = nullifiers
643                .iter()
644                .copied()
645                .map(|nullifier| (nullifier, inner.nullifier_tree.open(&nullifier)))
646                .collect();
647
648            Ok((latest_block_number, account_witnesses, nullifier_witnesses, partial_mmr))
649        })
650    }
651
652    /// Returns data needed by the block producer to verify transactions validity.
653    #[instrument(target = COMPONENT, skip_all, ret)]
654    pub async fn get_transaction_inputs(
655        &self,
656        account_id: AccountId,
657        nullifiers: &[Nullifier],
658        unauthenticated_note_commitments: Vec<Word>,
659    ) -> Result<TransactionInputs, DatabaseError> {
660        info!(target: COMPONENT, account_id = %account_id.to_string(), nullifiers = %format_array(nullifiers));
661
662        let tree_inputs = self.with_inner_read_blocking(|inner| {
663            let account_commitment = inner.account_tree.get_latest_commitment(account_id);
664
665            let new_account_id_prefix_is_unique = if account_commitment.is_empty() {
666                Some(!inner.account_tree.contains_account_id_prefix_in_latest(account_id.prefix()))
667            } else {
668                None
669            };
670
671            // Non-unique account Id prefixes for new accounts are not allowed.
672            if let Some(false) = new_account_id_prefix_is_unique {
673                return Err(TransactionInputs {
674                    new_account_id_prefix_is_unique,
675                    ..Default::default()
676                });
677            }
678
679            let nullifiers = nullifiers
680                .iter()
681                .map(|nullifier| NullifierInfo {
682                    nullifier: *nullifier,
683                    block_num: inner.nullifier_tree.get_block_num(nullifier).unwrap_or_default(),
684                })
685                .collect();
686
687            Ok((account_commitment, nullifiers, new_account_id_prefix_is_unique))
688        });
689        let (account_commitment, nullifiers, new_account_id_prefix_is_unique) = match tree_inputs {
690            Ok(inputs) => inputs,
691            Err(inputs) => return Ok(inputs),
692        };
693
694        let found_unauthenticated_notes = self
695            .db
696            .select_existing_note_commitments(unauthenticated_note_commitments)
697            .await?;
698
699        Ok(TransactionInputs {
700            account_commitment,
701            nullifiers,
702            found_unauthenticated_notes,
703            new_account_id_prefix_is_unique,
704        })
705    }
706
707    /// Filters `account_ids` down to the subset classified as network accounts.
708    pub async fn filter_network_accounts(
709        &self,
710        account_ids: &[AccountId],
711    ) -> Result<HashSet<AccountId>, DatabaseError> {
712        self.db.select_network_accounts_subset(account_ids.to_vec()).await
713    }
714
715    /// Returns the effective chain tip for the given finality level.
716    ///
717    /// - [`Finality::Committed`]: returns the latest committed block number (from in-memory MMR).
718    /// - [`Finality::Proven`]: returns the latest proven-in-sequence block number (cached via watch
719    ///   channel, updated by the proof scheduler).
720    pub async fn chain_tip(&self, finality: Finality) -> BlockNumber {
721        match finality {
722            Finality::Committed => self
723                .inner
724                .read()
725                .instrument(tracing::info_span!("acquire_inner"))
726                .await
727                .latest_block_num(),
728            Finality::Proven => self.proven_tip.read(),
729        }
730    }
731
732    /// Loads a block from the block store. Return `Ok(None)` if the block is not found.
733    pub async fn load_block(
734        &self,
735        block_num: BlockNumber,
736    ) -> Result<Option<Vec<u8>>, DatabaseError> {
737        if block_num > self.chain_tip(Finality::Committed).await {
738            return Ok(None);
739        }
740        self.block_store.load_block(block_num).await.map_err(Into::into)
741    }
742
743    /// Loads a block proof from the block store. Returns `Ok(None)` if the proof is not found.
744    pub async fn load_proof(
745        &self,
746        block_num: BlockNumber,
747    ) -> Result<Option<Vec<u8>>, DatabaseError> {
748        if block_num > self.chain_tip(Finality::Proven).await {
749            return Ok(None);
750        }
751        self.block_store.load_proof(block_num).await.map_err(Into::into)
752    }
753
754    /// Returns the script for a note by its root.
755    pub async fn get_note_script_by_root(
756        &self,
757        root: Word,
758    ) -> Result<Option<NoteScript>, DatabaseError> {
759        self.db.select_note_script_by_root(root).await
760    }
761}