1use std::collections::{BTreeMap, BTreeSet, HashSet};
2use std::mem::size_of;
3use std::num::NonZeroUsize;
4use std::ops::{Deref, DerefMut, RangeInclusive};
5use std::path::{Path, PathBuf};
6use std::sync::Arc;
7
8use anyhow::Context;
9use diesel::{Connection, SqliteConnection};
10use miden_crypto::dsa::ecdsa_k256_keccak::Signature;
11use miden_node_proto::domain::account::AccountInfo;
12use miden_node_utils::limiter::MAX_RESPONSE_PAYLOAD_BYTES;
13use miden_protocol::Word;
14use miden_protocol::account::{AccountHeader, AccountId, AccountStorageHeader, StorageMapKey};
15use miden_protocol::asset::{Asset, AssetVaultKey};
16use miden_protocol::block::{BlockHeader, BlockNoteIndex, BlockNumber, SignedBlock};
17use miden_protocol::crypto::merkle::SparseMerklePath;
18use miden_protocol::note::{
19 NoteAttachments,
20 NoteDetails,
21 NoteId,
22 NoteInclusionProof,
23 NoteMetadata,
24 NoteScript,
25 Nullifier,
26};
27use miden_protocol::transaction::TransactionHeader;
28use miden_protocol::utils::serde::Deserializable;
29use tokio::sync::oneshot;
30use tracing::{info, instrument};
31
32use crate::COMPONENT;
33use crate::db::migrations::{bootstrap_database, migrate_database, verify_latest_schema};
34use crate::db::models::conv::SqlTypeConvert;
35use crate::db::models::queries;
36pub use crate::db::models::queries::{
37 AccountCommitmentsPage,
38 NullifiersPage,
39 PublicAccountIdsPage,
40 PublicAccountStateRootsPage,
41};
42use crate::db::models::queries::{BlockHeaderCommitment, StorageMapValuesPage};
43use crate::errors::{DatabaseError, NoteSyncError};
44use crate::genesis::GenesisBlock;
45
46const STORAGE_MAP_VALUE_PER_ROW_BYTES: usize =
47 2 * size_of::<Word>() + size_of::<u32>() + size_of::<u8>();
48
49fn default_storage_map_entries_limit() -> usize {
50 MAX_RESPONSE_PAYLOAD_BYTES / STORAGE_MAP_VALUE_PER_ROW_BYTES
51}
52
53mod migrations;
54
55#[cfg(test)]
56mod tests;
57
58pub(crate) mod models;
59
60pub(crate) mod schema;
65
66pub type Result<T, E = DatabaseError> = std::result::Result<T, E>;
67
68#[derive(Copy, Clone, Debug, PartialEq, Eq)]
70pub struct DatabaseOptions {
71 pub connection_pool_size: NonZeroUsize,
73}
74
75impl Default for DatabaseOptions {
76 fn default() -> Self {
77 Self {
78 connection_pool_size: miden_node_db::default_connection_pool_size(),
79 }
80 }
81}
82
83pub struct Db {
87 db: miden_node_db::Db,
88}
89
90impl Deref for Db {
91 type Target = miden_node_db::Db;
92
93 fn deref(&self) -> &Self::Target {
94 &self.db
95 }
96}
97
98impl DerefMut for Db {
99 fn deref_mut(&mut self) -> &mut Self::Target {
100 &mut self.db
101 }
102}
103
104#[derive(Debug, Clone)]
108pub struct AccountVaultValue {
109 pub block_num: BlockNumber,
110 pub vault_key: AssetVaultKey,
111 pub asset: Option<Asset>,
113}
114
115impl AccountVaultValue {
116 pub fn from_raw_row(row: (i64, Vec<u8>, Option<Vec<u8>>)) -> Result<Self, DatabaseError> {
117 let (block_num, vault_key, asset) = row;
118 let vault_key = Word::read_from_bytes(&vault_key)?;
119 Ok(Self {
120 block_num: BlockNumber::from_raw_sql(block_num)?,
121 vault_key: AssetVaultKey::try_from(vault_key)?,
122 asset: asset.map(|b| Asset::read_from_bytes(&b)).transpose()?,
123 })
124 }
125}
126
127#[derive(Debug, PartialEq)]
128pub struct NullifierInfo {
129 pub nullifier: Nullifier,
130 pub block_num: BlockNumber,
131}
132
133impl PartialEq<(Nullifier, BlockNumber)> for NullifierInfo {
134 fn eq(&self, (nullifier, block_num): &(Nullifier, BlockNumber)) -> bool {
135 &self.nullifier == nullifier && &self.block_num == block_num
136 }
137}
138
139#[derive(Debug, PartialEq)]
140pub struct TransactionRecord {
141 pub block_num: BlockNumber,
142 pub header: TransactionHeader,
143 pub output_note_proofs: Vec<NoteSyncRecord>,
146}
147
148#[derive(Debug, Clone, PartialEq)]
149pub struct NoteRecord {
150 pub block_num: BlockNumber,
151 pub note_index: BlockNoteIndex,
152 pub note_id: Word,
153 pub metadata: NoteMetadata,
154 pub details: Option<NoteDetails>,
155 pub attachments: NoteAttachments,
156 pub inclusion_path: SparseMerklePath,
157}
158
159#[derive(Debug, PartialEq)]
160pub struct NoteSyncUpdate {
161 pub notes: Vec<NoteSyncRecord>,
162 pub block_header: BlockHeader,
163}
164
165#[derive(Debug, Clone, PartialEq)]
166pub struct NoteSyncRecord {
167 pub block_num: BlockNumber,
168 pub note_index: BlockNoteIndex,
169 pub note_id: NoteId,
170 pub metadata: NoteMetadata,
171 pub inclusion_path: SparseMerklePath,
172}
173
174impl From<NoteRecord> for NoteSyncRecord {
175 fn from(note: NoteRecord) -> Self {
176 Self {
177 block_num: note.block_num,
178 note_index: note.note_index,
179 note_id: NoteId::from_raw(note.note_id),
180 metadata: note.metadata,
181 inclusion_path: note.inclusion_path,
182 }
183 }
184}
185
186impl Db {
187 #[instrument(
189 target = COMPONENT,
190 name = "store.database.bootstrap",
191 skip_all,
192 fields(path=%database_filepath.display())
193 err,
194 )]
195 pub fn bootstrap(database_filepath: PathBuf, genesis: GenesisBlock) -> anyhow::Result<()> {
196 bootstrap_database(&database_filepath).context("failed to bootstrap database schema")?;
197
198 let mut conn: SqliteConnection = diesel::sqlite::SqliteConnection::establish(
199 database_filepath.to_str().context("database filepath is invalid")?,
200 )
201 .context("failed to open a database connection")?;
202
203 miden_node_db::configure_connection_on_creation(&mut conn)?;
204
205 let genesis_block = genesis.into_inner();
207 conn.transaction(move |conn| models::queries::apply_block(conn, &genesis_block, &[]))
208 .context("failed to insert genesis block")?;
209 Ok(())
210 }
211
212 #[instrument(target = COMPONENT, skip_all)]
214 pub async fn load(database_filepath: PathBuf) -> Result<Self, DatabaseError> {
215 Self::load_with_pool_size(database_filepath, miden_node_db::default_connection_pool_size())
216 .await
217 }
218
219 #[instrument(target = COMPONENT, skip_all)]
222 pub async fn load_with_pool_size(
223 database_filepath: PathBuf,
224 connection_pool_size: NonZeroUsize,
225 ) -> Result<Self, DatabaseError> {
226 verify_latest_schema(&database_filepath)?;
227
228 let db = miden_node_db::Db::new_with_pool_size(&database_filepath, connection_pool_size)?;
229 info!(
230 target: COMPONENT,
231 sqlite= %database_filepath.display(),
232 connection_pool_size = %connection_pool_size,
233 "Connected to the database"
234 );
235
236 Ok(Self { db })
237 }
238
239 #[instrument(target = COMPONENT, skip_all)]
241 pub fn migrate(database_filepath: impl AsRef<Path>) -> Result<(), DatabaseError> {
242 migrate_database(database_filepath.as_ref())?;
243 Ok(())
244 }
245
246 #[instrument(level = "debug", target = COMPONENT, skip_all, ret(level = "debug"), err)]
248 pub async fn select_nullifiers_paged(
249 &self,
250 page_size: std::num::NonZeroUsize,
251 after_nullifier: Option<Nullifier>,
252 ) -> Result<NullifiersPage> {
253 self.transact("read nullifiers paged", move |conn| {
254 queries::select_nullifiers_paged(conn, page_size, after_nullifier)
255 })
256 .await
257 }
258
259 #[instrument(
261 level = "debug",
262 target = COMPONENT,
263 skip_all,
264 fields(prefix_len, prefixes = nullifier_prefixes.len()),
265 ret(level = "debug"),
266 err
267 )]
268 pub async fn select_nullifiers_by_prefix(
269 &self,
270 prefix_len: u32,
271 nullifier_prefixes: Vec<u32>,
272 block_range: RangeInclusive<BlockNumber>,
273 ) -> Result<(Vec<NullifierInfo>, BlockNumber)> {
274 assert_eq!(prefix_len, 16, "Only 16-bit prefixes are supported");
275
276 self.transact("nullifieres by prefix", move |conn| {
277 let nullifier_prefixes =
278 Vec::from_iter(nullifier_prefixes.into_iter().map(|prefix| prefix as u16));
279 queries::select_nullifiers_by_prefix(
280 conn,
281 prefix_len as u8,
282 &nullifier_prefixes[..],
283 block_range,
284 )
285 })
286 .await
287 }
288
289 #[instrument(level = "debug", target = COMPONENT, skip_all, ret(level = "debug"), err)]
293 pub async fn select_block_header_by_block_num(
294 &self,
295 maybe_block_number: Option<BlockNumber>,
296 ) -> Result<Option<BlockHeader>> {
297 self.transact("block headers by block number", move |conn| {
298 let val = queries::select_block_header_by_block_num(conn, maybe_block_number)?;
299 Ok(val)
300 })
301 .await
302 }
303
304 #[instrument(level = "debug", target = COMPONENT, skip_all, ret(level = "debug"), err)]
306 pub async fn select_block_header_and_signature_by_block_num(
307 &self,
308 block_number: BlockNumber,
309 ) -> Result<Option<(BlockHeader, Signature)>> {
310 self.transact("block headers and signature by block number", move |conn| {
311 let val = queries::select_block_header_and_signature_by_block_num(conn, block_number)?;
312 Ok(val)
313 })
314 .await
315 }
316
317 #[instrument(level = "debug", target = COMPONENT, skip_all, ret(level = "debug"), err)]
319 pub async fn select_block_headers(
320 &self,
321 blocks: impl Iterator<Item = BlockNumber> + Send + 'static,
322 ) -> Result<Vec<BlockHeader>> {
323 self.transact("block headers from given block numbers", move |conn| {
324 let raw = queries::select_block_headers(conn, blocks)?;
325 Ok(raw)
326 })
327 .await
328 }
329
330 #[instrument(level = "debug", target = COMPONENT, skip_all, ret(level = "debug"), err)]
332 pub async fn select_all_block_header_commitments(&self) -> Result<Vec<BlockHeaderCommitment>> {
333 self.transact("all block headers", |conn| {
334 let raw = queries::select_all_block_header_commitments(conn)?;
335 Ok(raw)
336 })
337 .await
338 }
339
340 #[instrument(level = "debug", target = COMPONENT, skip_all, ret(level = "debug"), err)]
342 pub async fn select_account_commitments_paged(
343 &self,
344 page_size: std::num::NonZeroUsize,
345 after_account_id: Option<AccountId>,
346 ) -> Result<AccountCommitmentsPage> {
347 self.transact("read account commitments paged", move |conn| {
348 queries::select_account_commitments_paged(conn, page_size, after_account_id)
349 })
350 .await
351 }
352
353 #[instrument(level = "debug", target = COMPONENT, skip_all, ret(level = "debug"), err)]
355 pub async fn select_public_account_ids_paged(
356 &self,
357 page_size: std::num::NonZeroUsize,
358 after_account_id: Option<AccountId>,
359 ) -> Result<PublicAccountIdsPage> {
360 self.transact("read public account IDs paged", move |conn| {
361 queries::select_public_account_ids_paged(conn, page_size, after_account_id)
362 })
363 .await
364 }
365
366 #[instrument(level = "debug", target = COMPONENT, skip_all, ret(level = "debug"), err)]
368 pub async fn select_public_account_state_roots_paged(
369 &self,
370 page_size: std::num::NonZeroUsize,
371 after_account_id: Option<AccountId>,
372 ) -> Result<PublicAccountStateRootsPage> {
373 self.transact("read public account state roots paged", move |conn| {
374 queries::select_public_account_state_roots_paged(conn, page_size, after_account_id)
375 })
376 .await
377 }
378
379 #[instrument(level = "debug", target = COMPONENT, skip_all, ret(level = "debug"), err)]
381 pub async fn select_account(&self, id: AccountId) -> Result<AccountInfo> {
382 self.transact("Get account details", move |conn| queries::select_account(conn, id))
383 .await
384 }
385
386 #[instrument(level = "debug", target = COMPONENT, skip_all, ret(level = "debug"), err)]
388 pub async fn select_network_accounts_subset(
389 &self,
390 account_ids: Vec<AccountId>,
391 ) -> Result<HashSet<AccountId>> {
392 self.transact("Filter network accounts subset", move |conn| {
393 queries::select_network_accounts_subset(conn, &account_ids)
394 })
395 .await
396 }
397
398 #[instrument(target = COMPONENT, skip_all)]
402 pub async fn select_account_code_by_commitment(
403 &self,
404 code_commitment: Word,
405 ) -> Result<Option<Vec<u8>>> {
406 self.transact("Get account code by commitment", move |conn| {
407 queries::select_account_code_by_commitment(conn, code_commitment)
408 })
409 .await
410 }
411
412 #[instrument(target = COMPONENT, skip_all)]
417 pub async fn select_account_header_with_storage_header_at_block(
418 &self,
419 account_id: AccountId,
420 block_num: BlockNumber,
421 ) -> Result<Option<(AccountHeader, AccountStorageHeader)>> {
422 self.transact("Get account header with storage header at block", move |conn| {
423 queries::select_account_header_with_storage_header_at_block(conn, account_id, block_num)
424 })
425 .await
426 }
427
428 #[instrument(level = "debug", target = COMPONENT, skip_all, ret(level = "debug"), err)]
429 pub async fn get_note_sync_multi(
430 &self,
431 block_range: RangeInclusive<BlockNumber>,
432 note_tags: Arc<[u32]>,
433 ) -> Result<Vec<NoteSyncUpdate>, NoteSyncError> {
434 self.transact("notes sync task", move |conn| {
435 queries::get_note_sync_multi(conn, ¬e_tags, block_range, MAX_RESPONSE_PAYLOAD_BYTES)
436 })
437 .await
438 }
439
440 #[instrument(level = "debug", target = COMPONENT, skip_all, ret(level = "debug"), err)]
443 pub async fn select_notes_by_id(&self, note_ids: Vec<NoteId>) -> Result<Vec<NoteRecord>> {
444 self.transact("note by id", move |conn| {
445 queries::select_notes_by_id(conn, note_ids.as_slice())
446 })
447 .await
448 }
449
450 #[instrument(level = "debug", target = COMPONENT, skip_all, ret(level = "debug"), err)]
452 pub async fn select_existing_note_commitments(
453 &self,
454 note_commitments: Vec<Word>,
455 ) -> Result<HashSet<Word>> {
456 self.transact("note by commitment", move |conn| {
457 queries::select_existing_note_commitments(conn, note_commitments.as_slice())
458 })
459 .await
460 }
461
462 #[instrument(level = "debug", target = COMPONENT, skip_all, ret(level = "debug"), err)]
464 pub async fn select_note_inclusion_proofs(
465 &self,
466 note_commitments: BTreeSet<Word>,
467 ) -> Result<BTreeMap<NoteId, NoteInclusionProof>> {
468 self.transact("block note inclusion proofs by commitment", move |conn| {
469 models::queries::select_note_inclusion_proofs(conn, ¬e_commitments)
470 })
471 .await
472 }
473
474 #[instrument(target = COMPONENT, skip_all, err)]
480 pub async fn apply_block(
481 &self,
482 allow_acquire: oneshot::Sender<()>,
483 acquire_done: oneshot::Receiver<()>,
484 signed_block: SignedBlock,
485 notes: Vec<(NoteRecord, Option<Nullifier>)>,
486 ) -> Result<()> {
487 self.transact("apply block", move |conn| -> Result<()> {
488 models::queries::apply_block(conn, &signed_block, ¬es)?;
489
490 {
493 let _span = tracing::info_span!(target: COMPONENT, "acquire_write_lock").entered();
494 if allow_acquire.send(()).is_err() {
495 tracing::warn!(target: COMPONENT, "failed to send notification for successful block application, potential deadlock");
496 }
497 }
498
499 models::queries::prune_history(conn, signed_block.header().block_num())?;
500
501 let _span =
502 tracing::info_span!(target: COMPONENT, "acquire_done_lock").entered();
503 acquire_done.blocking_recv()?;
504
505 Ok(())
506 })
507 .await
508 }
509
510 pub(crate) async fn select_storage_map_sync_values(
515 &self,
516 account_id: AccountId,
517 block_range: RangeInclusive<BlockNumber>,
518 entries_limit: Option<usize>,
519 ) -> Result<StorageMapValuesPage> {
520 let entries_limit = entries_limit.unwrap_or_else(default_storage_map_entries_limit);
521
522 self.transact("select storage map sync values", move |conn| {
523 models::queries::select_account_storage_map_values_paged(
524 conn,
525 account_id,
526 block_range,
527 entries_limit,
528 )
529 })
530 .await
531 }
532
533 #[instrument(target = COMPONENT, skip_all)]
542 pub(crate) async fn reconstruct_storage_map_from_db(
543 &self,
544 account_id: AccountId,
545 slot_name: miden_protocol::account::StorageSlotName,
546 block_num: BlockNumber,
547 entries_limit: Option<usize>,
548 ) -> Result<miden_node_proto::domain::account::AccountStorageMapDetails> {
549 use miden_node_proto::domain::account::{AccountStorageMapDetails, StorageMapEntries};
550 use miden_protocol::EMPTY_WORD;
551
552 let mut values = Vec::new();
555 let mut block_range_start = BlockNumber::GENESIS;
556 let entries_limit = entries_limit.unwrap_or_else(default_storage_map_entries_limit);
557
558 let mut page = self
559 .select_storage_map_sync_values(
560 account_id,
561 block_range_start..=block_num,
562 Some(entries_limit),
563 )
564 .await?;
565
566 values.extend(page.values);
567 let mut last_block_included = page.last_block_included;
568
569 if values.is_empty() && last_block_included == block_range_start {
572 return Ok(AccountStorageMapDetails::limit_exceeded(slot_name));
573 }
574
575 loop {
576 if page.last_block_included == block_num || page.last_block_included < block_range_start
577 {
578 break;
579 }
580
581 block_range_start = page.last_block_included.child();
582 page = self
583 .select_storage_map_sync_values(
584 account_id,
585 block_range_start..=block_num,
586 Some(entries_limit),
587 )
588 .await?;
589
590 if page.last_block_included <= last_block_included {
591 return Ok(AccountStorageMapDetails::limit_exceeded(slot_name));
592 }
593
594 last_block_included = page.last_block_included;
595 values.extend(page.values);
596 }
597
598 if page.last_block_included != block_num {
599 return Ok(AccountStorageMapDetails::limit_exceeded(slot_name));
600 }
601
602 let mut latest_values = BTreeMap::<StorageMapKey, Word>::new();
604 for value in values {
605 if value.slot_name == slot_name {
606 let raw_key = value.key;
607 latest_values.insert(raw_key, value.value);
608 }
609 }
610
611 latest_values.retain(|_, v| *v != EMPTY_WORD);
613
614 if latest_values.len() > AccountStorageMapDetails::MAX_RETURN_ENTRIES {
615 return Ok(AccountStorageMapDetails::limit_exceeded(slot_name));
616 }
617
618 let entries = Vec::from_iter(latest_values.into_iter());
619 Ok(AccountStorageMapDetails {
620 slot_name,
621 entries: StorageMapEntries::AllEntries(entries),
622 })
623 }
624
625 pub async fn get_account_vault_sync(
626 &self,
627 account_id: AccountId,
628 block_range: RangeInclusive<BlockNumber>,
629 ) -> Result<(BlockNumber, Vec<AccountVaultValue>)> {
630 self.transact("account vault sync", move |conn| {
631 queries::select_account_vault_assets(conn, account_id, block_range)
632 })
633 .await
634 }
635
636 pub async fn select_note_script_by_root(&self, root: Word) -> Result<Option<NoteScript>> {
638 self.transact("note script by root", move |conn| {
639 queries::select_note_script_by_root(conn, root)
640 })
641 .await
642 }
643
644 pub async fn select_transactions_records(
651 &self,
652 account_ids: Vec<AccountId>,
653 block_range: RangeInclusive<BlockNumber>,
654 ) -> Result<(BlockNumber, Vec<TransactionRecord>)> {
655 self.transact("full transactions records", move |conn| {
656 queries::select_transactions_records(conn, &account_ids, block_range)
657 })
658 .await
659 }
660}