1use std::collections::{BTreeMap, BTreeSet, HashSet};
7use std::num::NonZeroUsize;
8use std::path::{Path, PathBuf};
9use std::sync::Arc;
10
11use miden_node_proto::domain::batch::BatchInputs;
12use miden_node_utils::clap::StorageOptions;
13use miden_node_utils::formatting::format_array;
14use miden_protocol::Word;
15use miden_protocol::account::AccountId;
16use miden_protocol::block::account_tree::AccountWitness;
17use miden_protocol::block::nullifier_tree::{NullifierTree, NullifierWitness};
18use miden_protocol::block::{BlockHeader, BlockInputs, BlockNumber, Blockchain};
19use miden_protocol::crypto::merkle::mmr::{MmrProof, PartialMmr};
20use miden_protocol::crypto::merkle::smt::{LargeSmt, SmtStorage};
21use miden_protocol::note::{NoteId, NoteScript, Nullifier};
22use miden_protocol::transaction::PartialBlockchain;
23use tokio::sync::{Mutex, RwLock, watch};
24use tracing::{Instrument, Span, info, instrument};
25
26use crate::account_state_forest::{AccountStateForest, AccountStateForestBackend};
27use crate::accounts::AccountTreeWithHistory;
28use crate::blocks::BlockStore;
29use crate::db::{Db, NoteRecord, NullifierInfo};
30use crate::errors::{
31 DatabaseError,
32 GetBatchInputsError,
33 GetBlockHeaderError,
34 GetBlockInputsError,
35 StateInitializationError,
36};
37use crate::proven_tip::ProvenTipWriter;
38use crate::{COMPONENT, DataDirectory, DatabaseOptions};
39
40const BLOCK_CACHE_CAPACITY: NonZeroUsize = NonZeroUsize::new(512).unwrap();
42
43const PROOF_CACHE_CAPACITY: NonZeroUsize = NonZeroUsize::new(512).unwrap();
45
46mod loader;
47use loader::{
48 ACCOUNT_STATE_FOREST_STORAGE_DIR,
49 ACCOUNT_TREE_STORAGE_DIR,
50 AccountForestLoader,
51 NULLIFIER_TREE_STORAGE_DIR,
52 TreeStorage,
53 TreeStorageLoader,
54 load_mmr,
55 verify_account_state_forest_consistency,
56 verify_tree_consistency,
57};
58
59mod replica;
60pub use replica::{BlockCache, BlockNotification, ProofCache, ProofNotification};
61
62mod account;
63
64mod subscription;
65pub use subscription::{
66 BlockSubscriptionEvent,
67 BlockSubscriptionStream,
68 ProofSubscriptionEvent,
69 ProofSubscriptionStream,
70 StateSubscriptionError,
71};
72
73mod apply_block;
74mod apply_proof;
75mod bootstrap;
76mod disk_monitor;
77mod sync_state;
78
79#[derive(Debug, Clone, Copy)]
84pub enum Finality {
85 Committed,
87 Proven,
89}
90
91#[derive(Debug, Default)]
95pub struct TransactionInputs {
96 pub account_commitment: Word,
97 pub nullifiers: Vec<NullifierInfo>,
98 pub found_unauthenticated_notes: HashSet<Word>,
99 pub new_account_id_prefix_is_unique: Option<bool>,
100}
101
102type BlockInputWitnesses = (
103 BlockNumber,
104 BTreeMap<AccountId, AccountWitness>,
105 BTreeMap<Nullifier, NullifierWitness>,
106 PartialMmr,
107);
108
109struct InnerState<S>
111where
112 S: SmtStorage,
113{
114 nullifier_tree: NullifierTree<LargeSmt<S>>,
115 blockchain: Blockchain,
116 account_tree: AccountTreeWithHistory<S>,
117}
118
119impl<S: SmtStorage> InnerState<S> {
120 fn latest_block_num(&self) -> BlockNumber {
122 self.blockchain
123 .chain_tip()
124 .expect("chain should always have at least the genesis block")
125 }
126}
127
128pub struct State {
133 data_directory: PathBuf,
135
136 db: Arc<Db>,
139
140 block_store: Arc<BlockStore>,
142
143 inner: RwLock<InnerState<TreeStorage>>,
147
148 forest: RwLock<AccountStateForest<AccountStateForestBackend>>,
150
151 writer: Mutex<()>,
154
155 proven_tip: ProvenTipWriter,
157
158 committed_tip_tx: watch::Sender<BlockNumber>,
161
162 pub(crate) block_cache: BlockCache,
165
166 pub(crate) proof_cache: ProofCache,
169}
170
171impl State {
172 #[instrument(target = COMPONENT, skip_all)]
180 pub async fn load(
181 data_path: &Path,
182 storage_options: StorageOptions,
183 ) -> Result<Self, StateInitializationError> {
184 Self::load_with_database_options(data_path, storage_options, DatabaseOptions::default())
185 .await
186 }
187
188 #[instrument(target = COMPONENT, skip_all)]
193 pub async fn load_with_database_options(
194 data_path: &Path,
195 storage_options: StorageOptions,
196 database_options: DatabaseOptions,
197 ) -> Result<Self, StateInitializationError> {
198 let data_directory = DataDirectory::load(data_path.to_path_buf())
199 .map_err(StateInitializationError::DataDirectoryLoadError)?;
200
201 let block_store = Arc::new(
202 BlockStore::load(data_directory.block_store_dir())
203 .map_err(StateInitializationError::BlockStoreLoadError)?,
204 );
205
206 let database_filepath = data_directory.database_path();
207 let mut db = Db::load_with_pool_size(
208 database_filepath.clone(),
209 database_options.connection_pool_size,
210 )
211 .await
212 .map_err(StateInitializationError::DatabaseLoadError)?;
213
214 let blockchain = load_mmr(&mut db).await?;
215 let latest_block_num = blockchain.chain_tip().unwrap_or(BlockNumber::GENESIS);
216
217 #[cfg(feature = "rocksdb")]
218 let (account_storage_config, nullifier_storage_config, forest_storage_config) = (
219 storage_options.account_tree.into(),
220 storage_options.nullifier_tree.into(),
221 storage_options.account_state_forest.into(),
222 );
223 #[cfg(not(feature = "rocksdb"))]
224 let (account_storage_config, nullifier_storage_config, forest_storage_config) = {
225 let _ = &storage_options;
226 ((), (), ())
227 };
228 let account_storage =
229 TreeStorage::create(data_path, &account_storage_config, ACCOUNT_TREE_STORAGE_DIR)?;
230 let account_tree = account_storage.load_account_tree(&mut db).await?;
231
232 let nullifier_storage =
233 TreeStorage::create(data_path, &nullifier_storage_config, NULLIFIER_TREE_STORAGE_DIR)?;
234 let nullifier_tree = nullifier_storage.load_nullifier_tree(&mut db).await?;
235
236 verify_tree_consistency(account_tree.root(), nullifier_tree.root(), &mut db).await?;
240
241 let account_tree = AccountTreeWithHistory::new(account_tree, latest_block_num);
242
243 let forest_backend = AccountStateForestBackend::create(
244 data_path,
245 &forest_storage_config,
246 ACCOUNT_STATE_FOREST_STORAGE_DIR,
247 )?;
248 let forest = forest_backend.load_account_state_forest(&mut db, latest_block_num).await?;
249 verify_account_state_forest_consistency(&forest, &mut db).await?;
250
251 let inner = RwLock::new(InnerState { nullifier_tree, blockchain, account_tree });
252
253 let forest = RwLock::new(forest);
254 let writer = Mutex::new(());
255 let db = Arc::new(db);
256
257 let proven_tip_init = block_store
259 .load_proven_tip()
260 .map_err(StateInitializationError::ProvenTipLoadError)?;
261 let (proven_tip, _rx) = ProvenTipWriter::new(proven_tip_init);
262
263 let (committed_tip_tx, _rx) = watch::channel(latest_block_num);
265
266 Ok(Self {
267 data_directory: data_path.to_path_buf(),
268 db,
269 block_store,
270 inner,
271 forest,
272 writer,
273 proven_tip,
274 committed_tip_tx,
275 block_cache: BlockCache::new(BLOCK_CACHE_CAPACITY),
276 proof_cache: ProofCache::new(PROOF_CACHE_CAPACITY),
277 })
278 }
279
280 pub fn subscribe_committed_tip(&self) -> watch::Receiver<BlockNumber> {
282 self.committed_tip_tx.subscribe()
283 }
284
285 pub async fn load_proving_inputs(
287 &self,
288 block_num: BlockNumber,
289 ) -> std::io::Result<Option<Vec<u8>>> {
290 self.block_store.load_proving_inputs(block_num).await
291 }
292
293 pub(crate) fn subscribe_proven_tip(&self) -> watch::Receiver<BlockNumber> {
295 self.proven_tip.subscribe()
296 }
297
298 fn with_inner_read_blocking<R>(&self, f: impl FnOnce(&InnerState<TreeStorage>) -> R) -> R {
307 let span = Span::current();
308 tokio::task::block_in_place(|| {
309 span.in_scope(|| {
310 let inner = self.inner.blocking_read();
311 f(&inner)
312 })
313 })
314 }
315
316 fn with_inner_write_blocking<R>(&self, f: impl FnOnce(&mut InnerState<TreeStorage>) -> R) -> R {
320 let span = Span::current();
321 tokio::task::block_in_place(|| {
322 span.in_scope(|| {
323 let mut inner = self.inner.blocking_write();
324 f(&mut inner)
325 })
326 })
327 }
328
329 fn with_forest_read_blocking<R>(
335 &self,
336 f: impl FnOnce(&AccountStateForest<AccountStateForestBackend>) -> R,
337 ) -> R {
338 let span = Span::current();
339 tokio::task::block_in_place(|| {
340 span.in_scope(|| {
341 let forest = self.forest.blocking_read();
342 f(&forest)
343 })
344 })
345 }
346
347 fn with_forest_write_blocking<R>(
351 &self,
352 f: impl FnOnce(&mut AccountStateForest<AccountStateForestBackend>) -> R,
353 ) -> R {
354 let span = Span::current();
355 tokio::task::block_in_place(|| {
356 span.in_scope(|| {
357 let mut forest = self.forest.blocking_write();
358 f(&mut forest)
359 })
360 })
361 }
362
363 #[instrument(level = "debug", target = COMPONENT, skip_all, ret(level = "debug"), err)]
371 pub async fn get_block_header(
372 &self,
373 block_num: Option<BlockNumber>,
374 include_mmr_proof: bool,
375 ) -> Result<(Option<BlockHeader>, Option<MmrProof>), GetBlockHeaderError> {
376 let block_header = self.db.select_block_header_by_block_num(block_num).await?;
377 if let Some(header) = block_header {
378 let mmr_proof = if include_mmr_proof {
379 let inner = self.inner.read().await;
380 let mmr_proof = inner.blockchain.open(header.block_num())?;
381 Some(mmr_proof)
382 } else {
383 None
384 };
385 Ok((Some(header), mmr_proof))
386 } else {
387 Ok((None, None))
388 }
389 }
390
391 pub async fn get_notes_by_id(
396 &self,
397 note_ids: Vec<NoteId>,
398 ) -> Result<Vec<NoteRecord>, DatabaseError> {
399 self.db.select_notes_by_id(note_ids).await
400 }
401
402 pub async fn get_batch_inputs(
421 &self,
422 tx_reference_blocks: BTreeSet<BlockNumber>,
423 unauthenticated_note_commitments: BTreeSet<Word>,
424 ) -> Result<BatchInputs, GetBatchInputsError> {
425 if tx_reference_blocks.is_empty() {
426 return Err(GetBatchInputsError::TransactionBlockReferencesEmpty);
427 }
428
429 let note_proofs = self
433 .db
434 .select_note_inclusion_proofs(unauthenticated_note_commitments)
435 .await
436 .map_err(GetBatchInputsError::SelectNoteInclusionProofError)?;
437
438 let note_blocks = note_proofs.values().map(|proof| proof.location().block_num());
440
441 let mut blocks: BTreeSet<BlockNumber> = tx_reference_blocks;
445 blocks.extend(note_blocks);
446
447 let (batch_reference_block, partial_mmr) = {
450 let inner_state = self.inner.read().await;
451
452 let latest_block_num = inner_state.latest_block_num();
453
454 let highest_block_num =
455 *blocks.last().expect("we should have checked for empty block references");
456 if highest_block_num > latest_block_num {
457 return Err(GetBatchInputsError::UnknownTransactionBlockReference {
458 highest_block_num,
459 latest_block_num,
460 });
461 }
462
463 blocks.remove(&latest_block_num);
467
468 let partial_mmr = inner_state
476 .blockchain
477 .partial_mmr_from_blocks(&blocks, latest_block_num)
478 .expect("latest block num should exist and all blocks in set should be < than latest block");
479
480 (latest_block_num, partial_mmr)
481 };
482
483 let mut headers = self
486 .db
487 .select_block_headers(blocks.into_iter().chain(std::iter::once(batch_reference_block)))
488 .await
489 .map_err(GetBatchInputsError::SelectBlockHeaderError)?;
490
491 let header_index = headers
493 .iter()
494 .enumerate()
495 .find_map(|(index, header)| {
496 (header.block_num() == batch_reference_block).then_some(index)
497 })
498 .expect("DB should have returned the header of the batch reference block");
499
500 let batch_reference_block_header = headers.swap_remove(header_index);
502
503 let partial_block_chain = PartialBlockchain::new_unchecked(partial_mmr, headers)
512 .expect("partial mmr and block headers should be consistent");
513
514 Ok(BatchInputs {
515 batch_reference_block_header,
516 note_proofs,
517 partial_block_chain,
518 })
519 }
520
521 pub async fn get_block_inputs(
523 &self,
524 account_ids: Vec<AccountId>,
525 nullifiers: Vec<Nullifier>,
526 unauthenticated_note_commitments: BTreeSet<Word>,
527 reference_blocks: BTreeSet<BlockNumber>,
528 ) -> Result<BlockInputs, GetBlockInputsError> {
529 let unauthenticated_note_proofs = self
533 .db
534 .select_note_inclusion_proofs(unauthenticated_note_commitments)
535 .await
536 .map_err(GetBlockInputsError::SelectNoteInclusionProofError)?;
537
538 let note_proof_reference_blocks =
540 unauthenticated_note_proofs.values().map(|proof| proof.location().block_num());
541
542 let mut blocks = reference_blocks;
544 blocks.extend(note_proof_reference_blocks);
545
546 let (latest_block_number, account_witnesses, nullifier_witnesses, partial_mmr) =
547 self.get_block_inputs_witnesses(&mut blocks, &account_ids, &nullifiers)?;
548
549 let mut headers = self
552 .db
553 .select_block_headers(blocks.into_iter().chain(std::iter::once(latest_block_number)))
554 .await
555 .map_err(GetBlockInputsError::SelectBlockHeaderError)?;
556
557 let latest_block_header_index = headers
560 .iter()
561 .enumerate()
562 .find_map(|(index, header)| {
563 (header.block_num() == latest_block_number).then_some(index)
564 })
565 .expect("DB should have returned the header of the latest block header");
566
567 let latest_block_header = headers.swap_remove(latest_block_header_index);
569
570 let partial_block_chain = PartialBlockchain::new_unchecked(partial_mmr, headers)
579 .expect("partial mmr and block headers should be consistent");
580
581 Ok(BlockInputs::new(
582 latest_block_header,
583 partial_block_chain,
584 account_witnesses,
585 nullifier_witnesses,
586 unauthenticated_note_proofs,
587 ))
588 }
589
590 fn get_block_inputs_witnesses(
597 &self,
598 blocks: &mut BTreeSet<BlockNumber>,
599 account_ids: &[AccountId],
600 nullifiers: &[Nullifier],
601 ) -> Result<BlockInputWitnesses, GetBlockInputsError> {
602 self.with_inner_read_blocking(|inner| {
603 let latest_block_number = inner.latest_block_num();
604
605 let highest_block_number = blocks.last().copied().unwrap_or(latest_block_number);
607 if highest_block_number > latest_block_number {
608 return Err(GetBlockInputsError::UnknownBatchBlockReference {
609 highest_block_number,
610 latest_block_number,
611 });
612 }
613
614 blocks.remove(&latest_block_number);
617
618 let partial_mmr =
629 inner.blockchain.partial_mmr_from_blocks(blocks, latest_block_number).expect(
630 "latest block num should exist and all blocks in set should be < than latest block",
631 );
632
633 let account_witnesses = account_ids
635 .iter()
636 .copied()
637 .map(|account_id| (account_id, inner.account_tree.open_latest(account_id)))
638 .collect::<BTreeMap<AccountId, AccountWitness>>();
639
640 let nullifier_witnesses: BTreeMap<Nullifier, NullifierWitness> = nullifiers
643 .iter()
644 .copied()
645 .map(|nullifier| (nullifier, inner.nullifier_tree.open(&nullifier)))
646 .collect();
647
648 Ok((latest_block_number, account_witnesses, nullifier_witnesses, partial_mmr))
649 })
650 }
651
652 #[instrument(target = COMPONENT, skip_all, ret)]
654 pub async fn get_transaction_inputs(
655 &self,
656 account_id: AccountId,
657 nullifiers: &[Nullifier],
658 unauthenticated_note_commitments: Vec<Word>,
659 ) -> Result<TransactionInputs, DatabaseError> {
660 info!(target: COMPONENT, account_id = %account_id.to_string(), nullifiers = %format_array(nullifiers));
661
662 let tree_inputs = self.with_inner_read_blocking(|inner| {
663 let account_commitment = inner.account_tree.get_latest_commitment(account_id);
664
665 let new_account_id_prefix_is_unique = if account_commitment.is_empty() {
666 Some(!inner.account_tree.contains_account_id_prefix_in_latest(account_id.prefix()))
667 } else {
668 None
669 };
670
671 if let Some(false) = new_account_id_prefix_is_unique {
673 return Err(TransactionInputs {
674 new_account_id_prefix_is_unique,
675 ..Default::default()
676 });
677 }
678
679 let nullifiers = nullifiers
680 .iter()
681 .map(|nullifier| NullifierInfo {
682 nullifier: *nullifier,
683 block_num: inner.nullifier_tree.get_block_num(nullifier).unwrap_or_default(),
684 })
685 .collect();
686
687 Ok((account_commitment, nullifiers, new_account_id_prefix_is_unique))
688 });
689 let (account_commitment, nullifiers, new_account_id_prefix_is_unique) = match tree_inputs {
690 Ok(inputs) => inputs,
691 Err(inputs) => return Ok(inputs),
692 };
693
694 let found_unauthenticated_notes = self
695 .db
696 .select_existing_note_commitments(unauthenticated_note_commitments)
697 .await?;
698
699 Ok(TransactionInputs {
700 account_commitment,
701 nullifiers,
702 found_unauthenticated_notes,
703 new_account_id_prefix_is_unique,
704 })
705 }
706
707 pub async fn filter_network_accounts(
709 &self,
710 account_ids: &[AccountId],
711 ) -> Result<HashSet<AccountId>, DatabaseError> {
712 self.db.select_network_accounts_subset(account_ids.to_vec()).await
713 }
714
715 pub async fn chain_tip(&self, finality: Finality) -> BlockNumber {
721 match finality {
722 Finality::Committed => self
723 .inner
724 .read()
725 .instrument(tracing::info_span!("acquire_inner"))
726 .await
727 .latest_block_num(),
728 Finality::Proven => self.proven_tip.read(),
729 }
730 }
731
732 pub async fn load_block(
734 &self,
735 block_num: BlockNumber,
736 ) -> Result<Option<Vec<u8>>, DatabaseError> {
737 if block_num > self.chain_tip(Finality::Committed).await {
738 return Ok(None);
739 }
740 self.block_store.load_block(block_num).await.map_err(Into::into)
741 }
742
743 pub async fn load_proof(
745 &self,
746 block_num: BlockNumber,
747 ) -> Result<Option<Vec<u8>>, DatabaseError> {
748 if block_num > self.chain_tip(Finality::Proven).await {
749 return Ok(None);
750 }
751 self.block_store.load_proof(block_num).await.map_err(Into::into)
752 }
753
754 pub async fn get_note_script_by_root(
756 &self,
757 root: Word,
758 ) -> Result<Option<NoteScript>, DatabaseError> {
759 self.db.select_note_script_by_root(root).await
760 }
761}