1use std::{
2 collections::{BTreeMap, BTreeSet},
3 fs::{self, create_dir_all},
4 sync::Arc,
5};
6
7use deadpool_sqlite::{Config as SqliteConfig, Hook, HookError, Pool, Runtime};
8use miden_node_proto::{
9 domain::account::{AccountInfo, AccountSummary},
10 generated::note as proto,
11};
12use miden_objects::{
13 account::{AccountDelta, AccountId},
14 block::{Block, BlockHeader, BlockNoteIndex, BlockNumber},
15 crypto::{hash::rpo::RpoDigest, merkle::MerklePath, utils::Deserializable},
16 note::{NoteId, NoteInclusionProof, NoteMetadata, Nullifier},
17 transaction::TransactionId,
18 utils::Serializable,
19};
20use rusqlite::vtab::array;
21use tokio::sync::oneshot;
22use tracing::{info, info_span, instrument};
23
24use crate::{
25 blocks::BlockStore,
26 config::StoreConfig,
27 db::migrations::apply_migrations,
28 errors::{DatabaseError, DatabaseSetupError, GenesisError, NoteSyncError, StateSyncError},
29 genesis::GenesisState,
30 COMPONENT, SQL_STATEMENT_CACHE_CAPACITY,
31};
32
33mod migrations;
34mod sql;
35
36mod settings;
37#[cfg(test)]
38mod tests;
39
40pub type Result<T, E = DatabaseError> = std::result::Result<T, E>;
41
42pub struct Db {
43 pool: Pool,
44}
45
46#[derive(Debug, PartialEq)]
47pub struct NullifierInfo {
48 pub nullifier: Nullifier,
49 pub block_num: BlockNumber,
50}
51
52#[derive(Debug, PartialEq)]
53pub struct TransactionSummary {
54 pub account_id: AccountId,
55 pub block_num: BlockNumber,
56 pub transaction_id: TransactionId,
57}
58
59#[derive(Debug, Clone, PartialEq)]
60pub struct NoteRecord {
61 pub block_num: BlockNumber,
62 pub note_index: BlockNoteIndex,
63 pub note_id: RpoDigest,
64 pub metadata: NoteMetadata,
65 pub details: Option<Vec<u8>>,
66 pub merkle_path: MerklePath,
67}
68
69impl From<NoteRecord> for proto::Note {
70 fn from(note: NoteRecord) -> Self {
71 Self {
72 block_num: note.block_num.as_u32(),
73 note_index: note.note_index.leaf_index_value().into(),
74 note_id: Some(note.note_id.into()),
75 metadata: Some(note.metadata.into()),
76 merkle_path: Some(Into::into(¬e.merkle_path)),
77 details: note.details,
78 }
79 }
80}
81
82#[derive(Debug, PartialEq)]
83pub struct StateSyncUpdate {
84 pub notes: Vec<NoteSyncRecord>,
85 pub block_header: BlockHeader,
86 pub account_updates: Vec<AccountSummary>,
87 pub transactions: Vec<TransactionSummary>,
88 pub nullifiers: Vec<NullifierInfo>,
89}
90
91#[derive(Debug, PartialEq)]
92pub struct NoteSyncUpdate {
93 pub notes: Vec<NoteSyncRecord>,
94 pub block_header: BlockHeader,
95}
96
97#[derive(Debug, Clone, PartialEq)]
98pub struct NoteSyncRecord {
99 pub block_num: BlockNumber,
100 pub note_index: BlockNoteIndex,
101 pub note_id: RpoDigest,
102 pub metadata: NoteMetadata,
103 pub merkle_path: MerklePath,
104}
105
106impl From<NoteSyncRecord> for proto::NoteSyncRecord {
107 fn from(note: NoteSyncRecord) -> Self {
108 Self {
109 note_index: note.note_index.leaf_index_value().into(),
110 note_id: Some(note.note_id.into()),
111 metadata: Some(note.metadata.into()),
112 merkle_path: Some(Into::into(¬e.merkle_path)),
113 }
114 }
115}
116
117impl From<NoteRecord> for NoteSyncRecord {
118 fn from(note: NoteRecord) -> Self {
119 Self {
120 block_num: note.block_num,
121 note_index: note.note_index,
122 note_id: note.note_id,
123 metadata: note.metadata,
124 merkle_path: note.merkle_path,
125 }
126 }
127}
128
129impl Db {
130 #[instrument(target = COMPONENT, skip_all)]
134 pub async fn setup(
135 config: StoreConfig,
136 block_store: Arc<BlockStore>,
137 ) -> Result<Self, DatabaseSetupError> {
138 info!(target: COMPONENT, %config, "Connecting to the database");
139
140 if let Some(p) = config.database_filepath.parent() {
141 create_dir_all(p).map_err(DatabaseError::IoError)?;
142 }
143
144 let pool = SqliteConfig::new(config.database_filepath.clone())
145 .builder(Runtime::Tokio1)
146 .expect("Infallible")
147 .post_create(Hook::async_fn(move |conn, _| {
148 Box::pin(async move {
149 let _ = conn
150 .interact(|conn| {
151 array::load_module(conn)?;
155
156 conn.set_prepared_statement_cache_capacity(
158 SQL_STATEMENT_CACHE_CAPACITY,
159 );
160
161 conn.execute("PRAGMA journal_mode = WAL;", ())?;
166
167 conn.execute("PRAGMA foreign_keys = ON;", ())
169 })
170 .await
171 .map_err(|e| {
172 HookError::Message(format!("Loading carray module failed: {e}").into())
173 })?;
174
175 Ok(())
176 })
177 }))
178 .build()?;
179
180 info!(
181 target: COMPONENT,
182 sqlite = format!("{}", config.database_filepath.display()),
183 "Connected to the database"
184 );
185
186 let conn = pool.get().await.map_err(DatabaseError::MissingDbConnection)?;
187
188 conn.interact(apply_migrations).await.map_err(|err| {
189 DatabaseError::InteractError(format!("Migration task failed: {err}"))
190 })??;
191
192 let db = Db { pool };
193 db.ensure_genesis_block(&config.genesis_filepath.as_path().to_string_lossy(), block_store)
194 .await?;
195
196 Ok(db)
197 }
198
199 #[instrument(target = COMPONENT, skip_all, ret(level = "debug"), err)]
201 pub async fn select_all_nullifiers(&self) -> Result<Vec<(Nullifier, BlockNumber)>> {
202 self.pool
203 .get()
204 .await?
205 .interact(sql::select_all_nullifiers)
206 .await
207 .map_err(|err| {
208 DatabaseError::InteractError(format!("Select nullifiers task failed: {err}"))
209 })?
210 }
211
212 #[instrument(target = COMPONENT, skip_all, ret(level = "debug"), err)]
214 pub async fn select_nullifiers_by_prefix(
215 &self,
216 prefix_len: u32,
217 nullifier_prefixes: Vec<u32>,
218 ) -> Result<Vec<NullifierInfo>> {
219 self.pool
220 .get()
221 .await?
222 .interact(move |conn| {
223 sql::select_nullifiers_by_prefix(conn, prefix_len, &nullifier_prefixes)
224 })
225 .await
226 .map_err(|err| {
227 DatabaseError::InteractError(format!(
228 "Select nullifiers by prefix task failed: {err}"
229 ))
230 })?
231 }
232
233 #[instrument(target = COMPONENT, skip_all, ret(level = "debug"), err)]
237 pub async fn select_block_header_by_block_num(
238 &self,
239 block_number: Option<BlockNumber>,
240 ) -> Result<Option<BlockHeader>> {
241 self.pool
242 .get()
243 .await?
244 .interact(move |conn| sql::select_block_header_by_block_num(conn, block_number))
245 .await
246 .map_err(|err| {
247 DatabaseError::InteractError(format!("Select block header task failed: {err}"))
248 })?
249 }
250
251 #[instrument(target = COMPONENT, skip_all, ret(level = "debug"), err)]
253 pub async fn select_block_headers(&self, blocks: Vec<BlockNumber>) -> Result<Vec<BlockHeader>> {
254 self.pool
255 .get()
256 .await?
257 .interact(move |conn| sql::select_block_headers(conn, &blocks))
258 .await
259 .map_err(|err| {
260 DatabaseError::InteractError(format!(
261 "Select many block headers task failed: {err}"
262 ))
263 })?
264 }
265
266 #[instrument(target = COMPONENT, skip_all, ret(level = "debug"), err)]
268 pub async fn select_all_block_headers(&self) -> Result<Vec<BlockHeader>> {
269 self.pool
270 .get()
271 .await?
272 .interact(sql::select_all_block_headers)
273 .await
274 .map_err(|err| {
275 DatabaseError::InteractError(format!("Select block headers task failed: {err}"))
276 })?
277 }
278
279 #[instrument(target = COMPONENT, skip_all, ret(level = "debug"), err)]
281 pub async fn select_all_account_hashes(&self) -> Result<Vec<(AccountId, RpoDigest)>> {
282 self.pool
283 .get()
284 .await?
285 .interact(sql::select_all_account_hashes)
286 .await
287 .map_err(|err| {
288 DatabaseError::InteractError(format!("Select account hashes task failed: {err}"))
289 })?
290 }
291
292 #[instrument(target = COMPONENT, skip_all, ret(level = "debug"), err)]
294 pub async fn select_account(&self, id: AccountId) -> Result<AccountInfo> {
295 self.pool
296 .get()
297 .await?
298 .interact(move |conn| sql::select_account(conn, id))
299 .await
300 .map_err(|err| {
301 DatabaseError::InteractError(format!("Get account details task failed: {err}"))
302 })?
303 }
304
305 #[instrument(target = COMPONENT, skip_all, ret(level = "debug"), err)]
307 pub async fn select_accounts_by_ids(
308 &self,
309 account_ids: Vec<AccountId>,
310 ) -> Result<Vec<AccountInfo>> {
311 self.pool
312 .get()
313 .await?
314 .interact(move |conn| sql::select_accounts_by_ids(conn, &account_ids))
315 .await
316 .map_err(|err| {
317 DatabaseError::InteractError(format!("Get accounts details task failed: {err}"))
318 })?
319 }
320
321 #[instrument(target = COMPONENT, skip_all, ret(level = "debug"), err)]
322 pub async fn get_state_sync(
323 &self,
324 block_num: BlockNumber,
325 account_ids: Vec<AccountId>,
326 note_tags: Vec<u32>,
327 nullifier_prefixes: Vec<u32>,
328 ) -> Result<StateSyncUpdate, StateSyncError> {
329 self.pool
330 .get()
331 .await
332 .map_err(DatabaseError::MissingDbConnection)?
333 .interact(move |conn| {
334 sql::get_state_sync(conn, block_num, &account_ids, ¬e_tags, &nullifier_prefixes)
335 })
336 .await
337 .map_err(|err| {
338 DatabaseError::InteractError(format!("Get state sync task failed: {err}"))
339 })?
340 }
341
342 #[instrument(target = COMPONENT, skip_all, ret(level = "debug"), err)]
343 pub async fn get_note_sync(
344 &self,
345 block_num: BlockNumber,
346 note_tags: Vec<u32>,
347 ) -> Result<NoteSyncUpdate, NoteSyncError> {
348 self.pool
349 .get()
350 .await
351 .map_err(DatabaseError::MissingDbConnection)?
352 .interact(move |conn| sql::get_note_sync(conn, block_num, ¬e_tags))
353 .await
354 .map_err(|err| {
355 DatabaseError::InteractError(format!("Get notes sync task failed: {err}"))
356 })?
357 }
358
359 #[instrument(target = COMPONENT, skip_all, ret(level = "debug"), err)]
361 pub async fn select_notes_by_id(&self, note_ids: Vec<NoteId>) -> Result<Vec<NoteRecord>> {
362 self.pool
363 .get()
364 .await?
365 .interact(move |conn| sql::select_notes_by_id(conn, ¬e_ids))
366 .await
367 .map_err(|err| {
368 DatabaseError::InteractError(format!("Select note by id task failed: {err}"))
369 })?
370 }
371
372 #[instrument(target = COMPONENT, skip_all, ret(level = "debug"), err)]
374 pub async fn select_note_inclusion_proofs(
375 &self,
376 note_ids: BTreeSet<NoteId>,
377 ) -> Result<BTreeMap<NoteId, NoteInclusionProof>> {
378 self.pool
379 .get()
380 .await?
381 .interact(move |conn| sql::select_note_inclusion_proofs(conn, note_ids))
382 .await
383 .map_err(|err| {
384 DatabaseError::InteractError(format!(
385 "Select block note inclusion proofs task failed: {err}"
386 ))
387 })?
388 }
389
390 #[instrument(target = COMPONENT, skip_all, ret(level = "debug"), err)]
392 pub async fn select_note_ids(&self, note_ids: Vec<NoteId>) -> Result<BTreeSet<NoteId>> {
393 self.select_notes_by_id(note_ids)
394 .await
395 .map(|notes| notes.into_iter().map(|note| note.note_id.into()).collect())
396 }
397
398 #[instrument(target = COMPONENT, skip_all, err)]
404 pub async fn apply_block(
405 &self,
406 allow_acquire: oneshot::Sender<()>,
407 acquire_done: oneshot::Receiver<()>,
408 block: Block,
409 notes: Vec<NoteRecord>,
410 ) -> Result<()> {
411 self.pool
412 .get()
413 .await?
414 .interact(move |conn| -> Result<()> {
415 let _span = info_span!(target: COMPONENT, "write_block_to_db").entered();
417
418 let transaction = conn.transaction()?;
419 sql::apply_block(
420 &transaction,
421 &block.header(),
422 ¬es,
423 block.nullifiers(),
424 block.updated_accounts(),
425 )?;
426
427 let _ = allow_acquire.send(());
428 acquire_done.blocking_recv()?;
429
430 transaction.commit()?;
431
432 Ok(())
433 })
434 .await
435 .map_err(|err| {
436 DatabaseError::InteractError(format!("Apply block task failed: {err}"))
437 })??;
438
439 Ok(())
440 }
441
442 pub(crate) async fn select_account_state_delta(
448 &self,
449 account_id: AccountId,
450 from_block: BlockNumber,
451 to_block: BlockNumber,
452 ) -> Result<Option<AccountDelta>> {
453 self.pool
454 .get()
455 .await
456 .map_err(DatabaseError::MissingDbConnection)?
457 .interact(move |conn| -> Result<Option<AccountDelta>> {
458 sql::select_account_delta(conn, account_id, from_block, to_block)
459 })
460 .await
461 .map_err(|err| DatabaseError::InteractError(err.to_string()))?
462 }
463
464 #[instrument(target = COMPONENT, skip_all, err)]
471 async fn ensure_genesis_block(
472 &self,
473 genesis_filepath: &str,
474 block_store: Arc<BlockStore>,
475 ) -> Result<(), GenesisError> {
476 let genesis_block = {
477 let file_contents = fs::read(genesis_filepath).map_err(|source| {
478 GenesisError::FailedToReadGenesisFile {
479 genesis_filepath: genesis_filepath.to_string(),
480 source,
481 }
482 })?;
483
484 let genesis_state = GenesisState::read_from_bytes(&file_contents)
485 .map_err(GenesisError::GenesisFileDeserializationError)?;
486
487 genesis_state.into_block()?
488 };
489
490 let maybe_block_header_in_store = self
491 .select_block_header_by_block_num(Some(BlockNumber::GENESIS))
492 .await
493 .map_err(|err| GenesisError::SelectBlockHeaderByBlockNumError(err.into()))?;
494
495 let expected_genesis_header = genesis_block.header();
496
497 match maybe_block_header_in_store {
498 Some(block_header_in_store) => {
499 if expected_genesis_header != block_header_in_store {
501 Err(GenesisError::GenesisBlockHeaderMismatch {
502 expected_genesis_header: Box::new(expected_genesis_header),
503 block_header_in_store: Box::new(block_header_in_store),
504 })?;
505 }
506 },
507 None => {
508 self.pool
510 .get()
511 .await
512 .map_err(DatabaseError::MissingDbConnection)?
513 .interact(move |conn| -> Result<()> {
514 let span = info_span!(target: COMPONENT, "write_genesis_block_to_db");
517 let guard = span.enter();
518
519 let transaction = conn.transaction()?;
520 sql::apply_block(
521 &transaction,
522 &expected_genesis_header,
523 &[],
524 &[],
525 genesis_block.updated_accounts(),
526 )?;
527
528 block_store.save_block_blocking(0.into(), &genesis_block.to_bytes())?;
529
530 transaction.commit()?;
531
532 drop(guard);
533 Ok(())
534 })
535 .await
536 .map_err(GenesisError::ApplyBlockFailed)??;
537 },
538 }
539
540 Ok(())
541 }
542}