miden_node_store/
state.rs

1//! Abstraction to synchronize state modifications.
2//!
3//! The [State] provides data access and modifications methods, its main purpose is to ensure that
4//! data is atomically written, and that reads are consistent.
5
6use std::collections::{BTreeMap, BTreeSet, HashSet};
7use std::ops::RangeInclusive;
8use std::path::Path;
9use std::sync::Arc;
10
11use miden_node_proto::domain::account::{
12    AccountDetailRequest,
13    AccountDetails,
14    AccountInfo,
15    AccountProofRequest,
16    AccountProofResponse,
17    AccountStorageDetails,
18    AccountStorageMapDetails,
19    AccountVaultDetails,
20    NetworkAccountPrefix,
21    StorageMapRequest,
22};
23use miden_node_proto::domain::batch::BatchInputs;
24use miden_node_utils::ErrorReport;
25use miden_node_utils::formatting::format_array;
26use miden_objects::account::{AccountHeader, AccountId, StorageSlot};
27use miden_objects::block::account_tree::{AccountTree, account_id_to_smt_key};
28use miden_objects::block::{
29    AccountWitness,
30    BlockHeader,
31    BlockInputs,
32    BlockNumber,
33    Blockchain,
34    NullifierTree,
35    NullifierWitness,
36    ProvenBlock,
37};
38use miden_objects::crypto::merkle::{
39    Forest,
40    LargeSmt,
41    MemoryStorage,
42    Mmr,
43    MmrDelta,
44    MmrPeaks,
45    MmrProof,
46    PartialMmr,
47    SmtProof,
48    SmtStorage,
49};
50use miden_objects::note::{NoteDetails, NoteId, NoteScript, Nullifier};
51use miden_objects::transaction::{OutputNote, PartialBlockchain};
52use miden_objects::utils::Serializable;
53use miden_objects::{AccountError, Word};
54use tokio::sync::{Mutex, RwLock, oneshot};
55use tracing::{Instrument, info, info_span, instrument};
56
57use crate::blocks::BlockStore;
58use crate::db::models::Page;
59use crate::db::models::queries::StorageMapValuesPage;
60use crate::db::{
61    AccountVaultValue,
62    Db,
63    NoteRecord,
64    NoteSyncUpdate,
65    NullifierInfo,
66    StateSyncUpdate,
67};
68use crate::errors::{
69    ApplyBlockError,
70    DatabaseError,
71    GetBatchInputsError,
72    GetBlockHeaderError,
73    GetBlockInputsError,
74    GetCurrentBlockchainDataError,
75    InvalidBlockError,
76    NoteSyncError,
77    StateInitializationError,
78    StateSyncError,
79};
80use crate::{AccountTreeWithHistory, COMPONENT, DataDirectory, InMemoryAccountTree};
81
82// STRUCTURES
83// ================================================================================================
84
85#[derive(Debug, Default)]
86pub struct TransactionInputs {
87    pub account_commitment: Word,
88    pub nullifiers: Vec<NullifierInfo>,
89    pub found_unauthenticated_notes: HashSet<Word>,
90    pub new_account_id_prefix_is_unique: Option<bool>,
91}
92
93/// Container for state that needs to be updated atomically.
94struct InnerState<S = MemoryStorage>
95where
96    S: SmtStorage,
97{
98    nullifier_tree: NullifierTree,
99    blockchain: Blockchain,
100    account_tree: AccountTreeWithHistory<AccountTree<LargeSmt<S>>>,
101}
102
103impl<S> InnerState<S>
104where
105    S: SmtStorage,
106{
107    /// Returns the latest block number.
108    fn latest_block_num(&self) -> BlockNumber {
109        self.blockchain
110            .chain_tip()
111            .expect("chain should always have at least the genesis block")
112    }
113}
114
115/// The rollup state
116pub struct State {
117    /// The database which stores block headers, nullifiers, notes, and the latest states of
118    /// accounts.
119    db: Arc<Db>,
120
121    /// The block store which stores full block contents for all blocks.
122    block_store: Arc<BlockStore>,
123
124    /// Read-write lock used to prevent writing to a structure while it is being used.
125    ///
126    /// The lock is writer-preferring, meaning the writer won't be starved.
127    inner: RwLock<InnerState>,
128
129    /// To allow readers to access the tree data while an update in being performed, and prevent
130    /// TOCTOU issues, there must be no concurrent writers. This locks to serialize the writers.
131    writer: Mutex<()>,
132}
133
134impl State {
135    /// Loads the state from the `db`.
136    #[instrument(target = COMPONENT, skip_all)]
137    pub async fn load(data_path: &Path) -> Result<Self, StateInitializationError> {
138        let data_directory = DataDirectory::load(data_path.to_path_buf())
139            .map_err(StateInitializationError::DataDirectoryLoadError)?;
140
141        let block_store = Arc::new(
142            BlockStore::load(data_directory.block_store_dir())
143                .map_err(StateInitializationError::BlockStoreLoadError)?,
144        );
145
146        let database_filepath = data_directory.database_path();
147        let mut db = Db::load(database_filepath.clone())
148            .await
149            .map_err(StateInitializationError::DatabaseLoadError)?;
150
151        let chain_mmr = load_mmr(&mut db).await?;
152        let block_headers = db.select_all_block_headers().await?;
153        // TODO: Account tree loading synchronization
154        // Currently `load_account_tree` loads all account commitments from the DB. This could
155        // potentially lead to inconsistency if the DB contains account states from blocks beyond
156        // `latest_block_num`, though in practice the DB writes are transactional and this
157        // should not occur.
158        let latest_block_num = block_headers
159            .last()
160            .map_or(BlockNumber::GENESIS, miden_objects::block::BlockHeader::block_num);
161        let account_tree = load_account_tree(&mut db, latest_block_num).await?;
162        let nullifier_tree = load_nullifier_tree(&mut db).await?;
163
164        let inner = RwLock::new(InnerState {
165            nullifier_tree,
166            // SAFETY: We assume the loaded MMR is valid and does not have more than u32::MAX
167            // entries.
168            blockchain: Blockchain::from_mmr_unchecked(chain_mmr),
169            account_tree,
170        });
171
172        let writer = Mutex::new(());
173        let db = Arc::new(db);
174
175        Ok(Self { db, block_store, inner, writer })
176    }
177
178    /// Apply changes of a new block to the DB and in-memory data structures.
179    ///
180    /// ## Note on state consistency
181    ///
182    /// The server contains in-memory representations of the existing trees, the in-memory
183    /// representation must be kept consistent with the committed data, this is necessary so to
184    /// provide consistent results for all endpoints. In order to achieve consistency, the
185    /// following steps are used:
186    ///
187    /// - the request data is validated, prior to starting any modifications.
188    /// - block is being saved into the store in parallel with updating the DB, but before
189    ///   committing. This block is considered as candidate and not yet available for reading
190    ///   because the latest block pointer is not updated yet.
191    /// - a transaction is open in the DB and the writes are started.
192    /// - while the transaction is not committed, concurrent reads are allowed, both the DB and the
193    ///   in-memory representations, which are consistent at this stage.
194    /// - prior to committing the changes to the DB, an exclusive lock to the in-memory data is
195    ///   acquired, preventing concurrent reads to the in-memory data, since that will be
196    ///   out-of-sync w.r.t. the DB.
197    /// - the DB transaction is committed, and requests that read only from the DB can proceed to
198    ///   use the fresh data.
199    /// - the in-memory structures are updated, including the latest block pointer and the lock is
200    ///   released.
201    // TODO: This span is logged in a root span, we should connect it to the parent span.
202    #[allow(clippy::too_many_lines)]
203    #[instrument(target = COMPONENT, skip_all, err)]
204    pub async fn apply_block(&self, block: ProvenBlock) -> Result<(), ApplyBlockError> {
205        let _lock = self.writer.try_lock().map_err(|_| ApplyBlockError::ConcurrentWrite)?;
206
207        let header = block.header();
208
209        let tx_commitment = block.transactions().commitment();
210
211        if header.tx_commitment() != tx_commitment {
212            return Err(InvalidBlockError::InvalidBlockTxCommitment {
213                expected: tx_commitment,
214                actual: header.tx_commitment(),
215            }
216            .into());
217        }
218
219        let block_num = header.block_num();
220        let block_commitment = block.commitment();
221
222        // ensures the right block header is being processed
223        let prev_block = self
224            .db
225            .select_block_header_by_block_num(None)
226            .await?
227            .ok_or(ApplyBlockError::DbBlockHeaderEmpty)?;
228
229        let expected_block_num = prev_block.block_num().child();
230        if block_num != expected_block_num {
231            return Err(InvalidBlockError::NewBlockInvalidBlockNum {
232                expected: expected_block_num,
233                submitted: block_num,
234            }
235            .into());
236        }
237        if header.prev_block_commitment() != prev_block.commitment() {
238            return Err(InvalidBlockError::NewBlockInvalidPrevCommitment.into());
239        }
240
241        let block_data = block.to_bytes();
242
243        // Save the block to the block store. In a case of a rolled-back DB transaction, the
244        // in-memory state will be unchanged, but the block might still be written into the
245        // block store. Thus, such block should be considered as block candidates, but not
246        // finalized blocks. So we should check for the latest block when getting block from
247        // the store.
248        let store = Arc::clone(&self.block_store);
249        let block_save_task = tokio::spawn(
250            async move { store.save_block(block_num, &block_data).await }.in_current_span(),
251        );
252
253        // scope to read in-memory data, compute mutations required for updating account
254        // and nullifier trees, and validate the request
255        let (
256            nullifier_tree_old_root,
257            nullifier_tree_update,
258            account_tree_old_root,
259            account_tree_update,
260        ) = {
261            let inner = self.inner.read().await;
262
263            let _span = info_span!(target: COMPONENT, "update_in_memory_structs").entered();
264
265            // nullifiers can be produced only once
266            let duplicate_nullifiers: Vec<_> = block
267                .created_nullifiers()
268                .iter()
269                .filter(|&n| inner.nullifier_tree.get_block_num(n).is_some())
270                .copied()
271                .collect();
272            if !duplicate_nullifiers.is_empty() {
273                return Err(InvalidBlockError::DuplicatedNullifiers(duplicate_nullifiers).into());
274            }
275
276            // compute updates for the in-memory data structures
277
278            // new_block.chain_root must be equal to the chain MMR root prior to the update
279            let peaks = inner.blockchain.peaks();
280            if peaks.hash_peaks() != header.chain_commitment() {
281                return Err(InvalidBlockError::NewBlockInvalidChainCommitment.into());
282            }
283
284            // compute update for nullifier tree
285            let nullifier_tree_update = inner
286                .nullifier_tree
287                .compute_mutations(
288                    block.created_nullifiers().iter().map(|nullifier| (*nullifier, block_num)),
289                )
290                .map_err(InvalidBlockError::NewBlockNullifierAlreadySpent)?;
291
292            if nullifier_tree_update.as_mutation_set().root() != header.nullifier_root() {
293                return Err(InvalidBlockError::NewBlockInvalidNullifierRoot.into());
294            }
295
296            // compute update for account tree
297            let account_tree_update = inner
298                .account_tree
299                .compute_mutations(
300                    block
301                        .updated_accounts()
302                        .iter()
303                        .map(|update| (update.account_id(), update.final_state_commitment())),
304                )
305                .map_err(|e| match e {
306                    crate::HistoricalError::AccountTreeError(err) => {
307                        InvalidBlockError::NewBlockDuplicateAccountIdPrefix(err)
308                    },
309                    crate::HistoricalError::MerkleError(_) => {
310                        panic!("Unexpected MerkleError during account tree mutation computation")
311                    },
312                })?;
313
314            if account_tree_update.as_mutation_set().root() != header.account_root() {
315                return Err(InvalidBlockError::NewBlockInvalidAccountRoot.into());
316            }
317
318            (
319                inner.nullifier_tree.root(),
320                nullifier_tree_update,
321                inner.account_tree.root_latest(),
322                account_tree_update,
323            )
324        };
325
326        // build note tree
327        let note_tree = block.build_output_note_tree();
328        if note_tree.root() != header.note_root() {
329            return Err(InvalidBlockError::NewBlockInvalidNoteRoot.into());
330        }
331
332        let notes = block
333            .output_notes()
334            .map(|(note_index, note)| {
335                let (details, nullifier) = match note {
336                    OutputNote::Full(note) => {
337                        (Some(NoteDetails::from(note)), Some(note.nullifier()))
338                    },
339                    OutputNote::Header(_) => (None, None),
340                    note @ OutputNote::Partial(_) => {
341                        return Err(InvalidBlockError::InvalidOutputNoteType(Box::new(
342                            note.clone(),
343                        )));
344                    },
345                };
346
347                let inclusion_path = note_tree.open(note_index);
348
349                let note_record = NoteRecord {
350                    block_num,
351                    note_index,
352                    note_id: note.id().into(),
353                    note_commitment: note.commitment(),
354                    metadata: *note.metadata(),
355                    details,
356                    inclusion_path,
357                };
358
359                Ok((note_record, nullifier))
360            })
361            .collect::<Result<Vec<_>, InvalidBlockError>>()?;
362
363        // Signals the transaction is ready to be committed, and the write lock can be acquired
364        let (allow_acquire, acquired_allowed) = oneshot::channel::<()>();
365        // Signals the write lock has been acquired, and the transaction can be committed
366        let (inform_acquire_done, acquire_done) = oneshot::channel::<()>();
367
368        // The DB and in-memory state updates need to be synchronized and are partially
369        // overlapping. Namely, the DB transaction only proceeds after this task acquires the
370        // in-memory write lock. This requires the DB update to run concurrently, so a new task is
371        // spawned.
372        let db = Arc::clone(&self.db);
373        let db_update_task = tokio::spawn(
374            async move { db.apply_block(allow_acquire, acquire_done, block, notes).await }
375                .in_current_span(),
376        );
377
378        // Wait for the message from the DB update task, that we ready to commit the DB transaction
379        acquired_allowed.await.map_err(ApplyBlockError::ClosedChannel)?;
380
381        // Awaiting the block saving task to complete without errors
382        block_save_task.await??;
383
384        // Scope to update the in-memory data
385        async move {
386            // We need to hold the write lock here to prevent inconsistency between the in-memory
387            // state and the DB state. Thus, we need to wait for the DB update task to complete
388            // successfully.
389            let mut inner = self.inner.write().await;
390
391            // We need to check that neither the nullifier tree nor the account tree have changed
392            // while we were waiting for the DB preparation task to complete. If either of them
393            // did change, we do not proceed with in-memory and database updates, since it may
394            // lead to an inconsistent state.
395            if inner.nullifier_tree.root() != nullifier_tree_old_root
396                || inner.account_tree.root_latest() != account_tree_old_root
397            {
398                return Err(ApplyBlockError::ConcurrentWrite);
399            }
400
401            // Notify the DB update task that the write lock has been acquired, so it can commit
402            // the DB transaction
403            inform_acquire_done
404                .send(())
405                .map_err(|_| ApplyBlockError::DbUpdateTaskFailed("Receiver was dropped".into()))?;
406
407            // TODO: shutdown #91
408            // Await for successful commit of the DB transaction. If the commit fails, we mustn't
409            // change in-memory state, so we return a block applying error and don't proceed with
410            // in-memory updates.
411            db_update_task
412                .await?
413                .map_err(|err| ApplyBlockError::DbUpdateTaskFailed(err.as_report()))?;
414
415            // Update the in-memory data structures after successful commit of the DB transaction
416            inner
417                .nullifier_tree
418                .apply_mutations(nullifier_tree_update)
419                .expect("Unreachable: old nullifier tree root must be checked before this step");
420            inner
421                .account_tree
422                .apply_mutations(account_tree_update)
423                .expect("Unreachable: old account tree root must be checked before this step");
424            inner.blockchain.push(block_commitment);
425
426            Ok(())
427        }
428        .instrument(info_span!("update trees"))
429        .await
430    }
431
432    /// Queries a [BlockHeader] from the database, and returns it alongside its inclusion proof.
433    ///
434    /// If [None] is given as the value of `block_num`, the data for the latest [BlockHeader] is
435    /// returned.
436    #[instrument(level = "debug", target = COMPONENT, skip_all, ret(level = "debug"), err)]
437    pub async fn get_block_header(
438        &self,
439        block_num: Option<BlockNumber>,
440        include_mmr_proof: bool,
441    ) -> Result<(Option<BlockHeader>, Option<MmrProof>), GetBlockHeaderError> {
442        let block_header = self.db.select_block_header_by_block_num(block_num).await?;
443        if let Some(header) = block_header {
444            let mmr_proof = if include_mmr_proof {
445                let inner = self.inner.read().await;
446                let mmr_proof = inner.blockchain.open(header.block_num())?;
447                Some(mmr_proof)
448            } else {
449                None
450            };
451            Ok((Some(header), mmr_proof))
452        } else {
453            Ok((None, None))
454        }
455    }
456
457    pub async fn sync_nullifiers(
458        &self,
459        prefix_len: u32,
460        nullifier_prefixes: Vec<u32>,
461        block_range: RangeInclusive<BlockNumber>,
462    ) -> Result<(Vec<NullifierInfo>, BlockNumber), DatabaseError> {
463        self.db
464            .select_nullifiers_by_prefix(prefix_len, nullifier_prefixes, block_range)
465            .await
466    }
467
468    /// Generates membership proofs for each one of the `nullifiers` against the latest nullifier
469    /// tree.
470    ///
471    /// Note: these proofs are invalidated once the nullifier tree is modified, i.e. on a new block.
472    #[instrument(level = "debug", target = COMPONENT, skip_all, ret)]
473    pub async fn check_nullifiers(&self, nullifiers: &[Nullifier]) -> Vec<SmtProof> {
474        let inner = self.inner.read().await;
475        nullifiers
476            .iter()
477            .map(|n| inner.nullifier_tree.open(n))
478            .map(NullifierWitness::into_proof)
479            .collect()
480    }
481
482    /// Queries a list of notes from the database.
483    ///
484    /// If the provided list of [`NoteId`] given is empty or no note matches the provided
485    /// [`NoteId`] an empty list is returned.
486    pub async fn get_notes_by_id(
487        &self,
488        note_ids: Vec<NoteId>,
489    ) -> Result<Vec<NoteRecord>, DatabaseError> {
490        self.db.select_notes_by_id(note_ids).await
491    }
492
493    /// If the input block number is the current chain tip, `None` is returned.
494    /// Otherwise, gets the current chain tip's block header with its corresponding MMR peaks.
495    pub async fn get_current_blockchain_data(
496        &self,
497        block_num: Option<BlockNumber>,
498    ) -> Result<Option<(BlockHeader, MmrPeaks)>, GetCurrentBlockchainDataError> {
499        let blockchain = &self.inner.read().await.blockchain;
500        if let Some(number) = block_num
501            && number == self.latest_block_num().await
502        {
503            return Ok(None);
504        }
505
506        // SAFETY: `select_block_header_by_block_num` will always return `Some(chain_tip_header)`
507        // when `None` is passed
508        let block_header: BlockHeader = self
509            .db
510            .select_block_header_by_block_num(None)
511            .await
512            .map_err(GetCurrentBlockchainDataError::ErrorRetrievingBlockHeader)?
513            .unwrap();
514        let peaks = blockchain
515            .peaks_at(block_header.block_num())
516            .map_err(GetCurrentBlockchainDataError::InvalidPeaks)?;
517
518        Ok(Some((block_header, peaks)))
519    }
520
521    /// Fetches the inputs for a transaction batch from the database.
522    ///
523    /// ## Inputs
524    ///
525    /// The function takes as input:
526    /// - The tx reference blocks are the set of blocks referenced by transactions in the batch.
527    /// - The unauthenticated note commitments are the set of commitments of unauthenticated notes
528    ///   consumed by all transactions in the batch. For these notes, we attempt to find inclusion
529    ///   proofs. Not all notes will exist in the DB necessarily, as some notes can be created and
530    ///   consumed within the same batch.
531    ///
532    /// ## Outputs
533    ///
534    /// The function will return:
535    /// - A block inclusion proof for all tx reference blocks and for all blocks which are
536    ///   referenced by a note inclusion proof.
537    /// - Note inclusion proofs for all notes that were found in the DB.
538    /// - The block header that the batch should reference, i.e. the latest known block.
539    pub async fn get_batch_inputs(
540        &self,
541        tx_reference_blocks: BTreeSet<BlockNumber>,
542        unauthenticated_note_commitments: BTreeSet<Word>,
543    ) -> Result<BatchInputs, GetBatchInputsError> {
544        if tx_reference_blocks.is_empty() {
545            return Err(GetBatchInputsError::TransactionBlockReferencesEmpty);
546        }
547
548        // First we grab note inclusion proofs for the known notes. These proofs only
549        // prove that the note was included in a given block. We then also need to prove that
550        // each of those blocks is included in the chain.
551        let note_proofs = self
552            .db
553            .select_note_inclusion_proofs(unauthenticated_note_commitments)
554            .await
555            .map_err(GetBatchInputsError::SelectNoteInclusionProofError)?;
556
557        // The set of blocks that the notes are included in.
558        let note_blocks = note_proofs.values().map(|proof| proof.location().block_num());
559
560        // Collect all blocks we need to query without duplicates, which is:
561        // - all blocks for which we need to prove note inclusion.
562        // - all blocks referenced by transactions in the batch.
563        let mut blocks: BTreeSet<BlockNumber> = tx_reference_blocks;
564        blocks.extend(note_blocks);
565
566        // Scoped block to automatically drop the read lock guard as soon as we're done.
567        // We also avoid accessing the db in the block as this would delay dropping the guard.
568        let (batch_reference_block, partial_mmr) = {
569            let inner_state = self.inner.read().await;
570
571            let latest_block_num = inner_state.latest_block_num();
572
573            let highest_block_num =
574                *blocks.last().expect("we should have checked for empty block references");
575            if highest_block_num > latest_block_num {
576                return Err(GetBatchInputsError::UnknownTransactionBlockReference {
577                    highest_block_num,
578                    latest_block_num,
579                });
580            }
581
582            // Remove the latest block from the to-be-tracked blocks as it will be the reference
583            // block for the batch itself and thus added to the MMR within the batch kernel, so
584            // there is no need to prove its inclusion.
585            blocks.remove(&latest_block_num);
586
587            // SAFETY:
588            // - The latest block num was retrieved from the inner blockchain from which we will
589            //   also retrieve the proofs, so it is guaranteed to exist in that chain.
590            // - We have checked that no block number in the blocks set is greater than latest block
591            //   number *and* latest block num was removed from the set. Therefore only block
592            //   numbers smaller than latest block num remain in the set. Therefore all the block
593            //   numbers are guaranteed to exist in the chain state at latest block num.
594            let partial_mmr = inner_state
595                .blockchain
596                .partial_mmr_from_blocks(&blocks, latest_block_num)
597                .expect("latest block num should exist and all blocks in set should be < than latest block");
598
599            (latest_block_num, partial_mmr)
600        };
601
602        // Fetch the reference block of the batch as part of this query, so we can avoid looking it
603        // up in a separate DB access.
604        let mut headers = self
605            .db
606            .select_block_headers(blocks.into_iter().chain(std::iter::once(batch_reference_block)))
607            .await
608            .map_err(GetBatchInputsError::SelectBlockHeaderError)?;
609
610        // Find and remove the batch reference block as we don't want to add it to the chain MMR.
611        let header_index = headers
612            .iter()
613            .enumerate()
614            .find_map(|(index, header)| {
615                (header.block_num() == batch_reference_block).then_some(index)
616            })
617            .expect("DB should have returned the header of the batch reference block");
618
619        // The order doesn't matter for PartialBlockchain::new, so swap remove is fine.
620        let batch_reference_block_header = headers.swap_remove(header_index);
621
622        // SAFETY: This should not error because:
623        // - we're passing exactly the block headers that we've added to the partial MMR,
624        // - so none of the block headers block numbers should exceed the chain length of the
625        //   partial MMR,
626        // - and we've added blocks to a BTreeSet, so there can be no duplicates.
627        //
628        // We construct headers and partial MMR in concert, so they are consistent. This is why we
629        // can call the unchecked constructor.
630        let partial_block_chain = PartialBlockchain::new_unchecked(partial_mmr, headers)
631            .expect("partial mmr and block headers should be consistent");
632
633        Ok(BatchInputs {
634            batch_reference_block_header,
635            note_proofs,
636            partial_block_chain,
637        })
638    }
639
640    /// Loads data to synchronize a client.
641    ///
642    /// The client's request contains a list of tag prefixes, this method will return the first
643    /// block with a matching tag, or the chain tip. All the other values are filter based on this
644    /// block range.
645    ///
646    /// # Arguments
647    ///
648    /// - `block_num`: The last block *known* by the client, updates start from the next block.
649    /// - `account_ids`: Include the account's commitment if their _last change_ was in the result's
650    ///   block range.
651    /// - `note_tags`: The tags the client is interested in, result is restricted to the first block
652    ///   with any matches tags.
653    #[instrument(level = "debug", target = COMPONENT, skip_all, ret(level = "debug"), err)]
654    pub async fn sync_state(
655        &self,
656        block_num: BlockNumber,
657        account_ids: Vec<AccountId>,
658        note_tags: Vec<u32>,
659    ) -> Result<(StateSyncUpdate, MmrDelta), StateSyncError> {
660        let inner = self.inner.read().await;
661
662        let state_sync = self.db.get_state_sync(block_num, account_ids, note_tags).await?;
663
664        let delta = if block_num == state_sync.block_header.block_num() {
665            // The client is in sync with the chain tip.
666            MmrDelta {
667                forest: Forest::new(block_num.as_usize()),
668                data: vec![],
669            }
670        } else {
671            // Important notes about the boundary conditions:
672            //
673            // - The Mmr forest is 1-indexed whereas the block number is 0-indexed. The Mmr root
674            // contained in the block header always lag behind by one block, this is because the Mmr
675            // leaves are hashes of block headers, and we can't have self-referential hashes. These
676            // two points cancel out and don't require adjusting.
677            // - Mmr::get_delta is inclusive, whereas the sync_state request block_num is defined to
678            //   be
679            // exclusive, so the from_forest has to be adjusted with a +1
680            let from_forest = (block_num + 1).as_usize();
681            let to_forest = state_sync.block_header.block_num().as_usize();
682            inner
683                .blockchain
684                .as_mmr()
685                .get_delta(Forest::new(from_forest), Forest::new(to_forest))
686                .map_err(StateSyncError::FailedToBuildMmrDelta)?
687        };
688
689        Ok((state_sync, delta))
690    }
691
692    /// Loads data to synchronize a client's notes.
693    ///
694    /// The client's request contains a list of tags, this method will return the first
695    /// block with a matching tag, or the chain tip. All the other values are filter based on this
696    /// block range.
697    ///
698    /// # Arguments
699    ///
700    /// - `note_tags`: The tags the client is interested in, resulting notes are restricted to the
701    ///   first block containing a matching note.
702    /// - `block_range`: The range of blocks from which to synchronize notes.
703    #[instrument(level = "debug", target = COMPONENT, skip_all, ret(level = "debug"), err)]
704    pub async fn sync_notes(
705        &self,
706        note_tags: Vec<u32>,
707        block_range: RangeInclusive<BlockNumber>,
708    ) -> Result<(NoteSyncUpdate, MmrProof, BlockNumber), NoteSyncError> {
709        let inner = self.inner.read().await;
710
711        let (note_sync, last_included_block) =
712            self.db.get_note_sync(block_range, note_tags).await?;
713
714        let mmr_proof = inner.blockchain.open(note_sync.block_header.block_num())?;
715
716        Ok((note_sync, mmr_proof, last_included_block))
717    }
718
719    /// Returns data needed by the block producer to construct and prove the next block.
720    pub async fn get_block_inputs(
721        &self,
722        account_ids: Vec<AccountId>,
723        nullifiers: Vec<Nullifier>,
724        unauthenticated_note_commitments: BTreeSet<Word>,
725        reference_blocks: BTreeSet<BlockNumber>,
726    ) -> Result<BlockInputs, GetBlockInputsError> {
727        // Get the note inclusion proofs from the DB.
728        // We do this first so we have to acquire the lock to the state just once. There we need the
729        // reference blocks of the note proofs to get their authentication paths in the chain MMR.
730        let unauthenticated_note_proofs = self
731            .db
732            .select_note_inclusion_proofs(unauthenticated_note_commitments)
733            .await
734            .map_err(GetBlockInputsError::SelectNoteInclusionProofError)?;
735
736        // The set of blocks that the notes are included in.
737        let note_proof_reference_blocks =
738            unauthenticated_note_proofs.values().map(|proof| proof.location().block_num());
739
740        // Collect all blocks we need to prove inclusion for, without duplicates.
741        let mut blocks = reference_blocks;
742        blocks.extend(note_proof_reference_blocks);
743
744        let (latest_block_number, account_witnesses, nullifier_witnesses, partial_mmr) =
745            self.get_block_inputs_witnesses(&mut blocks, account_ids, nullifiers).await?;
746
747        // Fetch the block headers for all blocks in the partial MMR plus the latest one which will
748        // be used as the previous block header of the block being built.
749        let mut headers = self
750            .db
751            .select_block_headers(blocks.into_iter().chain(std::iter::once(latest_block_number)))
752            .await
753            .map_err(GetBlockInputsError::SelectBlockHeaderError)?;
754
755        // Find and remove the latest block as we must not add it to the chain MMR, since it is
756        // not yet in the chain.
757        let latest_block_header_index = headers
758            .iter()
759            .enumerate()
760            .find_map(|(index, header)| {
761                (header.block_num() == latest_block_number).then_some(index)
762            })
763            .expect("DB should have returned the header of the latest block header");
764
765        // The order doesn't matter for PartialBlockchain::new, so swap remove is fine.
766        let latest_block_header = headers.swap_remove(latest_block_header_index);
767
768        // SAFETY: This should not error because:
769        // - we're passing exactly the block headers that we've added to the partial MMR,
770        // - so none of the block header's block numbers should exceed the chain length of the
771        //   partial MMR,
772        // - and we've added blocks to a BTreeSet, so there can be no duplicates.
773        //
774        // We construct headers and partial MMR in concert, so they are consistent. This is why we
775        // can call the unchecked constructor.
776        let partial_block_chain = PartialBlockchain::new_unchecked(partial_mmr, headers)
777            .expect("partial mmr and block headers should be consistent");
778
779        Ok(BlockInputs::new(
780            latest_block_header,
781            partial_block_chain,
782            account_witnesses,
783            nullifier_witnesses,
784            unauthenticated_note_proofs,
785        ))
786    }
787
788    /// Get account and nullifier witnesses for the requested account IDs and nullifier as well as
789    /// the [`PartialMmr`] for the given blocks. The MMR won't contain the latest block and its
790    /// number is removed from `blocks` and returned separately.
791    ///
792    /// This method acquires the lock to the inner state and does not access the DB so we release
793    /// the lock asap.
794    async fn get_block_inputs_witnesses(
795        &self,
796        blocks: &mut BTreeSet<BlockNumber>,
797        account_ids: Vec<AccountId>,
798        nullifiers: Vec<Nullifier>,
799    ) -> Result<
800        (
801            BlockNumber,
802            BTreeMap<AccountId, AccountWitness>,
803            BTreeMap<Nullifier, NullifierWitness>,
804            PartialMmr,
805        ),
806        GetBlockInputsError,
807    > {
808        let inner = self.inner.read().await;
809
810        let latest_block_number = inner.latest_block_num();
811
812        // If `blocks` is empty, use the latest block number which will never trigger the error.
813        let highest_block_number = blocks.last().copied().unwrap_or(latest_block_number);
814        if highest_block_number > latest_block_number {
815            return Err(GetBlockInputsError::UnknownBatchBlockReference {
816                highest_block_number,
817                latest_block_number,
818            });
819        }
820
821        // The latest block is not yet in the chain MMR, so we can't (and don't need to) prove its
822        // inclusion in the chain.
823        blocks.remove(&latest_block_number);
824
825        // Fetch the partial MMR at the state of the latest block with authentication paths for the
826        // provided set of blocks.
827        //
828        // SAFETY:
829        // - The latest block num was retrieved from the inner blockchain from which we will also
830        //   retrieve the proofs, so it is guaranteed to exist in that chain.
831        // - We have checked that no block number in the blocks set is greater than latest block
832        //   number *and* latest block num was removed from the set. Therefore only block numbers
833        //   smaller than latest block num remain in the set. Therefore all the block numbers are
834        //   guaranteed to exist in the chain state at latest block num.
835        let partial_mmr =
836            inner.blockchain.partial_mmr_from_blocks(blocks, latest_block_number).expect(
837                "latest block num should exist and all blocks in set should be < than latest block",
838            );
839
840        // Fetch witnesses for all accounts.
841        let account_witnesses = account_ids
842            .iter()
843            .copied()
844            .map(|account_id| (account_id, inner.account_tree.open_latest(account_id)))
845            .collect::<BTreeMap<AccountId, AccountWitness>>();
846
847        // Fetch witnesses for all nullifiers. We don't check whether the nullifiers are spent or
848        // not as this is done as part of proposing the block.
849        let nullifier_witnesses: BTreeMap<Nullifier, NullifierWitness> = nullifiers
850            .iter()
851            .copied()
852            .map(|nullifier| (nullifier, inner.nullifier_tree.open(&nullifier)))
853            .collect();
854
855        Ok((latest_block_number, account_witnesses, nullifier_witnesses, partial_mmr))
856    }
857
858    /// Returns data needed by the block producer to verify transactions validity.
859    #[instrument(target = COMPONENT, skip_all, ret)]
860    pub async fn get_transaction_inputs(
861        &self,
862        account_id: AccountId,
863        nullifiers: &[Nullifier],
864        unauthenticated_note_commitments: Vec<Word>,
865    ) -> Result<TransactionInputs, DatabaseError> {
866        info!(target: COMPONENT, account_id = %account_id.to_string(), nullifiers = %format_array(nullifiers));
867
868        let inner = self.inner.read().await;
869
870        let account_commitment = inner.account_tree.get_latest_commitment(account_id);
871
872        let new_account_id_prefix_is_unique = if account_commitment.is_empty() {
873            Some(!inner.account_tree.contains_account_id_prefix_in_latest(account_id.prefix()))
874        } else {
875            None
876        };
877
878        // Non-unique account Id prefixes for new accounts are not allowed.
879        if let Some(false) = new_account_id_prefix_is_unique {
880            return Ok(TransactionInputs {
881                new_account_id_prefix_is_unique,
882                ..Default::default()
883            });
884        }
885
886        let nullifiers = nullifiers
887            .iter()
888            .map(|nullifier| NullifierInfo {
889                nullifier: *nullifier,
890                block_num: inner.nullifier_tree.get_block_num(nullifier).unwrap_or_default(),
891            })
892            .collect();
893
894        let found_unauthenticated_notes = self
895            .db
896            .select_notes_by_commitment(unauthenticated_note_commitments)
897            .await?
898            .into_iter()
899            .map(|note| note.note_commitment)
900            .collect();
901
902        Ok(TransactionInputs {
903            account_commitment,
904            nullifiers,
905            found_unauthenticated_notes,
906            new_account_id_prefix_is_unique,
907        })
908    }
909
910    /// Returns details for public (on-chain) account.
911    pub async fn get_account_details(&self, id: AccountId) -> Result<AccountInfo, DatabaseError> {
912        self.db.select_account(id).await
913    }
914
915    /// Returns details for public (on-chain) network accounts.
916    pub async fn get_network_account_details_by_prefix(
917        &self,
918        id_prefix: u32,
919    ) -> Result<Option<AccountInfo>, DatabaseError> {
920        self.db.select_network_account_by_prefix(id_prefix).await
921    }
922
923    /// Returns the respective account proof with optional details, such as asset and storage
924    /// entries.
925    ///
926    /// Note: The `block_num` parameter in the request is currently ignored and will always
927    /// return the current state. Historical block support will be implemented in a future update.
928    #[allow(clippy::too_many_lines)]
929    pub async fn get_account_proof(
930        &self,
931        account_request: AccountProofRequest,
932    ) -> Result<AccountProofResponse, DatabaseError> {
933        let AccountProofRequest { block_num, account_id, details } = account_request;
934        let _ = block_num.ok_or_else(|| {
935            DatabaseError::NotImplemented(
936                "Handling of historical/past block numbers is not implemented yet".to_owned(),
937            )
938        });
939
940        // Lock inner state for the whole operation. We need to hold this lock to prevent the
941        // database, account tree and latest block number from changing during the operation,
942        // because changing one of them would lead to inconsistent state.
943        let inner_state = self.inner.read().await;
944
945        let block_num = inner_state.account_tree.block_number_latest();
946        let witness = inner_state.account_tree.open_latest(account_id);
947
948        let account_details = if let Some(AccountDetailRequest {
949            code_commitment,
950            asset_vault_commitment,
951            storage_requests,
952        }) = details
953        {
954            let account_info = self.db.select_account(account_id).await?;
955
956            // if we get a query for a _private_ account _with_ details requested, we'll error out
957            let Some(account) = account_info.details else {
958                return Err(DatabaseError::AccountNotPublic(account_id));
959            };
960
961            let storage_header = account.storage().to_header();
962
963            let mut storage_map_details =
964                Vec::<AccountStorageMapDetails>::with_capacity(storage_requests.len());
965
966            for StorageMapRequest { slot_index, slot_data } in storage_requests {
967                let Some(StorageSlot::Map(storage_map)) =
968                    account.storage().slots().get(slot_index as usize)
969                else {
970                    return Err(AccountError::StorageSlotNotMap(slot_index).into());
971                };
972                let details = AccountStorageMapDetails::new(slot_index, slot_data, storage_map);
973                storage_map_details.push(details);
974            }
975
976            // Only include unknown account code blobs, which is equal to a account code digest
977            // mismatch. If `None` was requested, don't return any.
978            let account_code = code_commitment
979                .is_some_and(|code_commitment| code_commitment != account.code().commitment())
980                .then(|| account.code().to_bytes());
981
982            // storage details
983            let storage_details = AccountStorageDetails {
984                header: storage_header,
985                map_details: storage_map_details,
986            };
987
988            // Handle vault details based on the `asset_vault_commitment`.
989            // Similar to `code_commitment`, if the provided commitment matches, we don't return
990            // vault data. If no commitment is provided or it doesn't match, we return
991            // the vault data. If the number of vault contained assets are exceeding a
992            // limit, we signal this back in the response and the user must handle that
993            // in follow-up request.
994            let vault_details = match asset_vault_commitment {
995                Some(commitment) if commitment == account.vault().root() => {
996                    // The client already has the correct vault data
997                    AccountVaultDetails::empty()
998                },
999                Some(_) => {
1000                    // The commitment doesn't match, so return vault data
1001                    AccountVaultDetails::new(account.vault())
1002                },
1003                None => {
1004                    // No commitment provided, so don't return vault data
1005                    AccountVaultDetails::empty()
1006                },
1007            };
1008
1009            Some(AccountDetails {
1010                account_header: AccountHeader::from(account),
1011                account_code,
1012                vault_details,
1013                storage_details,
1014            })
1015        } else {
1016            None
1017        };
1018
1019        let response = AccountProofResponse {
1020            block_num,
1021            witness,
1022            details: account_details,
1023        };
1024
1025        Ok(response)
1026    }
1027
1028    /// Returns storage map values for syncing within a block range.
1029    pub(crate) async fn get_storage_map_sync_values(
1030        &self,
1031        account_id: AccountId,
1032        block_range: RangeInclusive<BlockNumber>,
1033    ) -> Result<StorageMapValuesPage, DatabaseError> {
1034        self.db.select_storage_map_sync_values(account_id, block_range).await
1035    }
1036
1037    /// Loads a block from the block store. Return `Ok(None)` if the block is not found.
1038    pub async fn load_block(
1039        &self,
1040        block_num: BlockNumber,
1041    ) -> Result<Option<Vec<u8>>, DatabaseError> {
1042        if block_num > self.latest_block_num().await {
1043            return Ok(None);
1044        }
1045        self.block_store.load_block(block_num).await.map_err(Into::into)
1046    }
1047
1048    /// Returns the latest block number.
1049    pub async fn latest_block_num(&self) -> BlockNumber {
1050        self.inner.read().await.latest_block_num()
1051    }
1052
1053    /// Runs database optimization.
1054    pub async fn optimize_db(&self) -> Result<(), DatabaseError> {
1055        self.db.optimize().await
1056    }
1057
1058    /// Emits metrics for each database table's size.
1059    pub async fn analyze_table_sizes(&self) -> Result<(), DatabaseError> {
1060        self.db.analyze_table_sizes().await
1061    }
1062
1063    /// Returns account vault updates for specified account within a block range.
1064    pub async fn sync_account_vault(
1065        &self,
1066        account_id: AccountId,
1067        block_range: RangeInclusive<BlockNumber>,
1068    ) -> Result<(BlockNumber, Vec<AccountVaultValue>), DatabaseError> {
1069        self.db.get_account_vault_sync(account_id, block_range).await
1070    }
1071
1072    /// Returns the unprocessed network notes, along with the next pagination token.
1073    pub async fn get_unconsumed_network_notes(
1074        &self,
1075        page: Page,
1076    ) -> Result<(Vec<NoteRecord>, Page), DatabaseError> {
1077        self.db.select_unconsumed_network_notes(page).await
1078    }
1079
1080    /// Returns the network notes for an account that are unconsumed by a specified block number,
1081    /// along with the next pagination token.
1082    pub async fn get_unconsumed_network_notes_for_account(
1083        &self,
1084        network_account_id_prefix: NetworkAccountPrefix,
1085        block_num: BlockNumber,
1086        page: Page,
1087    ) -> Result<(Vec<NoteRecord>, Page), DatabaseError> {
1088        self.db
1089            .select_unconsumed_network_notes_for_account(network_account_id_prefix, block_num, page)
1090            .await
1091    }
1092
1093    /// Returns the script for a note by its root.
1094    pub async fn get_note_script_by_root(
1095        &self,
1096        root: Word,
1097    ) -> Result<Option<NoteScript>, DatabaseError> {
1098        self.db.select_note_script_by_root(root).await
1099    }
1100
1101    /// Returns the complete transaction records for the specified accounts within the specified
1102    /// block range, including state commitments and note IDs.
1103    pub async fn sync_transactions(
1104        &self,
1105        account_ids: Vec<AccountId>,
1106        block_range: RangeInclusive<BlockNumber>,
1107    ) -> Result<(BlockNumber, Vec<crate::db::TransactionRecord>), DatabaseError> {
1108        self.db.select_transactions_records(account_ids, block_range).await
1109    }
1110}
1111
1112// UTILITIES
1113// ================================================================================================
1114
1115#[instrument(level = "info", target = COMPONENT, skip_all)]
1116async fn load_nullifier_tree(db: &mut Db) -> Result<NullifierTree, StateInitializationError> {
1117    let nullifiers = db.select_all_nullifiers().await?;
1118
1119    NullifierTree::with_entries(nullifiers.into_iter().map(|info| (info.nullifier, info.block_num)))
1120        .map_err(StateInitializationError::FailedToCreateNullifierTree)
1121}
1122
1123#[instrument(level = "info", target = COMPONENT, skip_all)]
1124async fn load_mmr(db: &mut Db) -> Result<Mmr, StateInitializationError> {
1125    let block_commitments: Vec<Word> = db
1126        .select_all_block_headers()
1127        .await?
1128        .iter()
1129        .map(BlockHeader::commitment)
1130        .collect();
1131
1132    Ok(block_commitments.into())
1133}
1134
1135#[instrument(level = "info", target = COMPONENT, skip_all)]
1136async fn load_account_tree(
1137    db: &mut Db,
1138    block_number: BlockNumber,
1139) -> Result<AccountTreeWithHistory<InMemoryAccountTree>, StateInitializationError> {
1140    let account_data = db.select_all_account_commitments().await?.into_iter().collect::<Vec<_>>();
1141
1142    // Convert account_data to use account_id_to_smt_key
1143    let smt_entries = account_data
1144        .into_iter()
1145        .map(|(id, commitment)| (account_id_to_smt_key(id), commitment));
1146
1147    let smt = LargeSmt::with_entries(MemoryStorage::default(), smt_entries)
1148        .expect("Failed to create LargeSmt from database account data");
1149
1150    let account_tree = AccountTree::new(smt).expect("Failed to create AccountTree");
1151    Ok(AccountTreeWithHistory::new(account_tree, block_number))
1152}