use std::{
collections::{BTreeMap, BTreeSet},
fs::{self, create_dir_all},
sync::Arc,
};
use deadpool_sqlite::{Config as SqliteConfig, Hook, HookError, Pool, Runtime};
use miden_node_proto::{
domain::account::{AccountInfo, AccountSummary},
generated::note as proto,
};
use miden_objects::{
account::{AccountDelta, AccountId},
block::{Block, BlockHeader, BlockNoteIndex, BlockNumber},
crypto::{hash::rpo::RpoDigest, merkle::MerklePath, utils::Deserializable},
note::{NoteId, NoteInclusionProof, NoteMetadata, Nullifier},
transaction::TransactionId,
utils::Serializable,
};
use rusqlite::vtab::array;
use tokio::sync::oneshot;
use tracing::{info, info_span, instrument};
use crate::{
blocks::BlockStore,
config::StoreConfig,
db::migrations::apply_migrations,
errors::{DatabaseError, DatabaseSetupError, GenesisError, NoteSyncError, StateSyncError},
genesis::GenesisState,
COMPONENT, SQL_STATEMENT_CACHE_CAPACITY,
};
mod migrations;
mod sql;
mod settings;
#[cfg(test)]
mod tests;
pub type Result<T, E = DatabaseError> = std::result::Result<T, E>;
pub struct Db {
pool: Pool,
}
#[derive(Debug, PartialEq)]
pub struct NullifierInfo {
pub nullifier: Nullifier,
pub block_num: BlockNumber,
}
#[derive(Debug, PartialEq)]
pub struct TransactionSummary {
pub account_id: AccountId,
pub block_num: BlockNumber,
pub transaction_id: TransactionId,
}
#[derive(Debug, Clone, PartialEq)]
pub struct NoteRecord {
pub block_num: BlockNumber,
pub note_index: BlockNoteIndex,
pub note_id: RpoDigest,
pub metadata: NoteMetadata,
pub details: Option<Vec<u8>>,
pub merkle_path: MerklePath,
}
impl From<NoteRecord> for proto::Note {
fn from(note: NoteRecord) -> Self {
Self {
block_num: note.block_num.as_u32(),
note_index: note.note_index.leaf_index_value().into(),
note_id: Some(note.note_id.into()),
metadata: Some(note.metadata.into()),
merkle_path: Some(Into::into(¬e.merkle_path)),
details: note.details,
}
}
}
#[derive(Debug, PartialEq)]
pub struct StateSyncUpdate {
pub notes: Vec<NoteSyncRecord>,
pub block_header: BlockHeader,
pub account_updates: Vec<AccountSummary>,
pub transactions: Vec<TransactionSummary>,
pub nullifiers: Vec<NullifierInfo>,
}
#[derive(Debug, PartialEq)]
pub struct NoteSyncUpdate {
pub notes: Vec<NoteSyncRecord>,
pub block_header: BlockHeader,
}
#[derive(Debug, Clone, PartialEq)]
pub struct NoteSyncRecord {
pub block_num: BlockNumber,
pub note_index: BlockNoteIndex,
pub note_id: RpoDigest,
pub metadata: NoteMetadata,
pub merkle_path: MerklePath,
}
impl From<NoteSyncRecord> for proto::NoteSyncRecord {
fn from(note: NoteSyncRecord) -> Self {
Self {
note_index: note.note_index.leaf_index_value().into(),
note_id: Some(note.note_id.into()),
metadata: Some(note.metadata.into()),
merkle_path: Some(Into::into(¬e.merkle_path)),
}
}
}
impl From<NoteRecord> for NoteSyncRecord {
fn from(note: NoteRecord) -> Self {
Self {
block_num: note.block_num,
note_index: note.note_index,
note_id: note.note_id,
metadata: note.metadata,
merkle_path: note.merkle_path,
}
}
}
impl Db {
#[instrument(target = COMPONENT, skip_all)]
pub async fn setup(
config: StoreConfig,
block_store: Arc<BlockStore>,
) -> Result<Self, DatabaseSetupError> {
info!(target: COMPONENT, %config, "Connecting to the database");
if let Some(p) = config.database_filepath.parent() {
create_dir_all(p).map_err(DatabaseError::IoError)?;
}
let pool = SqliteConfig::new(config.database_filepath.clone())
.builder(Runtime::Tokio1)
.expect("Infallible")
.post_create(Hook::async_fn(move |conn, _| {
Box::pin(async move {
let _ = conn
.interact(|conn| {
array::load_module(conn)?;
conn.set_prepared_statement_cache_capacity(
SQL_STATEMENT_CACHE_CAPACITY,
);
conn.execute("PRAGMA journal_mode = WAL;", ())?;
conn.execute("PRAGMA foreign_keys = ON;", ())
})
.await
.map_err(|e| {
HookError::Message(format!("Loading carray module failed: {e}").into())
})?;
Ok(())
})
}))
.build()?;
info!(
target: COMPONENT,
sqlite = format!("{}", config.database_filepath.display()),
"Connected to the database"
);
let conn = pool.get().await.map_err(DatabaseError::MissingDbConnection)?;
conn.interact(apply_migrations).await.map_err(|err| {
DatabaseError::InteractError(format!("Migration task failed: {err}"))
})??;
let db = Db { pool };
db.ensure_genesis_block(&config.genesis_filepath.as_path().to_string_lossy(), block_store)
.await?;
Ok(db)
}
#[instrument(target = COMPONENT, skip_all, ret(level = "debug"), err)]
pub async fn select_all_nullifiers(&self) -> Result<Vec<(Nullifier, BlockNumber)>> {
self.pool
.get()
.await?
.interact(sql::select_all_nullifiers)
.await
.map_err(|err| {
DatabaseError::InteractError(format!("Select nullifiers task failed: {err}"))
})?
}
#[instrument(target = COMPONENT, skip_all, ret(level = "debug"), err)]
pub async fn select_nullifiers_by_prefix(
&self,
prefix_len: u32,
nullifier_prefixes: Vec<u32>,
) -> Result<Vec<NullifierInfo>> {
self.pool
.get()
.await?
.interact(move |conn| {
sql::select_nullifiers_by_prefix(conn, prefix_len, &nullifier_prefixes)
})
.await
.map_err(|err| {
DatabaseError::InteractError(format!(
"Select nullifiers by prefix task failed: {err}"
))
})?
}
#[instrument(target = COMPONENT, skip_all, ret(level = "debug"), err)]
pub async fn select_block_header_by_block_num(
&self,
block_number: Option<BlockNumber>,
) -> Result<Option<BlockHeader>> {
self.pool
.get()
.await?
.interact(move |conn| sql::select_block_header_by_block_num(conn, block_number))
.await
.map_err(|err| {
DatabaseError::InteractError(format!("Select block header task failed: {err}"))
})?
}
#[instrument(target = COMPONENT, skip_all, ret(level = "debug"), err)]
pub async fn select_block_headers(&self, blocks: Vec<BlockNumber>) -> Result<Vec<BlockHeader>> {
self.pool
.get()
.await?
.interact(move |conn| sql::select_block_headers(conn, &blocks))
.await
.map_err(|err| {
DatabaseError::InteractError(format!(
"Select many block headers task failed: {err}"
))
})?
}
#[instrument(target = COMPONENT, skip_all, ret(level = "debug"), err)]
pub async fn select_all_block_headers(&self) -> Result<Vec<BlockHeader>> {
self.pool
.get()
.await?
.interact(sql::select_all_block_headers)
.await
.map_err(|err| {
DatabaseError::InteractError(format!("Select block headers task failed: {err}"))
})?
}
#[instrument(target = COMPONENT, skip_all, ret(level = "debug"), err)]
pub async fn select_all_account_hashes(&self) -> Result<Vec<(AccountId, RpoDigest)>> {
self.pool
.get()
.await?
.interact(sql::select_all_account_hashes)
.await
.map_err(|err| {
DatabaseError::InteractError(format!("Select account hashes task failed: {err}"))
})?
}
#[instrument(target = COMPONENT, skip_all, ret(level = "debug"), err)]
pub async fn select_account(&self, id: AccountId) -> Result<AccountInfo> {
self.pool
.get()
.await?
.interact(move |conn| sql::select_account(conn, id))
.await
.map_err(|err| {
DatabaseError::InteractError(format!("Get account details task failed: {err}"))
})?
}
#[instrument(target = COMPONENT, skip_all, ret(level = "debug"), err)]
pub async fn select_accounts_by_ids(
&self,
account_ids: Vec<AccountId>,
) -> Result<Vec<AccountInfo>> {
self.pool
.get()
.await?
.interact(move |conn| sql::select_accounts_by_ids(conn, &account_ids))
.await
.map_err(|err| {
DatabaseError::InteractError(format!("Get accounts details task failed: {err}"))
})?
}
#[instrument(target = COMPONENT, skip_all, ret(level = "debug"), err)]
pub async fn get_state_sync(
&self,
block_num: BlockNumber,
account_ids: Vec<AccountId>,
note_tags: Vec<u32>,
nullifier_prefixes: Vec<u32>,
) -> Result<StateSyncUpdate, StateSyncError> {
self.pool
.get()
.await
.map_err(DatabaseError::MissingDbConnection)?
.interact(move |conn| {
sql::get_state_sync(conn, block_num, &account_ids, ¬e_tags, &nullifier_prefixes)
})
.await
.map_err(|err| {
DatabaseError::InteractError(format!("Get state sync task failed: {err}"))
})?
}
#[instrument(target = COMPONENT, skip_all, ret(level = "debug"), err)]
pub async fn get_note_sync(
&self,
block_num: BlockNumber,
note_tags: Vec<u32>,
) -> Result<NoteSyncUpdate, NoteSyncError> {
self.pool
.get()
.await
.map_err(DatabaseError::MissingDbConnection)?
.interact(move |conn| sql::get_note_sync(conn, block_num, ¬e_tags))
.await
.map_err(|err| {
DatabaseError::InteractError(format!("Get notes sync task failed: {err}"))
})?
}
#[instrument(target = COMPONENT, skip_all, ret(level = "debug"), err)]
pub async fn select_notes_by_id(&self, note_ids: Vec<NoteId>) -> Result<Vec<NoteRecord>> {
self.pool
.get()
.await?
.interact(move |conn| sql::select_notes_by_id(conn, ¬e_ids))
.await
.map_err(|err| {
DatabaseError::InteractError(format!("Select note by id task failed: {err}"))
})?
}
#[instrument(target = COMPONENT, skip_all, ret(level = "debug"), err)]
pub async fn select_note_inclusion_proofs(
&self,
note_ids: BTreeSet<NoteId>,
) -> Result<BTreeMap<NoteId, NoteInclusionProof>> {
self.pool
.get()
.await?
.interact(move |conn| sql::select_note_inclusion_proofs(conn, note_ids))
.await
.map_err(|err| {
DatabaseError::InteractError(format!(
"Select block note inclusion proofs task failed: {err}"
))
})?
}
#[instrument(target = COMPONENT, skip_all, ret(level = "debug"), err)]
pub async fn select_note_ids(&self, note_ids: Vec<NoteId>) -> Result<BTreeSet<NoteId>> {
self.select_notes_by_id(note_ids)
.await
.map(|notes| notes.into_iter().map(|note| note.note_id.into()).collect())
}
#[instrument(target = COMPONENT, skip_all, err)]
pub async fn apply_block(
&self,
allow_acquire: oneshot::Sender<()>,
acquire_done: oneshot::Receiver<()>,
block: Block,
notes: Vec<NoteRecord>,
) -> Result<()> {
self.pool
.get()
.await?
.interact(move |conn| -> Result<()> {
let _span = info_span!(target: COMPONENT, "write_block_to_db").entered();
let transaction = conn.transaction()?;
sql::apply_block(
&transaction,
&block.header(),
¬es,
block.nullifiers(),
block.updated_accounts(),
)?;
let _ = allow_acquire.send(());
acquire_done.blocking_recv()?;
transaction.commit()?;
Ok(())
})
.await
.map_err(|err| {
DatabaseError::InteractError(format!("Apply block task failed: {err}"))
})??;
Ok(())
}
pub(crate) async fn select_account_state_delta(
&self,
account_id: AccountId,
from_block: BlockNumber,
to_block: BlockNumber,
) -> Result<Option<AccountDelta>> {
self.pool
.get()
.await
.map_err(DatabaseError::MissingDbConnection)?
.interact(move |conn| -> Result<Option<AccountDelta>> {
sql::select_account_delta(conn, account_id, from_block, to_block)
})
.await
.map_err(|err| DatabaseError::InteractError(err.to_string()))?
}
#[instrument(target = COMPONENT, skip_all, err)]
async fn ensure_genesis_block(
&self,
genesis_filepath: &str,
block_store: Arc<BlockStore>,
) -> Result<(), GenesisError> {
let genesis_block = {
let file_contents = fs::read(genesis_filepath).map_err(|source| {
GenesisError::FailedToReadGenesisFile {
genesis_filepath: genesis_filepath.to_string(),
source,
}
})?;
let genesis_state = GenesisState::read_from_bytes(&file_contents)
.map_err(GenesisError::GenesisFileDeserializationError)?;
genesis_state.into_block()?
};
let maybe_block_header_in_store = self
.select_block_header_by_block_num(Some(BlockNumber::GENESIS))
.await
.map_err(|err| GenesisError::SelectBlockHeaderByBlockNumError(err.into()))?;
let expected_genesis_header = genesis_block.header();
match maybe_block_header_in_store {
Some(block_header_in_store) => {
if expected_genesis_header != block_header_in_store {
Err(GenesisError::GenesisBlockHeaderMismatch {
expected_genesis_header: Box::new(expected_genesis_header),
block_header_in_store: Box::new(block_header_in_store),
})?;
}
},
None => {
self.pool
.get()
.await
.map_err(DatabaseError::MissingDbConnection)?
.interact(move |conn| -> Result<()> {
let span = info_span!(target: COMPONENT, "write_genesis_block_to_db");
let guard = span.enter();
let transaction = conn.transaction()?;
sql::apply_block(
&transaction,
&expected_genesis_header,
&[],
&[],
genesis_block.updated_accounts(),
)?;
block_store.save_block_blocking(0.into(), &genesis_block.to_bytes())?;
transaction.commit()?;
drop(guard);
Ok(())
})
.await
.map_err(GenesisError::ApplyBlockFailed)??;
},
}
Ok(())
}
}