miden_node_store/
state.rs

1//! Abstraction to synchronize state modifications.
2//!
3//! The [State] provides data access and modifications methods, its main purpose is to ensure that
4//! data is atomically written, and that reads are consistent.
5
6use std::collections::{BTreeMap, BTreeSet, HashSet};
7use std::ops::RangeInclusive;
8use std::path::Path;
9use std::sync::Arc;
10
11use miden_node_proto::domain::account::{
12    AccountDetailRequest,
13    AccountDetails,
14    AccountInfo,
15    AccountProofRequest,
16    AccountProofResponse,
17    AccountStorageDetails,
18    AccountStorageMapDetails,
19    AccountVaultDetails,
20    NetworkAccountPrefix,
21    StorageMapRequest,
22};
23use miden_node_proto::domain::batch::BatchInputs;
24use miden_node_utils::ErrorReport;
25use miden_node_utils::formatting::format_array;
26use miden_objects::account::{AccountHeader, AccountId, StorageSlot};
27use miden_objects::block::account_tree::{AccountTree, account_id_to_smt_key};
28use miden_objects::block::{
29    AccountWitness,
30    BlockHeader,
31    BlockInputs,
32    BlockNumber,
33    Blockchain,
34    NullifierTree,
35    NullifierWitness,
36    ProvenBlock,
37};
38use miden_objects::crypto::merkle::{
39    Forest,
40    LargeSmt,
41    MemoryStorage,
42    Mmr,
43    MmrDelta,
44    MmrPeaks,
45    MmrProof,
46    PartialMmr,
47    SmtProof,
48    SmtStorage,
49};
50use miden_objects::note::{NoteDetails, NoteId, NoteScript, Nullifier};
51use miden_objects::transaction::{OutputNote, PartialBlockchain};
52use miden_objects::utils::Serializable;
53use miden_objects::{AccountError, Word};
54use tokio::sync::{Mutex, RwLock, oneshot};
55use tracing::{info, info_span, instrument};
56
57use crate::blocks::BlockStore;
58use crate::db::models::Page;
59use crate::db::models::queries::StorageMapValuesPage;
60use crate::db::{
61    AccountVaultValue,
62    Db,
63    NoteRecord,
64    NoteSyncUpdate,
65    NullifierInfo,
66    StateSyncUpdate,
67};
68use crate::errors::{
69    ApplyBlockError,
70    DatabaseError,
71    GetBatchInputsError,
72    GetBlockHeaderError,
73    GetBlockInputsError,
74    GetCurrentBlockchainDataError,
75    InvalidBlockError,
76    NoteSyncError,
77    StateInitializationError,
78    StateSyncError,
79};
80use crate::{AccountTreeWithHistory, COMPONENT, DataDirectory, InMemoryAccountTree};
81
82// STRUCTURES
83// ================================================================================================
84
85#[derive(Debug, Default)]
86pub struct TransactionInputs {
87    pub account_commitment: Word,
88    pub nullifiers: Vec<NullifierInfo>,
89    pub found_unauthenticated_notes: HashSet<Word>,
90    pub new_account_id_prefix_is_unique: Option<bool>,
91}
92
93/// Container for state that needs to be updated atomically.
94struct InnerState<S = MemoryStorage>
95where
96    S: SmtStorage,
97{
98    nullifier_tree: NullifierTree,
99    blockchain: Blockchain,
100    account_tree: AccountTreeWithHistory<AccountTree<LargeSmt<S>>>,
101}
102
103impl<S> InnerState<S>
104where
105    S: SmtStorage,
106{
107    /// Returns the latest block number.
108    fn latest_block_num(&self) -> BlockNumber {
109        self.blockchain
110            .chain_tip()
111            .expect("chain should always have at least the genesis block")
112    }
113}
114
115/// The rollup state
116pub struct State {
117    /// The database which stores block headers, nullifiers, notes, and the latest states of
118    /// accounts.
119    db: Arc<Db>,
120
121    /// The block store which stores full block contents for all blocks.
122    block_store: Arc<BlockStore>,
123
124    /// Read-write lock used to prevent writing to a structure while it is being used.
125    ///
126    /// The lock is writer-preferring, meaning the writer won't be starved.
127    inner: RwLock<InnerState>,
128
129    /// To allow readers to access the tree data while an update in being performed, and prevent
130    /// TOCTOU issues, there must be no concurrent writers. This locks to serialize the writers.
131    writer: Mutex<()>,
132}
133
134impl State {
135    /// Loads the state from the `db`.
136    #[instrument(target = COMPONENT, skip_all)]
137    pub async fn load(data_path: &Path) -> Result<Self, StateInitializationError> {
138        let data_directory = DataDirectory::load(data_path.to_path_buf())
139            .map_err(StateInitializationError::DataDirectoryLoadError)?;
140
141        let block_store = Arc::new(
142            BlockStore::load(data_directory.block_store_dir())
143                .map_err(StateInitializationError::BlockStoreLoadError)?,
144        );
145
146        let database_filepath = data_directory.database_path();
147        let mut db = Db::load(database_filepath.clone())
148            .await
149            .map_err(StateInitializationError::DatabaseLoadError)?;
150
151        let chain_mmr = load_mmr(&mut db).await?;
152        let block_headers = db.select_all_block_headers().await?;
153        // TODO: Account tree loading synchronization
154        // Currently `load_account_tree` loads all account commitments from the DB. This could
155        // potentially lead to inconsistency if the DB contains account states from blocks beyond
156        // `latest_block_num`, though in practice the DB writes are transactional and this
157        // should not occur.
158        let latest_block_num = block_headers
159            .last()
160            .map_or(BlockNumber::GENESIS, miden_objects::block::BlockHeader::block_num);
161        let account_tree = load_account_tree(&mut db, latest_block_num).await?;
162        let nullifier_tree = load_nullifier_tree(&mut db).await?;
163
164        let inner = RwLock::new(InnerState {
165            nullifier_tree,
166            // SAFETY: We assume the loaded MMR is valid and does not have more than u32::MAX
167            // entries.
168            blockchain: Blockchain::from_mmr_unchecked(chain_mmr),
169            account_tree,
170        });
171
172        let writer = Mutex::new(());
173        let db = Arc::new(db);
174
175        Ok(Self { db, block_store, inner, writer })
176    }
177
178    /// Apply changes of a new block to the DB and in-memory data structures.
179    ///
180    /// ## Note on state consistency
181    ///
182    /// The server contains in-memory representations of the existing trees, the in-memory
183    /// representation must be kept consistent with the committed data, this is necessary so to
184    /// provide consistent results for all endpoints. In order to achieve consistency, the
185    /// following steps are used:
186    ///
187    /// - the request data is validated, prior to starting any modifications.
188    /// - block is being saved into the store in parallel with updating the DB, but before
189    ///   committing. This block is considered as candidate and not yet available for reading
190    ///   because the latest block pointer is not updated yet.
191    /// - a transaction is open in the DB and the writes are started.
192    /// - while the transaction is not committed, concurrent reads are allowed, both the DB and the
193    ///   in-memory representations, which are consistent at this stage.
194    /// - prior to committing the changes to the DB, an exclusive lock to the in-memory data is
195    ///   acquired, preventing concurrent reads to the in-memory data, since that will be
196    ///   out-of-sync w.r.t. the DB.
197    /// - the DB transaction is committed, and requests that read only from the DB can proceed to
198    ///   use the fresh data.
199    /// - the in-memory structures are updated, including the latest block pointer and the lock is
200    ///   released.
201    // TODO: This span is logged in a root span, we should connect it to the parent span.
202    #[allow(clippy::too_many_lines)]
203    #[instrument(target = COMPONENT, skip_all, err)]
204    pub async fn apply_block(&self, block: ProvenBlock) -> Result<(), ApplyBlockError> {
205        let _lock = self.writer.try_lock().map_err(|_| ApplyBlockError::ConcurrentWrite)?;
206
207        let header = block.header();
208
209        let tx_commitment = block.transactions().commitment();
210
211        if header.tx_commitment() != tx_commitment {
212            return Err(InvalidBlockError::InvalidBlockTxCommitment {
213                expected: tx_commitment,
214                actual: header.tx_commitment(),
215            }
216            .into());
217        }
218
219        let block_num = header.block_num();
220        let block_commitment = block.commitment();
221
222        // ensures the right block header is being processed
223        let prev_block = self
224            .db
225            .select_block_header_by_block_num(None)
226            .await?
227            .ok_or(ApplyBlockError::DbBlockHeaderEmpty)?;
228
229        let expected_block_num = prev_block.block_num().child();
230        if block_num != expected_block_num {
231            return Err(InvalidBlockError::NewBlockInvalidBlockNum {
232                expected: expected_block_num,
233                submitted: block_num,
234            }
235            .into());
236        }
237        if header.prev_block_commitment() != prev_block.commitment() {
238            return Err(InvalidBlockError::NewBlockInvalidPrevCommitment.into());
239        }
240
241        let block_data = block.to_bytes();
242
243        // Save the block to the block store. In a case of a rolled-back DB transaction, the
244        // in-memory state will be unchanged, but the block might still be written into the
245        // block store. Thus, such block should be considered as block candidates, but not
246        // finalized blocks. So we should check for the latest block when getting block from
247        // the store.
248        let store = Arc::clone(&self.block_store);
249        let block_save_task =
250            tokio::spawn(async move { store.save_block(block_num, &block_data).await });
251
252        // scope to read in-memory data, compute mutations required for updating account
253        // and nullifier trees, and validate the request
254        let (
255            nullifier_tree_old_root,
256            nullifier_tree_update,
257            account_tree_old_root,
258            account_tree_update,
259        ) = {
260            let inner = self.inner.read().await;
261
262            let _span = info_span!(target: COMPONENT, "update_in_memory_structs").entered();
263
264            // nullifiers can be produced only once
265            let duplicate_nullifiers: Vec<_> = block
266                .created_nullifiers()
267                .iter()
268                .filter(|&n| inner.nullifier_tree.get_block_num(n).is_some())
269                .copied()
270                .collect();
271            if !duplicate_nullifiers.is_empty() {
272                return Err(InvalidBlockError::DuplicatedNullifiers(duplicate_nullifiers).into());
273            }
274
275            // compute updates for the in-memory data structures
276
277            // new_block.chain_root must be equal to the chain MMR root prior to the update
278            let peaks = inner.blockchain.peaks();
279            if peaks.hash_peaks() != header.chain_commitment() {
280                return Err(InvalidBlockError::NewBlockInvalidChainCommitment.into());
281            }
282
283            // compute update for nullifier tree
284            let nullifier_tree_update = inner
285                .nullifier_tree
286                .compute_mutations(
287                    block.created_nullifiers().iter().map(|nullifier| (*nullifier, block_num)),
288                )
289                .map_err(InvalidBlockError::NewBlockNullifierAlreadySpent)?;
290
291            if nullifier_tree_update.as_mutation_set().root() != header.nullifier_root() {
292                return Err(InvalidBlockError::NewBlockInvalidNullifierRoot.into());
293            }
294
295            // compute update for account tree
296            let account_tree_update = inner
297                .account_tree
298                .compute_mutations(
299                    block
300                        .updated_accounts()
301                        .iter()
302                        .map(|update| (update.account_id(), update.final_state_commitment())),
303                )
304                .map_err(|e| match e {
305                    crate::HistoricalError::AccountTreeError(err) => {
306                        InvalidBlockError::NewBlockDuplicateAccountIdPrefix(err)
307                    },
308                    crate::HistoricalError::MerkleError(_) => {
309                        panic!("Unexpected MerkleError during account tree mutation computation")
310                    },
311                })?;
312
313            if account_tree_update.as_mutation_set().root() != header.account_root() {
314                return Err(InvalidBlockError::NewBlockInvalidAccountRoot.into());
315            }
316
317            (
318                inner.nullifier_tree.root(),
319                nullifier_tree_update,
320                inner.account_tree.root_latest(),
321                account_tree_update,
322            )
323        };
324
325        // build note tree
326        let note_tree = block.build_output_note_tree();
327        if note_tree.root() != header.note_root() {
328            return Err(InvalidBlockError::NewBlockInvalidNoteRoot.into());
329        }
330
331        let notes = block
332            .output_notes()
333            .map(|(note_index, note)| {
334                let (details, nullifier) = match note {
335                    OutputNote::Full(note) => {
336                        (Some(NoteDetails::from(note)), Some(note.nullifier()))
337                    },
338                    OutputNote::Header(_) => (None, None),
339                    note @ OutputNote::Partial(_) => {
340                        return Err(InvalidBlockError::InvalidOutputNoteType(Box::new(
341                            note.clone(),
342                        )));
343                    },
344                };
345
346                let inclusion_path = note_tree.open(note_index);
347
348                let note_record = NoteRecord {
349                    block_num,
350                    note_index,
351                    note_id: note.id().into(),
352                    note_commitment: note.commitment(),
353                    metadata: *note.metadata(),
354                    details,
355                    inclusion_path,
356                };
357
358                Ok((note_record, nullifier))
359            })
360            .collect::<Result<Vec<_>, InvalidBlockError>>()?;
361
362        // Signals the transaction is ready to be committed, and the write lock can be acquired
363        let (allow_acquire, acquired_allowed) = oneshot::channel::<()>();
364        // Signals the write lock has been acquired, and the transaction can be committed
365        let (inform_acquire_done, acquire_done) = oneshot::channel::<()>();
366
367        // The DB and in-memory state updates need to be synchronized and are partially
368        // overlapping. Namely, the DB transaction only proceeds after this task acquires the
369        // in-memory write lock. This requires the DB update to run concurrently, so a new task is
370        // spawned.
371        let db = Arc::clone(&self.db);
372        let db_update_task =
373            tokio::spawn(
374                async move { db.apply_block(allow_acquire, acquire_done, block, notes).await },
375            );
376
377        // Wait for the message from the DB update task, that we ready to commit the DB transaction
378        acquired_allowed.await.map_err(ApplyBlockError::ClosedChannel)?;
379
380        // Awaiting the block saving task to complete without errors
381        block_save_task.await??;
382
383        // Scope to update the in-memory data
384        {
385            // We need to hold the write lock here to prevent inconsistency between the in-memory
386            // state and the DB state. Thus, we need to wait for the DB update task to complete
387            // successfully.
388            let mut inner = self.inner.write().await;
389
390            // We need to check that neither the nullifier tree nor the account tree have changed
391            // while we were waiting for the DB preparation task to complete. If either of them
392            // did change, we do not proceed with in-memory and database updates, since it may
393            // lead to an inconsistent state.
394            if inner.nullifier_tree.root() != nullifier_tree_old_root
395                || inner.account_tree.root_latest() != account_tree_old_root
396            {
397                return Err(ApplyBlockError::ConcurrentWrite);
398            }
399
400            // Notify the DB update task that the write lock has been acquired, so it can commit
401            // the DB transaction
402            inform_acquire_done
403                .send(())
404                .map_err(|_| ApplyBlockError::DbUpdateTaskFailed("Receiver was dropped".into()))?;
405
406            // TODO: shutdown #91
407            // Await for successful commit of the DB transaction. If the commit fails, we mustn't
408            // change in-memory state, so we return a block applying error and don't proceed with
409            // in-memory updates.
410            db_update_task
411                .await?
412                .map_err(|err| ApplyBlockError::DbUpdateTaskFailed(err.as_report()))?;
413
414            // Update the in-memory data structures after successful commit of the DB transaction
415            inner
416                .nullifier_tree
417                .apply_mutations(nullifier_tree_update)
418                .expect("Unreachable: old nullifier tree root must be checked before this step");
419            inner
420                .account_tree
421                .apply_mutations(account_tree_update)
422                .expect("Unreachable: old account tree root must be checked before this step");
423            inner.blockchain.push(block_commitment);
424        }
425
426        info!(%block_commitment, block_num = block_num.as_u32(), COMPONENT, "apply_block successful");
427
428        Ok(())
429    }
430
431    /// Queries a [BlockHeader] from the database, and returns it alongside its inclusion proof.
432    ///
433    /// If [None] is given as the value of `block_num`, the data for the latest [BlockHeader] is
434    /// returned.
435    #[instrument(level = "debug", target = COMPONENT, skip_all, ret(level = "debug"), err)]
436    pub async fn get_block_header(
437        &self,
438        block_num: Option<BlockNumber>,
439        include_mmr_proof: bool,
440    ) -> Result<(Option<BlockHeader>, Option<MmrProof>), GetBlockHeaderError> {
441        let block_header = self.db.select_block_header_by_block_num(block_num).await?;
442        if let Some(header) = block_header {
443            let mmr_proof = if include_mmr_proof {
444                let inner = self.inner.read().await;
445                let mmr_proof = inner.blockchain.open(header.block_num())?;
446                Some(mmr_proof)
447            } else {
448                None
449            };
450            Ok((Some(header), mmr_proof))
451        } else {
452            Ok((None, None))
453        }
454    }
455
456    pub async fn sync_nullifiers(
457        &self,
458        prefix_len: u32,
459        nullifier_prefixes: Vec<u32>,
460        block_range: RangeInclusive<BlockNumber>,
461    ) -> Result<(Vec<NullifierInfo>, BlockNumber), DatabaseError> {
462        self.db
463            .select_nullifiers_by_prefix(prefix_len, nullifier_prefixes, block_range)
464            .await
465    }
466
467    /// Generates membership proofs for each one of the `nullifiers` against the latest nullifier
468    /// tree.
469    ///
470    /// Note: these proofs are invalidated once the nullifier tree is modified, i.e. on a new block.
471    #[instrument(level = "debug", target = COMPONENT, skip_all, ret)]
472    pub async fn check_nullifiers(&self, nullifiers: &[Nullifier]) -> Vec<SmtProof> {
473        let inner = self.inner.read().await;
474        nullifiers
475            .iter()
476            .map(|n| inner.nullifier_tree.open(n))
477            .map(NullifierWitness::into_proof)
478            .collect()
479    }
480
481    /// Queries a list of notes from the database.
482    ///
483    /// If the provided list of [`NoteId`] given is empty or no note matches the provided
484    /// [`NoteId`] an empty list is returned.
485    pub async fn get_notes_by_id(
486        &self,
487        note_ids: Vec<NoteId>,
488    ) -> Result<Vec<NoteRecord>, DatabaseError> {
489        self.db.select_notes_by_id(note_ids).await
490    }
491
492    /// If the input block number is the current chain tip, `None` is returned.
493    /// Otherwise, gets the current chain tip's block header with its corresponding MMR peaks.
494    pub async fn get_current_blockchain_data(
495        &self,
496        block_num: Option<BlockNumber>,
497    ) -> Result<Option<(BlockHeader, MmrPeaks)>, GetCurrentBlockchainDataError> {
498        let blockchain = &self.inner.read().await.blockchain;
499        if let Some(number) = block_num
500            && number == self.latest_block_num().await
501        {
502            return Ok(None);
503        }
504
505        // SAFETY: `select_block_header_by_block_num` will always return `Some(chain_tip_header)`
506        // when `None` is passed
507        let block_header: BlockHeader = self
508            .db
509            .select_block_header_by_block_num(None)
510            .await
511            .map_err(GetCurrentBlockchainDataError::ErrorRetrievingBlockHeader)?
512            .unwrap();
513        let peaks = blockchain
514            .peaks_at(block_header.block_num())
515            .map_err(GetCurrentBlockchainDataError::InvalidPeaks)?;
516
517        Ok(Some((block_header, peaks)))
518    }
519
520    /// Fetches the inputs for a transaction batch from the database.
521    ///
522    /// ## Inputs
523    ///
524    /// The function takes as input:
525    /// - The tx reference blocks are the set of blocks referenced by transactions in the batch.
526    /// - The unauthenticated note commitments are the set of commitments of unauthenticated notes
527    ///   consumed by all transactions in the batch. For these notes, we attempt to find inclusion
528    ///   proofs. Not all notes will exist in the DB necessarily, as some notes can be created and
529    ///   consumed within the same batch.
530    ///
531    /// ## Outputs
532    ///
533    /// The function will return:
534    /// - A block inclusion proof for all tx reference blocks and for all blocks which are
535    ///   referenced by a note inclusion proof.
536    /// - Note inclusion proofs for all notes that were found in the DB.
537    /// - The block header that the batch should reference, i.e. the latest known block.
538    pub async fn get_batch_inputs(
539        &self,
540        tx_reference_blocks: BTreeSet<BlockNumber>,
541        unauthenticated_note_commitments: BTreeSet<Word>,
542    ) -> Result<BatchInputs, GetBatchInputsError> {
543        if tx_reference_blocks.is_empty() {
544            return Err(GetBatchInputsError::TransactionBlockReferencesEmpty);
545        }
546
547        // First we grab note inclusion proofs for the known notes. These proofs only
548        // prove that the note was included in a given block. We then also need to prove that
549        // each of those blocks is included in the chain.
550        let note_proofs = self
551            .db
552            .select_note_inclusion_proofs(unauthenticated_note_commitments)
553            .await
554            .map_err(GetBatchInputsError::SelectNoteInclusionProofError)?;
555
556        // The set of blocks that the notes are included in.
557        let note_blocks = note_proofs.values().map(|proof| proof.location().block_num());
558
559        // Collect all blocks we need to query without duplicates, which is:
560        // - all blocks for which we need to prove note inclusion.
561        // - all blocks referenced by transactions in the batch.
562        let mut blocks: BTreeSet<BlockNumber> = tx_reference_blocks;
563        blocks.extend(note_blocks);
564
565        // Scoped block to automatically drop the read lock guard as soon as we're done.
566        // We also avoid accessing the db in the block as this would delay dropping the guard.
567        let (batch_reference_block, partial_mmr) = {
568            let inner_state = self.inner.read().await;
569
570            let latest_block_num = inner_state.latest_block_num();
571
572            let highest_block_num =
573                *blocks.last().expect("we should have checked for empty block references");
574            if highest_block_num > latest_block_num {
575                return Err(GetBatchInputsError::UnknownTransactionBlockReference {
576                    highest_block_num,
577                    latest_block_num,
578                });
579            }
580
581            // Remove the latest block from the to-be-tracked blocks as it will be the reference
582            // block for the batch itself and thus added to the MMR within the batch kernel, so
583            // there is no need to prove its inclusion.
584            blocks.remove(&latest_block_num);
585
586            // SAFETY:
587            // - The latest block num was retrieved from the inner blockchain from which we will
588            //   also retrieve the proofs, so it is guaranteed to exist in that chain.
589            // - We have checked that no block number in the blocks set is greater than latest block
590            //   number *and* latest block num was removed from the set. Therefore only block
591            //   numbers smaller than latest block num remain in the set. Therefore all the block
592            //   numbers are guaranteed to exist in the chain state at latest block num.
593            let partial_mmr = inner_state
594                .blockchain
595                .partial_mmr_from_blocks(&blocks, latest_block_num)
596                .expect("latest block num should exist and all blocks in set should be < than latest block");
597
598            (latest_block_num, partial_mmr)
599        };
600
601        // Fetch the reference block of the batch as part of this query, so we can avoid looking it
602        // up in a separate DB access.
603        let mut headers = self
604            .db
605            .select_block_headers(blocks.into_iter().chain(std::iter::once(batch_reference_block)))
606            .await
607            .map_err(GetBatchInputsError::SelectBlockHeaderError)?;
608
609        // Find and remove the batch reference block as we don't want to add it to the chain MMR.
610        let header_index = headers
611            .iter()
612            .enumerate()
613            .find_map(|(index, header)| {
614                (header.block_num() == batch_reference_block).then_some(index)
615            })
616            .expect("DB should have returned the header of the batch reference block");
617
618        // The order doesn't matter for PartialBlockchain::new, so swap remove is fine.
619        let batch_reference_block_header = headers.swap_remove(header_index);
620
621        // SAFETY: This should not error because:
622        // - we're passing exactly the block headers that we've added to the partial MMR,
623        // - so none of the block headers block numbers should exceed the chain length of the
624        //   partial MMR,
625        // - and we've added blocks to a BTreeSet, so there can be no duplicates.
626        //
627        // We construct headers and partial MMR in concert, so they are consistent. This is why we
628        // can call the unchecked constructor.
629        let partial_block_chain = PartialBlockchain::new_unchecked(partial_mmr, headers)
630            .expect("partial mmr and block headers should be consistent");
631
632        Ok(BatchInputs {
633            batch_reference_block_header,
634            note_proofs,
635            partial_block_chain,
636        })
637    }
638
639    /// Loads data to synchronize a client.
640    ///
641    /// The client's request contains a list of tag prefixes, this method will return the first
642    /// block with a matching tag, or the chain tip. All the other values are filter based on this
643    /// block range.
644    ///
645    /// # Arguments
646    ///
647    /// - `block_num`: The last block *known* by the client, updates start from the next block.
648    /// - `account_ids`: Include the account's commitment if their _last change_ was in the result's
649    ///   block range.
650    /// - `note_tags`: The tags the client is interested in, result is restricted to the first block
651    ///   with any matches tags.
652    #[instrument(level = "debug", target = COMPONENT, skip_all, ret(level = "debug"), err)]
653    pub async fn sync_state(
654        &self,
655        block_num: BlockNumber,
656        account_ids: Vec<AccountId>,
657        note_tags: Vec<u32>,
658    ) -> Result<(StateSyncUpdate, MmrDelta), StateSyncError> {
659        let inner = self.inner.read().await;
660
661        let state_sync = self.db.get_state_sync(block_num, account_ids, note_tags).await?;
662
663        let delta = if block_num == state_sync.block_header.block_num() {
664            // The client is in sync with the chain tip.
665            MmrDelta {
666                forest: Forest::new(block_num.as_usize()),
667                data: vec![],
668            }
669        } else {
670            // Important notes about the boundary conditions:
671            //
672            // - The Mmr forest is 1-indexed whereas the block number is 0-indexed. The Mmr root
673            // contained in the block header always lag behind by one block, this is because the Mmr
674            // leaves are hashes of block headers, and we can't have self-referential hashes. These
675            // two points cancel out and don't require adjusting.
676            // - Mmr::get_delta is inclusive, whereas the sync_state request block_num is defined to
677            //   be
678            // exclusive, so the from_forest has to be adjusted with a +1
679            let from_forest = (block_num + 1).as_usize();
680            let to_forest = state_sync.block_header.block_num().as_usize();
681            inner
682                .blockchain
683                .as_mmr()
684                .get_delta(Forest::new(from_forest), Forest::new(to_forest))
685                .map_err(StateSyncError::FailedToBuildMmrDelta)?
686        };
687
688        Ok((state_sync, delta))
689    }
690
691    /// Loads data to synchronize a client's notes.
692    ///
693    /// The client's request contains a list of tags, this method will return the first
694    /// block with a matching tag, or the chain tip. All the other values are filter based on this
695    /// block range.
696    ///
697    /// # Arguments
698    ///
699    /// - `note_tags`: The tags the client is interested in, resulting notes are restricted to the
700    ///   first block containing a matching note.
701    /// - `block_range`: The range of blocks from which to synchronize notes.
702    #[instrument(level = "debug", target = COMPONENT, skip_all, ret(level = "debug"), err)]
703    pub async fn sync_notes(
704        &self,
705        note_tags: Vec<u32>,
706        block_range: RangeInclusive<BlockNumber>,
707    ) -> Result<(NoteSyncUpdate, MmrProof, BlockNumber), NoteSyncError> {
708        let inner = self.inner.read().await;
709
710        let (note_sync, last_included_block) =
711            self.db.get_note_sync(block_range, note_tags).await?;
712
713        let mmr_proof = inner.blockchain.open(note_sync.block_header.block_num())?;
714
715        Ok((note_sync, mmr_proof, last_included_block))
716    }
717
718    /// Returns data needed by the block producer to construct and prove the next block.
719    pub async fn get_block_inputs(
720        &self,
721        account_ids: Vec<AccountId>,
722        nullifiers: Vec<Nullifier>,
723        unauthenticated_note_commitments: BTreeSet<Word>,
724        reference_blocks: BTreeSet<BlockNumber>,
725    ) -> Result<BlockInputs, GetBlockInputsError> {
726        // Get the note inclusion proofs from the DB.
727        // We do this first so we have to acquire the lock to the state just once. There we need the
728        // reference blocks of the note proofs to get their authentication paths in the chain MMR.
729        let unauthenticated_note_proofs = self
730            .db
731            .select_note_inclusion_proofs(unauthenticated_note_commitments)
732            .await
733            .map_err(GetBlockInputsError::SelectNoteInclusionProofError)?;
734
735        // The set of blocks that the notes are included in.
736        let note_proof_reference_blocks =
737            unauthenticated_note_proofs.values().map(|proof| proof.location().block_num());
738
739        // Collect all blocks we need to prove inclusion for, without duplicates.
740        let mut blocks = reference_blocks;
741        blocks.extend(note_proof_reference_blocks);
742
743        let (latest_block_number, account_witnesses, nullifier_witnesses, partial_mmr) =
744            self.get_block_inputs_witnesses(&mut blocks, account_ids, nullifiers).await?;
745
746        // Fetch the block headers for all blocks in the partial MMR plus the latest one which will
747        // be used as the previous block header of the block being built.
748        let mut headers = self
749            .db
750            .select_block_headers(blocks.into_iter().chain(std::iter::once(latest_block_number)))
751            .await
752            .map_err(GetBlockInputsError::SelectBlockHeaderError)?;
753
754        // Find and remove the latest block as we must not add it to the chain MMR, since it is
755        // not yet in the chain.
756        let latest_block_header_index = headers
757            .iter()
758            .enumerate()
759            .find_map(|(index, header)| {
760                (header.block_num() == latest_block_number).then_some(index)
761            })
762            .expect("DB should have returned the header of the latest block header");
763
764        // The order doesn't matter for PartialBlockchain::new, so swap remove is fine.
765        let latest_block_header = headers.swap_remove(latest_block_header_index);
766
767        // SAFETY: This should not error because:
768        // - we're passing exactly the block headers that we've added to the partial MMR,
769        // - so none of the block header's block numbers should exceed the chain length of the
770        //   partial MMR,
771        // - and we've added blocks to a BTreeSet, so there can be no duplicates.
772        //
773        // We construct headers and partial MMR in concert, so they are consistent. This is why we
774        // can call the unchecked constructor.
775        let partial_block_chain = PartialBlockchain::new_unchecked(partial_mmr, headers)
776            .expect("partial mmr and block headers should be consistent");
777
778        Ok(BlockInputs::new(
779            latest_block_header,
780            partial_block_chain,
781            account_witnesses,
782            nullifier_witnesses,
783            unauthenticated_note_proofs,
784        ))
785    }
786
787    /// Get account and nullifier witnesses for the requested account IDs and nullifier as well as
788    /// the [`PartialMmr`] for the given blocks. The MMR won't contain the latest block and its
789    /// number is removed from `blocks` and returned separately.
790    ///
791    /// This method acquires the lock to the inner state and does not access the DB so we release
792    /// the lock asap.
793    async fn get_block_inputs_witnesses(
794        &self,
795        blocks: &mut BTreeSet<BlockNumber>,
796        account_ids: Vec<AccountId>,
797        nullifiers: Vec<Nullifier>,
798    ) -> Result<
799        (
800            BlockNumber,
801            BTreeMap<AccountId, AccountWitness>,
802            BTreeMap<Nullifier, NullifierWitness>,
803            PartialMmr,
804        ),
805        GetBlockInputsError,
806    > {
807        let inner = self.inner.read().await;
808
809        let latest_block_number = inner.latest_block_num();
810
811        // If `blocks` is empty, use the latest block number which will never trigger the error.
812        let highest_block_number = blocks.last().copied().unwrap_or(latest_block_number);
813        if highest_block_number > latest_block_number {
814            return Err(GetBlockInputsError::UnknownBatchBlockReference {
815                highest_block_number,
816                latest_block_number,
817            });
818        }
819
820        // The latest block is not yet in the chain MMR, so we can't (and don't need to) prove its
821        // inclusion in the chain.
822        blocks.remove(&latest_block_number);
823
824        // Fetch the partial MMR at the state of the latest block with authentication paths for the
825        // provided set of blocks.
826        //
827        // SAFETY:
828        // - The latest block num was retrieved from the inner blockchain from which we will also
829        //   retrieve the proofs, so it is guaranteed to exist in that chain.
830        // - We have checked that no block number in the blocks set is greater than latest block
831        //   number *and* latest block num was removed from the set. Therefore only block numbers
832        //   smaller than latest block num remain in the set. Therefore all the block numbers are
833        //   guaranteed to exist in the chain state at latest block num.
834        let partial_mmr =
835            inner.blockchain.partial_mmr_from_blocks(blocks, latest_block_number).expect(
836                "latest block num should exist and all blocks in set should be < than latest block",
837            );
838
839        // Fetch witnesses for all accounts.
840        let account_witnesses = account_ids
841            .iter()
842            .copied()
843            .map(|account_id| (account_id, inner.account_tree.open_latest(account_id)))
844            .collect::<BTreeMap<AccountId, AccountWitness>>();
845
846        // Fetch witnesses for all nullifiers. We don't check whether the nullifiers are spent or
847        // not as this is done as part of proposing the block.
848        let nullifier_witnesses: BTreeMap<Nullifier, NullifierWitness> = nullifiers
849            .iter()
850            .copied()
851            .map(|nullifier| (nullifier, inner.nullifier_tree.open(&nullifier)))
852            .collect();
853
854        Ok((latest_block_number, account_witnesses, nullifier_witnesses, partial_mmr))
855    }
856
857    /// Returns data needed by the block producer to verify transactions validity.
858    #[instrument(target = COMPONENT, skip_all, ret)]
859    pub async fn get_transaction_inputs(
860        &self,
861        account_id: AccountId,
862        nullifiers: &[Nullifier],
863        unauthenticated_note_commitments: Vec<Word>,
864    ) -> Result<TransactionInputs, DatabaseError> {
865        info!(target: COMPONENT, account_id = %account_id.to_string(), nullifiers = %format_array(nullifiers));
866
867        let inner = self.inner.read().await;
868
869        let account_commitment = inner.account_tree.get_latest_commitment(account_id);
870
871        let new_account_id_prefix_is_unique = if account_commitment.is_empty() {
872            Some(!inner.account_tree.contains_account_id_prefix_in_latest(account_id.prefix()))
873        } else {
874            None
875        };
876
877        // Non-unique account Id prefixes for new accounts are not allowed.
878        if let Some(false) = new_account_id_prefix_is_unique {
879            return Ok(TransactionInputs {
880                new_account_id_prefix_is_unique,
881                ..Default::default()
882            });
883        }
884
885        let nullifiers = nullifiers
886            .iter()
887            .map(|nullifier| NullifierInfo {
888                nullifier: *nullifier,
889                block_num: inner.nullifier_tree.get_block_num(nullifier).unwrap_or_default(),
890            })
891            .collect();
892
893        let found_unauthenticated_notes = self
894            .db
895            .select_notes_by_commitment(unauthenticated_note_commitments)
896            .await?
897            .into_iter()
898            .map(|note| note.note_commitment)
899            .collect();
900
901        Ok(TransactionInputs {
902            account_commitment,
903            nullifiers,
904            found_unauthenticated_notes,
905            new_account_id_prefix_is_unique,
906        })
907    }
908
909    /// Returns details for public (on-chain) account.
910    pub async fn get_account_details(&self, id: AccountId) -> Result<AccountInfo, DatabaseError> {
911        self.db.select_account(id).await
912    }
913
914    /// Returns details for public (on-chain) network accounts.
915    pub async fn get_network_account_details_by_prefix(
916        &self,
917        id_prefix: u32,
918    ) -> Result<Option<AccountInfo>, DatabaseError> {
919        self.db.select_network_account_by_prefix(id_prefix).await
920    }
921
922    /// Returns the respective account proof with optional details, such as asset and storage
923    /// entries.
924    ///
925    /// Note: The `block_num` parameter in the request is currently ignored and will always
926    /// return the current state. Historical block support will be implemented in a future update.
927    #[allow(clippy::too_many_lines)]
928    pub async fn get_account_proof(
929        &self,
930        account_request: AccountProofRequest,
931    ) -> Result<AccountProofResponse, DatabaseError> {
932        let AccountProofRequest { block_num, account_id, details } = account_request;
933        let _ = block_num.ok_or_else(|| {
934            DatabaseError::NotImplemented(
935                "Handling of historical/past block numbers is not implemented yet".to_owned(),
936            )
937        });
938
939        // Lock inner state for the whole operation. We need to hold this lock to prevent the
940        // database, account tree and latest block number from changing during the operation,
941        // because changing one of them would lead to inconsistent state.
942        let inner_state = self.inner.read().await;
943
944        let block_num = inner_state.account_tree.block_number_latest();
945        let witness = inner_state.account_tree.open_latest(account_id);
946
947        let account_details = if let Some(AccountDetailRequest {
948            code_commitment,
949            asset_vault_commitment,
950            storage_requests,
951        }) = details
952        {
953            let account_info = self.db.select_account(account_id).await?;
954
955            // if we get a query for a _private_ account _with_ details requested, we'll error out
956            let Some(account) = account_info.details else {
957                return Err(DatabaseError::AccountNotPublic(account_id));
958            };
959
960            let storage_header = account.storage().to_header();
961
962            let mut storage_map_details =
963                Vec::<AccountStorageMapDetails>::with_capacity(storage_requests.len());
964
965            for StorageMapRequest { slot_index, slot_data } in storage_requests {
966                let Some(StorageSlot::Map(storage_map)) =
967                    account.storage().slots().get(slot_index as usize)
968                else {
969                    return Err(AccountError::StorageSlotNotMap(slot_index).into());
970                };
971                let details = AccountStorageMapDetails::new(slot_index, slot_data, storage_map);
972                storage_map_details.push(details);
973            }
974
975            // Only include unknown account code blobs, which is equal to a account code digest
976            // mismatch. If `None` was requested, don't return any.
977            let account_code = code_commitment
978                .is_some_and(|code_commitment| code_commitment != account.code().commitment())
979                .then(|| account.code().to_bytes());
980
981            // storage details
982            let storage_details = AccountStorageDetails {
983                header: storage_header,
984                map_details: storage_map_details,
985            };
986
987            // Handle vault details based on the `asset_vault_commitment`.
988            // Similar to `code_commitment`, if the provided commitment matches, we don't return
989            // vault data. If no commitment is provided or it doesn't match, we return
990            // the vault data. If the number of vault contained assets are exceeding a
991            // limit, we signal this back in the response and the user must handle that
992            // in follow-up request.
993            let vault_details = match asset_vault_commitment {
994                Some(commitment) if commitment == account.vault().root() => {
995                    // The client already has the correct vault data
996                    AccountVaultDetails::empty()
997                },
998                Some(_) => {
999                    // The commitment doesn't match, so return vault data
1000                    AccountVaultDetails::new(account.vault())
1001                },
1002                None => {
1003                    // No commitment provided, so don't return vault data
1004                    AccountVaultDetails::empty()
1005                },
1006            };
1007
1008            Some(AccountDetails {
1009                account_header: AccountHeader::from(account),
1010                account_code,
1011                vault_details,
1012                storage_details,
1013            })
1014        } else {
1015            None
1016        };
1017
1018        let response = AccountProofResponse {
1019            block_num,
1020            witness,
1021            details: account_details,
1022        };
1023
1024        Ok(response)
1025    }
1026
1027    /// Returns storage map values for syncing within a block range.
1028    pub(crate) async fn get_storage_map_sync_values(
1029        &self,
1030        account_id: AccountId,
1031        block_range: RangeInclusive<BlockNumber>,
1032    ) -> Result<StorageMapValuesPage, DatabaseError> {
1033        self.db.select_storage_map_sync_values(account_id, block_range).await
1034    }
1035
1036    /// Loads a block from the block store. Return `Ok(None)` if the block is not found.
1037    pub async fn load_block(
1038        &self,
1039        block_num: BlockNumber,
1040    ) -> Result<Option<Vec<u8>>, DatabaseError> {
1041        if block_num > self.latest_block_num().await {
1042            return Ok(None);
1043        }
1044        self.block_store.load_block(block_num).await.map_err(Into::into)
1045    }
1046
1047    /// Returns the latest block number.
1048    pub async fn latest_block_num(&self) -> BlockNumber {
1049        self.inner.read().await.latest_block_num()
1050    }
1051
1052    /// Runs database optimization.
1053    pub async fn optimize_db(&self) -> Result<(), DatabaseError> {
1054        self.db.optimize().await
1055    }
1056
1057    /// Returns account vault updates for specified account within a block range.
1058    pub async fn sync_account_vault(
1059        &self,
1060        account_id: AccountId,
1061        block_range: RangeInclusive<BlockNumber>,
1062    ) -> Result<(BlockNumber, Vec<AccountVaultValue>), DatabaseError> {
1063        self.db.get_account_vault_sync(account_id, block_range).await
1064    }
1065
1066    /// Returns the unprocessed network notes, along with the next pagination token.
1067    pub async fn get_unconsumed_network_notes(
1068        &self,
1069        page: Page,
1070    ) -> Result<(Vec<NoteRecord>, Page), DatabaseError> {
1071        self.db.select_unconsumed_network_notes(page).await
1072    }
1073
1074    /// Returns the network notes for an account that are unconsumed by a specified block number,
1075    /// along with the next pagination token.
1076    pub async fn get_unconsumed_network_notes_for_account(
1077        &self,
1078        network_account_id_prefix: NetworkAccountPrefix,
1079        block_num: BlockNumber,
1080        page: Page,
1081    ) -> Result<(Vec<NoteRecord>, Page), DatabaseError> {
1082        self.db
1083            .select_unconsumed_network_notes_for_account(network_account_id_prefix, block_num, page)
1084            .await
1085    }
1086
1087    /// Returns the script for a note by its root.
1088    pub async fn get_note_script_by_root(
1089        &self,
1090        root: Word,
1091    ) -> Result<Option<NoteScript>, DatabaseError> {
1092        self.db.select_note_script_by_root(root).await
1093    }
1094
1095    /// Returns the complete transaction records for the specified accounts within the specified
1096    /// block range, including state commitments and note IDs.
1097    pub async fn sync_transactions(
1098        &self,
1099        account_ids: Vec<AccountId>,
1100        block_range: RangeInclusive<BlockNumber>,
1101    ) -> Result<(BlockNumber, Vec<crate::db::TransactionRecord>), DatabaseError> {
1102        self.db.select_transactions_records(account_ids, block_range).await
1103    }
1104}
1105
1106// UTILITIES
1107// ================================================================================================
1108
1109#[instrument(level = "info", target = COMPONENT, skip_all)]
1110async fn load_nullifier_tree(db: &mut Db) -> Result<NullifierTree, StateInitializationError> {
1111    let nullifiers = db.select_all_nullifiers().await?;
1112
1113    NullifierTree::with_entries(nullifiers.into_iter().map(|info| (info.nullifier, info.block_num)))
1114        .map_err(StateInitializationError::FailedToCreateNullifierTree)
1115}
1116
1117#[instrument(level = "info", target = COMPONENT, skip_all)]
1118async fn load_mmr(db: &mut Db) -> Result<Mmr, StateInitializationError> {
1119    let block_commitments: Vec<Word> = db
1120        .select_all_block_headers()
1121        .await?
1122        .iter()
1123        .map(BlockHeader::commitment)
1124        .collect();
1125
1126    Ok(block_commitments.into())
1127}
1128
1129#[instrument(level = "info", target = COMPONENT, skip_all)]
1130async fn load_account_tree(
1131    db: &mut Db,
1132    block_number: BlockNumber,
1133) -> Result<AccountTreeWithHistory<InMemoryAccountTree>, StateInitializationError> {
1134    let account_data = db.select_all_account_commitments().await?.into_iter().collect::<Vec<_>>();
1135
1136    // Convert account_data to use account_id_to_smt_key
1137    let smt_entries = account_data
1138        .into_iter()
1139        .map(|(id, commitment)| (account_id_to_smt_key(id), commitment));
1140
1141    let smt = LargeSmt::with_entries(MemoryStorage::default(), smt_entries)
1142        .expect("Failed to create LargeSmt from database account data");
1143
1144    let account_tree = AccountTree::new(smt).expect("Failed to create AccountTree");
1145    Ok(AccountTreeWithHistory::new(account_tree, block_number))
1146}