miden_node_store/
state.rs

1//! Abstraction to synchronize state modifications.
2//!
3//! The [State] provides data access and modifications methods, its main purpose is to ensure that
4//! data is atomically written, and that reads are consistent.
5
6use std::collections::{BTreeMap, BTreeSet, HashSet};
7use std::ops::RangeInclusive;
8use std::path::Path;
9use std::sync::Arc;
10
11use miden_node_proto::domain::account::{
12    AccountDetailRequest,
13    AccountDetails,
14    AccountInfo,
15    AccountProofRequest,
16    AccountProofResponse,
17    AccountStorageDetails,
18    AccountStorageMapDetails,
19    AccountVaultDetails,
20    NetworkAccountPrefix,
21    StorageMapRequest,
22};
23use miden_node_proto::domain::batch::BatchInputs;
24use miden_node_utils::ErrorReport;
25use miden_node_utils::formatting::format_array;
26use miden_objects::account::{AccountHeader, AccountId, StorageSlot};
27use miden_objects::block::account_tree::{AccountTree, account_id_to_smt_key};
28use miden_objects::block::{
29    AccountWitness,
30    BlockHeader,
31    BlockInputs,
32    BlockNumber,
33    Blockchain,
34    NullifierTree,
35    NullifierWitness,
36    ProvenBlock,
37};
38use miden_objects::crypto::merkle::{
39    Forest,
40    LargeSmt,
41    MemoryStorage,
42    Mmr,
43    MmrDelta,
44    MmrPeaks,
45    MmrProof,
46    PartialMmr,
47    SmtProof,
48    SmtStorage,
49};
50use miden_objects::note::{NoteDetails, NoteId, NoteScript, Nullifier};
51use miden_objects::transaction::{OutputNote, PartialBlockchain};
52use miden_objects::utils::Serializable;
53use miden_objects::{AccountError, Word};
54use tokio::sync::{Mutex, RwLock, oneshot};
55use tracing::{Instrument, info, info_span, instrument};
56
57use crate::blocks::BlockStore;
58use crate::db::models::Page;
59use crate::db::models::queries::StorageMapValuesPage;
60use crate::db::{
61    AccountVaultValue,
62    Db,
63    NoteRecord,
64    NoteSyncUpdate,
65    NullifierInfo,
66    StateSyncUpdate,
67};
68use crate::errors::{
69    ApplyBlockError,
70    DatabaseError,
71    GetBatchInputsError,
72    GetBlockHeaderError,
73    GetBlockInputsError,
74    GetCurrentBlockchainDataError,
75    InvalidBlockError,
76    NoteSyncError,
77    StateInitializationError,
78    StateSyncError,
79};
80use crate::{AccountTreeWithHistory, COMPONENT, DataDirectory, InMemoryAccountTree};
81
82// STRUCTURES
83// ================================================================================================
84
85#[derive(Debug, Default)]
86pub struct TransactionInputs {
87    pub account_commitment: Word,
88    pub nullifiers: Vec<NullifierInfo>,
89    pub found_unauthenticated_notes: HashSet<Word>,
90    pub new_account_id_prefix_is_unique: Option<bool>,
91}
92
93/// Container for state that needs to be updated atomically.
94struct InnerState<S = MemoryStorage>
95where
96    S: SmtStorage,
97{
98    nullifier_tree: NullifierTree,
99    blockchain: Blockchain,
100    account_tree: AccountTreeWithHistory<AccountTree<LargeSmt<S>>>,
101}
102
103impl<S> InnerState<S>
104where
105    S: SmtStorage,
106{
107    /// Returns the latest block number.
108    fn latest_block_num(&self) -> BlockNumber {
109        self.blockchain
110            .chain_tip()
111            .expect("chain should always have at least the genesis block")
112    }
113}
114
115/// The rollup state
116pub struct State {
117    /// The database which stores block headers, nullifiers, notes, and the latest states of
118    /// accounts.
119    db: Arc<Db>,
120
121    /// The block store which stores full block contents for all blocks.
122    block_store: Arc<BlockStore>,
123
124    /// Read-write lock used to prevent writing to a structure while it is being used.
125    ///
126    /// The lock is writer-preferring, meaning the writer won't be starved.
127    inner: RwLock<InnerState>,
128
129    /// To allow readers to access the tree data while an update in being performed, and prevent
130    /// TOCTOU issues, there must be no concurrent writers. This locks to serialize the writers.
131    writer: Mutex<()>,
132}
133
134impl State {
135    /// Loads the state from the `db`.
136    #[instrument(target = COMPONENT, skip_all)]
137    pub async fn load(data_path: &Path) -> Result<Self, StateInitializationError> {
138        let data_directory = DataDirectory::load(data_path.to_path_buf())
139            .map_err(StateInitializationError::DataDirectoryLoadError)?;
140
141        let block_store = Arc::new(
142            BlockStore::load(data_directory.block_store_dir())
143                .map_err(StateInitializationError::BlockStoreLoadError)?,
144        );
145
146        let database_filepath = data_directory.database_path();
147        let mut db = Db::load(database_filepath.clone())
148            .await
149            .map_err(StateInitializationError::DatabaseLoadError)?;
150
151        let chain_mmr = load_mmr(&mut db).await?;
152        let block_headers = db.select_all_block_headers().await?;
153        // TODO: Account tree loading synchronization
154        // Currently `load_account_tree` loads all account commitments from the DB. This could
155        // potentially lead to inconsistency if the DB contains account states from blocks beyond
156        // `latest_block_num`, though in practice the DB writes are transactional and this
157        // should not occur.
158        let latest_block_num = block_headers
159            .last()
160            .map_or(BlockNumber::GENESIS, miden_objects::block::BlockHeader::block_num);
161        let account_tree = load_account_tree(&mut db, latest_block_num).await?;
162        let nullifier_tree = load_nullifier_tree(&mut db).await?;
163
164        let inner = RwLock::new(InnerState {
165            nullifier_tree,
166            // SAFETY: We assume the loaded MMR is valid and does not have more than u32::MAX
167            // entries.
168            blockchain: Blockchain::from_mmr_unchecked(chain_mmr),
169            account_tree,
170        });
171
172        let writer = Mutex::new(());
173        let db = Arc::new(db);
174
175        Ok(Self { db, block_store, inner, writer })
176    }
177
178    /// Apply changes of a new block to the DB and in-memory data structures.
179    ///
180    /// ## Note on state consistency
181    ///
182    /// The server contains in-memory representations of the existing trees, the in-memory
183    /// representation must be kept consistent with the committed data, this is necessary so to
184    /// provide consistent results for all endpoints. In order to achieve consistency, the
185    /// following steps are used:
186    ///
187    /// - the request data is validated, prior to starting any modifications.
188    /// - block is being saved into the store in parallel with updating the DB, but before
189    ///   committing. This block is considered as candidate and not yet available for reading
190    ///   because the latest block pointer is not updated yet.
191    /// - a transaction is open in the DB and the writes are started.
192    /// - while the transaction is not committed, concurrent reads are allowed, both the DB and the
193    ///   in-memory representations, which are consistent at this stage.
194    /// - prior to committing the changes to the DB, an exclusive lock to the in-memory data is
195    ///   acquired, preventing concurrent reads to the in-memory data, since that will be
196    ///   out-of-sync w.r.t. the DB.
197    /// - the DB transaction is committed, and requests that read only from the DB can proceed to
198    ///   use the fresh data.
199    /// - the in-memory structures are updated, including the latest block pointer and the lock is
200    ///   released.
201    // TODO: This span is logged in a root span, we should connect it to the parent span.
202    #[allow(clippy::too_many_lines)]
203    #[instrument(target = COMPONENT, skip_all, err)]
204    pub async fn apply_block(&self, block: ProvenBlock) -> Result<(), ApplyBlockError> {
205        let _lock = self.writer.try_lock().map_err(|_| ApplyBlockError::ConcurrentWrite)?;
206
207        let header = block.header();
208
209        let tx_commitment = block.transactions().commitment();
210
211        if header.tx_commitment() != tx_commitment {
212            return Err(InvalidBlockError::InvalidBlockTxCommitment {
213                expected: tx_commitment,
214                actual: header.tx_commitment(),
215            }
216            .into());
217        }
218
219        let block_num = header.block_num();
220        let block_commitment = block.commitment();
221
222        // ensures the right block header is being processed
223        let prev_block = self
224            .db
225            .select_block_header_by_block_num(None)
226            .await?
227            .ok_or(ApplyBlockError::DbBlockHeaderEmpty)?;
228
229        let expected_block_num = prev_block.block_num().child();
230        if block_num != expected_block_num {
231            return Err(InvalidBlockError::NewBlockInvalidBlockNum {
232                expected: expected_block_num,
233                submitted: block_num,
234            }
235            .into());
236        }
237        if header.prev_block_commitment() != prev_block.commitment() {
238            return Err(InvalidBlockError::NewBlockInvalidPrevCommitment.into());
239        }
240
241        let block_data = block.to_bytes();
242
243        // Save the block to the block store. In a case of a rolled-back DB transaction, the
244        // in-memory state will be unchanged, but the block might still be written into the
245        // block store. Thus, such block should be considered as block candidates, but not
246        // finalized blocks. So we should check for the latest block when getting block from
247        // the store.
248        let store = Arc::clone(&self.block_store);
249        let block_save_task = tokio::spawn(
250            async move { store.save_block(block_num, &block_data).await }.in_current_span(),
251        );
252
253        // scope to read in-memory data, compute mutations required for updating account
254        // and nullifier trees, and validate the request
255        let (
256            nullifier_tree_old_root,
257            nullifier_tree_update,
258            account_tree_old_root,
259            account_tree_update,
260        ) = {
261            let inner = self.inner.read().await;
262
263            let _span = info_span!(target: COMPONENT, "update_in_memory_structs").entered();
264
265            // nullifiers can be produced only once
266            let duplicate_nullifiers: Vec<_> = block
267                .created_nullifiers()
268                .iter()
269                .filter(|&n| inner.nullifier_tree.get_block_num(n).is_some())
270                .copied()
271                .collect();
272            if !duplicate_nullifiers.is_empty() {
273                return Err(InvalidBlockError::DuplicatedNullifiers(duplicate_nullifiers).into());
274            }
275
276            // compute updates for the in-memory data structures
277
278            // new_block.chain_root must be equal to the chain MMR root prior to the update
279            let peaks = inner.blockchain.peaks();
280            if peaks.hash_peaks() != header.chain_commitment() {
281                return Err(InvalidBlockError::NewBlockInvalidChainCommitment.into());
282            }
283
284            // compute update for nullifier tree
285            let nullifier_tree_update = inner
286                .nullifier_tree
287                .compute_mutations(
288                    block.created_nullifiers().iter().map(|nullifier| (*nullifier, block_num)),
289                )
290                .map_err(InvalidBlockError::NewBlockNullifierAlreadySpent)?;
291
292            if nullifier_tree_update.as_mutation_set().root() != header.nullifier_root() {
293                return Err(InvalidBlockError::NewBlockInvalidNullifierRoot.into());
294            }
295
296            // compute update for account tree
297            let account_tree_update = inner
298                .account_tree
299                .compute_mutations(
300                    block
301                        .updated_accounts()
302                        .iter()
303                        .map(|update| (update.account_id(), update.final_state_commitment())),
304                )
305                .map_err(|e| match e {
306                    crate::HistoricalError::AccountTreeError(err) => {
307                        InvalidBlockError::NewBlockDuplicateAccountIdPrefix(err)
308                    },
309                    crate::HistoricalError::MerkleError(_) => {
310                        panic!("Unexpected MerkleError during account tree mutation computation")
311                    },
312                })?;
313
314            if account_tree_update.as_mutation_set().root() != header.account_root() {
315                return Err(InvalidBlockError::NewBlockInvalidAccountRoot.into());
316            }
317
318            (
319                inner.nullifier_tree.root(),
320                nullifier_tree_update,
321                inner.account_tree.root_latest(),
322                account_tree_update,
323            )
324        };
325
326        // build note tree
327        let note_tree = block.build_output_note_tree();
328        if note_tree.root() != header.note_root() {
329            return Err(InvalidBlockError::NewBlockInvalidNoteRoot.into());
330        }
331
332        let notes = block
333            .output_notes()
334            .map(|(note_index, note)| {
335                let (details, nullifier) = match note {
336                    OutputNote::Full(note) => {
337                        (Some(NoteDetails::from(note)), Some(note.nullifier()))
338                    },
339                    OutputNote::Header(_) => (None, None),
340                    note @ OutputNote::Partial(_) => {
341                        return Err(InvalidBlockError::InvalidOutputNoteType(Box::new(
342                            note.clone(),
343                        )));
344                    },
345                };
346
347                let inclusion_path = note_tree.open(note_index);
348
349                let note_record = NoteRecord {
350                    block_num,
351                    note_index,
352                    note_id: note.id().into(),
353                    note_commitment: note.commitment(),
354                    metadata: *note.metadata(),
355                    details,
356                    inclusion_path,
357                };
358
359                Ok((note_record, nullifier))
360            })
361            .collect::<Result<Vec<_>, InvalidBlockError>>()?;
362
363        // Signals the transaction is ready to be committed, and the write lock can be acquired
364        let (allow_acquire, acquired_allowed) = oneshot::channel::<()>();
365        // Signals the write lock has been acquired, and the transaction can be committed
366        let (inform_acquire_done, acquire_done) = oneshot::channel::<()>();
367
368        // The DB and in-memory state updates need to be synchronized and are partially
369        // overlapping. Namely, the DB transaction only proceeds after this task acquires the
370        // in-memory write lock. This requires the DB update to run concurrently, so a new task is
371        // spawned.
372        let db = Arc::clone(&self.db);
373        let db_update_task = tokio::spawn(
374            async move { db.apply_block(allow_acquire, acquire_done, block, notes).await }
375                .in_current_span(),
376        );
377
378        // Wait for the message from the DB update task, that we ready to commit the DB transaction
379        acquired_allowed.await.map_err(ApplyBlockError::ClosedChannel)?;
380
381        // Awaiting the block saving task to complete without errors
382        block_save_task.await??;
383
384        // Scope to update the in-memory data
385        async move {
386            // We need to hold the write lock here to prevent inconsistency between the in-memory
387            // state and the DB state. Thus, we need to wait for the DB update task to complete
388            // successfully.
389            let mut inner = self.inner.write().await;
390
391            // We need to check that neither the nullifier tree nor the account tree have changed
392            // while we were waiting for the DB preparation task to complete. If either of them
393            // did change, we do not proceed with in-memory and database updates, since it may
394            // lead to an inconsistent state.
395            if inner.nullifier_tree.root() != nullifier_tree_old_root
396                || inner.account_tree.root_latest() != account_tree_old_root
397            {
398                return Err(ApplyBlockError::ConcurrentWrite);
399            }
400
401            // Notify the DB update task that the write lock has been acquired, so it can commit
402            // the DB transaction
403            inform_acquire_done
404                .send(())
405                .map_err(|_| ApplyBlockError::DbUpdateTaskFailed("Receiver was dropped".into()))?;
406
407            // TODO: shutdown #91
408            // Await for successful commit of the DB transaction. If the commit fails, we mustn't
409            // change in-memory state, so we return a block applying error and don't proceed with
410            // in-memory updates.
411            db_update_task
412                .in_current_span()
413                .await?
414                .map_err(|err| ApplyBlockError::DbUpdateTaskFailed(err.as_report()))?;
415
416            // Update the in-memory data structures after successful commit of the DB transaction
417            inner
418                .nullifier_tree
419                .apply_mutations(nullifier_tree_update)
420                .expect("Unreachable: old nullifier tree root must be checked before this step");
421            inner
422                .account_tree
423                .apply_mutations(account_tree_update)
424                .expect("Unreachable: old account tree root must be checked before this step");
425            inner.blockchain.push(block_commitment);
426
427            Ok(())
428        }
429        .instrument(info_span!("update trees"))
430        .await
431    }
432
433    /// Queries a [BlockHeader] from the database, and returns it alongside its inclusion proof.
434    ///
435    /// If [None] is given as the value of `block_num`, the data for the latest [BlockHeader] is
436    /// returned.
437    #[instrument(level = "debug", target = COMPONENT, skip_all, ret(level = "debug"), err)]
438    pub async fn get_block_header(
439        &self,
440        block_num: Option<BlockNumber>,
441        include_mmr_proof: bool,
442    ) -> Result<(Option<BlockHeader>, Option<MmrProof>), GetBlockHeaderError> {
443        let block_header = self.db.select_block_header_by_block_num(block_num).await?;
444        if let Some(header) = block_header {
445            let mmr_proof = if include_mmr_proof {
446                let inner = self.inner.read().await;
447                let mmr_proof = inner.blockchain.open(header.block_num())?;
448                Some(mmr_proof)
449            } else {
450                None
451            };
452            Ok((Some(header), mmr_proof))
453        } else {
454            Ok((None, None))
455        }
456    }
457
458    pub async fn sync_nullifiers(
459        &self,
460        prefix_len: u32,
461        nullifier_prefixes: Vec<u32>,
462        block_range: RangeInclusive<BlockNumber>,
463    ) -> Result<(Vec<NullifierInfo>, BlockNumber), DatabaseError> {
464        self.db
465            .select_nullifiers_by_prefix(prefix_len, nullifier_prefixes, block_range)
466            .await
467    }
468
469    /// Generates membership proofs for each one of the `nullifiers` against the latest nullifier
470    /// tree.
471    ///
472    /// Note: these proofs are invalidated once the nullifier tree is modified, i.e. on a new block.
473    #[instrument(level = "debug", target = COMPONENT, skip_all, ret)]
474    pub async fn check_nullifiers(&self, nullifiers: &[Nullifier]) -> Vec<SmtProof> {
475        let inner = self.inner.read().await;
476        nullifiers
477            .iter()
478            .map(|n| inner.nullifier_tree.open(n))
479            .map(NullifierWitness::into_proof)
480            .collect()
481    }
482
483    /// Queries a list of notes from the database.
484    ///
485    /// If the provided list of [`NoteId`] given is empty or no note matches the provided
486    /// [`NoteId`] an empty list is returned.
487    pub async fn get_notes_by_id(
488        &self,
489        note_ids: Vec<NoteId>,
490    ) -> Result<Vec<NoteRecord>, DatabaseError> {
491        self.db.select_notes_by_id(note_ids).await
492    }
493
494    /// If the input block number is the current chain tip, `None` is returned.
495    /// Otherwise, gets the current chain tip's block header with its corresponding MMR peaks.
496    pub async fn get_current_blockchain_data(
497        &self,
498        block_num: Option<BlockNumber>,
499    ) -> Result<Option<(BlockHeader, MmrPeaks)>, GetCurrentBlockchainDataError> {
500        let blockchain = &self.inner.read().await.blockchain;
501        if let Some(number) = block_num
502            && number == self.latest_block_num().await
503        {
504            return Ok(None);
505        }
506
507        // SAFETY: `select_block_header_by_block_num` will always return `Some(chain_tip_header)`
508        // when `None` is passed
509        let block_header: BlockHeader = self
510            .db
511            .select_block_header_by_block_num(None)
512            .await
513            .map_err(GetCurrentBlockchainDataError::ErrorRetrievingBlockHeader)?
514            .unwrap();
515        let peaks = blockchain
516            .peaks_at(block_header.block_num())
517            .map_err(GetCurrentBlockchainDataError::InvalidPeaks)?;
518
519        Ok(Some((block_header, peaks)))
520    }
521
522    /// Fetches the inputs for a transaction batch from the database.
523    ///
524    /// ## Inputs
525    ///
526    /// The function takes as input:
527    /// - The tx reference blocks are the set of blocks referenced by transactions in the batch.
528    /// - The unauthenticated note commitments are the set of commitments of unauthenticated notes
529    ///   consumed by all transactions in the batch. For these notes, we attempt to find inclusion
530    ///   proofs. Not all notes will exist in the DB necessarily, as some notes can be created and
531    ///   consumed within the same batch.
532    ///
533    /// ## Outputs
534    ///
535    /// The function will return:
536    /// - A block inclusion proof for all tx reference blocks and for all blocks which are
537    ///   referenced by a note inclusion proof.
538    /// - Note inclusion proofs for all notes that were found in the DB.
539    /// - The block header that the batch should reference, i.e. the latest known block.
540    pub async fn get_batch_inputs(
541        &self,
542        tx_reference_blocks: BTreeSet<BlockNumber>,
543        unauthenticated_note_commitments: BTreeSet<Word>,
544    ) -> Result<BatchInputs, GetBatchInputsError> {
545        if tx_reference_blocks.is_empty() {
546            return Err(GetBatchInputsError::TransactionBlockReferencesEmpty);
547        }
548
549        // First we grab note inclusion proofs for the known notes. These proofs only
550        // prove that the note was included in a given block. We then also need to prove that
551        // each of those blocks is included in the chain.
552        let note_proofs = self
553            .db
554            .select_note_inclusion_proofs(unauthenticated_note_commitments)
555            .await
556            .map_err(GetBatchInputsError::SelectNoteInclusionProofError)?;
557
558        // The set of blocks that the notes are included in.
559        let note_blocks = note_proofs.values().map(|proof| proof.location().block_num());
560
561        // Collect all blocks we need to query without duplicates, which is:
562        // - all blocks for which we need to prove note inclusion.
563        // - all blocks referenced by transactions in the batch.
564        let mut blocks: BTreeSet<BlockNumber> = tx_reference_blocks;
565        blocks.extend(note_blocks);
566
567        // Scoped block to automatically drop the read lock guard as soon as we're done.
568        // We also avoid accessing the db in the block as this would delay dropping the guard.
569        let (batch_reference_block, partial_mmr) = {
570            let inner_state = self.inner.read().await;
571
572            let latest_block_num = inner_state.latest_block_num();
573
574            let highest_block_num =
575                *blocks.last().expect("we should have checked for empty block references");
576            if highest_block_num > latest_block_num {
577                return Err(GetBatchInputsError::UnknownTransactionBlockReference {
578                    highest_block_num,
579                    latest_block_num,
580                });
581            }
582
583            // Remove the latest block from the to-be-tracked blocks as it will be the reference
584            // block for the batch itself and thus added to the MMR within the batch kernel, so
585            // there is no need to prove its inclusion.
586            blocks.remove(&latest_block_num);
587
588            // SAFETY:
589            // - The latest block num was retrieved from the inner blockchain from which we will
590            //   also retrieve the proofs, so it is guaranteed to exist in that chain.
591            // - We have checked that no block number in the blocks set is greater than latest block
592            //   number *and* latest block num was removed from the set. Therefore only block
593            //   numbers smaller than latest block num remain in the set. Therefore all the block
594            //   numbers are guaranteed to exist in the chain state at latest block num.
595            let partial_mmr = inner_state
596                .blockchain
597                .partial_mmr_from_blocks(&blocks, latest_block_num)
598                .expect("latest block num should exist and all blocks in set should be < than latest block");
599
600            (latest_block_num, partial_mmr)
601        };
602
603        // Fetch the reference block of the batch as part of this query, so we can avoid looking it
604        // up in a separate DB access.
605        let mut headers = self
606            .db
607            .select_block_headers(blocks.into_iter().chain(std::iter::once(batch_reference_block)))
608            .await
609            .map_err(GetBatchInputsError::SelectBlockHeaderError)?;
610
611        // Find and remove the batch reference block as we don't want to add it to the chain MMR.
612        let header_index = headers
613            .iter()
614            .enumerate()
615            .find_map(|(index, header)| {
616                (header.block_num() == batch_reference_block).then_some(index)
617            })
618            .expect("DB should have returned the header of the batch reference block");
619
620        // The order doesn't matter for PartialBlockchain::new, so swap remove is fine.
621        let batch_reference_block_header = headers.swap_remove(header_index);
622
623        // SAFETY: This should not error because:
624        // - we're passing exactly the block headers that we've added to the partial MMR,
625        // - so none of the block headers block numbers should exceed the chain length of the
626        //   partial MMR,
627        // - and we've added blocks to a BTreeSet, so there can be no duplicates.
628        //
629        // We construct headers and partial MMR in concert, so they are consistent. This is why we
630        // can call the unchecked constructor.
631        let partial_block_chain = PartialBlockchain::new_unchecked(partial_mmr, headers)
632            .expect("partial mmr and block headers should be consistent");
633
634        Ok(BatchInputs {
635            batch_reference_block_header,
636            note_proofs,
637            partial_block_chain,
638        })
639    }
640
641    /// Loads data to synchronize a client.
642    ///
643    /// The client's request contains a list of tag prefixes, this method will return the first
644    /// block with a matching tag, or the chain tip. All the other values are filter based on this
645    /// block range.
646    ///
647    /// # Arguments
648    ///
649    /// - `block_num`: The last block *known* by the client, updates start from the next block.
650    /// - `account_ids`: Include the account's commitment if their _last change_ was in the result's
651    ///   block range.
652    /// - `note_tags`: The tags the client is interested in, result is restricted to the first block
653    ///   with any matches tags.
654    #[instrument(level = "debug", target = COMPONENT, skip_all, ret(level = "debug"), err)]
655    pub async fn sync_state(
656        &self,
657        block_num: BlockNumber,
658        account_ids: Vec<AccountId>,
659        note_tags: Vec<u32>,
660    ) -> Result<(StateSyncUpdate, MmrDelta), StateSyncError> {
661        let inner = self.inner.read().await;
662
663        let state_sync = self.db.get_state_sync(block_num, account_ids, note_tags).await?;
664
665        let delta = if block_num == state_sync.block_header.block_num() {
666            // The client is in sync with the chain tip.
667            MmrDelta {
668                forest: Forest::new(block_num.as_usize()),
669                data: vec![],
670            }
671        } else {
672            // Important notes about the boundary conditions:
673            //
674            // - The Mmr forest is 1-indexed whereas the block number is 0-indexed. The Mmr root
675            // contained in the block header always lag behind by one block, this is because the Mmr
676            // leaves are hashes of block headers, and we can't have self-referential hashes. These
677            // two points cancel out and don't require adjusting.
678            // - Mmr::get_delta is inclusive, whereas the sync_state request block_num is defined to
679            //   be
680            // exclusive, so the from_forest has to be adjusted with a +1
681            let from_forest = (block_num + 1).as_usize();
682            let to_forest = state_sync.block_header.block_num().as_usize();
683            inner
684                .blockchain
685                .as_mmr()
686                .get_delta(Forest::new(from_forest), Forest::new(to_forest))
687                .map_err(StateSyncError::FailedToBuildMmrDelta)?
688        };
689
690        Ok((state_sync, delta))
691    }
692
693    /// Loads data to synchronize a client's notes.
694    ///
695    /// The client's request contains a list of tags, this method will return the first
696    /// block with a matching tag, or the chain tip. All the other values are filter based on this
697    /// block range.
698    ///
699    /// # Arguments
700    ///
701    /// - `note_tags`: The tags the client is interested in, resulting notes are restricted to the
702    ///   first block containing a matching note.
703    /// - `block_range`: The range of blocks from which to synchronize notes.
704    #[instrument(level = "debug", target = COMPONENT, skip_all, ret(level = "debug"), err)]
705    pub async fn sync_notes(
706        &self,
707        note_tags: Vec<u32>,
708        block_range: RangeInclusive<BlockNumber>,
709    ) -> Result<(NoteSyncUpdate, MmrProof, BlockNumber), NoteSyncError> {
710        let inner = self.inner.read().await;
711
712        let (note_sync, last_included_block) =
713            self.db.get_note_sync(block_range, note_tags).await?;
714
715        let mmr_proof = inner.blockchain.open(note_sync.block_header.block_num())?;
716
717        Ok((note_sync, mmr_proof, last_included_block))
718    }
719
720    /// Returns data needed by the block producer to construct and prove the next block.
721    pub async fn get_block_inputs(
722        &self,
723        account_ids: Vec<AccountId>,
724        nullifiers: Vec<Nullifier>,
725        unauthenticated_note_commitments: BTreeSet<Word>,
726        reference_blocks: BTreeSet<BlockNumber>,
727    ) -> Result<BlockInputs, GetBlockInputsError> {
728        // Get the note inclusion proofs from the DB.
729        // We do this first so we have to acquire the lock to the state just once. There we need the
730        // reference blocks of the note proofs to get their authentication paths in the chain MMR.
731        let unauthenticated_note_proofs = self
732            .db
733            .select_note_inclusion_proofs(unauthenticated_note_commitments)
734            .await
735            .map_err(GetBlockInputsError::SelectNoteInclusionProofError)?;
736
737        // The set of blocks that the notes are included in.
738        let note_proof_reference_blocks =
739            unauthenticated_note_proofs.values().map(|proof| proof.location().block_num());
740
741        // Collect all blocks we need to prove inclusion for, without duplicates.
742        let mut blocks = reference_blocks;
743        blocks.extend(note_proof_reference_blocks);
744
745        let (latest_block_number, account_witnesses, nullifier_witnesses, partial_mmr) =
746            self.get_block_inputs_witnesses(&mut blocks, account_ids, nullifiers).await?;
747
748        // Fetch the block headers for all blocks in the partial MMR plus the latest one which will
749        // be used as the previous block header of the block being built.
750        let mut headers = self
751            .db
752            .select_block_headers(blocks.into_iter().chain(std::iter::once(latest_block_number)))
753            .await
754            .map_err(GetBlockInputsError::SelectBlockHeaderError)?;
755
756        // Find and remove the latest block as we must not add it to the chain MMR, since it is
757        // not yet in the chain.
758        let latest_block_header_index = headers
759            .iter()
760            .enumerate()
761            .find_map(|(index, header)| {
762                (header.block_num() == latest_block_number).then_some(index)
763            })
764            .expect("DB should have returned the header of the latest block header");
765
766        // The order doesn't matter for PartialBlockchain::new, so swap remove is fine.
767        let latest_block_header = headers.swap_remove(latest_block_header_index);
768
769        // SAFETY: This should not error because:
770        // - we're passing exactly the block headers that we've added to the partial MMR,
771        // - so none of the block header's block numbers should exceed the chain length of the
772        //   partial MMR,
773        // - and we've added blocks to a BTreeSet, so there can be no duplicates.
774        //
775        // We construct headers and partial MMR in concert, so they are consistent. This is why we
776        // can call the unchecked constructor.
777        let partial_block_chain = PartialBlockchain::new_unchecked(partial_mmr, headers)
778            .expect("partial mmr and block headers should be consistent");
779
780        Ok(BlockInputs::new(
781            latest_block_header,
782            partial_block_chain,
783            account_witnesses,
784            nullifier_witnesses,
785            unauthenticated_note_proofs,
786        ))
787    }
788
789    /// Get account and nullifier witnesses for the requested account IDs and nullifier as well as
790    /// the [`PartialMmr`] for the given blocks. The MMR won't contain the latest block and its
791    /// number is removed from `blocks` and returned separately.
792    ///
793    /// This method acquires the lock to the inner state and does not access the DB so we release
794    /// the lock asap.
795    async fn get_block_inputs_witnesses(
796        &self,
797        blocks: &mut BTreeSet<BlockNumber>,
798        account_ids: Vec<AccountId>,
799        nullifiers: Vec<Nullifier>,
800    ) -> Result<
801        (
802            BlockNumber,
803            BTreeMap<AccountId, AccountWitness>,
804            BTreeMap<Nullifier, NullifierWitness>,
805            PartialMmr,
806        ),
807        GetBlockInputsError,
808    > {
809        let inner = self.inner.read().await;
810
811        let latest_block_number = inner.latest_block_num();
812
813        // If `blocks` is empty, use the latest block number which will never trigger the error.
814        let highest_block_number = blocks.last().copied().unwrap_or(latest_block_number);
815        if highest_block_number > latest_block_number {
816            return Err(GetBlockInputsError::UnknownBatchBlockReference {
817                highest_block_number,
818                latest_block_number,
819            });
820        }
821
822        // The latest block is not yet in the chain MMR, so we can't (and don't need to) prove its
823        // inclusion in the chain.
824        blocks.remove(&latest_block_number);
825
826        // Fetch the partial MMR at the state of the latest block with authentication paths for the
827        // provided set of blocks.
828        //
829        // SAFETY:
830        // - The latest block num was retrieved from the inner blockchain from which we will also
831        //   retrieve the proofs, so it is guaranteed to exist in that chain.
832        // - We have checked that no block number in the blocks set is greater than latest block
833        //   number *and* latest block num was removed from the set. Therefore only block numbers
834        //   smaller than latest block num remain in the set. Therefore all the block numbers are
835        //   guaranteed to exist in the chain state at latest block num.
836        let partial_mmr =
837            inner.blockchain.partial_mmr_from_blocks(blocks, latest_block_number).expect(
838                "latest block num should exist and all blocks in set should be < than latest block",
839            );
840
841        // Fetch witnesses for all accounts.
842        let account_witnesses = account_ids
843            .iter()
844            .copied()
845            .map(|account_id| (account_id, inner.account_tree.open_latest(account_id)))
846            .collect::<BTreeMap<AccountId, AccountWitness>>();
847
848        // Fetch witnesses for all nullifiers. We don't check whether the nullifiers are spent or
849        // not as this is done as part of proposing the block.
850        let nullifier_witnesses: BTreeMap<Nullifier, NullifierWitness> = nullifiers
851            .iter()
852            .copied()
853            .map(|nullifier| (nullifier, inner.nullifier_tree.open(&nullifier)))
854            .collect();
855
856        Ok((latest_block_number, account_witnesses, nullifier_witnesses, partial_mmr))
857    }
858
859    /// Returns data needed by the block producer to verify transactions validity.
860    #[instrument(target = COMPONENT, skip_all, ret)]
861    pub async fn get_transaction_inputs(
862        &self,
863        account_id: AccountId,
864        nullifiers: &[Nullifier],
865        unauthenticated_note_commitments: Vec<Word>,
866    ) -> Result<TransactionInputs, DatabaseError> {
867        info!(target: COMPONENT, account_id = %account_id.to_string(), nullifiers = %format_array(nullifiers));
868
869        let inner = self.inner.read().await;
870
871        let account_commitment = inner.account_tree.get_latest_commitment(account_id);
872
873        let new_account_id_prefix_is_unique = if account_commitment.is_empty() {
874            Some(!inner.account_tree.contains_account_id_prefix_in_latest(account_id.prefix()))
875        } else {
876            None
877        };
878
879        // Non-unique account Id prefixes for new accounts are not allowed.
880        if let Some(false) = new_account_id_prefix_is_unique {
881            return Ok(TransactionInputs {
882                new_account_id_prefix_is_unique,
883                ..Default::default()
884            });
885        }
886
887        let nullifiers = nullifiers
888            .iter()
889            .map(|nullifier| NullifierInfo {
890                nullifier: *nullifier,
891                block_num: inner.nullifier_tree.get_block_num(nullifier).unwrap_or_default(),
892            })
893            .collect();
894
895        let found_unauthenticated_notes = self
896            .db
897            .select_notes_by_commitment(unauthenticated_note_commitments)
898            .await?
899            .into_iter()
900            .map(|note| note.note_commitment)
901            .collect();
902
903        Ok(TransactionInputs {
904            account_commitment,
905            nullifiers,
906            found_unauthenticated_notes,
907            new_account_id_prefix_is_unique,
908        })
909    }
910
911    /// Returns details for public (on-chain) account.
912    pub async fn get_account_details(&self, id: AccountId) -> Result<AccountInfo, DatabaseError> {
913        self.db.select_account(id).await
914    }
915
916    /// Returns details for public (on-chain) network accounts.
917    pub async fn get_network_account_details_by_prefix(
918        &self,
919        id_prefix: u32,
920    ) -> Result<Option<AccountInfo>, DatabaseError> {
921        self.db.select_network_account_by_prefix(id_prefix).await
922    }
923
924    /// Returns the respective account proof with optional details, such as asset and storage
925    /// entries.
926    ///
927    /// Note: The `block_num` parameter in the request is currently ignored and will always
928    /// return the current state. Historical block support will be implemented in a future update.
929    #[allow(clippy::too_many_lines)]
930    pub async fn get_account_proof(
931        &self,
932        account_request: AccountProofRequest,
933    ) -> Result<AccountProofResponse, DatabaseError> {
934        let AccountProofRequest { block_num, account_id, details } = account_request;
935        let _ = block_num.ok_or_else(|| {
936            DatabaseError::NotImplemented(
937                "Handling of historical/past block numbers is not implemented yet".to_owned(),
938            )
939        });
940
941        // Lock inner state for the whole operation. We need to hold this lock to prevent the
942        // database, account tree and latest block number from changing during the operation,
943        // because changing one of them would lead to inconsistent state.
944        let inner_state = self.inner.read().await;
945
946        let block_num = inner_state.account_tree.block_number_latest();
947        let witness = inner_state.account_tree.open_latest(account_id);
948
949        let account_details = if let Some(AccountDetailRequest {
950            code_commitment,
951            asset_vault_commitment,
952            storage_requests,
953        }) = details
954        {
955            let account_info = self.db.select_account(account_id).await?;
956
957            // if we get a query for a _private_ account _with_ details requested, we'll error out
958            let Some(account) = account_info.details else {
959                return Err(DatabaseError::AccountNotPublic(account_id));
960            };
961
962            let storage_header = account.storage().to_header();
963
964            let mut storage_map_details =
965                Vec::<AccountStorageMapDetails>::with_capacity(storage_requests.len());
966
967            for StorageMapRequest { slot_index, slot_data } in storage_requests {
968                let Some(StorageSlot::Map(storage_map)) =
969                    account.storage().slots().get(slot_index as usize)
970                else {
971                    return Err(AccountError::StorageSlotNotMap(slot_index).into());
972                };
973                let details = AccountStorageMapDetails::new(slot_index, slot_data, storage_map);
974                storage_map_details.push(details);
975            }
976
977            // Only include unknown account code blobs, which is equal to a account code digest
978            // mismatch. If `None` was requested, don't return any.
979            let account_code = code_commitment
980                .is_some_and(|code_commitment| code_commitment != account.code().commitment())
981                .then(|| account.code().to_bytes());
982
983            // storage details
984            let storage_details = AccountStorageDetails {
985                header: storage_header,
986                map_details: storage_map_details,
987            };
988
989            // Handle vault details based on the `asset_vault_commitment`.
990            // Similar to `code_commitment`, if the provided commitment matches, we don't return
991            // vault data. If no commitment is provided or it doesn't match, we return
992            // the vault data. If the number of vault contained assets are exceeding a
993            // limit, we signal this back in the response and the user must handle that
994            // in follow-up request.
995            let vault_details = match asset_vault_commitment {
996                Some(commitment) if commitment == account.vault().root() => {
997                    // The client already has the correct vault data
998                    AccountVaultDetails::empty()
999                },
1000                Some(_) => {
1001                    // The commitment doesn't match, so return vault data
1002                    AccountVaultDetails::new(account.vault())
1003                },
1004                None => {
1005                    // No commitment provided, so don't return vault data
1006                    AccountVaultDetails::empty()
1007                },
1008            };
1009
1010            Some(AccountDetails {
1011                account_header: AccountHeader::from(account),
1012                account_code,
1013                vault_details,
1014                storage_details,
1015            })
1016        } else {
1017            None
1018        };
1019
1020        let response = AccountProofResponse {
1021            block_num,
1022            witness,
1023            details: account_details,
1024        };
1025
1026        Ok(response)
1027    }
1028
1029    /// Returns storage map values for syncing within a block range.
1030    pub(crate) async fn get_storage_map_sync_values(
1031        &self,
1032        account_id: AccountId,
1033        block_range: RangeInclusive<BlockNumber>,
1034    ) -> Result<StorageMapValuesPage, DatabaseError> {
1035        self.db.select_storage_map_sync_values(account_id, block_range).await
1036    }
1037
1038    /// Loads a block from the block store. Return `Ok(None)` if the block is not found.
1039    pub async fn load_block(
1040        &self,
1041        block_num: BlockNumber,
1042    ) -> Result<Option<Vec<u8>>, DatabaseError> {
1043        if block_num > self.latest_block_num().await {
1044            return Ok(None);
1045        }
1046        self.block_store.load_block(block_num).await.map_err(Into::into)
1047    }
1048
1049    /// Returns the latest block number.
1050    pub async fn latest_block_num(&self) -> BlockNumber {
1051        self.inner.read().await.latest_block_num()
1052    }
1053
1054    /// Runs database optimization.
1055    pub async fn optimize_db(&self) -> Result<(), DatabaseError> {
1056        self.db.optimize().await
1057    }
1058
1059    /// Returns account vault updates for specified account within a block range.
1060    pub async fn sync_account_vault(
1061        &self,
1062        account_id: AccountId,
1063        block_range: RangeInclusive<BlockNumber>,
1064    ) -> Result<(BlockNumber, Vec<AccountVaultValue>), DatabaseError> {
1065        self.db.get_account_vault_sync(account_id, block_range).await
1066    }
1067
1068    /// Returns the unprocessed network notes, along with the next pagination token.
1069    pub async fn get_unconsumed_network_notes(
1070        &self,
1071        page: Page,
1072    ) -> Result<(Vec<NoteRecord>, Page), DatabaseError> {
1073        self.db.select_unconsumed_network_notes(page).await
1074    }
1075
1076    /// Returns the network notes for an account that are unconsumed by a specified block number,
1077    /// along with the next pagination token.
1078    pub async fn get_unconsumed_network_notes_for_account(
1079        &self,
1080        network_account_id_prefix: NetworkAccountPrefix,
1081        block_num: BlockNumber,
1082        page: Page,
1083    ) -> Result<(Vec<NoteRecord>, Page), DatabaseError> {
1084        self.db
1085            .select_unconsumed_network_notes_for_account(network_account_id_prefix, block_num, page)
1086            .await
1087    }
1088
1089    /// Returns the script for a note by its root.
1090    pub async fn get_note_script_by_root(
1091        &self,
1092        root: Word,
1093    ) -> Result<Option<NoteScript>, DatabaseError> {
1094        self.db.select_note_script_by_root(root).await
1095    }
1096
1097    /// Returns the complete transaction records for the specified accounts within the specified
1098    /// block range, including state commitments and note IDs.
1099    pub async fn sync_transactions(
1100        &self,
1101        account_ids: Vec<AccountId>,
1102        block_range: RangeInclusive<BlockNumber>,
1103    ) -> Result<(BlockNumber, Vec<crate::db::TransactionRecord>), DatabaseError> {
1104        self.db.select_transactions_records(account_ids, block_range).await
1105    }
1106}
1107
1108// UTILITIES
1109// ================================================================================================
1110
1111#[instrument(level = "info", target = COMPONENT, skip_all)]
1112async fn load_nullifier_tree(db: &mut Db) -> Result<NullifierTree, StateInitializationError> {
1113    let nullifiers = db.select_all_nullifiers().await?;
1114
1115    NullifierTree::with_entries(nullifiers.into_iter().map(|info| (info.nullifier, info.block_num)))
1116        .map_err(StateInitializationError::FailedToCreateNullifierTree)
1117}
1118
1119#[instrument(level = "info", target = COMPONENT, skip_all)]
1120async fn load_mmr(db: &mut Db) -> Result<Mmr, StateInitializationError> {
1121    let block_commitments: Vec<Word> = db
1122        .select_all_block_headers()
1123        .await?
1124        .iter()
1125        .map(BlockHeader::commitment)
1126        .collect();
1127
1128    Ok(block_commitments.into())
1129}
1130
1131#[instrument(level = "info", target = COMPONENT, skip_all)]
1132async fn load_account_tree(
1133    db: &mut Db,
1134    block_number: BlockNumber,
1135) -> Result<AccountTreeWithHistory<InMemoryAccountTree>, StateInitializationError> {
1136    let account_data = db.select_all_account_commitments().await?.into_iter().collect::<Vec<_>>();
1137
1138    // Convert account_data to use account_id_to_smt_key
1139    let smt_entries = account_data
1140        .into_iter()
1141        .map(|(id, commitment)| (account_id_to_smt_key(id), commitment));
1142
1143    let smt = LargeSmt::with_entries(MemoryStorage::default(), smt_entries)
1144        .expect("Failed to create LargeSmt from database account data");
1145
1146    let account_tree = AccountTree::new(smt).expect("Failed to create AccountTree");
1147    Ok(AccountTreeWithHistory::new(account_tree, block_number))
1148}