miden_node_store/db/
mod.rs

1use std::{
2    collections::{BTreeMap, BTreeSet},
3    fs::{self, create_dir_all},
4    path::PathBuf,
5    sync::Arc,
6};
7
8use miden_node_proto::{
9    domain::account::{AccountInfo, AccountSummary},
10    generated::note as proto,
11};
12use miden_objects::{
13    account::{AccountDelta, AccountId},
14    block::{BlockHeader, BlockNoteIndex, BlockNumber, ProvenBlock},
15    crypto::{hash::rpo::RpoDigest, merkle::MerklePath, utils::Deserializable},
16    note::{NoteId, NoteInclusionProof, NoteMetadata, Nullifier},
17    transaction::TransactionId,
18    utils::Serializable,
19};
20use sql::utils::{column_value_as_u64, read_block_number};
21use tokio::sync::oneshot;
22use tracing::{info, info_span, instrument};
23
24use crate::{
25    COMPONENT,
26    blocks::BlockStore,
27    db::{
28        migrations::apply_migrations,
29        pool_manager::{Pool, SqlitePoolManager},
30    },
31    errors::{DatabaseError, DatabaseSetupError, GenesisError, NoteSyncError, StateSyncError},
32    genesis::GenesisState,
33};
34
35mod migrations;
36#[macro_use]
37mod sql;
38
39mod connection;
40mod pool_manager;
41#[cfg(test)]
42mod query_plan;
43mod settings;
44#[cfg(test)]
45mod tests;
46mod transaction;
47
48pub type Result<T, E = DatabaseError> = std::result::Result<T, E>;
49
50pub struct Db {
51    pool: Pool,
52}
53
54#[derive(Debug, PartialEq)]
55pub struct NullifierInfo {
56    pub nullifier: Nullifier,
57    pub block_num: BlockNumber,
58}
59
60#[derive(Debug, PartialEq)]
61pub struct TransactionSummary {
62    pub account_id: AccountId,
63    pub block_num: BlockNumber,
64    pub transaction_id: TransactionId,
65}
66
67#[derive(Debug, Clone, PartialEq)]
68pub struct NoteRecord {
69    pub block_num: BlockNumber,
70    pub note_index: BlockNoteIndex,
71    pub note_id: RpoDigest,
72    pub metadata: NoteMetadata,
73    pub details: Option<Vec<u8>>,
74    pub merkle_path: MerklePath,
75}
76
77impl NoteRecord {
78    /// Columns from the `notes` table ordered to match [`Self::from_row`].
79    const SELECT_COLUMNS: &'static str = "
80            block_num,
81            batch_index,
82            note_index,
83            note_id,
84            note_type,
85            sender,
86            tag,
87            aux,
88            execution_hint,
89            merkle_path,
90            details
91    ";
92
93    /// Parses a row from the `notes` table. The sql selection must use [`Self::SELECT_COLUMNS`] to
94    /// ensure ordering is correct.
95    fn from_row(row: &rusqlite::Row<'_>) -> Result<Self> {
96        let block_num = read_block_number(row, 0)?;
97        let batch_idx = row.get(1)?;
98        let note_idx_in_batch = row.get(2)?;
99        // SAFETY: We can assume the batch and note indices stored in the DB are valid so this
100        // should never panic.
101        let note_index = BlockNoteIndex::new(batch_idx, note_idx_in_batch)
102            .expect("batch and note index from DB should be valid");
103        let note_id = row.get_ref(3)?.as_blob()?;
104        let note_id = RpoDigest::read_from_bytes(note_id)?;
105        let note_type = row.get::<_, u8>(4)?.try_into()?;
106        let sender = AccountId::read_from_bytes(row.get_ref(5)?.as_blob()?)?;
107        let tag: u32 = row.get(6)?;
108        let aux: u64 = row.get(7)?;
109        let aux = aux.try_into().map_err(DatabaseError::InvalidFelt)?;
110        let execution_hint = column_value_as_u64(row, 8)?;
111        let merkle_path_data = row.get_ref(9)?.as_blob()?;
112        let merkle_path = MerklePath::read_from_bytes(merkle_path_data)?;
113        let details_data = row.get_ref(10)?.as_blob_or_null()?;
114        let details = details_data.map(<Vec<u8>>::read_from_bytes).transpose()?;
115
116        let metadata =
117            NoteMetadata::new(sender, note_type, tag.into(), execution_hint.try_into()?, aux)?;
118
119        Ok(NoteRecord {
120            block_num,
121            note_index,
122            note_id,
123            metadata,
124            details,
125            merkle_path,
126        })
127    }
128}
129
130impl From<NoteRecord> for proto::Note {
131    fn from(note: NoteRecord) -> Self {
132        Self {
133            block_num: note.block_num.as_u32(),
134            note_index: note.note_index.leaf_index_value().into(),
135            note_id: Some(note.note_id.into()),
136            metadata: Some(note.metadata.into()),
137            merkle_path: Some(Into::into(&note.merkle_path)),
138            details: note.details,
139        }
140    }
141}
142
143#[derive(Debug, PartialEq)]
144pub struct StateSyncUpdate {
145    pub notes: Vec<NoteSyncRecord>,
146    pub block_header: BlockHeader,
147    pub account_updates: Vec<AccountSummary>,
148    pub transactions: Vec<TransactionSummary>,
149}
150
151#[derive(Debug, PartialEq)]
152pub struct NoteSyncUpdate {
153    pub notes: Vec<NoteSyncRecord>,
154    pub block_header: BlockHeader,
155}
156
157#[derive(Debug, Clone, PartialEq)]
158pub struct NoteSyncRecord {
159    pub block_num: BlockNumber,
160    pub note_index: BlockNoteIndex,
161    pub note_id: RpoDigest,
162    pub metadata: NoteMetadata,
163    pub merkle_path: MerklePath,
164}
165
166impl From<NoteSyncRecord> for proto::NoteSyncRecord {
167    fn from(note: NoteSyncRecord) -> Self {
168        Self {
169            note_index: note.note_index.leaf_index_value().into(),
170            note_id: Some(note.note_id.into()),
171            metadata: Some(note.metadata.into()),
172            merkle_path: Some(Into::into(&note.merkle_path)),
173        }
174    }
175}
176
177impl From<NoteRecord> for NoteSyncRecord {
178    fn from(note: NoteRecord) -> Self {
179        Self {
180            block_num: note.block_num,
181            note_index: note.note_index,
182            note_id: note.note_id,
183            metadata: note.metadata,
184            merkle_path: note.merkle_path,
185        }
186    }
187}
188
189impl Db {
190    /// Open a connection to the DB, apply any pending migrations, and ensure that the genesis block
191    /// is as expected and present in the database.
192    // TODO: This span is logged in a root span, we should connect it to the parent one.
193    #[instrument(target = COMPONENT, skip_all)]
194    pub async fn setup(
195        database_filepath: PathBuf,
196        genesis_filepath: &str,
197        block_store: Arc<BlockStore>,
198    ) -> Result<Self, DatabaseSetupError> {
199        info!(target: COMPONENT, ?database_filepath, "Connecting to the database");
200
201        if let Some(p) = database_filepath.parent() {
202            create_dir_all(p).map_err(DatabaseError::IoError)?;
203        }
204
205        let sqlite_pool_manager = SqlitePoolManager::new(database_filepath.clone());
206        let pool = Pool::builder(sqlite_pool_manager).build()?;
207
208        info!(
209            target: COMPONENT,
210            sqlite= %database_filepath.display(),
211            "Connected to the database"
212        );
213
214        let conn = pool.get().await.map_err(DatabaseError::MissingDbConnection)?;
215
216        conn.interact(apply_migrations).await.map_err(|err| {
217            DatabaseError::InteractError(format!("Migration task failed: {err}"))
218        })??;
219
220        let db = Db { pool };
221        db.ensure_genesis_block(genesis_filepath, block_store).await?;
222
223        Ok(db)
224    }
225
226    /// Loads all the nullifiers from the DB.
227    #[instrument(target = COMPONENT, skip_all, ret(level = "debug"), err)]
228    pub async fn select_all_nullifiers(&self) -> Result<Vec<(Nullifier, BlockNumber)>> {
229        self.pool
230            .get()
231            .await?
232            .interact(|conn| {
233                let transaction = conn.transaction()?;
234                sql::select_all_nullifiers(&transaction)
235            })
236            .await
237            .map_err(|err| {
238                DatabaseError::InteractError(format!("Select nullifiers task failed: {err}"))
239            })?
240    }
241
242    /// Loads the nullifiers that match the prefixes from the DB.
243    #[instrument(target = COMPONENT, skip_all, ret(level = "debug"), err)]
244    pub async fn select_nullifiers_by_prefix(
245        &self,
246        prefix_len: u32,
247        nullifier_prefixes: Vec<u32>,
248        block_num: BlockNumber,
249    ) -> Result<Vec<NullifierInfo>> {
250        self.pool
251            .get()
252            .await?
253            .interact(move |conn| {
254                let transaction = conn.transaction()?;
255                sql::select_nullifiers_by_prefix(
256                    &transaction,
257                    prefix_len,
258                    &nullifier_prefixes,
259                    block_num,
260                )
261            })
262            .await
263            .map_err(|err| {
264                DatabaseError::InteractError(format!(
265                    "Select nullifiers by prefix task failed: {err}"
266                ))
267            })?
268    }
269
270    /// Search for a [BlockHeader] from the database by its `block_num`.
271    ///
272    /// When `block_number` is [None], the latest block header is returned.
273    #[instrument(target = COMPONENT, skip_all, ret(level = "debug"), err)]
274    pub async fn select_block_header_by_block_num(
275        &self,
276        block_number: Option<BlockNumber>,
277    ) -> Result<Option<BlockHeader>> {
278        self.pool
279            .get()
280            .await?
281            .interact(move |conn| {
282                let transaction = conn.transaction()?;
283                sql::select_block_header_by_block_num(&transaction, block_number)
284            })
285            .await
286            .map_err(|err| {
287                DatabaseError::InteractError(format!("Select block header task failed: {err}"))
288            })?
289    }
290
291    /// Loads multiple block headers from the DB.
292    #[instrument(target = COMPONENT, skip_all, ret(level = "debug"), err)]
293    pub async fn select_block_headers(
294        &self,
295        blocks: impl Iterator<Item = BlockNumber> + Send + 'static,
296    ) -> Result<Vec<BlockHeader>> {
297        self.pool
298            .get()
299            .await?
300            .interact(move |conn| {
301                let transaction = conn.transaction()?;
302                sql::select_block_headers(&transaction, blocks)
303            })
304            .await
305            .map_err(|err| {
306                DatabaseError::InteractError(format!(
307                    "Select many block headers task failed: {err}"
308                ))
309            })?
310    }
311
312    /// Loads all the block headers from the DB.
313    #[instrument(target = COMPONENT, skip_all, ret(level = "debug"), err)]
314    pub async fn select_all_block_headers(&self) -> Result<Vec<BlockHeader>> {
315        self.pool
316            .get()
317            .await?
318            .interact(|conn| {
319                let transaction = conn.transaction()?;
320                sql::select_all_block_headers(&transaction)
321            })
322            .await
323            .map_err(|err| {
324                DatabaseError::InteractError(format!("Select block headers task failed: {err}"))
325            })?
326    }
327
328    /// Loads all the account commitments from the DB.
329    #[instrument(target = COMPONENT, skip_all, ret(level = "debug"), err)]
330    pub async fn select_all_account_commitments(&self) -> Result<Vec<(AccountId, RpoDigest)>> {
331        self.pool
332            .get()
333            .await?
334            .interact(|conn| {
335                let transaction = conn.transaction()?;
336                sql::select_all_account_commitments(&transaction)
337            })
338            .await
339            .map_err(|err| {
340                DatabaseError::InteractError(format!(
341                    "Select account commitments task failed: {err}"
342                ))
343            })?
344    }
345
346    /// Loads public account details from the DB.
347    #[instrument(target = COMPONENT, skip_all, ret(level = "debug"), err)]
348    pub async fn select_account(&self, id: AccountId) -> Result<AccountInfo> {
349        self.pool
350            .get()
351            .await?
352            .interact(move |conn| {
353                let transaction = conn.transaction()?;
354                sql::select_account(&transaction, id)
355            })
356            .await
357            .map_err(|err| {
358                DatabaseError::InteractError(format!("Get account details task failed: {err}"))
359            })?
360    }
361
362    /// Loads public accounts details from the DB.
363    #[instrument(target = COMPONENT, skip_all, ret(level = "debug"), err)]
364    pub async fn select_accounts_by_ids(
365        &self,
366        account_ids: Vec<AccountId>,
367    ) -> Result<Vec<AccountInfo>> {
368        self.pool
369            .get()
370            .await?
371            .interact(move |conn| {
372                let transaction = conn.transaction()?;
373                sql::select_accounts_by_ids(&transaction, &account_ids)
374            })
375            .await
376            .map_err(|err| {
377                DatabaseError::InteractError(format!("Get accounts details task failed: {err}"))
378            })?
379    }
380
381    #[instrument(target = COMPONENT, skip_all, ret(level = "debug"), err)]
382    pub async fn get_state_sync(
383        &self,
384        block_num: BlockNumber,
385        account_ids: Vec<AccountId>,
386        note_tags: Vec<u32>,
387    ) -> Result<StateSyncUpdate, StateSyncError> {
388        self.pool
389            .get()
390            .await
391            .map_err(DatabaseError::MissingDbConnection)?
392            .interact(move |conn| {
393                let transaction = conn.transaction().map_err(DatabaseError::SqliteError)?;
394                sql::get_state_sync(&transaction, block_num, &account_ids, &note_tags)
395            })
396            .await
397            .map_err(|err| {
398                DatabaseError::InteractError(format!("Get state sync task failed: {err}"))
399            })?
400    }
401
402    #[instrument(target = COMPONENT, skip_all, ret(level = "debug"), err)]
403    pub async fn get_note_sync(
404        &self,
405        block_num: BlockNumber,
406        note_tags: Vec<u32>,
407    ) -> Result<NoteSyncUpdate, NoteSyncError> {
408        self.pool
409            .get()
410            .await
411            .map_err(DatabaseError::MissingDbConnection)?
412            .interact(move |conn| {
413                let transaction = conn.transaction().map_err(DatabaseError::SqliteError)?;
414                sql::get_note_sync(&transaction, block_num, &note_tags)
415            })
416            .await
417            .map_err(|err| {
418                DatabaseError::InteractError(format!("Get notes sync task failed: {err}"))
419            })?
420    }
421
422    /// Loads all the Note's matching a certain NoteId from the database.
423    #[instrument(target = COMPONENT, skip_all, ret(level = "debug"), err)]
424    pub async fn select_notes_by_id(&self, note_ids: Vec<NoteId>) -> Result<Vec<NoteRecord>> {
425        self.pool
426            .get()
427            .await?
428            .interact(move |conn| {
429                let transaction = conn.transaction()?;
430                sql::select_notes_by_id(&transaction, &note_ids)
431            })
432            .await
433            .map_err(|err| {
434                DatabaseError::InteractError(format!("Select note by id task failed: {err}"))
435            })?
436    }
437
438    /// Loads inclusion proofs for notes matching the given IDs.
439    #[instrument(target = COMPONENT, skip_all, ret(level = "debug"), err)]
440    pub async fn select_note_inclusion_proofs(
441        &self,
442        note_ids: BTreeSet<NoteId>,
443    ) -> Result<BTreeMap<NoteId, NoteInclusionProof>> {
444        self.pool
445            .get()
446            .await?
447            .interact(move |conn| {
448                let transaction = conn.transaction()?;
449                sql::select_note_inclusion_proofs(&transaction, note_ids)
450            })
451            .await
452            .map_err(|err| {
453                DatabaseError::InteractError(format!(
454                    "Select block note inclusion proofs task failed: {err}"
455                ))
456            })?
457    }
458
459    /// Loads all note IDs matching a certain NoteId from the database.
460    #[instrument(target = COMPONENT, skip_all, ret(level = "debug"), err)]
461    pub async fn select_note_ids(&self, note_ids: Vec<NoteId>) -> Result<BTreeSet<NoteId>> {
462        self.select_notes_by_id(note_ids)
463            .await
464            .map(|notes| notes.into_iter().map(|note| note.note_id.into()).collect())
465    }
466
467    /// Inserts the data of a new block into the DB.
468    ///
469    /// `allow_acquire` and `acquire_done` are used to synchronize writes to the DB with writes to
470    /// the in-memory trees. Further details available on [super::state::State::apply_block].
471    // TODO: This span is logged in a root span, we should connect it to the parent one.
472    #[instrument(target = COMPONENT, skip_all, err)]
473    pub async fn apply_block(
474        &self,
475        allow_acquire: oneshot::Sender<()>,
476        acquire_done: oneshot::Receiver<()>,
477        block: ProvenBlock,
478        notes: Vec<(NoteRecord, Option<Nullifier>)>,
479    ) -> Result<()> {
480        self.pool
481            .get()
482            .await?
483            .interact(move |conn| -> Result<()> {
484                // TODO: This span is logged in a root span, we should connect it to the parent one.
485                let _span = info_span!(target: COMPONENT, "write_block_to_db").entered();
486
487                let transaction = conn.transaction()?;
488                sql::apply_block(
489                    &transaction,
490                    block.header(),
491                    &notes,
492                    block.created_nullifiers(),
493                    block.updated_accounts(),
494                )?;
495
496                let _ = allow_acquire.send(());
497                acquire_done.blocking_recv()?;
498
499                transaction.commit()?;
500
501                Ok(())
502            })
503            .await
504            .map_err(|err| {
505                DatabaseError::InteractError(format!("Apply block task failed: {err}"))
506            })??;
507
508        Ok(())
509    }
510
511    /// Merges all account deltas from the DB for given account ID and block range.
512    /// Note, that `from_block` is exclusive and `to_block` is inclusive.
513    ///
514    /// Returns `Ok(None)` if no deltas were found in the DB for the specified account within
515    /// the given block range.
516    pub(crate) async fn select_account_state_delta(
517        &self,
518        account_id: AccountId,
519        from_block: BlockNumber,
520        to_block: BlockNumber,
521    ) -> Result<Option<AccountDelta>> {
522        self.pool
523            .get()
524            .await
525            .map_err(DatabaseError::MissingDbConnection)?
526            .interact(move |conn| {
527                let transaction = conn.transaction()?;
528                sql::select_account_delta(&transaction, account_id, from_block, to_block)
529            })
530            .await
531            .map_err(|err| DatabaseError::InteractError(err.to_string()))?
532    }
533
534    /// Runs database optimization.
535    #[instrument(target = COMPONENT, skip_all, err)]
536    pub async fn optimize(&self) -> Result<(), DatabaseError> {
537        self.pool
538            .get()
539            .await?
540            .interact(move |conn| -> Result<()> {
541                conn.execute("PRAGMA optimize;", ())
542                    .map(|_| ())
543                    .map_err(DatabaseError::SqliteError)
544            })
545            .await
546            .map_err(|err| {
547                DatabaseError::InteractError(format!("Database optimization task failed: {err}"))
548            })?
549    }
550
551    // HELPERS
552    // ---------------------------------------------------------------------------------------------
553
554    /// If the database is empty, generates and stores the genesis block. Otherwise, it ensures that
555    /// the genesis block in the database is consistent with the genesis block data in the
556    /// genesis JSON file.
557    #[instrument(target = COMPONENT, skip_all, err)]
558    async fn ensure_genesis_block(
559        &self,
560        genesis_filepath: &str,
561        block_store: Arc<BlockStore>,
562    ) -> Result<(), GenesisError> {
563        let genesis_block = {
564            let file_contents = fs::read(genesis_filepath).map_err(|source| {
565                GenesisError::FailedToReadGenesisFile {
566                    genesis_filepath: genesis_filepath.to_string(),
567                    source,
568                }
569            })?;
570
571            let genesis_state = GenesisState::read_from_bytes(&file_contents)
572                .map_err(GenesisError::GenesisFileDeserializationError)?;
573
574            genesis_state.into_block()?
575        };
576
577        let maybe_block_header_in_store = self
578            .select_block_header_by_block_num(Some(BlockNumber::GENESIS))
579            .await
580            .map_err(|err| GenesisError::SelectBlockHeaderByBlockNumError(err.into()))?;
581
582        let expected_genesis_header = genesis_block.header().clone();
583
584        match maybe_block_header_in_store {
585            Some(block_header_in_store) => {
586                // ensure that expected header is what's also in the store
587                if expected_genesis_header != block_header_in_store {
588                    Err(GenesisError::GenesisBlockHeaderMismatch {
589                        expected_genesis_header: Box::new(expected_genesis_header),
590                        block_header_in_store: Box::new(block_header_in_store),
591                    })?;
592                }
593            },
594            None => {
595                // add genesis header to store
596                self.pool
597                    .get()
598                    .await
599                    .map_err(DatabaseError::MissingDbConnection)?
600                    .interact(move |conn| -> Result<()> {
601                        // TODO: This span is logged in a root span, we should connect it to the
602                        // parent one.
603                        let span = info_span!(target: COMPONENT, "write_genesis_block_to_db");
604                        let guard = span.enter();
605
606                        let transaction = conn.transaction()?;
607                        sql::apply_block(
608                            &transaction,
609                            &expected_genesis_header,
610                            &[],
611                            &[],
612                            genesis_block.updated_accounts(),
613                        )?;
614
615                        block_store.save_block_blocking(0.into(), &genesis_block.to_bytes())?;
616
617                        transaction.commit()?;
618
619                        drop(guard);
620                        Ok(())
621                    })
622                    .await
623                    .map_err(GenesisError::ApplyBlockFailed)??;
624            },
625        }
626
627        Ok(())
628    }
629}