miden_node_store/db/
mod.rs

1use std::{
2    collections::{BTreeMap, BTreeSet},
3    fs::{self, create_dir_all},
4    sync::Arc,
5};
6
7use deadpool_sqlite::{Config as SqliteConfig, Hook, HookError, Pool, Runtime};
8use miden_node_proto::{
9    domain::account::{AccountInfo, AccountSummary},
10    generated::note as proto,
11};
12use miden_objects::{
13    account::{AccountDelta, AccountId},
14    block::{Block, BlockHeader, BlockNoteIndex, BlockNumber},
15    crypto::{hash::rpo::RpoDigest, merkle::MerklePath, utils::Deserializable},
16    note::{NoteId, NoteInclusionProof, NoteMetadata, Nullifier},
17    transaction::TransactionId,
18    utils::Serializable,
19};
20use rusqlite::vtab::array;
21use tokio::sync::oneshot;
22use tracing::{info, info_span, instrument};
23
24use crate::{
25    blocks::BlockStore,
26    config::StoreConfig,
27    db::migrations::apply_migrations,
28    errors::{DatabaseError, DatabaseSetupError, GenesisError, NoteSyncError, StateSyncError},
29    genesis::GenesisState,
30    COMPONENT, SQL_STATEMENT_CACHE_CAPACITY,
31};
32
33mod migrations;
34mod sql;
35
36mod settings;
37#[cfg(test)]
38mod tests;
39
40pub type Result<T, E = DatabaseError> = std::result::Result<T, E>;
41
42pub struct Db {
43    pool: Pool,
44}
45
46#[derive(Debug, PartialEq)]
47pub struct NullifierInfo {
48    pub nullifier: Nullifier,
49    pub block_num: BlockNumber,
50}
51
52#[derive(Debug, PartialEq)]
53pub struct TransactionSummary {
54    pub account_id: AccountId,
55    pub block_num: BlockNumber,
56    pub transaction_id: TransactionId,
57}
58
59#[derive(Debug, Clone, PartialEq)]
60pub struct NoteRecord {
61    pub block_num: BlockNumber,
62    pub note_index: BlockNoteIndex,
63    pub note_id: RpoDigest,
64    pub metadata: NoteMetadata,
65    pub details: Option<Vec<u8>>,
66    pub merkle_path: MerklePath,
67}
68
69impl From<NoteRecord> for proto::Note {
70    fn from(note: NoteRecord) -> Self {
71        Self {
72            block_num: note.block_num.as_u32(),
73            note_index: note.note_index.leaf_index_value().into(),
74            note_id: Some(note.note_id.into()),
75            metadata: Some(note.metadata.into()),
76            merkle_path: Some(Into::into(&note.merkle_path)),
77            details: note.details,
78        }
79    }
80}
81
82#[derive(Debug, PartialEq)]
83pub struct StateSyncUpdate {
84    pub notes: Vec<NoteSyncRecord>,
85    pub block_header: BlockHeader,
86    pub account_updates: Vec<AccountSummary>,
87    pub transactions: Vec<TransactionSummary>,
88    pub nullifiers: Vec<NullifierInfo>,
89}
90
91#[derive(Debug, PartialEq)]
92pub struct NoteSyncUpdate {
93    pub notes: Vec<NoteSyncRecord>,
94    pub block_header: BlockHeader,
95}
96
97#[derive(Debug, Clone, PartialEq)]
98pub struct NoteSyncRecord {
99    pub block_num: BlockNumber,
100    pub note_index: BlockNoteIndex,
101    pub note_id: RpoDigest,
102    pub metadata: NoteMetadata,
103    pub merkle_path: MerklePath,
104}
105
106impl From<NoteSyncRecord> for proto::NoteSyncRecord {
107    fn from(note: NoteSyncRecord) -> Self {
108        Self {
109            note_index: note.note_index.leaf_index_value().into(),
110            note_id: Some(note.note_id.into()),
111            metadata: Some(note.metadata.into()),
112            merkle_path: Some(Into::into(&note.merkle_path)),
113        }
114    }
115}
116
117impl From<NoteRecord> for NoteSyncRecord {
118    fn from(note: NoteRecord) -> Self {
119        Self {
120            block_num: note.block_num,
121            note_index: note.note_index,
122            note_id: note.note_id,
123            metadata: note.metadata,
124            merkle_path: note.merkle_path,
125        }
126    }
127}
128
129impl Db {
130    /// Open a connection to the DB, apply any pending migrations, and ensure that the genesis block
131    /// is as expected and present in the database.
132    // TODO: This span is logged in a root span, we should connect it to the parent one.
133    #[instrument(target = COMPONENT, skip_all)]
134    pub async fn setup(
135        config: StoreConfig,
136        block_store: Arc<BlockStore>,
137    ) -> Result<Self, DatabaseSetupError> {
138        info!(target: COMPONENT, %config, "Connecting to the database");
139
140        if let Some(p) = config.database_filepath.parent() {
141            create_dir_all(p).map_err(DatabaseError::IoError)?;
142        }
143
144        let pool = SqliteConfig::new(config.database_filepath.clone())
145            .builder(Runtime::Tokio1)
146            .expect("Infallible")
147            .post_create(Hook::async_fn(move |conn, _| {
148                Box::pin(async move {
149                    let _ = conn
150                        .interact(|conn| {
151                            // Feature used to support `IN` and `NOT IN` queries. We need to load
152                            // this module for every connection we create to the DB to support the
153                            // queries we want to run
154                            array::load_module(conn)?;
155
156                            // Increase the statement cache size.
157                            conn.set_prepared_statement_cache_capacity(
158                                SQL_STATEMENT_CACHE_CAPACITY,
159                            );
160
161                            // Enable the WAL mode. This allows concurrent reads while the
162                            // transaction is being written, this is required for proper
163                            // synchronization of the servers in-memory and on-disk representations
164                            // (see [State::apply_block])
165                            conn.execute("PRAGMA journal_mode = WAL;", ())?;
166
167                            // Enable foreign key checks.
168                            conn.execute("PRAGMA foreign_keys = ON;", ())
169                        })
170                        .await
171                        .map_err(|e| {
172                            HookError::Message(format!("Loading carray module failed: {e}").into())
173                        })?;
174
175                    Ok(())
176                })
177            }))
178            .build()?;
179
180        info!(
181            target: COMPONENT,
182            sqlite = format!("{}", config.database_filepath.display()),
183            "Connected to the database"
184        );
185
186        let conn = pool.get().await.map_err(DatabaseError::MissingDbConnection)?;
187
188        conn.interact(apply_migrations).await.map_err(|err| {
189            DatabaseError::InteractError(format!("Migration task failed: {err}"))
190        })??;
191
192        let db = Db { pool };
193        db.ensure_genesis_block(&config.genesis_filepath.as_path().to_string_lossy(), block_store)
194            .await?;
195
196        Ok(db)
197    }
198
199    /// Loads all the nullifiers from the DB.
200    #[instrument(target = COMPONENT, skip_all, ret(level = "debug"), err)]
201    pub async fn select_all_nullifiers(&self) -> Result<Vec<(Nullifier, BlockNumber)>> {
202        self.pool
203            .get()
204            .await?
205            .interact(sql::select_all_nullifiers)
206            .await
207            .map_err(|err| {
208                DatabaseError::InteractError(format!("Select nullifiers task failed: {err}"))
209            })?
210    }
211
212    /// Loads the nullifiers that match the prefixes from the DB.
213    #[instrument(target = COMPONENT, skip_all, ret(level = "debug"), err)]
214    pub async fn select_nullifiers_by_prefix(
215        &self,
216        prefix_len: u32,
217        nullifier_prefixes: Vec<u32>,
218    ) -> Result<Vec<NullifierInfo>> {
219        self.pool
220            .get()
221            .await?
222            .interact(move |conn| {
223                sql::select_nullifiers_by_prefix(conn, prefix_len, &nullifier_prefixes)
224            })
225            .await
226            .map_err(|err| {
227                DatabaseError::InteractError(format!(
228                    "Select nullifiers by prefix task failed: {err}"
229                ))
230            })?
231    }
232
233    /// Search for a [BlockHeader] from the database by its `block_num`.
234    ///
235    /// When `block_number` is [None], the latest block header is returned.
236    #[instrument(target = COMPONENT, skip_all, ret(level = "debug"), err)]
237    pub async fn select_block_header_by_block_num(
238        &self,
239        block_number: Option<BlockNumber>,
240    ) -> Result<Option<BlockHeader>> {
241        self.pool
242            .get()
243            .await?
244            .interact(move |conn| sql::select_block_header_by_block_num(conn, block_number))
245            .await
246            .map_err(|err| {
247                DatabaseError::InteractError(format!("Select block header task failed: {err}"))
248            })?
249    }
250
251    /// Loads multiple block headers from the DB.
252    #[instrument(target = COMPONENT, skip_all, ret(level = "debug"), err)]
253    pub async fn select_block_headers(&self, blocks: Vec<BlockNumber>) -> Result<Vec<BlockHeader>> {
254        self.pool
255            .get()
256            .await?
257            .interact(move |conn| sql::select_block_headers(conn, &blocks))
258            .await
259            .map_err(|err| {
260                DatabaseError::InteractError(format!(
261                    "Select many block headers task failed: {err}"
262                ))
263            })?
264    }
265
266    /// Loads all the block headers from the DB.
267    #[instrument(target = COMPONENT, skip_all, ret(level = "debug"), err)]
268    pub async fn select_all_block_headers(&self) -> Result<Vec<BlockHeader>> {
269        self.pool
270            .get()
271            .await?
272            .interact(sql::select_all_block_headers)
273            .await
274            .map_err(|err| {
275                DatabaseError::InteractError(format!("Select block headers task failed: {err}"))
276            })?
277    }
278
279    /// Loads all the account hashes from the DB.
280    #[instrument(target = COMPONENT, skip_all, ret(level = "debug"), err)]
281    pub async fn select_all_account_hashes(&self) -> Result<Vec<(AccountId, RpoDigest)>> {
282        self.pool
283            .get()
284            .await?
285            .interact(sql::select_all_account_hashes)
286            .await
287            .map_err(|err| {
288                DatabaseError::InteractError(format!("Select account hashes task failed: {err}"))
289            })?
290    }
291
292    /// Loads public account details from the DB.
293    #[instrument(target = COMPONENT, skip_all, ret(level = "debug"), err)]
294    pub async fn select_account(&self, id: AccountId) -> Result<AccountInfo> {
295        self.pool
296            .get()
297            .await?
298            .interact(move |conn| sql::select_account(conn, id))
299            .await
300            .map_err(|err| {
301                DatabaseError::InteractError(format!("Get account details task failed: {err}"))
302            })?
303    }
304
305    /// Loads public accounts details from the DB.
306    #[instrument(target = COMPONENT, skip_all, ret(level = "debug"), err)]
307    pub async fn select_accounts_by_ids(
308        &self,
309        account_ids: Vec<AccountId>,
310    ) -> Result<Vec<AccountInfo>> {
311        self.pool
312            .get()
313            .await?
314            .interact(move |conn| sql::select_accounts_by_ids(conn, &account_ids))
315            .await
316            .map_err(|err| {
317                DatabaseError::InteractError(format!("Get accounts details task failed: {err}"))
318            })?
319    }
320
321    #[instrument(target = COMPONENT, skip_all, ret(level = "debug"), err)]
322    pub async fn get_state_sync(
323        &self,
324        block_num: BlockNumber,
325        account_ids: Vec<AccountId>,
326        note_tags: Vec<u32>,
327        nullifier_prefixes: Vec<u32>,
328    ) -> Result<StateSyncUpdate, StateSyncError> {
329        self.pool
330            .get()
331            .await
332            .map_err(DatabaseError::MissingDbConnection)?
333            .interact(move |conn| {
334                sql::get_state_sync(conn, block_num, &account_ids, &note_tags, &nullifier_prefixes)
335            })
336            .await
337            .map_err(|err| {
338                DatabaseError::InteractError(format!("Get state sync task failed: {err}"))
339            })?
340    }
341
342    #[instrument(target = COMPONENT, skip_all, ret(level = "debug"), err)]
343    pub async fn get_note_sync(
344        &self,
345        block_num: BlockNumber,
346        note_tags: Vec<u32>,
347    ) -> Result<NoteSyncUpdate, NoteSyncError> {
348        self.pool
349            .get()
350            .await
351            .map_err(DatabaseError::MissingDbConnection)?
352            .interact(move |conn| sql::get_note_sync(conn, block_num, &note_tags))
353            .await
354            .map_err(|err| {
355                DatabaseError::InteractError(format!("Get notes sync task failed: {err}"))
356            })?
357    }
358
359    /// Loads all the Note's matching a certain NoteId from the database.
360    #[instrument(target = COMPONENT, skip_all, ret(level = "debug"), err)]
361    pub async fn select_notes_by_id(&self, note_ids: Vec<NoteId>) -> Result<Vec<NoteRecord>> {
362        self.pool
363            .get()
364            .await?
365            .interact(move |conn| sql::select_notes_by_id(conn, &note_ids))
366            .await
367            .map_err(|err| {
368                DatabaseError::InteractError(format!("Select note by id task failed: {err}"))
369            })?
370    }
371
372    /// Loads inclusion proofs for notes matching the given IDs.
373    #[instrument(target = COMPONENT, skip_all, ret(level = "debug"), err)]
374    pub async fn select_note_inclusion_proofs(
375        &self,
376        note_ids: BTreeSet<NoteId>,
377    ) -> Result<BTreeMap<NoteId, NoteInclusionProof>> {
378        self.pool
379            .get()
380            .await?
381            .interact(move |conn| sql::select_note_inclusion_proofs(conn, note_ids))
382            .await
383            .map_err(|err| {
384                DatabaseError::InteractError(format!(
385                    "Select block note inclusion proofs task failed: {err}"
386                ))
387            })?
388    }
389
390    /// Loads all note IDs matching a certain NoteId from the database.
391    #[instrument(target = COMPONENT, skip_all, ret(level = "debug"), err)]
392    pub async fn select_note_ids(&self, note_ids: Vec<NoteId>) -> Result<BTreeSet<NoteId>> {
393        self.select_notes_by_id(note_ids)
394            .await
395            .map(|notes| notes.into_iter().map(|note| note.note_id.into()).collect())
396    }
397
398    /// Inserts the data of a new block into the DB.
399    ///
400    /// `allow_acquire` and `acquire_done` are used to synchronize writes to the DB with writes to
401    /// the in-memory trees. Further details available on [super::state::State::apply_block].
402    // TODO: This span is logged in a root span, we should connect it to the parent one.
403    #[instrument(target = COMPONENT, skip_all, err)]
404    pub async fn apply_block(
405        &self,
406        allow_acquire: oneshot::Sender<()>,
407        acquire_done: oneshot::Receiver<()>,
408        block: Block,
409        notes: Vec<NoteRecord>,
410    ) -> Result<()> {
411        self.pool
412            .get()
413            .await?
414            .interact(move |conn| -> Result<()> {
415                // TODO: This span is logged in a root span, we should connect it to the parent one.
416                let _span = info_span!(target: COMPONENT, "write_block_to_db").entered();
417
418                let transaction = conn.transaction()?;
419                sql::apply_block(
420                    &transaction,
421                    &block.header(),
422                    &notes,
423                    block.nullifiers(),
424                    block.updated_accounts(),
425                )?;
426
427                let _ = allow_acquire.send(());
428                acquire_done.blocking_recv()?;
429
430                transaction.commit()?;
431
432                Ok(())
433            })
434            .await
435            .map_err(|err| {
436                DatabaseError::InteractError(format!("Apply block task failed: {err}"))
437            })??;
438
439        Ok(())
440    }
441
442    /// Merges all account deltas from the DB for given account ID and block range.
443    /// Note, that `from_block` is exclusive and `to_block` is inclusive.
444    ///
445    /// Returns `Ok(None)` if no deltas were found in the DB for the specified account within
446    /// the given block range.
447    pub(crate) async fn select_account_state_delta(
448        &self,
449        account_id: AccountId,
450        from_block: BlockNumber,
451        to_block: BlockNumber,
452    ) -> Result<Option<AccountDelta>> {
453        self.pool
454            .get()
455            .await
456            .map_err(DatabaseError::MissingDbConnection)?
457            .interact(move |conn| -> Result<Option<AccountDelta>> {
458                sql::select_account_delta(conn, account_id, from_block, to_block)
459            })
460            .await
461            .map_err(|err| DatabaseError::InteractError(err.to_string()))?
462    }
463
464    // HELPERS
465    // ---------------------------------------------------------------------------------------------
466
467    /// If the database is empty, generates and stores the genesis block. Otherwise, it ensures that
468    /// the genesis block in the database is consistent with the genesis block data in the
469    /// genesis JSON file.
470    #[instrument(target = COMPONENT, skip_all, err)]
471    async fn ensure_genesis_block(
472        &self,
473        genesis_filepath: &str,
474        block_store: Arc<BlockStore>,
475    ) -> Result<(), GenesisError> {
476        let genesis_block = {
477            let file_contents = fs::read(genesis_filepath).map_err(|source| {
478                GenesisError::FailedToReadGenesisFile {
479                    genesis_filepath: genesis_filepath.to_string(),
480                    source,
481                }
482            })?;
483
484            let genesis_state = GenesisState::read_from_bytes(&file_contents)
485                .map_err(GenesisError::GenesisFileDeserializationError)?;
486
487            genesis_state.into_block()?
488        };
489
490        let maybe_block_header_in_store = self
491            .select_block_header_by_block_num(Some(BlockNumber::GENESIS))
492            .await
493            .map_err(|err| GenesisError::SelectBlockHeaderByBlockNumError(err.into()))?;
494
495        let expected_genesis_header = genesis_block.header();
496
497        match maybe_block_header_in_store {
498            Some(block_header_in_store) => {
499                // ensure that expected header is what's also in the store
500                if expected_genesis_header != block_header_in_store {
501                    Err(GenesisError::GenesisBlockHeaderMismatch {
502                        expected_genesis_header: Box::new(expected_genesis_header),
503                        block_header_in_store: Box::new(block_header_in_store),
504                    })?;
505                }
506            },
507            None => {
508                // add genesis header to store
509                self.pool
510                    .get()
511                    .await
512                    .map_err(DatabaseError::MissingDbConnection)?
513                    .interact(move |conn| -> Result<()> {
514                        // TODO: This span is logged in a root span, we should connect it to the
515                        // parent one.
516                        let span = info_span!(target: COMPONENT, "write_genesis_block_to_db");
517                        let guard = span.enter();
518
519                        let transaction = conn.transaction()?;
520                        sql::apply_block(
521                            &transaction,
522                            &expected_genesis_header,
523                            &[],
524                            &[],
525                            genesis_block.updated_accounts(),
526                        )?;
527
528                        block_store.save_block_blocking(0.into(), &genesis_block.to_bytes())?;
529
530                        transaction.commit()?;
531
532                        drop(guard);
533                        Ok(())
534                    })
535                    .await
536                    .map_err(GenesisError::ApplyBlockFailed)??;
537            },
538        }
539
540        Ok(())
541    }
542}