miden_node_store/
state.rs

1//! Abstraction to synchronize state modifications.
2//!
3//! The [State] provides data access and modifications methods, its main purpose is to ensure that
4//! data is atomically written, and that reads are consistent.
5
6use std::collections::{BTreeMap, BTreeSet, HashSet};
7use std::ops::Not;
8use std::path::Path;
9use std::sync::Arc;
10
11use miden_node_proto::domain::account::{
12    AccountInfo,
13    AccountProofRequest,
14    NetworkAccountPrefix,
15    StorageMapKeysProof,
16};
17use miden_node_proto::domain::batch::BatchInputs;
18use miden_node_proto::{AccountWitnessRecord, generated as proto};
19use miden_node_utils::ErrorReport;
20use miden_node_utils::formatting::format_array;
21use miden_objects::account::{AccountHeader, AccountId, StorageSlot};
22use miden_objects::block::{
23    AccountTree,
24    AccountWitness,
25    BlockHeader,
26    BlockInputs,
27    BlockNumber,
28    Blockchain,
29    NullifierTree,
30    NullifierWitness,
31    ProvenBlock,
32};
33use miden_objects::crypto::merkle::{
34    Forest,
35    Mmr,
36    MmrDelta,
37    MmrPeaks,
38    MmrProof,
39    PartialMmr,
40    SmtProof,
41};
42use miden_objects::note::{NoteDetails, NoteId, Nullifier};
43use miden_objects::transaction::{OutputNote, PartialBlockchain};
44use miden_objects::utils::Serializable;
45use miden_objects::{AccountError, Word};
46use tokio::sync::{Mutex, RwLock, oneshot};
47use tracing::{info, info_span, instrument};
48
49use crate::blocks::BlockStore;
50use crate::db::models::Page;
51use crate::db::models::queries::StorageMapValuesPage;
52use crate::db::{
53    AccountVaultValue,
54    Db,
55    NoteRecord,
56    NoteSyncUpdate,
57    NullifierInfo,
58    StateSyncUpdate,
59};
60use crate::errors::{
61    ApplyBlockError,
62    DatabaseError,
63    GetBatchInputsError,
64    GetBlockHeaderError,
65    GetBlockInputsError,
66    GetCurrentBlockchainDataError,
67    InvalidBlockError,
68    NoteSyncError,
69    StateInitializationError,
70    StateSyncError,
71};
72use crate::{COMPONENT, DataDirectory};
73
74// STRUCTURES
75// ================================================================================================
76
77#[derive(Debug, Default)]
78pub struct TransactionInputs {
79    pub account_commitment: Word,
80    pub nullifiers: Vec<NullifierInfo>,
81    pub found_unauthenticated_notes: HashSet<NoteId>,
82    pub new_account_id_prefix_is_unique: Option<bool>,
83}
84
85/// Container for state that needs to be updated atomically.
86struct InnerState {
87    nullifier_tree: NullifierTree,
88    blockchain: Blockchain,
89    account_tree: AccountTree,
90}
91
92impl InnerState {
93    /// Returns the latest block number.
94    fn latest_block_num(&self) -> BlockNumber {
95        self.blockchain
96            .chain_tip()
97            .expect("chain should always have at least the genesis block")
98    }
99}
100
101/// The rollup state
102pub struct State {
103    /// The database which stores block headers, nullifiers, notes, and the latest states of
104    /// accounts.
105    db: Arc<Db>,
106
107    /// The block store which stores full block contents for all blocks.
108    block_store: Arc<BlockStore>,
109
110    /// Read-write lock used to prevent writing to a structure while it is being used.
111    ///
112    /// The lock is writer-preferring, meaning the writer won't be starved.
113    inner: RwLock<InnerState>,
114
115    /// To allow readers to access the tree data while an update in being performed, and prevent
116    /// TOCTOU issues, there must be no concurrent writers. This locks to serialize the writers.
117    writer: Mutex<()>,
118}
119
120impl State {
121    /// Loads the state from the `db`.
122    #[instrument(target = COMPONENT, skip_all)]
123    pub async fn load(data_path: &Path) -> Result<Self, StateInitializationError> {
124        let data_directory = DataDirectory::load(data_path.to_path_buf())
125            .map_err(StateInitializationError::DataDirectoryLoadError)?;
126
127        let block_store = Arc::new(
128            BlockStore::load(data_directory.block_store_dir())
129                .map_err(StateInitializationError::BlockStoreLoadError)?,
130        );
131
132        let database_filepath = data_directory.database_path();
133        let mut db = Db::load(database_filepath.clone())
134            .await
135            .map_err(StateInitializationError::DatabaseLoadError)?;
136
137        let chain_mmr = load_mmr(&mut db).await?;
138        let account_tree = load_account_tree(&mut db).await?;
139        let nullifier_tree = load_nullifier_tree(&mut db).await?;
140
141        let inner = RwLock::new(InnerState {
142            nullifier_tree,
143            // SAFETY: We assume the loaded MMR is valid and does not have more than u32::MAX
144            // entries.
145            blockchain: Blockchain::from_mmr_unchecked(chain_mmr),
146            account_tree,
147        });
148
149        let writer = Mutex::new(());
150        let db = Arc::new(db);
151
152        Ok(Self { db, block_store, inner, writer })
153    }
154
155    /// Apply changes of a new block to the DB and in-memory data structures.
156    ///
157    /// ## Note on state consistency
158    ///
159    /// The server contains in-memory representations of the existing trees, the in-memory
160    /// representation must be kept consistent with the committed data, this is necessary so to
161    /// provide consistent results for all endpoints. In order to achieve consistency, the
162    /// following steps are used:
163    ///
164    /// - the request data is validated, prior to starting any modifications.
165    /// - block is being saved into the store in parallel with updating the DB, but before
166    ///   committing. This block is considered as candidate and not yet available for reading
167    ///   because the latest block pointer is not updated yet.
168    /// - a transaction is open in the DB and the writes are started.
169    /// - while the transaction is not committed, concurrent reads are allowed, both the DB and the
170    ///   in-memory representations, which are consistent at this stage.
171    /// - prior to committing the changes to the DB, an exclusive lock to the in-memory data is
172    ///   acquired, preventing concurrent reads to the in-memory data, since that will be
173    ///   out-of-sync w.r.t. the DB.
174    /// - the DB transaction is committed, and requests that read only from the DB can proceed to
175    ///   use the fresh data.
176    /// - the in-memory structures are updated, including the latest block pointer and the lock is
177    ///   released.
178    // TODO: This span is logged in a root span, we should connect it to the parent span.
179    #[allow(clippy::too_many_lines)]
180    #[instrument(target = COMPONENT, skip_all, err)]
181    pub async fn apply_block(&self, block: ProvenBlock) -> Result<(), ApplyBlockError> {
182        let _lock = self.writer.try_lock().map_err(|_| ApplyBlockError::ConcurrentWrite)?;
183
184        let header = block.header();
185
186        let tx_commitment = block.transactions().commitment();
187
188        if header.tx_commitment() != tx_commitment {
189            return Err(InvalidBlockError::InvalidBlockTxCommitment {
190                expected: tx_commitment,
191                actual: header.tx_commitment(),
192            }
193            .into());
194        }
195
196        let block_num = header.block_num();
197        let block_commitment = block.commitment();
198
199        // ensures the right block header is being processed
200        let prev_block = self
201            .db
202            .select_block_header_by_block_num(None)
203            .await?
204            .ok_or(ApplyBlockError::DbBlockHeaderEmpty)?;
205
206        let expected_block_num = prev_block.block_num().child();
207        if block_num != expected_block_num {
208            return Err(InvalidBlockError::NewBlockInvalidBlockNum {
209                expected: expected_block_num,
210                submitted: block_num,
211            }
212            .into());
213        }
214        if header.prev_block_commitment() != prev_block.commitment() {
215            return Err(InvalidBlockError::NewBlockInvalidPrevCommitment.into());
216        }
217
218        let block_data = block.to_bytes();
219
220        // Save the block to the block store. In a case of a rolled-back DB transaction, the
221        // in-memory state will be unchanged, but the block might still be written into the
222        // block store. Thus, such block should be considered as block candidates, but not
223        // finalized blocks. So we should check for the latest block when getting block from
224        // the store.
225        let store = Arc::clone(&self.block_store);
226        let block_save_task =
227            tokio::spawn(async move { store.save_block(block_num, &block_data).await });
228
229        // scope to read in-memory data, compute mutations required for updating account
230        // and nullifier trees, and validate the request
231        let (
232            nullifier_tree_old_root,
233            nullifier_tree_update,
234            account_tree_old_root,
235            account_tree_update,
236        ) = {
237            let inner = self.inner.read().await;
238
239            let _span = info_span!(target: COMPONENT, "update_in_memory_structs").entered();
240
241            // nullifiers can be produced only once
242            let duplicate_nullifiers: Vec<_> = block
243                .created_nullifiers()
244                .iter()
245                .filter(|&n| inner.nullifier_tree.get_block_num(n).is_some())
246                .copied()
247                .collect();
248            if !duplicate_nullifiers.is_empty() {
249                return Err(InvalidBlockError::DuplicatedNullifiers(duplicate_nullifiers).into());
250            }
251
252            // compute updates for the in-memory data structures
253
254            // new_block.chain_root must be equal to the chain MMR root prior to the update
255            let peaks = inner.blockchain.peaks();
256            if peaks.hash_peaks() != header.chain_commitment() {
257                return Err(InvalidBlockError::NewBlockInvalidChainCommitment.into());
258            }
259
260            // compute update for nullifier tree
261            let nullifier_tree_update = inner
262                .nullifier_tree
263                .compute_mutations(
264                    block.created_nullifiers().iter().map(|nullifier| (*nullifier, block_num)),
265                )
266                .map_err(InvalidBlockError::NewBlockNullifierAlreadySpent)?;
267
268            if nullifier_tree_update.as_mutation_set().root() != header.nullifier_root() {
269                return Err(InvalidBlockError::NewBlockInvalidNullifierRoot.into());
270            }
271
272            // compute update for account tree
273            let account_tree_update = inner
274                .account_tree
275                .compute_mutations(
276                    block
277                        .updated_accounts()
278                        .iter()
279                        .map(|update| (update.account_id(), update.final_state_commitment())),
280                )
281                .map_err(InvalidBlockError::NewBlockDuplicateAccountIdPrefix)?;
282
283            if account_tree_update.as_mutation_set().root() != header.account_root() {
284                return Err(InvalidBlockError::NewBlockInvalidAccountRoot.into());
285            }
286
287            (
288                inner.nullifier_tree.root(),
289                nullifier_tree_update,
290                inner.account_tree.root(),
291                account_tree_update,
292            )
293        };
294
295        // build note tree
296        let note_tree = block.build_output_note_tree();
297        if note_tree.root() != header.note_root() {
298            return Err(InvalidBlockError::NewBlockInvalidNoteRoot.into());
299        }
300
301        let notes = block
302            .output_notes()
303            .map(|(note_index, note)| {
304                let (details, nullifier) = match note {
305                    OutputNote::Full(note) => {
306                        (Some(NoteDetails::from(note)), Some(note.nullifier()))
307                    },
308                    OutputNote::Header(_) => (None, None),
309                    note @ OutputNote::Partial(_) => {
310                        return Err(InvalidBlockError::InvalidOutputNoteType(Box::new(
311                            note.clone(),
312                        )));
313                    },
314                };
315
316                let inclusion_path = note_tree.open(note_index);
317
318                let note_record = NoteRecord {
319                    block_num,
320                    note_index,
321                    note_id: note.id().into(),
322                    metadata: *note.metadata(),
323                    details,
324                    inclusion_path,
325                };
326
327                Ok((note_record, nullifier))
328            })
329            .collect::<Result<Vec<_>, InvalidBlockError>>()?;
330
331        // Signals the transaction is ready to be committed, and the write lock can be acquired
332        let (allow_acquire, acquired_allowed) = oneshot::channel::<()>();
333        // Signals the write lock has been acquired, and the transaction can be committed
334        let (inform_acquire_done, acquire_done) = oneshot::channel::<()>();
335
336        // The DB and in-memory state updates need to be synchronized and are partially
337        // overlapping. Namely, the DB transaction only proceeds after this task acquires the
338        // in-memory write lock. This requires the DB update to run concurrently, so a new task is
339        // spawned.
340        let db = Arc::clone(&self.db);
341        let db_update_task =
342            tokio::spawn(
343                async move { db.apply_block(allow_acquire, acquire_done, block, notes).await },
344            );
345
346        // Wait for the message from the DB update task, that we ready to commit the DB transaction
347        acquired_allowed.await.map_err(ApplyBlockError::ClosedChannel)?;
348
349        // Awaiting the block saving task to complete without errors
350        block_save_task.await??;
351
352        // Scope to update the in-memory data
353        {
354            // We need to hold the write lock here to prevent inconsistency between the in-memory
355            // state and the DB state. Thus, we need to wait for the DB update task to complete
356            // successfully.
357            let mut inner = self.inner.write().await;
358
359            // We need to check that neither the nullifier tree nor the account tree have changed
360            // while we were waiting for the DB preparation task to complete. If either of them
361            // did change, we do not proceed with in-memory and database updates, since it may
362            // lead to an inconsistent state.
363            if inner.nullifier_tree.root() != nullifier_tree_old_root
364                || inner.account_tree.root() != account_tree_old_root
365            {
366                return Err(ApplyBlockError::ConcurrentWrite);
367            }
368
369            // Notify the DB update task that the write lock has been acquired, so it can commit
370            // the DB transaction
371            inform_acquire_done
372                .send(())
373                .map_err(|_| ApplyBlockError::DbUpdateTaskFailed("Receiver was dropped".into()))?;
374
375            // TODO: shutdown #91
376            // Await for successful commit of the DB transaction. If the commit fails, we mustn't
377            // change in-memory state, so we return a block applying error and don't proceed with
378            // in-memory updates.
379            db_update_task
380                .await?
381                .map_err(|err| ApplyBlockError::DbUpdateTaskFailed(err.as_report()))?;
382
383            // Update the in-memory data structures after successful commit of the DB transaction
384            inner
385                .nullifier_tree
386                .apply_mutations(nullifier_tree_update)
387                .expect("Unreachable: old nullifier tree root must be checked before this step");
388            inner
389                .account_tree
390                .apply_mutations(account_tree_update)
391                .expect("Unreachable: old account tree root must be checked before this step");
392            inner.blockchain.push(block_commitment);
393        }
394
395        info!(%block_commitment, block_num = block_num.as_u32(), COMPONENT, "apply_block successful");
396
397        Ok(())
398    }
399
400    /// Queries a [BlockHeader] from the database, and returns it alongside its inclusion proof.
401    ///
402    /// If [None] is given as the value of `block_num`, the data for the latest [BlockHeader] is
403    /// returned.
404    #[instrument(level = "debug", target = COMPONENT, skip_all, ret(level = "debug"), err)]
405    pub async fn get_block_header(
406        &self,
407        block_num: Option<BlockNumber>,
408        include_mmr_proof: bool,
409    ) -> Result<(Option<BlockHeader>, Option<MmrProof>), GetBlockHeaderError> {
410        let block_header = self.db.select_block_header_by_block_num(block_num).await?;
411        if let Some(header) = block_header {
412            let mmr_proof = if include_mmr_proof {
413                let inner = self.inner.read().await;
414                let mmr_proof = inner.blockchain.open(header.block_num())?;
415                Some(mmr_proof)
416            } else {
417                None
418            };
419            Ok((Some(header), mmr_proof))
420        } else {
421            Ok((None, None))
422        }
423    }
424
425    pub async fn check_nullifiers_by_prefix(
426        &self,
427        prefix_len: u32,
428        nullifier_prefixes: Vec<u32>,
429        block_num: BlockNumber,
430    ) -> Result<Vec<NullifierInfo>, DatabaseError> {
431        self.db
432            .select_nullifiers_by_prefix(prefix_len, nullifier_prefixes, block_num)
433            .await
434    }
435
436    /// Generates membership proofs for each one of the `nullifiers` against the latest nullifier
437    /// tree.
438    ///
439    /// Note: these proofs are invalidated once the nullifier tree is modified, i.e. on a new block.
440    #[instrument(level = "debug", target = COMPONENT, skip_all, ret)]
441    pub async fn check_nullifiers(&self, nullifiers: &[Nullifier]) -> Vec<SmtProof> {
442        let inner = self.inner.read().await;
443        nullifiers
444            .iter()
445            .map(|n| inner.nullifier_tree.open(n))
446            .map(NullifierWitness::into_proof)
447            .collect()
448    }
449
450    /// Queries a list of notes from the database.
451    ///
452    /// If the provided list of [`NoteId`] given is empty or no note matches the provided
453    /// [`NoteId`] an empty list is returned.
454    pub async fn get_notes_by_id(
455        &self,
456        note_ids: Vec<NoteId>,
457    ) -> Result<Vec<NoteRecord>, DatabaseError> {
458        self.db.select_notes_by_id(note_ids).await
459    }
460
461    /// If the input block number is the current chain tip, `None` is returned.
462    /// Otherwise, gets the current chain tip's block header with its corresponding MMR peaks.
463    pub async fn get_current_blockchain_data(
464        &self,
465        block_num: Option<BlockNumber>,
466    ) -> Result<Option<(BlockHeader, MmrPeaks)>, GetCurrentBlockchainDataError> {
467        let blockchain = &self.inner.read().await.blockchain;
468        if let Some(number) = block_num
469            && number == self.latest_block_num().await
470        {
471            return Ok(None);
472        }
473
474        // SAFETY: `select_block_header_by_block_num` will always return `Some(chain_tip_header)`
475        // when `None` is passed
476        let block_header: BlockHeader = self
477            .db
478            .select_block_header_by_block_num(None)
479            .await
480            .map_err(GetCurrentBlockchainDataError::ErrorRetrievingBlockHeader)?
481            .unwrap();
482        let peaks = blockchain
483            .peaks_at(block_header.block_num())
484            .map_err(GetCurrentBlockchainDataError::InvalidPeaks)?;
485
486        Ok(Some((block_header, peaks)))
487    }
488
489    /// Fetches the inputs for a transaction batch from the database.
490    ///
491    /// ## Inputs
492    ///
493    /// The function takes as input:
494    /// - The tx reference blocks are the set of blocks referenced by transactions in the batch.
495    /// - The unauthenticated note ids are the set of IDs of unauthenticated notes consumed by all
496    ///   transactions in the batch. For these notes, we attempt to find note inclusion proofs. Not
497    ///   all notes will exist in the DB necessarily, as some notes can be created and consumed
498    ///   within the same batch.
499    ///
500    /// ## Outputs
501    ///
502    /// The function will return:
503    /// - A block inclusion proof for all tx reference blocks and for all blocks which are
504    ///   referenced by a note inclusion proof.
505    /// - Note inclusion proofs for all notes that were found in the DB.
506    /// - The block header that the batch should reference, i.e. the latest known block.
507    pub async fn get_batch_inputs(
508        &self,
509        tx_reference_blocks: BTreeSet<BlockNumber>,
510        unauthenticated_note_ids: BTreeSet<NoteId>,
511    ) -> Result<BatchInputs, GetBatchInputsError> {
512        if tx_reference_blocks.is_empty() {
513            return Err(GetBatchInputsError::TransactionBlockReferencesEmpty);
514        }
515
516        // First we grab note inclusion proofs for the known notes. These proofs only
517        // prove that the note was included in a given block. We then also need to prove that
518        // each of those blocks is included in the chain.
519        let note_proofs = self
520            .db
521            .select_note_inclusion_proofs(unauthenticated_note_ids)
522            .await
523            .map_err(GetBatchInputsError::SelectNoteInclusionProofError)?;
524
525        // The set of blocks that the notes are included in.
526        let note_blocks = note_proofs.values().map(|proof| proof.location().block_num());
527
528        // Collect all blocks we need to query without duplicates, which is:
529        // - all blocks for which we need to prove note inclusion.
530        // - all blocks referenced by transactions in the batch.
531        let mut blocks: BTreeSet<BlockNumber> = tx_reference_blocks;
532        blocks.extend(note_blocks);
533
534        // Scoped block to automatically drop the read lock guard as soon as we're done.
535        // We also avoid accessing the db in the block as this would delay dropping the guard.
536        let (batch_reference_block, partial_mmr) = {
537            let inner_state = self.inner.read().await;
538
539            let latest_block_num = inner_state.latest_block_num();
540
541            let highest_block_num =
542                *blocks.last().expect("we should have checked for empty block references");
543            if highest_block_num > latest_block_num {
544                return Err(GetBatchInputsError::UnknownTransactionBlockReference {
545                    highest_block_num,
546                    latest_block_num,
547                });
548            }
549
550            // Remove the latest block from the to-be-tracked blocks as it will be the reference
551            // block for the batch itself and thus added to the MMR within the batch kernel, so
552            // there is no need to prove its inclusion.
553            blocks.remove(&latest_block_num);
554
555            // SAFETY:
556            // - The latest block num was retrieved from the inner blockchain from which we will
557            //   also retrieve the proofs, so it is guaranteed to exist in that chain.
558            // - We have checked that no block number in the blocks set is greater than latest block
559            //   number *and* latest block num was removed from the set. Therefore only block
560            //   numbers smaller than latest block num remain in the set. Therefore all the block
561            //   numbers are guaranteed to exist in the chain state at latest block num.
562            let partial_mmr = inner_state
563                .blockchain
564                .partial_mmr_from_blocks(&blocks, latest_block_num)
565                .expect("latest block num should exist and all blocks in set should be < than latest block");
566
567            (latest_block_num, partial_mmr)
568        };
569
570        // Fetch the reference block of the batch as part of this query, so we can avoid looking it
571        // up in a separate DB access.
572        let mut headers = self
573            .db
574            .select_block_headers(blocks.into_iter().chain(std::iter::once(batch_reference_block)))
575            .await
576            .map_err(GetBatchInputsError::SelectBlockHeaderError)?;
577
578        // Find and remove the batch reference block as we don't want to add it to the chain MMR.
579        let header_index = headers
580            .iter()
581            .enumerate()
582            .find_map(|(index, header)| {
583                (header.block_num() == batch_reference_block).then_some(index)
584            })
585            .expect("DB should have returned the header of the batch reference block");
586
587        // The order doesn't matter for PartialBlockchain::new, so swap remove is fine.
588        let batch_reference_block_header = headers.swap_remove(header_index);
589
590        // SAFETY: This should not error because:
591        // - we're passing exactly the block headers that we've added to the partial MMR,
592        // - so none of the block headers block numbers should exceed the chain length of the
593        //   partial MMR,
594        // - and we've added blocks to a BTreeSet, so there can be no duplicates.
595        //
596        // We construct headers and partial MMR in concert, so they are consistent. This is why we
597        // can call the unchecked constructor.
598        let partial_block_chain = PartialBlockchain::new_unchecked(partial_mmr, headers)
599            .expect("partial mmr and block headers should be consistent");
600
601        Ok(BatchInputs {
602            batch_reference_block_header,
603            note_proofs,
604            partial_block_chain,
605        })
606    }
607
608    /// Loads data to synchronize a client.
609    ///
610    /// The client's request contains a list of tag prefixes, this method will return the first
611    /// block with a matching tag, or the chain tip. All the other values are filter based on this
612    /// block range.
613    ///
614    /// # Arguments
615    ///
616    /// - `block_num`: The last block *known* by the client, updates start from the next block.
617    /// - `account_ids`: Include the account's commitment if their _last change_ was in the result's
618    ///   block range.
619    /// - `note_tags`: The tags the client is interested in, result is restricted to the first block
620    ///   with any matches tags.
621    #[instrument(level = "debug", target = COMPONENT, skip_all, ret(level = "debug"), err)]
622    pub async fn sync_state(
623        &self,
624        block_num: BlockNumber,
625        account_ids: Vec<AccountId>,
626        note_tags: Vec<u32>,
627    ) -> Result<(StateSyncUpdate, MmrDelta), StateSyncError> {
628        let inner = self.inner.read().await;
629
630        let state_sync = self.db.get_state_sync(block_num, account_ids, note_tags).await?;
631
632        let delta = if block_num == state_sync.block_header.block_num() {
633            // The client is in sync with the chain tip.
634            MmrDelta {
635                forest: Forest::new(block_num.as_usize()),
636                data: vec![],
637            }
638        } else {
639            // Important notes about the boundary conditions:
640            //
641            // - The Mmr forest is 1-indexed whereas the block number is 0-indexed. The Mmr root
642            // contained in the block header always lag behind by one block, this is because the Mmr
643            // leaves are hashes of block headers, and we can't have self-referential hashes. These
644            // two points cancel out and don't require adjusting.
645            // - Mmr::get_delta is inclusive, whereas the sync_state request block_num is defined to
646            //   be
647            // exclusive, so the from_forest has to be adjusted with a +1
648            let from_forest = (block_num + 1).as_usize();
649            let to_forest = state_sync.block_header.block_num().as_usize();
650            inner
651                .blockchain
652                .as_mmr()
653                .get_delta(Forest::new(from_forest), Forest::new(to_forest))
654                .map_err(StateSyncError::FailedToBuildMmrDelta)?
655        };
656
657        Ok((state_sync, delta))
658    }
659
660    /// Loads data to synchronize a client's notes.
661    ///
662    /// The client's request contains a list of tags, this method will return the first
663    /// block with a matching tag, or the chain tip. All the other values are filter based on this
664    /// block range.
665    ///
666    /// # Arguments
667    ///
668    /// - `block_num`: The last block *known* by the client, updates start from the next block.
669    /// - `note_tags`: The tags the client is interested in, resulting notes are restricted to the
670    ///   first block containing a matching note.
671    #[instrument(level = "debug", target = COMPONENT, skip_all, ret(level = "debug"), err)]
672    pub async fn sync_notes(
673        &self,
674        block_num: BlockNumber,
675        note_tags: Vec<u32>,
676    ) -> Result<(NoteSyncUpdate, MmrProof), NoteSyncError> {
677        let inner = self.inner.read().await;
678
679        let note_sync = self.db.get_note_sync(block_num, note_tags).await?;
680
681        let mmr_proof = inner.blockchain.open(note_sync.block_header.block_num())?;
682
683        Ok((note_sync, mmr_proof))
684    }
685
686    /// Returns data needed by the block producer to construct and prove the next block.
687    pub async fn get_block_inputs(
688        &self,
689        account_ids: Vec<AccountId>,
690        nullifiers: Vec<Nullifier>,
691        unauthenticated_notes: BTreeSet<NoteId>,
692        reference_blocks: BTreeSet<BlockNumber>,
693    ) -> Result<BlockInputs, GetBlockInputsError> {
694        // Get the note inclusion proofs from the DB.
695        // We do this first so we have to acquire the lock to the state just once. There we need the
696        // reference blocks of the note proofs to get their authentication paths in the chain MMR.
697        let unauthenticated_note_proofs = self
698            .db
699            .select_note_inclusion_proofs(unauthenticated_notes)
700            .await
701            .map_err(GetBlockInputsError::SelectNoteInclusionProofError)?;
702
703        // The set of blocks that the notes are included in.
704        let note_proof_reference_blocks =
705            unauthenticated_note_proofs.values().map(|proof| proof.location().block_num());
706
707        // Collect all blocks we need to prove inclusion for, without duplicates.
708        let mut blocks = reference_blocks;
709        blocks.extend(note_proof_reference_blocks);
710
711        let (latest_block_number, account_witnesses, nullifier_witnesses, partial_mmr) =
712            self.get_block_inputs_witnesses(&mut blocks, account_ids, nullifiers).await?;
713
714        // Fetch the block headers for all blocks in the partial MMR plus the latest one which will
715        // be used as the previous block header of the block being built.
716        let mut headers = self
717            .db
718            .select_block_headers(blocks.into_iter().chain(std::iter::once(latest_block_number)))
719            .await
720            .map_err(GetBlockInputsError::SelectBlockHeaderError)?;
721
722        // Find and remove the latest block as we must not add it to the chain MMR, since it is
723        // not yet in the chain.
724        let latest_block_header_index = headers
725            .iter()
726            .enumerate()
727            .find_map(|(index, header)| {
728                (header.block_num() == latest_block_number).then_some(index)
729            })
730            .expect("DB should have returned the header of the latest block header");
731
732        // The order doesn't matter for PartialBlockchain::new, so swap remove is fine.
733        let latest_block_header = headers.swap_remove(latest_block_header_index);
734
735        // SAFETY: This should not error because:
736        // - we're passing exactly the block headers that we've added to the partial MMR,
737        // - so none of the block header's block numbers should exceed the chain length of the
738        //   partial MMR,
739        // - and we've added blocks to a BTreeSet, so there can be no duplicates.
740        //
741        // We construct headers and partial MMR in concert, so they are consistent. This is why we
742        // can call the unchecked constructor.
743        let partial_block_chain = PartialBlockchain::new_unchecked(partial_mmr, headers)
744            .expect("partial mmr and block headers should be consistent");
745
746        Ok(BlockInputs::new(
747            latest_block_header,
748            partial_block_chain,
749            account_witnesses,
750            nullifier_witnesses,
751            unauthenticated_note_proofs,
752        ))
753    }
754
755    /// Get account and nullifier witnesses for the requested account IDs and nullifier as well as
756    /// the [`PartialMmr`] for the given blocks. The MMR won't contain the latest block and its
757    /// number is removed from `blocks` and returned separately.
758    ///
759    /// This method acquires the lock to the inner state and does not access the DB so we release
760    /// the lock asap.
761    async fn get_block_inputs_witnesses(
762        &self,
763        blocks: &mut BTreeSet<BlockNumber>,
764        account_ids: Vec<AccountId>,
765        nullifiers: Vec<Nullifier>,
766    ) -> Result<
767        (
768            BlockNumber,
769            BTreeMap<AccountId, AccountWitness>,
770            BTreeMap<Nullifier, NullifierWitness>,
771            PartialMmr,
772        ),
773        GetBlockInputsError,
774    > {
775        let inner = self.inner.read().await;
776
777        let latest_block_number = inner.latest_block_num();
778
779        // If `blocks` is empty, use the latest block number which will never trigger the error.
780        let highest_block_number = blocks.last().copied().unwrap_or(latest_block_number);
781        if highest_block_number > latest_block_number {
782            return Err(GetBlockInputsError::UnknownBatchBlockReference {
783                highest_block_number,
784                latest_block_number,
785            });
786        }
787
788        // The latest block is not yet in the chain MMR, so we can't (and don't need to) prove its
789        // inclusion in the chain.
790        blocks.remove(&latest_block_number);
791
792        // Fetch the partial MMR at the state of the latest block with authentication paths for the
793        // provided set of blocks.
794        //
795        // SAFETY:
796        // - The latest block num was retrieved from the inner blockchain from which we will also
797        //   retrieve the proofs, so it is guaranteed to exist in that chain.
798        // - We have checked that no block number in the blocks set is greater than latest block
799        //   number *and* latest block num was removed from the set. Therefore only block numbers
800        //   smaller than latest block num remain in the set. Therefore all the block numbers are
801        //   guaranteed to exist in the chain state at latest block num.
802        let partial_mmr =
803            inner.blockchain.partial_mmr_from_blocks(blocks, latest_block_number).expect(
804                "latest block num should exist and all blocks in set should be < than latest block",
805            );
806
807        // Fetch witnesses for all accounts.
808        let account_witnesses = account_ids
809            .iter()
810            .copied()
811            .map(|account_id| (account_id, inner.account_tree.open(account_id)))
812            .collect::<BTreeMap<AccountId, AccountWitness>>();
813
814        // Fetch witnesses for all nullifiers. We don't check whether the nullifiers are spent or
815        // not as this is done as part of proposing the block.
816        let nullifier_witnesses: BTreeMap<Nullifier, NullifierWitness> = nullifiers
817            .iter()
818            .copied()
819            .map(|nullifier| (nullifier, inner.nullifier_tree.open(&nullifier)))
820            .collect();
821
822        Ok((latest_block_number, account_witnesses, nullifier_witnesses, partial_mmr))
823    }
824
825    /// Returns data needed by the block producer to verify transactions validity.
826    #[instrument(target = COMPONENT, skip_all, ret)]
827    pub async fn get_transaction_inputs(
828        &self,
829        account_id: AccountId,
830        nullifiers: &[Nullifier],
831        unauthenticated_notes: Vec<NoteId>,
832    ) -> Result<TransactionInputs, DatabaseError> {
833        info!(target: COMPONENT, account_id = %account_id.to_string(), nullifiers = %format_array(nullifiers));
834
835        let inner = self.inner.read().await;
836
837        let account_commitment = inner.account_tree.get(account_id);
838
839        let new_account_id_prefix_is_unique = if account_commitment.is_empty() {
840            Some(!inner.account_tree.contains_account_id_prefix(account_id.prefix()))
841        } else {
842            None
843        };
844
845        // Non-unique account Id prefixes for new accounts are not allowed.
846        if let Some(false) = new_account_id_prefix_is_unique {
847            return Ok(TransactionInputs {
848                new_account_id_prefix_is_unique,
849                ..Default::default()
850            });
851        }
852
853        let nullifiers = nullifiers
854            .iter()
855            .map(|nullifier| NullifierInfo {
856                nullifier: *nullifier,
857                block_num: inner.nullifier_tree.get_block_num(nullifier).unwrap_or_default(),
858            })
859            .collect();
860
861        let found_unauthenticated_notes =
862            self.db.select_note_ids(unauthenticated_notes.clone()).await?;
863
864        Ok(TransactionInputs {
865            account_commitment,
866            nullifiers,
867            found_unauthenticated_notes,
868            new_account_id_prefix_is_unique,
869        })
870    }
871
872    /// Returns details for public (on-chain) account.
873    pub async fn get_account_details(&self, id: AccountId) -> Result<AccountInfo, DatabaseError> {
874        self.db.select_account(id).await
875    }
876
877    /// Returns details for public (on-chain) network accounts.
878    pub async fn get_network_account_details_by_prefix(
879        &self,
880        id_prefix: u32,
881    ) -> Result<Option<AccountInfo>, DatabaseError> {
882        self.db.select_network_account_by_prefix(id_prefix).await
883    }
884
885    /// Returns account proofs with optional account and storage headers.
886    pub async fn get_account_proofs(
887        &self,
888        account_requests: Vec<AccountProofRequest>,
889        known_code_commitments: BTreeSet<Word>,
890        include_headers: bool,
891    ) -> Result<(BlockNumber, Vec<proto::rpc_store::account_proofs::AccountProof>), DatabaseError>
892    {
893        // Lock inner state for the whole operation. We need to hold this lock to prevent the
894        // database, account tree and latest block number from changing during the operation,
895        // because changing one of them would lead to inconsistent state.
896        let inner_state = self.inner.read().await;
897
898        let account_ids: Vec<AccountId> =
899            account_requests.iter().map(|req| req.account_id).collect();
900
901        let state_headers = if include_headers.not() {
902            BTreeMap::<AccountId, proto::rpc_store::account_proofs::account_proof::AccountStateHeader>::default()
903        } else {
904            let infos = self.db.select_accounts_by_ids(account_ids.clone()).await?;
905            if account_ids.len() > infos.len() {
906                let found_ids = infos.iter().map(|info| info.summary.account_id).collect();
907                return Err(DatabaseError::AccountsNotFoundInDb(
908                    BTreeSet::from_iter(account_ids).difference(&found_ids).copied().collect(),
909                ));
910            }
911
912            let mut headers_map = BTreeMap::new();
913
914            // Iterate and build state headers for public accounts
915            for request in account_requests {
916                let account_info = infos
917                    .iter()
918                    .find(|info| info.summary.account_id == request.account_id)
919                    .expect("retrieved accounts were validated against request");
920
921                if let Some(details) = &account_info.details {
922                    let mut storage_slot_map_keys = Vec::new();
923
924                    for StorageMapKeysProof { storage_index, storage_keys } in
925                        &request.storage_requests
926                    {
927                        if let Some(StorageSlot::Map(storage_map)) =
928                            details.storage().slots().get(*storage_index as usize)
929                        {
930                            for map_key in storage_keys {
931                                let proof = storage_map.open(map_key);
932
933                                let slot_map_key = proto::rpc_store::account_proofs::account_proof::account_state_header::StorageSlotMapProof {
934                                    storage_slot: u32::from(*storage_index),
935                                    smt_proof: proof.to_bytes(),
936                                };
937                                storage_slot_map_keys.push(slot_map_key);
938                            }
939                        } else {
940                            return Err(AccountError::StorageSlotNotMap(*storage_index).into());
941                        }
942                    }
943
944                    // Only include unknown account codes
945                    let account_code = known_code_commitments
946                        .contains(&details.code().commitment())
947                        .not()
948                        .then(|| details.code().to_bytes());
949
950                    let state_header =
951                        proto::rpc_store::account_proofs::account_proof::AccountStateHeader {
952                            header: Some(AccountHeader::from(details).into()),
953                            storage_header: details.storage().to_header().to_bytes(),
954                            account_code,
955                            storage_maps: storage_slot_map_keys,
956                        };
957
958                    headers_map.insert(account_info.summary.account_id, state_header);
959                }
960            }
961
962            headers_map
963        };
964
965        let responses = account_ids
966            .into_iter()
967            .map(|account_id| {
968                let witness = inner_state.account_tree.open(account_id);
969                let state_header = state_headers.get(&account_id).cloned();
970
971                let witness_record = AccountWitnessRecord { account_id, witness };
972
973                proto::rpc_store::account_proofs::AccountProof {
974                    witness: Some(witness_record.into()),
975                    state_header,
976                }
977            })
978            .collect();
979
980        Ok((inner_state.latest_block_num(), responses))
981    }
982
983    /// Returns storage map values for syncing within a block range.
984    pub(crate) async fn get_storage_map_sync_values(
985        &self,
986        account_id: AccountId,
987        block_from: BlockNumber,
988        block_to: BlockNumber,
989    ) -> Result<StorageMapValuesPage, DatabaseError> {
990        self.db.select_storage_map_sync_values(account_id, block_from, block_to).await
991    }
992
993    /// Loads a block from the block store. Return `Ok(None)` if the block is not found.
994    pub async fn load_block(
995        &self,
996        block_num: BlockNumber,
997    ) -> Result<Option<Vec<u8>>, DatabaseError> {
998        if block_num > self.latest_block_num().await {
999            return Ok(None);
1000        }
1001        self.block_store.load_block(block_num).await.map_err(Into::into)
1002    }
1003
1004    /// Returns the latest block number.
1005    pub async fn latest_block_num(&self) -> BlockNumber {
1006        self.inner.read().await.latest_block_num()
1007    }
1008
1009    /// Runs database optimization.
1010    pub async fn optimize_db(&self) -> Result<(), DatabaseError> {
1011        self.db.optimize().await
1012    }
1013
1014    /// Returns account vault updates for specified account within a block range.
1015    pub async fn sync_account_vault(
1016        &self,
1017        account_id: AccountId,
1018        block_from: BlockNumber,
1019        block_to: BlockNumber,
1020    ) -> Result<(BlockNumber, Vec<AccountVaultValue>), DatabaseError> {
1021        self.db.get_account_vault_sync(account_id, block_from, block_to).await
1022    }
1023
1024    /// Returns the unprocessed network notes, along with the next pagination token.
1025    pub async fn get_unconsumed_network_notes(
1026        &self,
1027        page: Page,
1028    ) -> Result<(Vec<NoteRecord>, Page), DatabaseError> {
1029        self.db.select_unconsumed_network_notes(page).await
1030    }
1031
1032    /// Returns the network notes for an account that are unconsumed by a specified block number,
1033    /// along with the next pagination token.
1034    pub async fn get_unconsumed_network_notes_for_account(
1035        &self,
1036        network_account_id_prefix: NetworkAccountPrefix,
1037        block_num: BlockNumber,
1038        page: Page,
1039    ) -> Result<(Vec<NoteRecord>, Page), DatabaseError> {
1040        self.db
1041            .select_unconsumed_network_notes_for_account(network_account_id_prefix, block_num, page)
1042            .await
1043    }
1044}
1045
1046// UTILITIES
1047// ================================================================================================
1048
1049#[instrument(level = "info", target = COMPONENT, skip_all)]
1050async fn load_nullifier_tree(db: &mut Db) -> Result<NullifierTree, StateInitializationError> {
1051    let nullifiers = db.select_all_nullifiers().await?;
1052
1053    NullifierTree::with_entries(nullifiers.into_iter().map(|info| (info.nullifier, info.block_num)))
1054        .map_err(StateInitializationError::FailedToCreateNullifierTree)
1055}
1056
1057#[instrument(level = "info", target = COMPONENT, skip_all)]
1058async fn load_mmr(db: &mut Db) -> Result<Mmr, StateInitializationError> {
1059    let block_commitments: Vec<Word> = db
1060        .select_all_block_headers()
1061        .await?
1062        .iter()
1063        .map(BlockHeader::commitment)
1064        .collect();
1065
1066    Ok(block_commitments.into())
1067}
1068
1069#[instrument(level = "info", target = COMPONENT, skip_all)]
1070async fn load_account_tree(db: &mut Db) -> Result<AccountTree, StateInitializationError> {
1071    let account_data = db.select_all_account_commitments().await?.into_iter().collect::<Vec<_>>();
1072
1073    AccountTree::with_entries(account_data)
1074        .map_err(StateInitializationError::FailedToCreateAccountsTree)
1075}