miden_node_store/
state.rs

1//! Abstraction to synchronize state modifications.
2//!
3//! The [State] provides data access and modifications methods, its main purpose is to ensure that
4//! data is atomically written, and that reads are consistent.
5
6use std::{
7    collections::{BTreeMap, BTreeSet},
8    ops::Not,
9    sync::Arc,
10};
11
12use miden_node_proto::{
13    domain::{
14        account::{AccountInfo, AccountProofRequest, StorageMapKeysProof},
15        batch::BatchInputs,
16    },
17    generated::responses::{AccountProofsResponse, AccountStateHeader, StorageSlotMapProof},
18};
19use miden_node_utils::formatting::format_array;
20use miden_objects::{
21    ACCOUNT_TREE_DEPTH, AccountError,
22    account::{AccountDelta, AccountHeader, AccountId, StorageSlot},
23    block::{AccountWitness, BlockHeader, BlockInputs, BlockNumber, NullifierWitness, ProvenBlock},
24    crypto::{
25        hash::rpo::RpoDigest,
26        merkle::{
27            LeafIndex, Mmr, MmrDelta, MmrError, MmrPeaks, MmrProof, PartialMmr, SimpleSmt,
28            SmtProof, ValuePath,
29        },
30    },
31    note::{NoteId, Nullifier},
32    transaction::{ChainMmr, OutputNote},
33    utils::Serializable,
34};
35use tokio::{
36    sync::{Mutex, RwLock, oneshot},
37    time::Instant,
38};
39use tracing::{info, info_span, instrument};
40
41use crate::{
42    COMPONENT,
43    blocks::BlockStore,
44    db::{Db, NoteRecord, NoteSyncUpdate, NullifierInfo, StateSyncUpdate},
45    errors::{
46        ApplyBlockError, DatabaseError, GetBatchInputsError, GetBlockHeaderError,
47        GetBlockInputsError, InvalidBlockError, NoteSyncError, StateInitializationError,
48        StateSyncError,
49    },
50    nullifier_tree::NullifierTree,
51};
52// STRUCTURES
53// ================================================================================================
54
55#[derive(Debug)]
56pub struct TransactionInputs {
57    pub account_commitment: RpoDigest,
58    pub nullifiers: Vec<NullifierInfo>,
59    pub found_unauthenticated_notes: BTreeSet<NoteId>,
60}
61
62/// A [Merkle Mountain Range](Mmr) defining a chain of blocks.
63#[derive(Debug, Clone)]
64pub struct Blockchain(Mmr);
65
66impl Blockchain {
67    /// Returns a new Blockchain.
68    pub fn new(chain_mmr: Mmr) -> Self {
69        Self(chain_mmr)
70    }
71
72    /// Returns the tip of the chain, i.e. the number of the latest block in the chain.
73    pub fn chain_tip(&self) -> BlockNumber {
74        let block_number: u32 = (self.0.forest() - 1)
75            .try_into()
76            .expect("chain_mmr always has, at least, the genesis block");
77
78        block_number.into()
79    }
80
81    /// Returns the chain length.
82    pub fn chain_length(&self) -> BlockNumber {
83        self.chain_tip().child()
84    }
85
86    /// Returns the current peaks of the MMR.
87    pub fn peaks(&self) -> MmrPeaks {
88        self.0.peaks()
89    }
90
91    /// Returns the peaks of the MMR at the state specified by `forest`.
92    ///
93    /// # Errors
94    ///
95    /// Returns an error if the specified `forest` value is not valid for this MMR.
96    pub fn peaks_at(&self, forest: usize) -> Result<MmrPeaks, MmrError> {
97        self.0.peaks_at(forest)
98    }
99
100    /// Adds a block commitment to the MMR. The caller must ensure that this commitent is the one
101    /// for the next block in the chain.
102    pub fn push(&mut self, block_commitment: RpoDigest) {
103        self.0.add(block_commitment);
104    }
105
106    /// Returns an [`MmrProof`] for the leaf at the specified position.
107    pub fn open(&self, pos: usize) -> Result<MmrProof, MmrError> {
108        self.0.open_at(pos, self.0.forest())
109    }
110
111    /// Returns a reference to the underlying [`Mmr`].
112    pub fn as_mmr(&self) -> &Mmr {
113        &self.0
114    }
115
116    /// Creates a [`PartialMmr`] at the state of the latest block (i.e. the block's chain root will
117    /// match the hashed peaks of the returned partial MMR). This MMR will include authentication
118    /// paths for all blocks in the provided set.
119    pub fn partial_mmr_from_blocks(
120        &self,
121        blocks: &BTreeSet<BlockNumber>,
122        latest_block_number: BlockNumber,
123    ) -> PartialMmr {
124        // Using latest block as the target forest means we take the state of the MMR one before
125        // the latest block. This is because the latest block will be used as the reference
126        // block of the batch and will be added to the MMR by the batch kernel.
127        let target_forest = latest_block_number.as_usize();
128        let peaks = self
129            .peaks_at(target_forest)
130            .expect("target_forest should be smaller than forest of the chain mmr");
131        // Grab the block merkle paths from the inner state.
132        let mut partial_mmr = PartialMmr::from_peaks(peaks);
133
134        for block_num in blocks.iter().map(BlockNumber::as_usize) {
135            // SAFETY: We have ensured block nums are less than chain length.
136            let leaf = self
137                .0
138                .get(block_num)
139                .expect("block num less than chain length should exist in chain mmr");
140            let path = self
141                .0
142                .open_at(block_num, target_forest)
143                .expect("block num and target forest should be valid for this mmr")
144                .merkle_path;
145            // SAFETY: We should be able to fill the partial MMR with data from the chain MMR
146            // without errors, otherwise it indicates the chain mmr is invalid.
147            partial_mmr
148                .track(block_num, leaf, &path)
149                .expect("filling partial mmr with data from mmr should succeed");
150        }
151
152        partial_mmr
153    }
154}
155
156/// Container for state that needs to be updated atomically.
157struct InnerState {
158    nullifier_tree: NullifierTree,
159    blockchain: Blockchain,
160    account_tree: SimpleSmt<ACCOUNT_TREE_DEPTH>,
161}
162
163impl InnerState {
164    /// Returns the latest block number.
165    fn latest_block_num(&self) -> BlockNumber {
166        self.blockchain.chain_tip()
167    }
168}
169
170/// The rollup state
171pub struct State {
172    /// The database which stores block headers, nullifiers, notes, and the latest states of
173    /// accounts.
174    db: Arc<Db>,
175
176    /// The block store which stores full block contents for all blocks.
177    block_store: Arc<BlockStore>,
178
179    /// Read-write lock used to prevent writing to a structure while it is being used.
180    ///
181    /// The lock is writer-preferring, meaning the writer won't be starved.
182    inner: RwLock<InnerState>,
183
184    /// To allow readers to access the tree data while an update in being performed, and prevent
185    /// TOCTOU issues, there must be no concurrent writers. This locks to serialize the writers.
186    writer: Mutex<()>,
187}
188
189impl State {
190    /// Loads the state from the `db`.
191    #[instrument(target = COMPONENT, skip_all)]
192    pub async fn load(
193        mut db: Db,
194        block_store: Arc<BlockStore>,
195    ) -> Result<Self, StateInitializationError> {
196        let nullifier_tree = load_nullifier_tree(&mut db).await?;
197        let chain_mmr = load_mmr(&mut db).await?;
198        let account_tree = load_accounts(&mut db).await?;
199
200        let inner = RwLock::new(InnerState {
201            nullifier_tree,
202            blockchain: Blockchain::new(chain_mmr),
203            account_tree,
204        });
205
206        let writer = Mutex::new(());
207        let db = Arc::new(db);
208
209        Ok(Self { db, block_store, inner, writer })
210    }
211
212    /// Apply changes of a new block to the DB and in-memory data structures.
213    ///
214    /// ## Note on state consistency
215    ///
216    /// The server contains in-memory representations of the existing trees, the in-memory
217    /// representation must be kept consistent with the committed data, this is necessary so to
218    /// provide consistent results for all endpoints. In order to achieve consistency, the
219    /// following steps are used:
220    ///
221    /// - the request data is validated, prior to starting any modifications.
222    /// - block is being saved into the store in parallel with updating the DB, but before
223    ///   committing. This block is considered as candidate and not yet available for reading
224    ///   because the latest block pointer is not updated yet.
225    /// - a transaction is open in the DB and the writes are started.
226    /// - while the transaction is not committed, concurrent reads are allowed, both the DB and the
227    ///   in-memory representations, which are consistent at this stage.
228    /// - prior to committing the changes to the DB, an exclusive lock to the in-memory data is
229    ///   acquired, preventing concurrent reads to the in-memory data, since that will be
230    ///   out-of-sync w.r.t. the DB.
231    /// - the DB transaction is committed, and requests that read only from the DB can proceed to
232    ///   use the fresh data.
233    /// - the in-memory structures are updated, including the latest block pointer and the lock is
234    ///   released.
235    // TODO: This span is logged in a root span, we should connect it to the parent span.
236    #[instrument(target = COMPONENT, skip_all, err)]
237    pub async fn apply_block(&self, block: ProvenBlock) -> Result<(), ApplyBlockError> {
238        let _lock = self.writer.try_lock().map_err(|_| ApplyBlockError::ConcurrentWrite)?;
239
240        let header = block.header();
241
242        let tx_commitment = BlockHeader::compute_tx_commitment(block.transactions());
243        if header.tx_commitment() != tx_commitment {
244            return Err(InvalidBlockError::InvalidBlockTxCommitment {
245                expected: tx_commitment,
246                actual: header.tx_commitment(),
247            }
248            .into());
249        }
250
251        let block_num = header.block_num();
252        let block_commitment = block.commitment();
253
254        // ensures the right block header is being processed
255        let prev_block = self
256            .db
257            .select_block_header_by_block_num(None)
258            .await?
259            .ok_or(ApplyBlockError::DbBlockHeaderEmpty)?;
260
261        if block_num != prev_block.block_num() + 1 {
262            return Err(InvalidBlockError::NewBlockInvalidBlockNum.into());
263        }
264        if header.prev_block_commitment() != prev_block.commitment() {
265            return Err(InvalidBlockError::NewBlockInvalidPrevCommitment.into());
266        }
267
268        let block_data = block.to_bytes();
269
270        // Save the block to the block store. In a case of a rolled-back DB transaction, the
271        // in-memory state will be unchanged, but the block might still be written into the
272        // block store. Thus, such block should be considered as block candidates, but not
273        // finalized blocks. So we should check for the latest block when getting block from
274        // the store.
275        let store = Arc::clone(&self.block_store);
276        let block_save_task =
277            tokio::spawn(async move { store.save_block(block_num, &block_data).await });
278
279        // scope to read in-memory data, compute mutations required for updating account
280        // and nullifier trees, and validate the request
281        let (
282            nullifier_tree_old_root,
283            nullifier_tree_update,
284            account_tree_old_root,
285            account_tree_update,
286        ) = {
287            let inner = self.inner.read().await;
288
289            let _span = info_span!(target: COMPONENT, "update_in_memory_structs").entered();
290
291            // nullifiers can be produced only once
292            let duplicate_nullifiers: Vec<_> = block
293                .created_nullifiers()
294                .iter()
295                .filter(|&n| inner.nullifier_tree.get_block_num(n).is_some())
296                .copied()
297                .collect();
298            if !duplicate_nullifiers.is_empty() {
299                return Err(InvalidBlockError::DuplicatedNullifiers(duplicate_nullifiers).into());
300            }
301
302            // compute updates for the in-memory data structures
303
304            // new_block.chain_root must be equal to the chain MMR root prior to the update
305            let peaks = inner.blockchain.peaks();
306            if peaks.hash_peaks() != header.chain_commitment() {
307                return Err(InvalidBlockError::NewBlockInvalidChainCommitment.into());
308            }
309
310            // compute update for nullifier tree
311            let nullifier_tree_update = inner.nullifier_tree.compute_mutations(
312                block.created_nullifiers().iter().map(|nullifier| (*nullifier, block_num)),
313            );
314
315            if nullifier_tree_update.root() != header.nullifier_root() {
316                return Err(InvalidBlockError::NewBlockInvalidNullifierRoot.into());
317            }
318
319            // compute update for account tree
320            let account_tree_update = inner.account_tree.compute_mutations(
321                block.updated_accounts().iter().map(|update| {
322                    (
323                        LeafIndex::new_max_depth(update.account_id().prefix().into()),
324                        update.final_state_commitment().into(),
325                    )
326                }),
327            );
328
329            if account_tree_update.root() != header.account_root() {
330                return Err(InvalidBlockError::NewBlockInvalidAccountRoot.into());
331            }
332
333            (
334                inner.nullifier_tree.root(),
335                nullifier_tree_update,
336                inner.account_tree.root(),
337                account_tree_update,
338            )
339        };
340
341        // build note tree
342        let note_tree = block.build_output_note_tree();
343        if note_tree.root() != header.note_root() {
344            return Err(InvalidBlockError::NewBlockInvalidNoteRoot.into());
345        }
346
347        let notes = block
348            .output_notes()
349            .map(|(note_index, note)| {
350                let (details, nullifier) = match note {
351                    OutputNote::Full(note) => (Some(note.to_bytes()), Some(note.nullifier())),
352                    OutputNote::Header(_) => (None, None),
353                    note @ OutputNote::Partial(_) => {
354                        return Err(InvalidBlockError::InvalidOutputNoteType(Box::new(
355                            note.clone(),
356                        )));
357                    },
358                };
359
360                let merkle_path = note_tree.get_note_path(note_index);
361
362                let note_record = NoteRecord {
363                    block_num,
364                    note_index,
365                    note_id: note.id().into(),
366                    metadata: *note.metadata(),
367                    details,
368                    merkle_path,
369                };
370
371                Ok((note_record, nullifier))
372            })
373            .collect::<Result<Vec<_>, InvalidBlockError>>()?;
374
375        // Signals the transaction is ready to be committed, and the write lock can be acquired
376        let (allow_acquire, acquired_allowed) = oneshot::channel::<()>();
377        // Signals the write lock has been acquired, and the transaction can be committed
378        let (inform_acquire_done, acquire_done) = oneshot::channel::<()>();
379
380        // The DB and in-memory state updates need to be synchronized and are partially
381        // overlapping. Namely, the DB transaction only proceeds after this task acquires the
382        // in-memory write lock. This requires the DB update to run concurrently, so a new task is
383        // spawned.
384        let db = Arc::clone(&self.db);
385        let db_update_task =
386            tokio::spawn(
387                async move { db.apply_block(allow_acquire, acquire_done, block, notes).await },
388            );
389
390        // Wait for the message from the DB update task, that we ready to commit the DB transaction
391        acquired_allowed.await.map_err(ApplyBlockError::ClosedChannel)?;
392
393        // Awaiting the block saving task to complete without errors
394        block_save_task.await??;
395
396        // Scope to update the in-memory data
397        {
398            // We need to hold the write lock here to prevent inconsistency between the in-memory
399            // state and the DB state. Thus, we need to wait for the DB update task to complete
400            // successfully.
401            let mut inner = self.inner.write().await;
402
403            // We need to check that neither the nullifier tree nor the account tree have changed
404            // while we were waiting for the DB preparation task to complete. If either of them
405            // did change, we do not proceed with in-memory and database updates, since it may
406            // lead to an inconsistent state.
407            if inner.nullifier_tree.root() != nullifier_tree_old_root
408                || inner.account_tree.root() != account_tree_old_root
409            {
410                return Err(ApplyBlockError::ConcurrentWrite);
411            }
412
413            // Notify the DB update task that the write lock has been acquired, so it can commit
414            // the DB transaction
415            inform_acquire_done
416                .send(())
417                .map_err(|_| ApplyBlockError::DbUpdateTaskFailed("Receiver was dropped".into()))?;
418
419            // TODO: shutdown #91
420            // Await for successful commit of the DB transaction. If the commit fails, we mustn't
421            // change in-memory state, so we return a block applying error and don't proceed with
422            // in-memory updates.
423            db_update_task
424                .await?
425                .map_err(|err| ApplyBlockError::DbUpdateTaskFailed(err.to_string()))?;
426
427            // Update the in-memory data structures after successful commit of the DB transaction
428            inner
429                .nullifier_tree
430                .apply_mutations(nullifier_tree_update)
431                .expect("Unreachable: old nullifier tree root must be checked before this step");
432            inner
433                .account_tree
434                .apply_mutations(account_tree_update)
435                .expect("Unreachable: old account tree root must be checked before this step");
436            inner.blockchain.push(block_commitment);
437        }
438
439        info!(%block_commitment, block_num = block_num.as_u32(), COMPONENT, "apply_block successful");
440
441        Ok(())
442    }
443
444    /// Queries a [BlockHeader] from the database, and returns it alongside its inclusion proof.
445    ///
446    /// If [None] is given as the value of `block_num`, the data for the latest [BlockHeader] is
447    /// returned.
448    #[instrument(target = COMPONENT, skip_all, ret(level = "debug"), err)]
449    pub async fn get_block_header(
450        &self,
451        block_num: Option<BlockNumber>,
452        include_mmr_proof: bool,
453    ) -> Result<(Option<BlockHeader>, Option<MmrProof>), GetBlockHeaderError> {
454        let block_header = self.db.select_block_header_by_block_num(block_num).await?;
455        if let Some(header) = block_header {
456            let mmr_proof = if include_mmr_proof {
457                let inner = self.inner.read().await;
458                let mmr_proof = inner.blockchain.open(header.block_num().as_usize())?;
459                Some(mmr_proof)
460            } else {
461                None
462            };
463            Ok((Some(header), mmr_proof))
464        } else {
465            Ok((None, None))
466        }
467    }
468
469    pub async fn check_nullifiers_by_prefix(
470        &self,
471        prefix_len: u32,
472        nullifier_prefixes: Vec<u32>,
473        block_num: BlockNumber,
474    ) -> Result<Vec<NullifierInfo>, DatabaseError> {
475        self.db
476            .select_nullifiers_by_prefix(prefix_len, nullifier_prefixes, block_num)
477            .await
478    }
479
480    /// Generates membership proofs for each one of the `nullifiers` against the latest nullifier
481    /// tree.
482    ///
483    /// Note: these proofs are invalidated once the nullifier tree is modified, i.e. on a new block.
484    #[instrument(target = COMPONENT, skip_all, ret(level = "debug"))]
485    pub async fn check_nullifiers(&self, nullifiers: &[Nullifier]) -> Vec<SmtProof> {
486        let inner = self.inner.read().await;
487        nullifiers.iter().map(|n| inner.nullifier_tree.open(n)).collect()
488    }
489
490    /// Queries a list of [`NoteRecord`] from the database.
491    ///
492    /// If the provided list of [`NoteId`] given is empty or no [`NoteRecord`] matches the provided
493    /// [`NoteId`] an empty list is returned.
494    pub async fn get_notes_by_id(
495        &self,
496        note_ids: Vec<NoteId>,
497    ) -> Result<Vec<NoteRecord>, DatabaseError> {
498        self.db.select_notes_by_id(note_ids).await
499    }
500
501    /// Fetches the inputs for a transaction batch from the database.
502    ///
503    /// ## Inputs
504    ///
505    /// The function takes as input:
506    /// - The tx reference blocks are the set of blocks referenced by transactions in the batch.
507    /// - The unauthenticated note ids are the set of IDs of unauthenticated notes consumed by all
508    ///   transactions in the batch. For these notes, we attempt to find note inclusion proofs. Not
509    ///   all notes will exist in the DB necessarily, as some notes can be created and consumed
510    ///   within the same batch.
511    ///
512    /// ## Outputs
513    ///
514    /// The function will return:
515    /// - A block inclusion proof for all tx reference blocks and for all blocks which are
516    ///   referenced by a note inclusion proof.
517    /// - Note inclusion proofs for all notes that were found in the DB.
518    /// - The block header that the batch should reference, i.e. the latest known block.
519    pub async fn get_batch_inputs(
520        &self,
521        tx_reference_blocks: BTreeSet<BlockNumber>,
522        unauthenticated_note_ids: BTreeSet<NoteId>,
523    ) -> Result<BatchInputs, GetBatchInputsError> {
524        if tx_reference_blocks.is_empty() {
525            return Err(GetBatchInputsError::TransactionBlockReferencesEmpty);
526        }
527
528        // First we grab note inclusion proofs for the known notes. These proofs only
529        // prove that the note was included in a given block. We then also need to prove that
530        // each of those blocks is included in the chain.
531        let note_proofs = self
532            .db
533            .select_note_inclusion_proofs(unauthenticated_note_ids)
534            .await
535            .map_err(GetBatchInputsError::SelectNoteInclusionProofError)?;
536
537        // The set of blocks that the notes are included in.
538        let note_blocks = note_proofs.values().map(|proof| proof.location().block_num());
539
540        // Collect all blocks we need to query without duplicates, which is:
541        // - all blocks for which we need to prove note inclusion.
542        // - all blocks referenced by transactions in the batch.
543        let mut blocks: BTreeSet<BlockNumber> = tx_reference_blocks;
544        blocks.extend(note_blocks);
545
546        // Scoped block to automatically drop the read lock guard as soon as we're done.
547        // We also avoid accessing the db in the block as this would delay dropping the guard.
548        let (batch_reference_block, partial_mmr) = {
549            let inner_state = self.inner.read().await;
550
551            let latest_block_num = inner_state.blockchain.chain_tip();
552
553            let highest_block_num =
554                *blocks.last().expect("we should have checked for empty block references");
555            if highest_block_num > latest_block_num {
556                return Err(GetBatchInputsError::UnknownTransactionBlockReference {
557                    highest_block_num,
558                    latest_block_num,
559                });
560            }
561
562            // Remove the latest block from the to-be-tracked blocks as it will be the reference
563            // block for the batch itself and thus added to the MMR within the batch kernel, so
564            // there is no need to prove its inclusion.
565            blocks.remove(&latest_block_num);
566
567            (
568                latest_block_num,
569                inner_state.blockchain.partial_mmr_from_blocks(&blocks, latest_block_num),
570            )
571        };
572
573        // Fetch the reference block of the batch as part of this query, so we can avoid looking it
574        // up in a separate DB access.
575        let mut headers = self
576            .db
577            .select_block_headers(blocks.into_iter().chain(std::iter::once(batch_reference_block)))
578            .await
579            .map_err(GetBatchInputsError::SelectBlockHeaderError)?;
580
581        // Find and remove the batch reference block as we don't want to add it to the chain MMR.
582        let header_index = headers
583            .iter()
584            .enumerate()
585            .find_map(|(index, header)| {
586                (header.block_num() == batch_reference_block).then_some(index)
587            })
588            .expect("DB should have returned the header of the batch reference block");
589
590        // The order doesn't matter for ChainMmr::new, so swap remove is fine.
591        let batch_reference_block_header = headers.swap_remove(header_index);
592
593        // SAFETY: This should not error because:
594        // - we're passing exactly the block headers that we've added to the partial MMR,
595        // - so none of the block headers block numbers should exceed the chain length of the
596        //   partial MMR,
597        // - and we've added blocks to a BTreeSet, so there can be no duplicates.
598        let chain_mmr = ChainMmr::new(partial_mmr, headers)
599            .expect("partial mmr and block headers should be consistent");
600
601        Ok(BatchInputs {
602            batch_reference_block_header,
603            note_proofs,
604            chain_mmr,
605        })
606    }
607
608    /// Loads data to synchronize a client.
609    ///
610    /// The client's request contains a list of tag prefixes, this method will return the first
611    /// block with a matching tag, or the chain tip. All the other values are filter based on this
612    /// block range.
613    ///
614    /// # Arguments
615    ///
616    /// - `block_num`: The last block *known* by the client, updates start from the next block.
617    /// - `account_ids`: Include the account's commitment if their _last change_ was in the result's
618    ///   block range.
619    /// - `note_tags`: The tags the client is interested in, result is restricted to the first block
620    ///   with any matches tags.
621    #[instrument(target = COMPONENT, skip_all, ret(level = "debug"), err)]
622    pub async fn sync_state(
623        &self,
624        block_num: BlockNumber,
625        account_ids: Vec<AccountId>,
626        note_tags: Vec<u32>,
627    ) -> Result<(StateSyncUpdate, MmrDelta), StateSyncError> {
628        let inner = self.inner.read().await;
629
630        let state_sync = self.db.get_state_sync(block_num, account_ids, note_tags).await?;
631
632        let delta = if block_num == state_sync.block_header.block_num() {
633            // The client is in sync with the chain tip.
634            MmrDelta {
635                forest: block_num.as_usize(),
636                data: vec![],
637            }
638        } else {
639            // Important notes about the boundary conditions:
640            //
641            // - The Mmr forest is 1-indexed whereas the block number is 0-indexed. The Mmr root
642            // contained in the block header always lag behind by one block, this is because the Mmr
643            // leaves are hashes of block headers, and we can't have self-referential hashes. These
644            // two points cancel out and don't require adjusting.
645            // - Mmr::get_delta is inclusive, whereas the sync_state request block_num is defined to
646            //   be
647            // exclusive, so the from_forest has to be adjusted with a +1
648            let from_forest = (block_num + 1).as_usize();
649            let to_forest = state_sync.block_header.block_num().as_usize();
650            inner
651                .blockchain
652                .as_mmr()
653                .get_delta(from_forest, to_forest)
654                .map_err(StateSyncError::FailedToBuildMmrDelta)?
655        };
656
657        Ok((state_sync, delta))
658    }
659
660    /// Loads data to synchronize a client's notes.
661    ///
662    /// The client's request contains a list of tags, this method will return the first
663    /// block with a matching tag, or the chain tip. All the other values are filter based on this
664    /// block range.
665    ///
666    /// # Arguments
667    ///
668    /// - `block_num`: The last block *known* by the client, updates start from the next block.
669    /// - `note_tags`: The tags the client is interested in, resulting notes are restricted to the
670    ///   first block containing a matching note.
671    #[instrument(target = COMPONENT, skip_all, ret(level = "debug"), err)]
672    pub async fn sync_notes(
673        &self,
674        block_num: BlockNumber,
675        note_tags: Vec<u32>,
676    ) -> Result<(NoteSyncUpdate, MmrProof), NoteSyncError> {
677        let inner = self.inner.read().await;
678
679        let note_sync = self.db.get_note_sync(block_num, note_tags).await?;
680
681        let mmr_proof = inner.blockchain.open(note_sync.block_header.block_num().as_usize())?;
682
683        Ok((note_sync, mmr_proof))
684    }
685
686    /// Returns data needed by the block producer to construct and prove the next block.
687    pub async fn get_block_inputs(
688        &self,
689        account_ids: Vec<AccountId>,
690        nullifiers: Vec<Nullifier>,
691        unauthenticated_notes: BTreeSet<NoteId>,
692        reference_blocks: BTreeSet<BlockNumber>,
693    ) -> Result<BlockInputs, GetBlockInputsError> {
694        // Get the note inclusion proofs from the DB.
695        // We do this first so we have to acquire the lock to the state just once. There we need the
696        // reference blocks of the note proofs to get their authentication paths in the chain MMR.
697        let unauthenticated_note_proofs = self
698            .db
699            .select_note_inclusion_proofs(unauthenticated_notes)
700            .await
701            .map_err(GetBlockInputsError::SelectNoteInclusionProofError)?;
702
703        // The set of blocks that the notes are included in.
704        let note_proof_reference_blocks =
705            unauthenticated_note_proofs.values().map(|proof| proof.location().block_num());
706
707        // Collect all blocks we need to prove inclusion for, without duplicates.
708        let mut blocks = reference_blocks;
709        blocks.extend(note_proof_reference_blocks);
710
711        let (latest_block_number, account_witnesses, nullifier_witnesses, partial_mmr) =
712            self.get_block_inputs_witnesses(&mut blocks, account_ids, nullifiers).await?;
713
714        // Fetch the block headers for all blocks in the partial MMR plus the latest one which will
715        // be used as the previous block header of the block being built.
716        let mut headers = self
717            .db
718            .select_block_headers(blocks.into_iter().chain(std::iter::once(latest_block_number)))
719            .await
720            .map_err(GetBlockInputsError::SelectBlockHeaderError)?;
721
722        // Find and remove the latest block as we must not add it to the chain MMR, since it is
723        // not yet in the chain.
724        let latest_block_header_index = headers
725            .iter()
726            .enumerate()
727            .find_map(|(index, header)| {
728                (header.block_num() == latest_block_number).then_some(index)
729            })
730            .expect("DB should have returned the header of the latest block header");
731
732        // The order doesn't matter for ChainMmr::new, so swap remove is fine.
733        let latest_block_header = headers.swap_remove(latest_block_header_index);
734
735        // SAFETY: This should not error because:
736        // - we're passing exactly the block headers that we've added to the partial MMR,
737        // - so none of the block header's block numbers should exceed the chain length of the
738        //   partial MMR,
739        // - and we've added blocks to a BTreeSet, so there can be no duplicates.
740        let chain_mmr = ChainMmr::new(partial_mmr, headers)
741            .expect("partial mmr and block headers should be consistent");
742
743        Ok(BlockInputs::new(
744            latest_block_header,
745            chain_mmr,
746            account_witnesses,
747            nullifier_witnesses,
748            unauthenticated_note_proofs,
749        ))
750    }
751
752    /// Get account and nullifier witnesses for the requested account IDs and nullifier as well as
753    /// the [`PartialMmr`] for the given blocks. The MMR won't contain the latest block and its
754    /// number is removed from `blocks` and returned separately.
755    ///
756    /// This method acquires the lock to the inner state and does not access the DB so we release
757    /// the lock asap.
758    async fn get_block_inputs_witnesses(
759        &self,
760        blocks: &mut BTreeSet<BlockNumber>,
761        account_ids: Vec<AccountId>,
762        nullifiers: Vec<Nullifier>,
763    ) -> Result<
764        (
765            BlockNumber,
766            BTreeMap<AccountId, AccountWitness>,
767            BTreeMap<Nullifier, NullifierWitness>,
768            PartialMmr,
769        ),
770        GetBlockInputsError,
771    > {
772        let inner = self.inner.read().await;
773
774        let latest_block_number = inner.latest_block_num();
775
776        // If `blocks` is empty, use the latest block number which will never trigger the error.
777        let highest_block_number = blocks.last().copied().unwrap_or(latest_block_number);
778        if highest_block_number > latest_block_number {
779            return Err(GetBlockInputsError::UnknownBatchBlockReference {
780                highest_block_number,
781                latest_block_number,
782            });
783        }
784
785        // The latest block is not yet in the chain MMR, so we can't (and don't need to) prove its
786        // inclusion in the chain.
787        blocks.remove(&latest_block_number);
788
789        // Fetch the partial MMR at the state of the latest block with authentication paths for the
790        // provided set of blocks.
791        let partial_mmr = inner.blockchain.partial_mmr_from_blocks(blocks, latest_block_number);
792
793        // Fetch witnesses for all acounts.
794        let account_witnesses = account_ids
795            .iter()
796            .copied()
797            .map(|account_id| {
798                let ValuePath {
799                    value: latest_state_commitment,
800                    path: proof,
801                } = inner.account_tree.open(&account_id.into());
802                (account_id, AccountWitness::new(latest_state_commitment, proof))
803            })
804            .collect::<BTreeMap<AccountId, AccountWitness>>();
805
806        // Fetch witnesses for all nullifiers. We don't check whether the nullifiers are spent or
807        // not as this is done as part of proposing the block.
808        let nullifier_witnesses: BTreeMap<Nullifier, NullifierWitness> = nullifiers
809            .iter()
810            .copied()
811            .map(|nullifier| {
812                let proof = inner.nullifier_tree.open(&nullifier);
813                (nullifier, NullifierWitness::new(proof))
814            })
815            .collect();
816
817        Ok((latest_block_number, account_witnesses, nullifier_witnesses, partial_mmr))
818    }
819
820    /// Returns data needed by the block producer to verify transactions validity.
821    #[instrument(target = COMPONENT, skip_all, ret)]
822    pub async fn get_transaction_inputs(
823        &self,
824        account_id: AccountId,
825        nullifiers: &[Nullifier],
826        unauthenticated_notes: Vec<NoteId>,
827    ) -> Result<TransactionInputs, DatabaseError> {
828        info!(target: COMPONENT, account_id = %account_id.to_string(), nullifiers = %format_array(nullifiers));
829
830        let inner = self.inner.read().await;
831
832        let account_commitment = inner
833            .account_tree
834            .open(&LeafIndex::new_max_depth(account_id.prefix().into()))
835            .value;
836
837        let nullifiers = nullifiers
838            .iter()
839            .map(|nullifier| NullifierInfo {
840                nullifier: *nullifier,
841                block_num: inner.nullifier_tree.get_block_num(nullifier).unwrap_or_default(),
842            })
843            .collect();
844
845        let found_unauthenticated_notes =
846            self.db.select_note_ids(unauthenticated_notes.clone()).await?;
847
848        Ok(TransactionInputs {
849            account_commitment,
850            nullifiers,
851            found_unauthenticated_notes,
852        })
853    }
854
855    /// Returns details for public (on-chain) account.
856    pub async fn get_account_details(&self, id: AccountId) -> Result<AccountInfo, DatabaseError> {
857        self.db.select_account(id).await
858    }
859
860    /// Returns account proofs with optional account and storage headers.
861    pub async fn get_account_proofs(
862        &self,
863        account_requests: Vec<AccountProofRequest>,
864        known_code_commitments: BTreeSet<RpoDigest>,
865        include_headers: bool,
866    ) -> Result<(BlockNumber, Vec<AccountProofsResponse>), DatabaseError> {
867        // Lock inner state for the whole operation. We need to hold this lock to prevent the
868        // database, account tree and latest block number from changing during the operation,
869        // because changing one of them would lead to inconsistent state.
870        let inner_state = self.inner.read().await;
871
872        let account_ids: Vec<AccountId> =
873            account_requests.iter().map(|req| req.account_id).collect();
874
875        let state_headers = if include_headers.not() {
876            BTreeMap::<AccountId, AccountStateHeader>::default()
877        } else {
878            let infos = self.db.select_accounts_by_ids(account_ids.clone()).await?;
879            if account_ids.len() > infos.len() {
880                let found_ids = infos.iter().map(|info| info.summary.account_id).collect();
881                return Err(DatabaseError::AccountsNotFoundInDb(
882                    BTreeSet::from_iter(account_ids).difference(&found_ids).copied().collect(),
883                ));
884            }
885
886            let mut headers_map = BTreeMap::new();
887
888            // Iterate and build state headers for public accounts
889            for request in account_requests {
890                let account_info = infos
891                    .iter()
892                    .find(|info| info.summary.account_id == request.account_id)
893                    .expect("retrieved accounts were validated against request");
894
895                if let Some(details) = &account_info.details {
896                    let mut storage_slot_map_keys = Vec::new();
897
898                    for StorageMapKeysProof { storage_index, storage_keys } in
899                        &request.storage_requests
900                    {
901                        if let Some(StorageSlot::Map(storage_map)) =
902                            details.storage().slots().get(*storage_index as usize)
903                        {
904                            for map_key in storage_keys {
905                                let proof = storage_map.open(map_key);
906
907                                let slot_map_key = StorageSlotMapProof {
908                                    storage_slot: u32::from(*storage_index),
909                                    smt_proof: proof.to_bytes(),
910                                };
911                                storage_slot_map_keys.push(slot_map_key);
912                            }
913                        } else {
914                            return Err(AccountError::StorageSlotNotMap(*storage_index).into());
915                        }
916                    }
917
918                    // Only include unknown account codes
919                    let account_code = known_code_commitments
920                        .contains(&details.code().commitment())
921                        .not()
922                        .then(|| details.code().to_bytes());
923
924                    let state_header = AccountStateHeader {
925                        header: Some(AccountHeader::from(details).into()),
926                        storage_header: details.storage().get_header().to_bytes(),
927                        account_code,
928                        storage_maps: storage_slot_map_keys,
929                    };
930
931                    headers_map.insert(account_info.summary.account_id, state_header);
932                }
933            }
934
935            headers_map
936        };
937
938        let responses = account_ids
939            .into_iter()
940            .map(|account_id| {
941                let acc_leaf_idx = LeafIndex::new_max_depth(account_id.prefix().into());
942                let opening = inner_state.account_tree.open(&acc_leaf_idx);
943                let state_header = state_headers.get(&account_id).cloned();
944
945                AccountProofsResponse {
946                    account_id: Some(account_id.into()),
947                    account_commitment: Some(opening.value.into()),
948                    account_proof: Some(opening.path.into()),
949                    state_header,
950                }
951            })
952            .collect();
953
954        Ok((inner_state.latest_block_num(), responses))
955    }
956
957    /// Returns the state delta between `from_block` (exclusive) and `to_block` (inclusive) for the
958    /// given account.
959    pub(crate) async fn get_account_state_delta(
960        &self,
961        account_id: AccountId,
962        from_block: BlockNumber,
963        to_block: BlockNumber,
964    ) -> Result<Option<AccountDelta>, DatabaseError> {
965        self.db.select_account_state_delta(account_id, from_block, to_block).await
966    }
967
968    /// Loads a block from the block store. Return `Ok(None)` if the block is not found.
969    pub async fn load_block(
970        &self,
971        block_num: BlockNumber,
972    ) -> Result<Option<Vec<u8>>, DatabaseError> {
973        if block_num > self.latest_block_num().await {
974            return Ok(None);
975        }
976        self.block_store.load_block(block_num).await.map_err(Into::into)
977    }
978
979    /// Returns the latest block number.
980    pub async fn latest_block_num(&self) -> BlockNumber {
981        self.inner.read().await.latest_block_num()
982    }
983
984    /// Runs database optimization.
985    pub async fn optimize_db(&self) -> Result<(), DatabaseError> {
986        self.db.optimize().await
987    }
988}
989
990// UTILITIES
991// ================================================================================================
992
993#[instrument(target = COMPONENT, skip_all)]
994async fn load_nullifier_tree(db: &mut Db) -> Result<NullifierTree, StateInitializationError> {
995    let nullifiers = db.select_all_nullifiers().await?;
996    let len = nullifiers.len();
997
998    let now = Instant::now();
999    let nullifier_tree = NullifierTree::with_entries(nullifiers)
1000        .map_err(StateInitializationError::FailedToCreateNullifierTree)?;
1001    let elapsed = now.elapsed().as_secs();
1002
1003    info!(
1004        num_of_leaves = len,
1005        tree_construction = elapsed,
1006        COMPONENT,
1007        "Loaded nullifier tree"
1008    );
1009    Ok(nullifier_tree)
1010}
1011
1012#[instrument(target = COMPONENT, skip_all)]
1013async fn load_mmr(db: &mut Db) -> Result<Mmr, StateInitializationError> {
1014    let block_commitments: Vec<RpoDigest> = db
1015        .select_all_block_headers()
1016        .await?
1017        .iter()
1018        .map(BlockHeader::commitment)
1019        .collect();
1020
1021    Ok(block_commitments.into())
1022}
1023
1024#[instrument(target = COMPONENT, skip_all)]
1025async fn load_accounts(
1026    db: &mut Db,
1027) -> Result<SimpleSmt<ACCOUNT_TREE_DEPTH>, StateInitializationError> {
1028    let account_data: Vec<_> = db
1029        .select_all_account_commitments()
1030        .await?
1031        .into_iter()
1032        .map(|(id, account_commitment)| (id.prefix().into(), account_commitment.into()))
1033        .collect();
1034
1035    SimpleSmt::with_leaves(account_data)
1036        .map_err(StateInitializationError::FailedToCreateAccountsTree)
1037}