1use std::{
2 collections::{BTreeMap, BTreeSet},
3 fs::{self, create_dir_all},
4 path::PathBuf,
5 sync::Arc,
6};
7
8use miden_node_proto::{
9 domain::account::{AccountInfo, AccountSummary},
10 generated::note as proto,
11};
12use miden_objects::{
13 account::{AccountDelta, AccountId},
14 block::{BlockHeader, BlockNoteIndex, BlockNumber, ProvenBlock},
15 crypto::{hash::rpo::RpoDigest, merkle::MerklePath, utils::Deserializable},
16 note::{NoteId, NoteInclusionProof, NoteMetadata, Nullifier},
17 transaction::TransactionId,
18 utils::Serializable,
19};
20use sql::utils::{column_value_as_u64, read_block_number};
21use tokio::sync::oneshot;
22use tracing::{info, info_span, instrument};
23
24use crate::{
25 COMPONENT,
26 blocks::BlockStore,
27 db::{
28 migrations::apply_migrations,
29 pool_manager::{Pool, SqlitePoolManager},
30 },
31 errors::{DatabaseError, DatabaseSetupError, GenesisError, NoteSyncError, StateSyncError},
32 genesis::GenesisState,
33};
34
35mod migrations;
36#[macro_use]
37mod sql;
38
39mod connection;
40mod pool_manager;
41#[cfg(test)]
42mod query_plan;
43mod settings;
44#[cfg(test)]
45mod tests;
46mod transaction;
47
48pub type Result<T, E = DatabaseError> = std::result::Result<T, E>;
49
50pub struct Db {
51 pool: Pool,
52}
53
54#[derive(Debug, PartialEq)]
55pub struct NullifierInfo {
56 pub nullifier: Nullifier,
57 pub block_num: BlockNumber,
58}
59
60#[derive(Debug, PartialEq)]
61pub struct TransactionSummary {
62 pub account_id: AccountId,
63 pub block_num: BlockNumber,
64 pub transaction_id: TransactionId,
65}
66
67#[derive(Debug, Clone, PartialEq)]
68pub struct NoteRecord {
69 pub block_num: BlockNumber,
70 pub note_index: BlockNoteIndex,
71 pub note_id: RpoDigest,
72 pub metadata: NoteMetadata,
73 pub details: Option<Vec<u8>>,
74 pub merkle_path: MerklePath,
75}
76
77impl NoteRecord {
78 const SELECT_COLUMNS: &'static str = "
80 block_num,
81 batch_index,
82 note_index,
83 note_id,
84 note_type,
85 sender,
86 tag,
87 aux,
88 execution_hint,
89 merkle_path,
90 details
91 ";
92
93 fn from_row(row: &rusqlite::Row<'_>) -> Result<Self> {
96 let block_num = read_block_number(row, 0)?;
97 let batch_idx = row.get(1)?;
98 let note_idx_in_batch = row.get(2)?;
99 let note_index = BlockNoteIndex::new(batch_idx, note_idx_in_batch)
102 .expect("batch and note index from DB should be valid");
103 let note_id = row.get_ref(3)?.as_blob()?;
104 let note_id = RpoDigest::read_from_bytes(note_id)?;
105 let note_type = row.get::<_, u8>(4)?.try_into()?;
106 let sender = AccountId::read_from_bytes(row.get_ref(5)?.as_blob()?)?;
107 let tag: u32 = row.get(6)?;
108 let aux: u64 = row.get(7)?;
109 let aux = aux.try_into().map_err(DatabaseError::InvalidFelt)?;
110 let execution_hint = column_value_as_u64(row, 8)?;
111 let merkle_path_data = row.get_ref(9)?.as_blob()?;
112 let merkle_path = MerklePath::read_from_bytes(merkle_path_data)?;
113 let details_data = row.get_ref(10)?.as_blob_or_null()?;
114 let details = details_data.map(<Vec<u8>>::read_from_bytes).transpose()?;
115
116 let metadata =
117 NoteMetadata::new(sender, note_type, tag.into(), execution_hint.try_into()?, aux)?;
118
119 Ok(NoteRecord {
120 block_num,
121 note_index,
122 note_id,
123 metadata,
124 details,
125 merkle_path,
126 })
127 }
128}
129
130impl From<NoteRecord> for proto::Note {
131 fn from(note: NoteRecord) -> Self {
132 Self {
133 block_num: note.block_num.as_u32(),
134 note_index: note.note_index.leaf_index_value().into(),
135 note_id: Some(note.note_id.into()),
136 metadata: Some(note.metadata.into()),
137 merkle_path: Some(Into::into(¬e.merkle_path)),
138 details: note.details,
139 }
140 }
141}
142
143#[derive(Debug, PartialEq)]
144pub struct StateSyncUpdate {
145 pub notes: Vec<NoteSyncRecord>,
146 pub block_header: BlockHeader,
147 pub account_updates: Vec<AccountSummary>,
148 pub transactions: Vec<TransactionSummary>,
149}
150
151#[derive(Debug, PartialEq)]
152pub struct NoteSyncUpdate {
153 pub notes: Vec<NoteSyncRecord>,
154 pub block_header: BlockHeader,
155}
156
157#[derive(Debug, Clone, PartialEq)]
158pub struct NoteSyncRecord {
159 pub block_num: BlockNumber,
160 pub note_index: BlockNoteIndex,
161 pub note_id: RpoDigest,
162 pub metadata: NoteMetadata,
163 pub merkle_path: MerklePath,
164}
165
166impl From<NoteSyncRecord> for proto::NoteSyncRecord {
167 fn from(note: NoteSyncRecord) -> Self {
168 Self {
169 note_index: note.note_index.leaf_index_value().into(),
170 note_id: Some(note.note_id.into()),
171 metadata: Some(note.metadata.into()),
172 merkle_path: Some(Into::into(¬e.merkle_path)),
173 }
174 }
175}
176
177impl From<NoteRecord> for NoteSyncRecord {
178 fn from(note: NoteRecord) -> Self {
179 Self {
180 block_num: note.block_num,
181 note_index: note.note_index,
182 note_id: note.note_id,
183 metadata: note.metadata,
184 merkle_path: note.merkle_path,
185 }
186 }
187}
188
189impl Db {
190 #[instrument(target = COMPONENT, skip_all)]
194 pub async fn setup(
195 database_filepath: PathBuf,
196 genesis_filepath: &str,
197 block_store: Arc<BlockStore>,
198 ) -> Result<Self, DatabaseSetupError> {
199 info!(target: COMPONENT, ?database_filepath, "Connecting to the database");
200
201 if let Some(p) = database_filepath.parent() {
202 create_dir_all(p).map_err(DatabaseError::IoError)?;
203 }
204
205 let sqlite_pool_manager = SqlitePoolManager::new(database_filepath.clone());
206 let pool = Pool::builder(sqlite_pool_manager).build()?;
207
208 info!(
209 target: COMPONENT,
210 sqlite= %database_filepath.display(),
211 "Connected to the database"
212 );
213
214 let conn = pool.get().await.map_err(DatabaseError::MissingDbConnection)?;
215
216 conn.interact(apply_migrations).await.map_err(|err| {
217 DatabaseError::InteractError(format!("Migration task failed: {err}"))
218 })??;
219
220 let db = Db { pool };
221 db.ensure_genesis_block(genesis_filepath, block_store).await?;
222
223 Ok(db)
224 }
225
226 #[instrument(target = COMPONENT, skip_all, ret(level = "debug"), err)]
228 pub async fn select_all_nullifiers(&self) -> Result<Vec<(Nullifier, BlockNumber)>> {
229 self.pool
230 .get()
231 .await?
232 .interact(|conn| {
233 let transaction = conn.transaction()?;
234 sql::select_all_nullifiers(&transaction)
235 })
236 .await
237 .map_err(|err| {
238 DatabaseError::InteractError(format!("Select nullifiers task failed: {err}"))
239 })?
240 }
241
242 #[instrument(target = COMPONENT, skip_all, ret(level = "debug"), err)]
244 pub async fn select_nullifiers_by_prefix(
245 &self,
246 prefix_len: u32,
247 nullifier_prefixes: Vec<u32>,
248 block_num: BlockNumber,
249 ) -> Result<Vec<NullifierInfo>> {
250 self.pool
251 .get()
252 .await?
253 .interact(move |conn| {
254 let transaction = conn.transaction()?;
255 sql::select_nullifiers_by_prefix(
256 &transaction,
257 prefix_len,
258 &nullifier_prefixes,
259 block_num,
260 )
261 })
262 .await
263 .map_err(|err| {
264 DatabaseError::InteractError(format!(
265 "Select nullifiers by prefix task failed: {err}"
266 ))
267 })?
268 }
269
270 #[instrument(target = COMPONENT, skip_all, ret(level = "debug"), err)]
274 pub async fn select_block_header_by_block_num(
275 &self,
276 block_number: Option<BlockNumber>,
277 ) -> Result<Option<BlockHeader>> {
278 self.pool
279 .get()
280 .await?
281 .interact(move |conn| {
282 let transaction = conn.transaction()?;
283 sql::select_block_header_by_block_num(&transaction, block_number)
284 })
285 .await
286 .map_err(|err| {
287 DatabaseError::InteractError(format!("Select block header task failed: {err}"))
288 })?
289 }
290
291 #[instrument(target = COMPONENT, skip_all, ret(level = "debug"), err)]
293 pub async fn select_block_headers(
294 &self,
295 blocks: impl Iterator<Item = BlockNumber> + Send + 'static,
296 ) -> Result<Vec<BlockHeader>> {
297 self.pool
298 .get()
299 .await?
300 .interact(move |conn| {
301 let transaction = conn.transaction()?;
302 sql::select_block_headers(&transaction, blocks)
303 })
304 .await
305 .map_err(|err| {
306 DatabaseError::InteractError(format!(
307 "Select many block headers task failed: {err}"
308 ))
309 })?
310 }
311
312 #[instrument(target = COMPONENT, skip_all, ret(level = "debug"), err)]
314 pub async fn select_all_block_headers(&self) -> Result<Vec<BlockHeader>> {
315 self.pool
316 .get()
317 .await?
318 .interact(|conn| {
319 let transaction = conn.transaction()?;
320 sql::select_all_block_headers(&transaction)
321 })
322 .await
323 .map_err(|err| {
324 DatabaseError::InteractError(format!("Select block headers task failed: {err}"))
325 })?
326 }
327
328 #[instrument(target = COMPONENT, skip_all, ret(level = "debug"), err)]
330 pub async fn select_all_account_commitments(&self) -> Result<Vec<(AccountId, RpoDigest)>> {
331 self.pool
332 .get()
333 .await?
334 .interact(|conn| {
335 let transaction = conn.transaction()?;
336 sql::select_all_account_commitments(&transaction)
337 })
338 .await
339 .map_err(|err| {
340 DatabaseError::InteractError(format!(
341 "Select account commitments task failed: {err}"
342 ))
343 })?
344 }
345
346 #[instrument(target = COMPONENT, skip_all, ret(level = "debug"), err)]
348 pub async fn select_account(&self, id: AccountId) -> Result<AccountInfo> {
349 self.pool
350 .get()
351 .await?
352 .interact(move |conn| {
353 let transaction = conn.transaction()?;
354 sql::select_account(&transaction, id)
355 })
356 .await
357 .map_err(|err| {
358 DatabaseError::InteractError(format!("Get account details task failed: {err}"))
359 })?
360 }
361
362 #[instrument(target = COMPONENT, skip_all, ret(level = "debug"), err)]
364 pub async fn select_accounts_by_ids(
365 &self,
366 account_ids: Vec<AccountId>,
367 ) -> Result<Vec<AccountInfo>> {
368 self.pool
369 .get()
370 .await?
371 .interact(move |conn| {
372 let transaction = conn.transaction()?;
373 sql::select_accounts_by_ids(&transaction, &account_ids)
374 })
375 .await
376 .map_err(|err| {
377 DatabaseError::InteractError(format!("Get accounts details task failed: {err}"))
378 })?
379 }
380
381 #[instrument(target = COMPONENT, skip_all, ret(level = "debug"), err)]
382 pub async fn get_state_sync(
383 &self,
384 block_num: BlockNumber,
385 account_ids: Vec<AccountId>,
386 note_tags: Vec<u32>,
387 ) -> Result<StateSyncUpdate, StateSyncError> {
388 self.pool
389 .get()
390 .await
391 .map_err(DatabaseError::MissingDbConnection)?
392 .interact(move |conn| {
393 let transaction = conn.transaction().map_err(DatabaseError::SqliteError)?;
394 sql::get_state_sync(&transaction, block_num, &account_ids, ¬e_tags)
395 })
396 .await
397 .map_err(|err| {
398 DatabaseError::InteractError(format!("Get state sync task failed: {err}"))
399 })?
400 }
401
402 #[instrument(target = COMPONENT, skip_all, ret(level = "debug"), err)]
403 pub async fn get_note_sync(
404 &self,
405 block_num: BlockNumber,
406 note_tags: Vec<u32>,
407 ) -> Result<NoteSyncUpdate, NoteSyncError> {
408 self.pool
409 .get()
410 .await
411 .map_err(DatabaseError::MissingDbConnection)?
412 .interact(move |conn| {
413 let transaction = conn.transaction().map_err(DatabaseError::SqliteError)?;
414 sql::get_note_sync(&transaction, block_num, ¬e_tags)
415 })
416 .await
417 .map_err(|err| {
418 DatabaseError::InteractError(format!("Get notes sync task failed: {err}"))
419 })?
420 }
421
422 #[instrument(target = COMPONENT, skip_all, ret(level = "debug"), err)]
424 pub async fn select_notes_by_id(&self, note_ids: Vec<NoteId>) -> Result<Vec<NoteRecord>> {
425 self.pool
426 .get()
427 .await?
428 .interact(move |conn| {
429 let transaction = conn.transaction()?;
430 sql::select_notes_by_id(&transaction, ¬e_ids)
431 })
432 .await
433 .map_err(|err| {
434 DatabaseError::InteractError(format!("Select note by id task failed: {err}"))
435 })?
436 }
437
438 #[instrument(target = COMPONENT, skip_all, ret(level = "debug"), err)]
440 pub async fn select_note_inclusion_proofs(
441 &self,
442 note_ids: BTreeSet<NoteId>,
443 ) -> Result<BTreeMap<NoteId, NoteInclusionProof>> {
444 self.pool
445 .get()
446 .await?
447 .interact(move |conn| {
448 let transaction = conn.transaction()?;
449 sql::select_note_inclusion_proofs(&transaction, note_ids)
450 })
451 .await
452 .map_err(|err| {
453 DatabaseError::InteractError(format!(
454 "Select block note inclusion proofs task failed: {err}"
455 ))
456 })?
457 }
458
459 #[instrument(target = COMPONENT, skip_all, ret(level = "debug"), err)]
461 pub async fn select_note_ids(&self, note_ids: Vec<NoteId>) -> Result<BTreeSet<NoteId>> {
462 self.select_notes_by_id(note_ids)
463 .await
464 .map(|notes| notes.into_iter().map(|note| note.note_id.into()).collect())
465 }
466
467 #[instrument(target = COMPONENT, skip_all, err)]
473 pub async fn apply_block(
474 &self,
475 allow_acquire: oneshot::Sender<()>,
476 acquire_done: oneshot::Receiver<()>,
477 block: ProvenBlock,
478 notes: Vec<(NoteRecord, Option<Nullifier>)>,
479 ) -> Result<()> {
480 self.pool
481 .get()
482 .await?
483 .interact(move |conn| -> Result<()> {
484 let _span = info_span!(target: COMPONENT, "write_block_to_db").entered();
486
487 let transaction = conn.transaction()?;
488 sql::apply_block(
489 &transaction,
490 block.header(),
491 ¬es,
492 block.created_nullifiers(),
493 block.updated_accounts(),
494 )?;
495
496 let _ = allow_acquire.send(());
497 acquire_done.blocking_recv()?;
498
499 transaction.commit()?;
500
501 Ok(())
502 })
503 .await
504 .map_err(|err| {
505 DatabaseError::InteractError(format!("Apply block task failed: {err}"))
506 })??;
507
508 Ok(())
509 }
510
511 pub(crate) async fn select_account_state_delta(
517 &self,
518 account_id: AccountId,
519 from_block: BlockNumber,
520 to_block: BlockNumber,
521 ) -> Result<Option<AccountDelta>> {
522 self.pool
523 .get()
524 .await
525 .map_err(DatabaseError::MissingDbConnection)?
526 .interact(move |conn| {
527 let transaction = conn.transaction()?;
528 sql::select_account_delta(&transaction, account_id, from_block, to_block)
529 })
530 .await
531 .map_err(|err| DatabaseError::InteractError(err.to_string()))?
532 }
533
534 #[instrument(target = COMPONENT, skip_all, err)]
536 pub async fn optimize(&self) -> Result<(), DatabaseError> {
537 self.pool
538 .get()
539 .await?
540 .interact(move |conn| -> Result<()> {
541 conn.execute("PRAGMA optimize;", ())
542 .map(|_| ())
543 .map_err(DatabaseError::SqliteError)
544 })
545 .await
546 .map_err(|err| {
547 DatabaseError::InteractError(format!("Database optimization task failed: {err}"))
548 })?
549 }
550
551 #[instrument(target = COMPONENT, skip_all, err)]
558 async fn ensure_genesis_block(
559 &self,
560 genesis_filepath: &str,
561 block_store: Arc<BlockStore>,
562 ) -> Result<(), GenesisError> {
563 let genesis_block = {
564 let file_contents = fs::read(genesis_filepath).map_err(|source| {
565 GenesisError::FailedToReadGenesisFile {
566 genesis_filepath: genesis_filepath.to_string(),
567 source,
568 }
569 })?;
570
571 let genesis_state = GenesisState::read_from_bytes(&file_contents)
572 .map_err(GenesisError::GenesisFileDeserializationError)?;
573
574 genesis_state.into_block()?
575 };
576
577 let maybe_block_header_in_store = self
578 .select_block_header_by_block_num(Some(BlockNumber::GENESIS))
579 .await
580 .map_err(|err| GenesisError::SelectBlockHeaderByBlockNumError(err.into()))?;
581
582 let expected_genesis_header = genesis_block.header().clone();
583
584 match maybe_block_header_in_store {
585 Some(block_header_in_store) => {
586 if expected_genesis_header != block_header_in_store {
588 Err(GenesisError::GenesisBlockHeaderMismatch {
589 expected_genesis_header: Box::new(expected_genesis_header),
590 block_header_in_store: Box::new(block_header_in_store),
591 })?;
592 }
593 },
594 None => {
595 self.pool
597 .get()
598 .await
599 .map_err(DatabaseError::MissingDbConnection)?
600 .interact(move |conn| -> Result<()> {
601 let span = info_span!(target: COMPONENT, "write_genesis_block_to_db");
604 let guard = span.enter();
605
606 let transaction = conn.transaction()?;
607 sql::apply_block(
608 &transaction,
609 &expected_genesis_header,
610 &[],
611 &[],
612 genesis_block.updated_accounts(),
613 )?;
614
615 block_store.save_block_blocking(0.into(), &genesis_block.to_bytes())?;
616
617 transaction.commit()?;
618
619 drop(guard);
620 Ok(())
621 })
622 .await
623 .map_err(GenesisError::ApplyBlockFailed)??;
624 },
625 }
626
627 Ok(())
628 }
629}