Skip to main content

miden_node_store/state/
mod.rs

1//! Abstraction to synchronize state modifications.
2//!
3//! The [State] provides data access and modifications methods, its main purpose is to ensure that
4//! data is atomically written, and that reads are consistent.
5
6use std::collections::{BTreeMap, BTreeSet, HashSet};
7use std::ops::RangeInclusive;
8use std::path::Path;
9use std::sync::Arc;
10
11use miden_node_proto::domain::account::{
12    AccountDetailRequest,
13    AccountDetails,
14    AccountInfo,
15    AccountRequest,
16    AccountResponse,
17    AccountStorageDetails,
18    AccountStorageMapDetails,
19    AccountVaultDetails,
20    SlotData,
21    StorageMapRequest,
22};
23use miden_node_proto::domain::batch::BatchInputs;
24use miden_node_utils::ErrorReport;
25use miden_node_utils::formatting::format_array;
26use miden_protocol::Word;
27use miden_protocol::account::delta::AccountUpdateDetails;
28use miden_protocol::account::{AccountId, StorageMapWitness, StorageSlotName};
29use miden_protocol::asset::{AssetVaultKey, AssetWitness};
30use miden_protocol::block::account_tree::AccountWitness;
31use miden_protocol::block::nullifier_tree::{NullifierTree, NullifierWitness};
32use miden_protocol::block::{BlockHeader, BlockInputs, BlockNumber, Blockchain, ProvenBlock};
33use miden_protocol::crypto::merkle::mmr::{Forest, MmrDelta, MmrPeaks, MmrProof, PartialMmr};
34use miden_protocol::crypto::merkle::smt::{LargeSmt, SmtProof, SmtStorage};
35use miden_protocol::note::{NoteDetails, NoteId, NoteScript, Nullifier};
36use miden_protocol::transaction::{OutputNote, PartialBlockchain};
37use miden_protocol::utils::Serializable;
38use tokio::sync::{Mutex, RwLock, oneshot};
39use tracing::{Instrument, info, info_span, instrument};
40
41use crate::accounts::{AccountTreeWithHistory, HistoricalError};
42use crate::blocks::BlockStore;
43use crate::db::models::Page;
44use crate::db::models::queries::StorageMapValuesPage;
45use crate::db::{
46    AccountVaultValue,
47    Db,
48    NoteRecord,
49    NoteSyncUpdate,
50    NullifierInfo,
51    StateSyncUpdate,
52};
53use crate::errors::{
54    ApplyBlockError,
55    DatabaseError,
56    GetBatchInputsError,
57    GetBlockHeaderError,
58    GetBlockInputsError,
59    GetCurrentBlockchainDataError,
60    InvalidBlockError,
61    NoteSyncError,
62    StateInitializationError,
63    StateSyncError,
64};
65use crate::inner_forest::{InnerForest, WitnessError};
66use crate::{COMPONENT, DataDirectory};
67
68mod loader;
69
70pub use loader::{
71    ACCOUNT_TREE_STORAGE_DIR,
72    NULLIFIER_TREE_STORAGE_DIR,
73    StorageLoader,
74    TreeStorage,
75};
76use loader::{load_mmr, load_smt_forest, verify_tree_consistency};
77
78// STRUCTURES
79// ================================================================================================
80
81#[derive(Debug, Default)]
82pub struct TransactionInputs {
83    pub account_commitment: Word,
84    pub nullifiers: Vec<NullifierInfo>,
85    pub found_unauthenticated_notes: HashSet<Word>,
86    pub new_account_id_prefix_is_unique: Option<bool>,
87}
88
89/// Container for state that needs to be updated atomically.
90struct InnerState<S>
91where
92    S: SmtStorage,
93{
94    nullifier_tree: NullifierTree<LargeSmt<S>>,
95    blockchain: Blockchain,
96    account_tree: AccountTreeWithHistory<S>,
97}
98
99impl<S: SmtStorage> InnerState<S> {
100    /// Returns the latest block number.
101    fn latest_block_num(&self) -> BlockNumber {
102        self.blockchain
103            .chain_tip()
104            .expect("chain should always have at least the genesis block")
105    }
106}
107
108// CHAIN STATE
109// ================================================================================================
110
111/// The rollup state.
112pub struct State {
113    /// The database which stores block headers, nullifiers, notes, and the latest states of
114    /// accounts.
115    db: Arc<Db>,
116
117    /// The block store which stores full block contents for all blocks.
118    block_store: Arc<BlockStore>,
119
120    /// Read-write lock used to prevent writing to a structure while it is being used.
121    ///
122    /// The lock is writer-preferring, meaning the writer won't be starved.
123    inner: RwLock<InnerState<TreeStorage>>,
124
125    /// Forest-related state `(SmtForest, storage_map_roots, vault_roots)` with its own lock.
126    forest: RwLock<InnerForest>,
127
128    /// To allow readers to access the tree data while an update in being performed, and prevent
129    /// TOCTOU issues, there must be no concurrent writers. This locks to serialize the writers.
130    writer: Mutex<()>,
131
132    /// Request termination of the process due to a fatal internal state error.
133    termination_ask: tokio::sync::mpsc::Sender<ApplyBlockError>,
134}
135
136impl State {
137    // CONSTRUCTOR
138    // --------------------------------------------------------------------------------------------
139
140    /// Loads the state from the data directory.
141    #[instrument(target = COMPONENT, skip_all)]
142    pub async fn load(
143        data_path: &Path,
144        termination_ask: tokio::sync::mpsc::Sender<ApplyBlockError>,
145    ) -> Result<Self, StateInitializationError> {
146        let data_directory = DataDirectory::load(data_path.to_path_buf())
147            .map_err(StateInitializationError::DataDirectoryLoadError)?;
148
149        let block_store = Arc::new(
150            BlockStore::load(data_directory.block_store_dir())
151                .map_err(StateInitializationError::BlockStoreLoadError)?,
152        );
153
154        let database_filepath = data_directory.database_path();
155        let mut db = Db::load(database_filepath.clone())
156            .await
157            .map_err(StateInitializationError::DatabaseLoadError)?;
158
159        let blockchain = load_mmr(&mut db).await?;
160        let latest_block_num = blockchain.chain_tip().unwrap_or(BlockNumber::GENESIS);
161
162        let account_storage = TreeStorage::create(data_path, ACCOUNT_TREE_STORAGE_DIR)?;
163        let account_tree = account_storage.load_account_tree(&mut db).await?;
164
165        let nullifier_storage = TreeStorage::create(data_path, NULLIFIER_TREE_STORAGE_DIR)?;
166        let nullifier_tree = nullifier_storage.load_nullifier_tree(&mut db).await?;
167
168        // Verify that tree roots match the expected roots from the database.
169        // This catches any divergence between persistent storage and the database caused by
170        // corruption or incomplete shutdown.
171        verify_tree_consistency(account_tree.root(), nullifier_tree.root(), &mut db).await?;
172
173        let account_tree = AccountTreeWithHistory::new(account_tree, latest_block_num);
174
175        let forest = load_smt_forest(&mut db, latest_block_num).await?;
176
177        let inner = RwLock::new(InnerState { nullifier_tree, blockchain, account_tree });
178
179        let forest = RwLock::new(forest);
180        let writer = Mutex::new(());
181        let db = Arc::new(db);
182
183        Ok(Self {
184            db,
185            block_store,
186            inner,
187            forest,
188            writer,
189            termination_ask,
190        })
191    }
192
193    // STATE MUTATOR
194    // --------------------------------------------------------------------------------------------
195
196    /// Apply changes of a new block to the DB and in-memory data structures.
197    ///
198    /// ## Note on state consistency
199    ///
200    /// The server contains in-memory representations of the existing trees, the in-memory
201    /// representation must be kept consistent with the committed data, this is necessary so to
202    /// provide consistent results for all endpoints. In order to achieve consistency, the
203    /// following steps are used:
204    ///
205    /// - the request data is validated, prior to starting any modifications.
206    /// - block is being saved into the store in parallel with updating the DB, but before
207    ///   committing. This block is considered as candidate and not yet available for reading
208    ///   because the latest block pointer is not updated yet.
209    /// - a transaction is open in the DB and the writes are started.
210    /// - while the transaction is not committed, concurrent reads are allowed, both the DB and the
211    ///   in-memory representations, which are consistent at this stage.
212    /// - prior to committing the changes to the DB, an exclusive lock to the in-memory data is
213    ///   acquired, preventing concurrent reads to the in-memory data, since that will be
214    ///   out-of-sync w.r.t. the DB.
215    /// - the DB transaction is committed, and requests that read only from the DB can proceed to
216    ///   use the fresh data.
217    /// - the in-memory structures are updated, including the latest block pointer and the lock is
218    ///   released.
219    // TODO: This span is logged in a root span, we should connect it to the parent span.
220    #[allow(clippy::too_many_lines)]
221    #[instrument(target = COMPONENT, skip_all, err)]
222    pub async fn apply_block(&self, block: ProvenBlock) -> Result<(), ApplyBlockError> {
223        let _lock = self.writer.try_lock().map_err(|_| ApplyBlockError::ConcurrentWrite)?;
224
225        let header = block.header();
226
227        let tx_commitment = block.body().transactions().commitment();
228
229        if header.tx_commitment() != tx_commitment {
230            return Err(InvalidBlockError::InvalidBlockTxCommitment {
231                expected: tx_commitment,
232                actual: header.tx_commitment(),
233            }
234            .into());
235        }
236
237        let block_num = header.block_num();
238        let block_commitment = header.commitment();
239
240        // ensures the right block header is being processed
241        let prev_block = self
242            .db
243            .select_block_header_by_block_num(None)
244            .await?
245            .ok_or(ApplyBlockError::DbBlockHeaderEmpty)?;
246
247        let expected_block_num = prev_block.block_num().child();
248        if block_num != expected_block_num {
249            return Err(InvalidBlockError::NewBlockInvalidBlockNum {
250                expected: expected_block_num,
251                submitted: block_num,
252            }
253            .into());
254        }
255        if header.prev_block_commitment() != prev_block.commitment() {
256            return Err(InvalidBlockError::NewBlockInvalidPrevCommitment.into());
257        }
258
259        let block_data = block.to_bytes();
260
261        // Save the block to the block store. In a case of a rolled-back DB transaction, the
262        // in-memory state will be unchanged, but the block might still be written into the
263        // block store. Thus, such block should be considered as block candidates, but not
264        // finalized blocks. So we should check for the latest block when getting block from
265        // the store.
266        let store = Arc::clone(&self.block_store);
267        let block_save_task = tokio::spawn(
268            async move { store.save_block(block_num, &block_data).await }.in_current_span(),
269        );
270
271        // scope to read in-memory data, compute mutations required for updating account
272        // and nullifier trees, and validate the request
273        let (
274            nullifier_tree_old_root,
275            nullifier_tree_update,
276            account_tree_old_root,
277            account_tree_update,
278        ) = {
279            let inner = self.inner.read().await;
280
281            let _span = info_span!(target: COMPONENT, "update_in_memory_structs").entered();
282
283            // nullifiers can be produced only once
284            let duplicate_nullifiers: Vec<_> = block
285                .body()
286                .created_nullifiers()
287                .iter()
288                .filter(|&nullifier| inner.nullifier_tree.get_block_num(nullifier).is_some())
289                .copied()
290                .collect();
291            if !duplicate_nullifiers.is_empty() {
292                return Err(InvalidBlockError::DuplicatedNullifiers(duplicate_nullifiers).into());
293            }
294
295            // compute updates for the in-memory data structures
296
297            // new_block.chain_root must be equal to the chain MMR root prior to the update
298            let peaks = inner.blockchain.peaks();
299            if peaks.hash_peaks() != header.chain_commitment() {
300                return Err(InvalidBlockError::NewBlockInvalidChainCommitment.into());
301            }
302
303            // compute update for nullifier tree
304            let nullifier_tree_update = inner
305                .nullifier_tree
306                .compute_mutations(
307                    block
308                        .body()
309                        .created_nullifiers()
310                        .iter()
311                        .map(|nullifier| (*nullifier, block_num)),
312                )
313                .map_err(InvalidBlockError::NewBlockNullifierAlreadySpent)?;
314
315            if nullifier_tree_update.as_mutation_set().root() != header.nullifier_root() {
316                // We do our best here to notify the serve routine, if it doesn't care (dropped the
317                // receiver) we can't do much.
318                let _ = self.termination_ask.try_send(ApplyBlockError::InvalidBlockError(
319                    InvalidBlockError::NewBlockInvalidNullifierRoot,
320                ));
321                return Err(InvalidBlockError::NewBlockInvalidNullifierRoot.into());
322            }
323
324            // compute update for account tree
325            let account_tree_update = inner
326                .account_tree
327                .compute_mutations(
328                    block
329                        .body()
330                        .updated_accounts()
331                        .iter()
332                        .map(|update| (update.account_id(), update.final_state_commitment())),
333                )
334                .map_err(|e| match e {
335                    HistoricalError::AccountTreeError(err) => {
336                        InvalidBlockError::NewBlockDuplicateAccountIdPrefix(err)
337                    },
338                    HistoricalError::MerkleError(_) => {
339                        panic!("Unexpected MerkleError during account tree mutation computation")
340                    },
341                })?;
342
343            if account_tree_update.as_mutation_set().root() != header.account_root() {
344                let _ = self.termination_ask.try_send(ApplyBlockError::InvalidBlockError(
345                    InvalidBlockError::NewBlockInvalidAccountRoot,
346                ));
347                return Err(InvalidBlockError::NewBlockInvalidAccountRoot.into());
348            }
349
350            (
351                inner.nullifier_tree.root(),
352                nullifier_tree_update,
353                inner.account_tree.root_latest(),
354                account_tree_update,
355            )
356        };
357
358        // build note tree
359        let note_tree = block.body().compute_block_note_tree();
360        if note_tree.root() != header.note_root() {
361            return Err(InvalidBlockError::NewBlockInvalidNoteRoot.into());
362        }
363
364        let notes = block
365            .body()
366            .output_notes()
367            .map(|(note_index, note)| {
368                let (details, nullifier) = match note {
369                    OutputNote::Full(note) => {
370                        (Some(NoteDetails::from(note)), Some(note.nullifier()))
371                    },
372                    OutputNote::Header(_) => (None, None),
373                    note @ OutputNote::Partial(_) => {
374                        return Err(InvalidBlockError::InvalidOutputNoteType(Box::new(
375                            note.clone(),
376                        )));
377                    },
378                };
379
380                let inclusion_path = note_tree.open(note_index);
381
382                let note_record = NoteRecord {
383                    block_num,
384                    note_index,
385                    note_id: note.id().as_word(),
386                    note_commitment: note.commitment(),
387                    metadata: note.metadata().clone(),
388                    details,
389                    inclusion_path,
390                };
391
392                Ok((note_record, nullifier))
393            })
394            .collect::<Result<Vec<_>, InvalidBlockError>>()?;
395
396        // Signals the transaction is ready to be committed, and the write lock can be acquired
397        let (allow_acquire, acquired_allowed) = oneshot::channel::<()>();
398        // Signals the write lock has been acquired, and the transaction can be committed
399        let (inform_acquire_done, acquire_done) = oneshot::channel::<()>();
400
401        // Extract public account updates with deltas before block is moved into async task.
402        // Private accounts are filtered out since they don't expose their state changes.
403        let account_deltas =
404            Vec::from_iter(block.body().updated_accounts().iter().filter_map(|update| {
405                match update.details() {
406                    AccountUpdateDetails::Delta(delta) => Some(delta.clone()),
407                    AccountUpdateDetails::Private => None,
408                }
409            }));
410
411        // The DB and in-memory state updates need to be synchronized and are partially
412        // overlapping. Namely, the DB transaction only proceeds after this task acquires the
413        // in-memory write lock. This requires the DB update to run concurrently, so a new task is
414        // spawned.
415        let db = Arc::clone(&self.db);
416        let db_update_task = tokio::spawn(
417            async move { db.apply_block(allow_acquire, acquire_done, block, notes).await }
418                .in_current_span(),
419        );
420
421        // Wait for the message from the DB update task, that we ready to commit the DB transaction
422        acquired_allowed.await.map_err(ApplyBlockError::ClosedChannel)?;
423
424        // Awaiting the block saving task to complete without errors
425        block_save_task.await??;
426
427        // Scope to update the in-memory data
428        async move {
429            // We need to hold the write lock here to prevent inconsistency between the in-memory
430            // state and the DB state. Thus, we need to wait for the DB update task to complete
431            // successfully.
432            let mut inner = self.inner.write().await;
433
434            // We need to check that neither the nullifier tree nor the account tree have changed
435            // while we were waiting for the DB preparation task to complete. If either of them
436            // did change, we do not proceed with in-memory and database updates, since it may
437            // lead to an inconsistent state.
438            if inner.nullifier_tree.root() != nullifier_tree_old_root
439                || inner.account_tree.root_latest() != account_tree_old_root
440            {
441                return Err(ApplyBlockError::ConcurrentWrite);
442            }
443
444            // Notify the DB update task that the write lock has been acquired, so it can commit
445            // the DB transaction
446            inform_acquire_done
447                .send(())
448                .map_err(|_| ApplyBlockError::DbUpdateTaskFailed("Receiver was dropped".into()))?;
449
450            // TODO: shutdown #91
451            // Await for successful commit of the DB transaction. If the commit fails, we mustn't
452            // change in-memory state, so we return a block applying error and don't proceed with
453            // in-memory updates.
454            db_update_task
455                .await?
456                .map_err(|err| ApplyBlockError::DbUpdateTaskFailed(err.as_report()))?;
457
458            // Update the in-memory data structures after successful commit of the DB transaction
459            inner
460                .nullifier_tree
461                .apply_mutations(nullifier_tree_update)
462                .expect("Unreachable: old nullifier tree root must be checked before this step");
463            inner
464                .account_tree
465                .apply_mutations(account_tree_update)
466                .expect("Unreachable: old account tree root must be checked before this step");
467            inner.blockchain.push(block_commitment);
468
469            Ok(())
470        }
471        .in_current_span()
472        .await?;
473
474        self.forest.write().await.apply_block_updates(block_num, account_deltas)?;
475
476        info!(%block_commitment, block_num = block_num.as_u32(), COMPONENT, "apply_block successful");
477
478        Ok(())
479    }
480
481    // STATE ACCESSORS
482    // --------------------------------------------------------------------------------------------
483
484    /// Queries a [BlockHeader] from the database, and returns it alongside its inclusion proof.
485    ///
486    /// If [None] is given as the value of `block_num`, the data for the latest [BlockHeader] is
487    /// returned.
488    #[instrument(level = "debug", target = COMPONENT, skip_all, ret(level = "debug"), err)]
489    pub async fn get_block_header(
490        &self,
491        block_num: Option<BlockNumber>,
492        include_mmr_proof: bool,
493    ) -> Result<(Option<BlockHeader>, Option<MmrProof>), GetBlockHeaderError> {
494        let block_header = self.db.select_block_header_by_block_num(block_num).await?;
495        if let Some(header) = block_header {
496            let mmr_proof = if include_mmr_proof {
497                let inner = self.inner.read().await;
498                let mmr_proof = inner.blockchain.open(header.block_num())?;
499                Some(mmr_proof)
500            } else {
501                None
502            };
503            Ok((Some(header), mmr_proof))
504        } else {
505            Ok((None, None))
506        }
507    }
508
509    pub async fn sync_nullifiers(
510        &self,
511        prefix_len: u32,
512        nullifier_prefixes: Vec<u32>,
513        block_range: RangeInclusive<BlockNumber>,
514    ) -> Result<(Vec<NullifierInfo>, BlockNumber), DatabaseError> {
515        self.db
516            .select_nullifiers_by_prefix(prefix_len, nullifier_prefixes, block_range)
517            .await
518    }
519
520    /// Generates membership proofs for each one of the `nullifiers` against the latest nullifier
521    /// tree.
522    ///
523    /// Note: these proofs are invalidated once the nullifier tree is modified, i.e. on a new block.
524    #[instrument(level = "debug", target = COMPONENT, skip_all, ret)]
525    pub async fn check_nullifiers(&self, nullifiers: &[Nullifier]) -> Vec<SmtProof> {
526        let inner = self.inner.read().await;
527        nullifiers
528            .iter()
529            .map(|n| inner.nullifier_tree.open(n))
530            .map(NullifierWitness::into_proof)
531            .collect()
532    }
533
534    /// Queries a list of notes from the database.
535    ///
536    /// If the provided list of [`NoteId`] given is empty or no note matches the provided
537    /// [`NoteId`] an empty list is returned.
538    pub async fn get_notes_by_id(
539        &self,
540        note_ids: Vec<NoteId>,
541    ) -> Result<Vec<NoteRecord>, DatabaseError> {
542        self.db.select_notes_by_id(note_ids).await
543    }
544
545    /// If the input block number is the current chain tip, `None` is returned.
546    /// Otherwise, gets the current chain tip's block header with its corresponding MMR peaks.
547    pub async fn get_current_blockchain_data(
548        &self,
549        block_num: Option<BlockNumber>,
550    ) -> Result<Option<(BlockHeader, MmrPeaks)>, GetCurrentBlockchainDataError> {
551        let blockchain = &self.inner.read().await.blockchain;
552        if let Some(number) = block_num
553            && number == self.latest_block_num().await
554        {
555            return Ok(None);
556        }
557
558        // SAFETY: `select_block_header_by_block_num` will always return `Some(chain_tip_header)`
559        // when `None` is passed
560        let block_header: BlockHeader = self
561            .db
562            .select_block_header_by_block_num(None)
563            .await
564            .map_err(GetCurrentBlockchainDataError::ErrorRetrievingBlockHeader)?
565            .unwrap();
566        let peaks = blockchain
567            .peaks_at(block_header.block_num())
568            .map_err(GetCurrentBlockchainDataError::InvalidPeaks)?;
569
570        Ok(Some((block_header, peaks)))
571    }
572
573    /// Fetches the inputs for a transaction batch from the database.
574    ///
575    /// ## Inputs
576    ///
577    /// The function takes as input:
578    /// - The tx reference blocks are the set of blocks referenced by transactions in the batch.
579    /// - The unauthenticated note commitments are the set of commitments of unauthenticated notes
580    ///   consumed by all transactions in the batch. For these notes, we attempt to find inclusion
581    ///   proofs. Not all notes will exist in the DB necessarily, as some notes can be created and
582    ///   consumed within the same batch.
583    ///
584    /// ## Outputs
585    ///
586    /// The function will return:
587    /// - A block inclusion proof for all tx reference blocks and for all blocks which are
588    ///   referenced by a note inclusion proof.
589    /// - Note inclusion proofs for all notes that were found in the DB.
590    /// - The block header that the batch should reference, i.e. the latest known block.
591    pub async fn get_batch_inputs(
592        &self,
593        tx_reference_blocks: BTreeSet<BlockNumber>,
594        unauthenticated_note_commitments: BTreeSet<Word>,
595    ) -> Result<BatchInputs, GetBatchInputsError> {
596        if tx_reference_blocks.is_empty() {
597            return Err(GetBatchInputsError::TransactionBlockReferencesEmpty);
598        }
599
600        // First we grab note inclusion proofs for the known notes. These proofs only
601        // prove that the note was included in a given block. We then also need to prove that
602        // each of those blocks is included in the chain.
603        let note_proofs = self
604            .db
605            .select_note_inclusion_proofs(unauthenticated_note_commitments)
606            .await
607            .map_err(GetBatchInputsError::SelectNoteInclusionProofError)?;
608
609        // The set of blocks that the notes are included in.
610        let note_blocks = note_proofs.values().map(|proof| proof.location().block_num());
611
612        // Collect all blocks we need to query without duplicates, which is:
613        // - all blocks for which we need to prove note inclusion.
614        // - all blocks referenced by transactions in the batch.
615        let mut blocks: BTreeSet<BlockNumber> = tx_reference_blocks;
616        blocks.extend(note_blocks);
617
618        // Scoped block to automatically drop the read lock guard as soon as we're done.
619        // We also avoid accessing the db in the block as this would delay dropping the guard.
620        let (batch_reference_block, partial_mmr) = {
621            let inner_state = self.inner.read().await;
622
623            let latest_block_num = inner_state.latest_block_num();
624
625            let highest_block_num =
626                *blocks.last().expect("we should have checked for empty block references");
627            if highest_block_num > latest_block_num {
628                return Err(GetBatchInputsError::UnknownTransactionBlockReference {
629                    highest_block_num,
630                    latest_block_num,
631                });
632            }
633
634            // Remove the latest block from the to-be-tracked blocks as it will be the reference
635            // block for the batch itself and thus added to the MMR within the batch kernel, so
636            // there is no need to prove its inclusion.
637            blocks.remove(&latest_block_num);
638
639            // SAFETY:
640            // - The latest block num was retrieved from the inner blockchain from which we will
641            //   also retrieve the proofs, so it is guaranteed to exist in that chain.
642            // - We have checked that no block number in the blocks set is greater than latest block
643            //   number *and* latest block num was removed from the set. Therefore only block
644            //   numbers smaller than latest block num remain in the set. Therefore all the block
645            //   numbers are guaranteed to exist in the chain state at latest block num.
646            let partial_mmr = inner_state
647                .blockchain
648                .partial_mmr_from_blocks(&blocks, latest_block_num)
649                .expect("latest block num should exist and all blocks in set should be < than latest block");
650
651            (latest_block_num, partial_mmr)
652        };
653
654        // Fetch the reference block of the batch as part of this query, so we can avoid looking it
655        // up in a separate DB access.
656        let mut headers = self
657            .db
658            .select_block_headers(blocks.into_iter().chain(std::iter::once(batch_reference_block)))
659            .await
660            .map_err(GetBatchInputsError::SelectBlockHeaderError)?;
661
662        // Find and remove the batch reference block as we don't want to add it to the chain MMR.
663        let header_index = headers
664            .iter()
665            .enumerate()
666            .find_map(|(index, header)| {
667                (header.block_num() == batch_reference_block).then_some(index)
668            })
669            .expect("DB should have returned the header of the batch reference block");
670
671        // The order doesn't matter for PartialBlockchain::new, so swap remove is fine.
672        let batch_reference_block_header = headers.swap_remove(header_index);
673
674        // SAFETY: This should not error because:
675        // - we're passing exactly the block headers that we've added to the partial MMR,
676        // - so none of the block headers block numbers should exceed the chain length of the
677        //   partial MMR,
678        // - and we've added blocks to a BTreeSet, so there can be no duplicates.
679        //
680        // We construct headers and partial MMR in concert, so they are consistent. This is why we
681        // can call the unchecked constructor.
682        let partial_block_chain = PartialBlockchain::new_unchecked(partial_mmr, headers)
683            .expect("partial mmr and block headers should be consistent");
684
685        Ok(BatchInputs {
686            batch_reference_block_header,
687            note_proofs,
688            partial_block_chain,
689        })
690    }
691
692    /// Loads data to synchronize a client.
693    ///
694    /// The client's request contains a list of note tags, this method will return the first
695    /// block with a matching tag, or the chain tip. All the other values are filtered based on this
696    /// block range.
697    ///
698    /// # Arguments
699    ///
700    /// - `block_num`: The last block *known* by the client, updates start from the next block.
701    /// - `account_ids`: Include the account's commitment if their _last change_ was in the result's
702    ///   block range.
703    /// - `note_tags`: The tags the client is interested in, result is restricted to the first block
704    ///   with any matches tags.
705    #[instrument(level = "debug", target = COMPONENT, skip_all, ret(level = "debug"), err)]
706    pub async fn sync_state(
707        &self,
708        block_num: BlockNumber,
709        account_ids: Vec<AccountId>,
710        note_tags: Vec<u32>,
711    ) -> Result<(StateSyncUpdate, MmrDelta), StateSyncError> {
712        let inner = self.inner.read().await;
713
714        let state_sync = self.db.get_state_sync(block_num, account_ids, note_tags).await?;
715
716        let delta = if block_num == state_sync.block_header.block_num() {
717            // The client is in sync with the chain tip.
718            MmrDelta {
719                forest: Forest::new(block_num.as_usize()),
720                data: vec![],
721            }
722        } else {
723            // Important notes about the boundary conditions:
724            //
725            // - The Mmr forest is 1-indexed whereas the block number is 0-indexed. The Mmr root
726            // contained in the block header always lag behind by one block, this is because the Mmr
727            // leaves are hashes of block headers, and we can't have self-referential hashes. These
728            // two points cancel out and don't require adjusting.
729            // - Mmr::get_delta is inclusive, whereas the sync_state request block_num is defined to
730            //   be
731            // exclusive, so the from_forest has to be adjusted with a +1
732            let from_forest = (block_num + 1).as_usize();
733            let to_forest = state_sync.block_header.block_num().as_usize();
734            inner
735                .blockchain
736                .as_mmr()
737                .get_delta(Forest::new(from_forest), Forest::new(to_forest))
738                .map_err(StateSyncError::FailedToBuildMmrDelta)?
739        };
740
741        Ok((state_sync, delta))
742    }
743
744    /// Loads data to synchronize a client's notes.
745    ///
746    /// The client's request contains a list of tags, this method will return the first
747    /// block with a matching tag, or the chain tip. All the other values are filter based on this
748    /// block range.
749    ///
750    /// # Arguments
751    ///
752    /// - `note_tags`: The tags the client is interested in, resulting notes are restricted to the
753    ///   first block containing a matching note.
754    /// - `block_range`: The range of blocks from which to synchronize notes.
755    #[instrument(level = "debug", target = COMPONENT, skip_all, ret(level = "debug"), err)]
756    pub async fn sync_notes(
757        &self,
758        note_tags: Vec<u32>,
759        block_range: RangeInclusive<BlockNumber>,
760    ) -> Result<(NoteSyncUpdate, MmrProof, BlockNumber), NoteSyncError> {
761        let inner = self.inner.read().await;
762
763        let (note_sync, last_included_block) =
764            self.db.get_note_sync(block_range, note_tags).await?;
765
766        let mmr_proof = inner.blockchain.open(note_sync.block_header.block_num())?;
767
768        Ok((note_sync, mmr_proof, last_included_block))
769    }
770
771    /// Returns data needed by the block producer to construct and prove the next block.
772    pub async fn get_block_inputs(
773        &self,
774        account_ids: Vec<AccountId>,
775        nullifiers: Vec<Nullifier>,
776        unauthenticated_note_commitments: BTreeSet<Word>,
777        reference_blocks: BTreeSet<BlockNumber>,
778    ) -> Result<BlockInputs, GetBlockInputsError> {
779        // Get the note inclusion proofs from the DB.
780        // We do this first so we have to acquire the lock to the state just once. There we need the
781        // reference blocks of the note proofs to get their authentication paths in the chain MMR.
782        let unauthenticated_note_proofs = self
783            .db
784            .select_note_inclusion_proofs(unauthenticated_note_commitments)
785            .await
786            .map_err(GetBlockInputsError::SelectNoteInclusionProofError)?;
787
788        // The set of blocks that the notes are included in.
789        let note_proof_reference_blocks =
790            unauthenticated_note_proofs.values().map(|proof| proof.location().block_num());
791
792        // Collect all blocks we need to prove inclusion for, without duplicates.
793        let mut blocks = reference_blocks;
794        blocks.extend(note_proof_reference_blocks);
795
796        let (latest_block_number, account_witnesses, nullifier_witnesses, partial_mmr) =
797            self.get_block_inputs_witnesses(&mut blocks, account_ids, nullifiers).await?;
798
799        // Fetch the block headers for all blocks in the partial MMR plus the latest one which will
800        // be used as the previous block header of the block being built.
801        let mut headers = self
802            .db
803            .select_block_headers(blocks.into_iter().chain(std::iter::once(latest_block_number)))
804            .await
805            .map_err(GetBlockInputsError::SelectBlockHeaderError)?;
806
807        // Find and remove the latest block as we must not add it to the chain MMR, since it is
808        // not yet in the chain.
809        let latest_block_header_index = headers
810            .iter()
811            .enumerate()
812            .find_map(|(index, header)| {
813                (header.block_num() == latest_block_number).then_some(index)
814            })
815            .expect("DB should have returned the header of the latest block header");
816
817        // The order doesn't matter for PartialBlockchain::new, so swap remove is fine.
818        let latest_block_header = headers.swap_remove(latest_block_header_index);
819
820        // SAFETY: This should not error because:
821        // - we're passing exactly the block headers that we've added to the partial MMR,
822        // - so none of the block header's block numbers should exceed the chain length of the
823        //   partial MMR,
824        // - and we've added blocks to a BTreeSet, so there can be no duplicates.
825        //
826        // We construct headers and partial MMR in concert, so they are consistent. This is why we
827        // can call the unchecked constructor.
828        let partial_block_chain = PartialBlockchain::new_unchecked(partial_mmr, headers)
829            .expect("partial mmr and block headers should be consistent");
830
831        Ok(BlockInputs::new(
832            latest_block_header,
833            partial_block_chain,
834            account_witnesses,
835            nullifier_witnesses,
836            unauthenticated_note_proofs,
837        ))
838    }
839
840    /// Get account and nullifier witnesses for the requested account IDs and nullifier as well as
841    /// the [`PartialMmr`] for the given blocks. The MMR won't contain the latest block and its
842    /// number is removed from `blocks` and returned separately.
843    ///
844    /// This method acquires the lock to the inner state and does not access the DB so we release
845    /// the lock asap.
846    async fn get_block_inputs_witnesses(
847        &self,
848        blocks: &mut BTreeSet<BlockNumber>,
849        account_ids: Vec<AccountId>,
850        nullifiers: Vec<Nullifier>,
851    ) -> Result<
852        (
853            BlockNumber,
854            BTreeMap<AccountId, AccountWitness>,
855            BTreeMap<Nullifier, NullifierWitness>,
856            PartialMmr,
857        ),
858        GetBlockInputsError,
859    > {
860        let inner = self.inner.read().await;
861
862        let latest_block_number = inner.latest_block_num();
863
864        // If `blocks` is empty, use the latest block number which will never trigger the error.
865        let highest_block_number = blocks.last().copied().unwrap_or(latest_block_number);
866        if highest_block_number > latest_block_number {
867            return Err(GetBlockInputsError::UnknownBatchBlockReference {
868                highest_block_number,
869                latest_block_number,
870            });
871        }
872
873        // The latest block is not yet in the chain MMR, so we can't (and don't need to) prove its
874        // inclusion in the chain.
875        blocks.remove(&latest_block_number);
876
877        // Fetch the partial MMR at the state of the latest block with authentication paths for the
878        // provided set of blocks.
879        //
880        // SAFETY:
881        // - The latest block num was retrieved from the inner blockchain from which we will also
882        //   retrieve the proofs, so it is guaranteed to exist in that chain.
883        // - We have checked that no block number in the blocks set is greater than latest block
884        //   number *and* latest block num was removed from the set. Therefore only block numbers
885        //   smaller than latest block num remain in the set. Therefore all the block numbers are
886        //   guaranteed to exist in the chain state at latest block num.
887        let partial_mmr =
888            inner.blockchain.partial_mmr_from_blocks(blocks, latest_block_number).expect(
889                "latest block num should exist and all blocks in set should be < than latest block",
890            );
891
892        // Fetch witnesses for all accounts.
893        let account_witnesses = account_ids
894            .iter()
895            .copied()
896            .map(|account_id| (account_id, inner.account_tree.open_latest(account_id)))
897            .collect::<BTreeMap<AccountId, AccountWitness>>();
898
899        // Fetch witnesses for all nullifiers. We don't check whether the nullifiers are spent or
900        // not as this is done as part of proposing the block.
901        let nullifier_witnesses: BTreeMap<Nullifier, NullifierWitness> = nullifiers
902            .iter()
903            .copied()
904            .map(|nullifier| (nullifier, inner.nullifier_tree.open(&nullifier)))
905            .collect();
906
907        Ok((latest_block_number, account_witnesses, nullifier_witnesses, partial_mmr))
908    }
909
910    /// Returns data needed by the block producer to verify transactions validity.
911    #[instrument(target = COMPONENT, skip_all, ret)]
912    pub async fn get_transaction_inputs(
913        &self,
914        account_id: AccountId,
915        nullifiers: &[Nullifier],
916        unauthenticated_note_commitments: Vec<Word>,
917    ) -> Result<TransactionInputs, DatabaseError> {
918        info!(target: COMPONENT, account_id = %account_id.to_string(), nullifiers = %format_array(nullifiers));
919
920        let inner = self.inner.read().await;
921
922        let account_commitment = inner.account_tree.get_latest_commitment(account_id);
923
924        let new_account_id_prefix_is_unique = if account_commitment.is_empty() {
925            Some(!inner.account_tree.contains_account_id_prefix_in_latest(account_id.prefix()))
926        } else {
927            None
928        };
929
930        // Non-unique account Id prefixes for new accounts are not allowed.
931        if let Some(false) = new_account_id_prefix_is_unique {
932            return Ok(TransactionInputs {
933                new_account_id_prefix_is_unique,
934                ..Default::default()
935            });
936        }
937
938        let nullifiers = nullifiers
939            .iter()
940            .map(|nullifier| NullifierInfo {
941                nullifier: *nullifier,
942                block_num: inner.nullifier_tree.get_block_num(nullifier).unwrap_or_default(),
943            })
944            .collect();
945
946        let found_unauthenticated_notes = self
947            .db
948            .select_existing_note_commitments(unauthenticated_note_commitments)
949            .await?;
950
951        Ok(TransactionInputs {
952            account_commitment,
953            nullifiers,
954            found_unauthenticated_notes,
955            new_account_id_prefix_is_unique,
956        })
957    }
958
959    /// Returns details for public (on-chain) account.
960    pub async fn get_account_details(&self, id: AccountId) -> Result<AccountInfo, DatabaseError> {
961        self.db.select_account(id).await
962    }
963
964    /// Returns details for public (on-chain) network accounts by full account ID.
965    pub async fn get_network_account_details_by_id(
966        &self,
967        account_id: AccountId,
968    ) -> Result<Option<AccountInfo>, DatabaseError> {
969        self.db.select_network_account_by_id(account_id).await
970    }
971
972    /// Returns network account IDs within the specified block range (based on account creation
973    /// block).
974    ///
975    /// The function may return fewer accounts than exist in the range if the result would exceed
976    /// `MAX_RESPONSE_PAYLOAD_BYTES / AccountId::SERIALIZED_SIZE` rows. In this case, the result is
977    /// truncated at a block boundary to ensure all accounts from included blocks are returned.
978    ///
979    /// The response includes the last block number that was fully included in the result.
980    pub async fn get_all_network_accounts(
981        &self,
982        block_range: RangeInclusive<BlockNumber>,
983    ) -> Result<(Vec<AccountId>, BlockNumber), DatabaseError> {
984        self.db.select_all_network_account_ids(block_range).await
985    }
986
987    /// Returns an account witness and optionally account details at a specific block.
988    ///
989    /// The witness is a Merkle proof of inclusion in the account tree, proving the account's
990    /// state commitment. If `details` is requested, the method also returns the account's code,
991    /// vault assets, and storage data. Account details are only available for public accounts.
992    ///
993    /// If `block_num` is provided, returns the state at that historical block; otherwise, returns
994    /// the latest state. Note that historical states are only available for recent blocks close
995    /// to the chain tip.
996    pub async fn get_account(
997        &self,
998        account_request: AccountRequest,
999    ) -> Result<AccountResponse, DatabaseError> {
1000        let AccountRequest { block_num, account_id, details } = account_request;
1001
1002        if details.is_some() && !account_id.has_public_state() {
1003            return Err(DatabaseError::AccountNotPublic(account_id));
1004        }
1005
1006        let (block_num, witness) = self.get_account_witness(block_num, account_id).await?;
1007
1008        let details = if let Some(request) = details {
1009            Some(self.fetch_public_account_details(account_id, block_num, request).await?)
1010        } else {
1011            None
1012        };
1013
1014        Ok(AccountResponse { block_num, witness, details })
1015    }
1016
1017    /// Returns an account witness (Merkle proof of inclusion in the account tree).
1018    ///
1019    /// If `block_num` is provided, returns the witness at that historical block;
1020    /// otherwise, returns the witness at the latest block.
1021    async fn get_account_witness(
1022        &self,
1023        block_num: Option<BlockNumber>,
1024        account_id: AccountId,
1025    ) -> Result<(BlockNumber, AccountWitness), DatabaseError> {
1026        let inner_state = self.inner.read().await;
1027
1028        // Determine which block to query
1029        let (block_num, witness) = if let Some(requested_block) = block_num {
1030            // Historical query: use the account tree with history
1031            let witness = inner_state
1032                .account_tree
1033                .open_at(account_id, requested_block)
1034                .ok_or_else(|| DatabaseError::HistoricalBlockNotAvailable {
1035                    block_num: requested_block,
1036                    reason: "Block is either in the future or has been pruned from history"
1037                        .to_string(),
1038                })?;
1039            (requested_block, witness)
1040        } else {
1041            // Latest query: use the latest state
1042            let block_num = inner_state.account_tree.block_number_latest();
1043            let witness = inner_state.account_tree.open_latest(account_id);
1044            (block_num, witness)
1045        };
1046
1047        Ok((block_num, witness))
1048    }
1049
1050    /// Fetches the account details (code, vault, storage) for a public account at the specified
1051    /// block.
1052    ///
1053    /// This method queries the database to fetch the account state and processes the detail
1054    /// request to return only the requested information.
1055    ///
1056    /// For specific key queries (`SlotData::MapKeys`), the forest is used to provide SMT proofs.
1057    /// Returns an error if the forest doesn't have data for the requested slot.
1058    /// All-entries queries (`SlotData::All`) use the forest to return all entries.
1059    async fn fetch_public_account_details(
1060        &self,
1061        account_id: AccountId,
1062        block_num: BlockNumber,
1063        detail_request: AccountDetailRequest,
1064    ) -> Result<AccountDetails, DatabaseError> {
1065        let AccountDetailRequest {
1066            code_commitment,
1067            asset_vault_commitment,
1068            storage_requests,
1069        } = detail_request;
1070
1071        if !account_id.has_public_state() {
1072            return Err(DatabaseError::AccountNotPublic(account_id));
1073        }
1074
1075        // Validate block exists in the blockchain before querying the database
1076        self.validate_block_exists(block_num).await?;
1077
1078        // Query account header and storage header together in a single DB call
1079        let (account_header, storage_header) = self
1080            .db
1081            .select_account_header_with_storage_header_at_block(account_id, block_num)
1082            .await?
1083            .ok_or(DatabaseError::AccountAtBlockHeightNotFoundInDb(account_id, block_num))?;
1084
1085        let account_code = match code_commitment {
1086            Some(commitment) if commitment == account_header.code_commitment() => None,
1087            Some(_) => {
1088                self.db
1089                    .select_account_code_by_commitment(account_header.code_commitment())
1090                    .await?
1091            },
1092            None => None,
1093        };
1094
1095        let vault_details = match asset_vault_commitment {
1096            Some(commitment) if commitment == account_header.vault_root() => {
1097                AccountVaultDetails::empty()
1098            },
1099            Some(_) => {
1100                let vault_assets =
1101                    self.db.select_account_vault_at_block(account_id, block_num).await?;
1102                AccountVaultDetails::from_assets(vault_assets)
1103            },
1104            None => AccountVaultDetails::empty(),
1105        };
1106
1107        let mut storage_map_details =
1108            Vec::<AccountStorageMapDetails>::with_capacity(storage_requests.len());
1109
1110        // Use forest for storage map queries
1111        let forest_guard = self.forest.read().await;
1112
1113        for StorageMapRequest { slot_name, slot_data } in storage_requests {
1114            let details = match &slot_data {
1115                SlotData::MapKeys(keys) => forest_guard
1116                    .open_storage_map(account_id, slot_name.clone(), block_num, keys)
1117                    .ok_or_else(|| DatabaseError::StorageRootNotFound {
1118                        account_id,
1119                        slot_name: slot_name.to_string(),
1120                        block_num,
1121                    })?
1122                    .map_err(DatabaseError::MerkleError)?,
1123                SlotData::All => forest_guard
1124                    .storage_map_entries(account_id, slot_name.clone(), block_num)
1125                    .ok_or_else(|| DatabaseError::StorageRootNotFound {
1126                        account_id,
1127                        slot_name: slot_name.to_string(),
1128                        block_num,
1129                    })?,
1130            };
1131
1132            storage_map_details.push(details);
1133        }
1134
1135        Ok(AccountDetails {
1136            account_header,
1137            account_code,
1138            vault_details,
1139            storage_details: AccountStorageDetails {
1140                header: storage_header,
1141                map_details: storage_map_details,
1142            },
1143        })
1144    }
1145
1146    /// Returns storage map values for syncing within a block range.
1147    pub(crate) async fn get_storage_map_sync_values(
1148        &self,
1149        account_id: AccountId,
1150        block_range: RangeInclusive<BlockNumber>,
1151    ) -> Result<StorageMapValuesPage, DatabaseError> {
1152        self.db.select_storage_map_sync_values(account_id, block_range).await
1153    }
1154
1155    /// Loads a block from the block store. Return `Ok(None)` if the block is not found.
1156    pub async fn load_block(
1157        &self,
1158        block_num: BlockNumber,
1159    ) -> Result<Option<Vec<u8>>, DatabaseError> {
1160        if block_num > self.latest_block_num().await {
1161            return Ok(None);
1162        }
1163        self.block_store.load_block(block_num).await.map_err(Into::into)
1164    }
1165
1166    /// Returns the latest block number.
1167    pub async fn latest_block_num(&self) -> BlockNumber {
1168        self.inner.read().await.latest_block_num()
1169    }
1170
1171    /// Validates that a block exists in the blockchain
1172    ///
1173    /// # Attention
1174    ///
1175    /// Acquires a *read lock** on `self.inner`.
1176    ///
1177    /// # Errors
1178    ///
1179    /// Returns `DatabaseError::BlockNotFound` if the block doesn't exist in the blockchain.
1180    async fn validate_block_exists(&self, block_num: BlockNumber) -> Result<(), DatabaseError> {
1181        let inner = self.inner.read().await;
1182        let latest_block_num = inner.latest_block_num();
1183
1184        if block_num > latest_block_num {
1185            return Err(DatabaseError::BlockNotFound(block_num));
1186        }
1187
1188        Ok(())
1189    }
1190
1191    /// Emits metrics for each database table's size.
1192    pub async fn analyze_table_sizes(&self) -> Result<(), DatabaseError> {
1193        self.db.analyze_table_sizes().await
1194    }
1195
1196    /// Returns account vault updates for specified account within a block range.
1197    pub async fn sync_account_vault(
1198        &self,
1199        account_id: AccountId,
1200        block_range: RangeInclusive<BlockNumber>,
1201    ) -> Result<(BlockNumber, Vec<AccountVaultValue>), DatabaseError> {
1202        self.db.get_account_vault_sync(account_id, block_range).await
1203    }
1204    /// Returns the network notes for an account that are unconsumed by a specified block number,
1205    /// along with the next pagination token.
1206    pub async fn get_unconsumed_network_notes_for_account(
1207        &self,
1208        account_id: AccountId,
1209        block_num: BlockNumber,
1210        page: Page,
1211    ) -> Result<(Vec<NoteRecord>, Page), DatabaseError> {
1212        self.db.select_unconsumed_network_notes(account_id, block_num, page).await
1213    }
1214
1215    /// Returns the script for a note by its root.
1216    pub async fn get_note_script_by_root(
1217        &self,
1218        root: Word,
1219    ) -> Result<Option<NoteScript>, DatabaseError> {
1220        self.db.select_note_script_by_root(root).await
1221    }
1222
1223    /// Returns the complete transaction records for the specified accounts within the specified
1224    /// block range, including state commitments and note IDs.
1225    pub async fn sync_transactions(
1226        &self,
1227        account_ids: Vec<AccountId>,
1228        block_range: RangeInclusive<BlockNumber>,
1229    ) -> Result<(BlockNumber, Vec<crate::db::TransactionRecord>), DatabaseError> {
1230        self.db.select_transactions_records(account_ids, block_range).await
1231    }
1232
1233    /// Returns vault asset witnesses for the specified account and block number.
1234    pub async fn get_vault_asset_witnesses(
1235        &self,
1236        account_id: AccountId,
1237        block_num: BlockNumber,
1238        vault_keys: BTreeSet<AssetVaultKey>,
1239    ) -> Result<Vec<AssetWitness>, WitnessError> {
1240        let witnesses = self
1241            .forest
1242            .read()
1243            .await
1244            .get_vault_asset_witnesses(account_id, block_num, vault_keys)?;
1245        Ok(witnesses)
1246    }
1247
1248    /// Returns a storage map witness for the specified account and storage entry at the block
1249    /// number.
1250    ///
1251    /// Note that the `raw_key` is the raw, user-provided key that needs to be hashed in order to
1252    /// get the actual key into the storage map.
1253    pub async fn get_storage_map_witness(
1254        &self,
1255        account_id: AccountId,
1256        slot_name: &StorageSlotName,
1257        block_num: BlockNumber,
1258        raw_key: Word,
1259    ) -> Result<StorageMapWitness, WitnessError> {
1260        let witness = self
1261            .forest
1262            .read()
1263            .await
1264            .get_storage_map_witness(account_id, slot_name, block_num, raw_key)?;
1265        Ok(witness)
1266    }
1267}