use std::{
collections::BTreeSet,
fs::{self, create_dir_all},
sync::Arc,
};
use deadpool_sqlite::{Config as SqliteConfig, Hook, HookError, Pool, Runtime};
use miden_node_proto::{
domain::accounts::{AccountInfo, AccountSummary},
generated::note::Note as NotePb,
};
use miden_objects::{
block::{Block, BlockNoteIndex},
crypto::{hash::rpo::RpoDigest, merkle::MerklePath, utils::Deserializable},
notes::{NoteId, NoteMetadata, Nullifier},
transaction::TransactionId,
utils::Serializable,
BlockHeader, GENESIS_BLOCK,
};
use rusqlite::vtab::array;
use tokio::sync::oneshot;
use tracing::{info, info_span, instrument};
use crate::{
blocks::BlockStore,
config::StoreConfig,
db::migrations::apply_migrations,
errors::{DatabaseError, DatabaseSetupError, GenesisError, StateSyncError},
genesis::GenesisState,
types::{AccountId, BlockNumber},
COMPONENT,
};
mod migrations;
mod sql;
mod settings;
#[cfg(test)]
mod tests;
pub type Result<T, E = DatabaseError> = std::result::Result<T, E>;
pub struct Db {
pool: Pool,
}
#[derive(Debug, PartialEq)]
pub struct NullifierInfo {
pub nullifier: Nullifier,
pub block_num: BlockNumber,
}
#[derive(Debug, PartialEq)]
pub struct TransactionSummary {
pub account_id: AccountId,
pub block_num: BlockNumber,
pub transaction_id: TransactionId,
}
#[derive(Debug, Clone, PartialEq)]
pub struct NoteRecord {
pub block_num: BlockNumber,
pub note_index: BlockNoteIndex,
pub note_id: RpoDigest,
pub metadata: NoteMetadata,
pub details: Option<Vec<u8>>,
pub merkle_path: MerklePath,
}
impl From<NoteRecord> for NotePb {
fn from(note: NoteRecord) -> Self {
Self {
block_num: note.block_num,
note_index: note.note_index.to_absolute_index() as u32,
note_id: Some(note.note_id.into()),
metadata: Some(note.metadata.into()),
merkle_path: Some(note.merkle_path.into()),
details: note.details,
}
}
}
#[derive(Debug, PartialEq)]
pub struct StateSyncUpdate {
pub notes: Vec<NoteRecord>,
pub block_header: BlockHeader,
pub chain_tip: BlockNumber,
pub account_updates: Vec<AccountSummary>,
pub transactions: Vec<TransactionSummary>,
pub nullifiers: Vec<NullifierInfo>,
}
impl Db {
#[instrument(target = "miden-store", skip_all)]
pub async fn setup(
config: StoreConfig,
block_store: Arc<BlockStore>,
) -> Result<Self, DatabaseSetupError> {
info!(target: COMPONENT, %config, "Connecting to the database");
if let Some(p) = config.database_filepath.parent() {
create_dir_all(p).map_err(DatabaseError::IoError)?;
}
let pool = SqliteConfig::new(config.database_filepath.clone())
.builder(Runtime::Tokio1)
.expect("Infallible")
.post_create(Hook::async_fn(move |conn, _| {
Box::pin(async move {
let _ = conn
.interact(|conn| {
array::load_module(conn)?;
conn.execute("PRAGMA journal_mode = WAL;", ())?;
conn.execute("PRAGMA foreign_keys = ON;", ())
})
.await
.map_err(|e| {
HookError::Message(format!("Loading carray module failed: {e}"))
})?;
Ok(())
})
}))
.build()?;
info!(
target: COMPONENT,
sqlite = format!("{}", config.database_filepath.display()),
"Connected to the database"
);
let conn = pool.get().await.map_err(DatabaseError::MissingDbConnection)?;
conn.interact(apply_migrations).await.map_err(|err| {
DatabaseError::InteractError(format!("Migration task failed: {err}"))
})??;
let db = Db { pool };
db.ensure_genesis_block(&config.genesis_filepath.as_path().to_string_lossy(), block_store)
.await?;
Ok(db)
}
#[instrument(target = "miden-store", skip_all, ret(level = "debug"), err)]
pub async fn select_nullifiers(&self) -> Result<Vec<(Nullifier, BlockNumber)>> {
self.pool.get().await?.interact(sql::select_nullifiers).await.map_err(|err| {
DatabaseError::InteractError(format!("Select nullifiers task failed: {err}"))
})?
}
#[instrument(target = "miden-store", skip_all, ret(level = "debug"), err)]
pub async fn select_notes(&self) -> Result<Vec<NoteRecord>> {
self.pool.get().await?.interact(sql::select_notes).await.map_err(|err| {
DatabaseError::InteractError(format!("Select notes task failed: {err}"))
})?
}
#[instrument(target = "miden-store", skip_all, ret(level = "debug"), err)]
pub async fn select_accounts(&self) -> Result<Vec<AccountInfo>> {
self.pool.get().await?.interact(sql::select_accounts).await.map_err(|err| {
DatabaseError::InteractError(format!("Select accounts task failed: {err}"))
})?
}
#[instrument(target = "miden-store", skip_all, ret(level = "debug"), err)]
pub async fn select_block_header_by_block_num(
&self,
block_number: Option<BlockNumber>,
) -> Result<Option<BlockHeader>> {
self.pool
.get()
.await?
.interact(move |conn| sql::select_block_header_by_block_num(conn, block_number))
.await
.map_err(|err| {
DatabaseError::InteractError(format!("Select block header task failed: {err}"))
})?
}
#[instrument(target = "miden-store", skip_all, ret(level = "debug"), err)]
pub async fn select_block_headers(&self) -> Result<Vec<BlockHeader>> {
self.pool
.get()
.await?
.interact(sql::select_block_headers)
.await
.map_err(|err| {
DatabaseError::InteractError(format!("Select block headers task failed: {err}"))
})?
}
#[instrument(target = "miden-store", skip_all, ret(level = "debug"), err)]
pub async fn select_account_hashes(&self) -> Result<Vec<(AccountId, RpoDigest)>> {
self.pool
.get()
.await?
.interact(sql::select_account_hashes)
.await
.map_err(|err| {
DatabaseError::InteractError(format!("Select account hashes task failed: {err}"))
})?
}
#[instrument(target = "miden-store", skip_all, ret(level = "debug"), err)]
pub async fn select_account(&self, id: AccountId) -> Result<AccountInfo> {
self.pool
.get()
.await?
.interact(move |conn| sql::select_account(conn, id))
.await
.map_err(|err| {
DatabaseError::InteractError(format!("Get account details task failed: {err}"))
})?
}
#[instrument(target = "miden-store", skip_all, ret(level = "debug"), err)]
pub async fn get_state_sync(
&self,
block_num: BlockNumber,
account_ids: &[AccountId],
note_tag_prefixes: &[u32],
nullifier_prefixes: &[u32],
) -> Result<StateSyncUpdate, StateSyncError> {
let account_ids = account_ids.to_vec();
let note_tag_prefixes = note_tag_prefixes.to_vec();
let nullifier_prefixes = nullifier_prefixes.to_vec();
self.pool
.get()
.await
.map_err(DatabaseError::MissingDbConnection)?
.interact(move |conn| {
sql::get_state_sync(
conn,
block_num,
&account_ids,
¬e_tag_prefixes,
&nullifier_prefixes,
)
})
.await
.map_err(|err| {
DatabaseError::InteractError(format!("Get state sync task failed: {err}"))
})?
}
#[instrument(target = "miden-store", skip_all, ret(level = "debug"), err)]
pub async fn select_notes_by_id(&self, note_ids: Vec<NoteId>) -> Result<Vec<NoteRecord>> {
self.pool
.get()
.await?
.interact(move |conn| sql::select_notes_by_id(conn, ¬e_ids))
.await
.map_err(|err| {
DatabaseError::InteractError(format!("Select note by id task failed: {err}"))
})?
}
#[instrument(target = "miden-store", skip_all, ret(level = "debug"), err)]
pub async fn select_note_ids(&self, note_ids: Vec<NoteId>) -> Result<BTreeSet<NoteId>> {
self.select_notes_by_id(note_ids)
.await
.map(|notes| notes.into_iter().map(|note| note.note_id.into()).collect())
}
#[instrument(target = "miden-store", skip_all, err)]
pub async fn apply_block(
&self,
allow_acquire: oneshot::Sender<()>,
acquire_done: oneshot::Receiver<()>,
block: Block,
notes: Vec<NoteRecord>,
) -> Result<()> {
self.pool
.get()
.await?
.interact(move |conn| -> Result<()> {
let _span = info_span!(target: COMPONENT, "write_block_to_db").entered();
let transaction = conn.transaction()?;
sql::apply_block(
&transaction,
&block.header(),
¬es,
block.created_nullifiers(),
block.updated_accounts(),
)?;
let _ = allow_acquire.send(());
acquire_done
.blocking_recv()
.map_err(DatabaseError::ApplyBlockFailedClosedChannel)?;
transaction.commit()?;
Ok(())
})
.await
.map_err(|err| {
DatabaseError::InteractError(format!("Apply block task failed: {err}"))
})??;
Ok(())
}
#[instrument(target = "miden-store", skip_all, err)]
async fn ensure_genesis_block(
&self,
genesis_filepath: &str,
block_store: Arc<BlockStore>,
) -> Result<(), GenesisError> {
let genesis_block = {
let file_contents = fs::read(genesis_filepath).map_err(|error| {
GenesisError::FailedToReadGenesisFile {
genesis_filepath: genesis_filepath.to_string(),
error,
}
})?;
let genesis_state = GenesisState::read_from_bytes(&file_contents)
.map_err(GenesisError::GenesisFileDeserializationError)?;
genesis_state.into_block()?
};
let maybe_block_header_in_store = self
.select_block_header_by_block_num(Some(GENESIS_BLOCK))
.await
.map_err(|err| GenesisError::SelectBlockHeaderByBlockNumError(err.into()))?;
let expected_genesis_header = genesis_block.header();
match maybe_block_header_in_store {
Some(block_header_in_store) => {
if expected_genesis_header != block_header_in_store {
Err(GenesisError::GenesisBlockHeaderMismatch {
expected_genesis_header: Box::new(expected_genesis_header),
block_header_in_store: Box::new(block_header_in_store),
})?;
}
},
None => {
self.pool
.get()
.await
.map_err(DatabaseError::MissingDbConnection)?
.interact(move |conn| -> Result<()> {
let span = info_span!(target: COMPONENT, "write_genesis_block_to_db");
let guard = span.enter();
let transaction = conn.transaction()?;
sql::apply_block(
&transaction,
&expected_genesis_header,
&[],
&[],
genesis_block.updated_accounts(),
)?;
block_store.save_block_blocking(0, &genesis_block.to_bytes())?;
transaction.commit()?;
drop(guard);
Ok(())
})
.await
.map_err(|err| GenesisError::ApplyBlockFailed(err.to_string()))??;
},
}
Ok(())
}
}