use std::collections::{BTreeMap, BTreeSet, HashSet};
use std::mem::size_of;
use std::ops::{Deref, DerefMut, RangeInclusive};
use std::path::PathBuf;
use std::sync::Arc;
use anyhow::Context;
use diesel::{Connection, QueryableByName, RunQueryDsl, SqliteConnection};
use miden_node_proto::domain::account::AccountInfo;
use miden_node_proto::{BlockProofRequest, generated as proto};
use miden_node_utils::limiter::MAX_RESPONSE_PAYLOAD_BYTES;
use miden_node_utils::tracing::OpenTelemetrySpanExt;
use miden_protocol::Word;
use miden_protocol::account::{AccountHeader, AccountId, AccountStorageHeader, StorageMapKey};
use miden_protocol::asset::{Asset, AssetVaultKey};
use miden_protocol::block::{BlockHeader, BlockNoteIndex, BlockNumber, SignedBlock};
use miden_protocol::crypto::merkle::SparseMerklePath;
use miden_protocol::note::{
NoteDetails,
NoteId,
NoteInclusionProof,
NoteMetadata,
NoteScript,
Nullifier,
};
use miden_protocol::transaction::TransactionHeader;
use miden_protocol::utils::serde::{Deserializable, Serializable};
use tokio::sync::oneshot;
use tracing::{info, instrument};
use crate::COMPONENT;
use crate::db::migrations::apply_migrations;
use crate::db::models::conv::SqlTypeConvert;
pub use crate::db::models::queries::{
AccountCommitmentsPage,
NullifiersPage,
PublicAccountIdsPage,
};
use crate::db::models::queries::{BlockHeaderCommitment, StorageMapValuesPage};
use crate::db::models::{Page, queries};
use crate::errors::{DatabaseError, NoteSyncError};
use crate::genesis::GenesisBlock;
const STORAGE_MAP_VALUE_PER_ROW_BYTES: usize =
2 * size_of::<Word>() + size_of::<u32>() + size_of::<u8>();
fn default_storage_map_entries_limit() -> usize {
MAX_RESPONSE_PAYLOAD_BYTES / STORAGE_MAP_VALUE_PER_ROW_BYTES
}
mod migrations;
mod schema_hash;
#[cfg(test)]
mod tests;
pub(crate) mod models;
pub(crate) mod schema;
pub type Result<T, E = DatabaseError> = std::result::Result<T, E>;
pub struct Db {
db: miden_node_db::Db,
}
impl Deref for Db {
type Target = miden_node_db::Db;
fn deref(&self) -> &Self::Target {
&self.db
}
}
impl DerefMut for Db {
fn deref_mut(&mut self) -> &mut Self::Target {
&mut self.db
}
}
#[derive(Debug, Clone)]
pub struct AccountVaultValue {
pub block_num: BlockNumber,
pub vault_key: AssetVaultKey,
pub asset: Option<Asset>,
}
impl AccountVaultValue {
pub fn from_raw_row(row: (i64, Vec<u8>, Option<Vec<u8>>)) -> Result<Self, DatabaseError> {
let (block_num, vault_key, asset) = row;
let vault_key = Word::read_from_bytes(&vault_key)?;
Ok(Self {
block_num: BlockNumber::from_raw_sql(block_num)?,
vault_key: AssetVaultKey::try_from(vault_key)?,
asset: asset.map(|b| Asset::read_from_bytes(&b)).transpose()?,
})
}
}
#[derive(Debug, PartialEq)]
pub struct NullifierInfo {
pub nullifier: Nullifier,
pub block_num: BlockNumber,
}
impl PartialEq<(Nullifier, BlockNumber)> for NullifierInfo {
fn eq(&self, (nullifier, block_num): &(Nullifier, BlockNumber)) -> bool {
&self.nullifier == nullifier && &self.block_num == block_num
}
}
#[derive(Debug, PartialEq)]
pub struct TransactionRecord {
pub block_num: BlockNumber,
pub header: TransactionHeader,
pub output_note_proofs: Vec<NoteSyncRecord>,
}
impl TransactionRecord {
pub fn into_proto(self) -> proto::rpc::TransactionRecord {
let output_note_proofs = self
.output_note_proofs
.into_iter()
.map(|n| proto::note::NoteInclusionInBlockProof {
note_id: Some(n.note_id.into()),
block_num: n.block_num.as_u32(),
note_index_in_block: n.note_index.leaf_index_value().into(),
inclusion_path: Some(n.inclusion_path.into()),
})
.collect();
proto::rpc::TransactionRecord {
header: Some(proto::transaction::TransactionHeader {
transaction_id: Some(self.header.id().into()),
account_id: Some(self.header.account_id().into()),
initial_state_commitment: Some(self.header.initial_state_commitment().into()),
final_state_commitment: Some(self.header.final_state_commitment().into()),
input_notes: self.header.input_notes().iter().cloned().map(Into::into).collect(),
output_notes: self.header.output_notes().iter().cloned().map(Into::into).collect(),
fee: Some(Asset::from(self.header.fee()).into()),
}),
block_num: self.block_num.as_u32(),
output_note_proofs,
}
}
}
#[derive(Debug, Clone, PartialEq)]
pub struct NoteRecord {
pub block_num: BlockNumber,
pub note_index: BlockNoteIndex,
pub note_id: Word,
pub note_commitment: Word,
pub metadata: NoteMetadata,
pub details: Option<NoteDetails>,
pub inclusion_path: SparseMerklePath,
}
impl From<NoteRecord> for proto::note::CommittedNote {
fn from(note: NoteRecord) -> Self {
let inclusion_proof = Some(proto::note::NoteInclusionInBlockProof {
note_id: Some(note.note_id.into()),
block_num: note.block_num.as_u32(),
note_index_in_block: note.note_index.leaf_index_value().into(),
inclusion_path: Some(Into::into(note.inclusion_path)),
});
let note = Some(proto::note::Note {
metadata: Some(note.metadata.into()),
details: note.details.map(|details| details.to_bytes()),
});
Self { inclusion_proof, note }
}
}
#[derive(Debug, PartialEq)]
pub struct NoteSyncUpdate {
pub notes: Vec<NoteSyncRecord>,
pub block_header: BlockHeader,
}
#[derive(Debug, Clone, PartialEq)]
pub struct NoteSyncRecord {
pub block_num: BlockNumber,
pub note_index: BlockNoteIndex,
pub note_id: Word,
pub metadata: NoteMetadata,
pub inclusion_path: SparseMerklePath,
}
impl From<NoteSyncRecord> for proto::note::NoteSyncRecord {
fn from(note: NoteSyncRecord) -> Self {
let metadata_header = Some(note.metadata.to_header().into());
let inclusion_proof = Some(proto::note::NoteInclusionInBlockProof {
note_id: Some(note.note_id.into()),
block_num: note.block_num.as_u32(),
note_index_in_block: note.note_index.leaf_index_value().into(),
inclusion_path: Some(note.inclusion_path.into()),
});
Self { metadata_header, inclusion_proof }
}
}
impl From<NoteRecord> for NoteSyncRecord {
fn from(note: NoteRecord) -> Self {
Self {
block_num: note.block_num,
note_index: note.note_index,
note_id: note.note_id,
metadata: note.metadata,
inclusion_path: note.inclusion_path,
}
}
}
impl Db {
#[instrument(
target = COMPONENT,
name = "store.database.bootstrap",
skip_all,
fields(path=%database_filepath.display())
err,
)]
pub fn bootstrap(database_filepath: PathBuf, genesis: GenesisBlock) -> anyhow::Result<()> {
let mut conn: SqliteConnection = diesel::sqlite::SqliteConnection::establish(
database_filepath.to_str().context("database filepath is invalid")?,
)
.context("failed to open a database connection")?;
miden_node_db::configure_connection_on_creation(&mut conn)?;
apply_migrations(&mut conn).context("failed to apply database migrations")?;
let (header, body, signature, _proof) = genesis.into_inner().into_parts();
let genesis_block = SignedBlock::new_unchecked(header, body, signature);
conn.transaction(move |conn| models::queries::apply_block(conn, &genesis_block, &[], None))
.context("failed to insert genesis block")?;
Ok(())
}
#[instrument(target = COMPONENT, skip_all)]
pub async fn load(database_filepath: PathBuf) -> Result<Self, DatabaseError> {
let db = miden_node_db::Db::new(&database_filepath)?;
info!(
target: COMPONENT,
sqlite= %database_filepath.display(),
"Connected to the database"
);
db.query("migrations", apply_migrations).await?;
Ok(Self { db })
}
#[instrument(level = "debug", target = COMPONENT, skip_all, ret(level = "debug"), err)]
pub async fn select_nullifiers_paged(
&self,
page_size: std::num::NonZeroUsize,
after_nullifier: Option<Nullifier>,
) -> Result<NullifiersPage> {
self.transact("read nullifiers paged", move |conn| {
queries::select_nullifiers_paged(conn, page_size, after_nullifier)
})
.await
}
#[instrument(
level = "debug",
target = COMPONENT,
skip_all,
fields(prefix_len, prefixes = nullifier_prefixes.len()),
ret(level = "debug"),
err
)]
pub async fn select_nullifiers_by_prefix(
&self,
prefix_len: u32,
nullifier_prefixes: Vec<u32>,
block_range: RangeInclusive<BlockNumber>,
) -> Result<(Vec<NullifierInfo>, BlockNumber)> {
assert_eq!(prefix_len, 16, "Only 16-bit prefixes are supported");
self.transact("nullifieres by prefix", move |conn| {
let nullifier_prefixes =
Vec::from_iter(nullifier_prefixes.into_iter().map(|prefix| prefix as u16));
queries::select_nullifiers_by_prefix(
conn,
prefix_len as u8,
&nullifier_prefixes[..],
block_range,
)
})
.await
}
#[instrument(level = "debug", target = COMPONENT, skip_all, ret(level = "debug"), err)]
pub async fn select_block_header_by_block_num(
&self,
maybe_block_number: Option<BlockNumber>,
) -> Result<Option<BlockHeader>> {
self.transact("block headers by block number", move |conn| {
let val = queries::select_block_header_by_block_num(conn, maybe_block_number)?;
Ok(val)
})
.await
}
#[instrument(level = "debug", target = COMPONENT, skip_all, ret(level = "debug"), err)]
pub async fn select_block_headers(
&self,
blocks: impl Iterator<Item = BlockNumber> + Send + 'static,
) -> Result<Vec<BlockHeader>> {
self.transact("block headers from given block numbers", move |conn| {
let raw = queries::select_block_headers(conn, blocks)?;
Ok(raw)
})
.await
}
#[instrument(level = "debug", target = COMPONENT, skip_all, ret(level = "debug"), err)]
pub async fn select_all_block_headers(&self) -> Result<Vec<BlockHeader>> {
self.transact("all block headers", |conn| {
let raw = queries::select_all_block_headers(conn)?;
Ok(raw)
})
.await
}
#[instrument(level = "debug", target = COMPONENT, skip_all, ret(level = "debug"), err)]
pub async fn select_all_block_header_commitments(&self) -> Result<Vec<BlockHeaderCommitment>> {
self.transact("all block headers", |conn| {
let raw = queries::select_all_block_header_commitments(conn)?;
Ok(raw)
})
.await
}
#[instrument(level = "debug", target = COMPONENT, skip_all, ret(level = "debug"), err)]
pub async fn select_account_commitments_paged(
&self,
page_size: std::num::NonZeroUsize,
after_account_id: Option<AccountId>,
) -> Result<AccountCommitmentsPage> {
self.transact("read account commitments paged", move |conn| {
queries::select_account_commitments_paged(conn, page_size, after_account_id)
})
.await
}
#[instrument(level = "debug", target = COMPONENT, skip_all, ret(level = "debug"), err)]
pub async fn select_public_account_ids_paged(
&self,
page_size: std::num::NonZeroUsize,
after_account_id: Option<AccountId>,
) -> Result<PublicAccountIdsPage> {
self.transact("read public account IDs paged", move |conn| {
queries::select_public_account_ids_paged(conn, page_size, after_account_id)
})
.await
}
#[instrument(level = "debug", target = COMPONENT, skip_all, ret(level = "debug"), err)]
pub async fn select_account(&self, id: AccountId) -> Result<AccountInfo> {
self.transact("Get account details", move |conn| queries::select_account(conn, id))
.await
}
#[instrument(level = "debug", target = COMPONENT, skip_all, ret(level = "debug"), err)]
pub async fn select_network_account_by_id(
&self,
account_id: AccountId,
) -> Result<Option<AccountInfo>> {
self.transact("Get network account by id", move |conn| {
queries::select_network_account_by_id(conn, account_id)
})
.await
}
#[instrument(level = "debug", target = COMPONENT, skip_all, ret(level = "debug"), err)]
pub async fn select_all_network_account_ids(
&self,
block_range: RangeInclusive<BlockNumber>,
) -> Result<(Vec<AccountId>, BlockNumber)> {
self.transact("Get all network account IDs", move |conn| {
queries::select_all_network_account_ids(conn, block_range)
})
.await
}
#[instrument(target = COMPONENT, skip_all)]
pub async fn select_account_code_by_commitment(
&self,
code_commitment: Word,
) -> Result<Option<Vec<u8>>> {
self.transact("Get account code by commitment", move |conn| {
queries::select_account_code_by_commitment(conn, code_commitment)
})
.await
}
#[instrument(target = COMPONENT, skip_all)]
pub async fn select_account_header_with_storage_header_at_block(
&self,
account_id: AccountId,
block_num: BlockNumber,
) -> Result<Option<(AccountHeader, AccountStorageHeader)>> {
self.transact("Get account header with storage header at block", move |conn| {
queries::select_account_header_with_storage_header_at_block(conn, account_id, block_num)
})
.await
}
#[instrument(level = "debug", target = COMPONENT, skip_all, ret(level = "debug"), err)]
pub async fn get_note_sync(
&self,
block_range: RangeInclusive<BlockNumber>,
note_tags: Arc<[u32]>,
) -> Result<Option<NoteSyncUpdate>, NoteSyncError> {
self.transact("notes sync task", move |conn| {
queries::get_note_sync(conn, ¬e_tags, block_range)
})
.await
}
#[instrument(level = "debug", target = COMPONENT, skip_all, ret(level = "debug"), err)]
pub async fn select_notes_by_id(&self, note_ids: Vec<NoteId>) -> Result<Vec<NoteRecord>> {
self.transact("note by id", move |conn| {
queries::select_notes_by_id(conn, note_ids.as_slice())
})
.await
}
#[instrument(level = "debug", target = COMPONENT, skip_all, ret(level = "debug"), err)]
pub async fn select_existing_note_commitments(
&self,
note_commitments: Vec<Word>,
) -> Result<HashSet<Word>> {
self.transact("note by commitment", move |conn| {
queries::select_existing_note_commitments(conn, note_commitments.as_slice())
})
.await
}
#[instrument(level = "debug", target = COMPONENT, skip_all, ret(level = "debug"), err)]
pub async fn select_note_inclusion_proofs(
&self,
note_commitments: BTreeSet<Word>,
) -> Result<BTreeMap<NoteId, NoteInclusionProof>> {
self.transact("block note inclusion proofs by commitment", move |conn| {
models::queries::select_note_inclusion_proofs(conn, ¬e_commitments)
})
.await
}
#[instrument(target = COMPONENT, skip_all, err)]
pub async fn apply_block(
&self,
allow_acquire: oneshot::Sender<()>,
acquire_done: oneshot::Receiver<()>,
signed_block: SignedBlock,
notes: Vec<(NoteRecord, Option<Nullifier>)>,
proving_inputs: Option<BlockProofRequest>,
) -> Result<()> {
self.transact("apply block", move |conn| -> Result<()> {
models::queries::apply_block(conn, &signed_block, ¬es, proving_inputs)?;
{
let _span = tracing::info_span!(target: COMPONENT, "acquire_write_lock").entered();
if allow_acquire.send(()).is_err() {
tracing::warn!(target: COMPONENT, "failed to send notification for successful block application, potential deadlock");
}
}
models::queries::prune_history(conn, signed_block.header().block_num())?;
let _span =
tracing::info_span!(target: COMPONENT, "acquire_done_lock").entered();
acquire_done.blocking_recv()?;
Ok(())
})
.await
}
#[instrument(target = COMPONENT, skip_all, err)]
pub async fn mark_proven_and_advance_sequence(
&self,
block_num: BlockNumber,
) -> Result<Vec<BlockNumber>> {
self.transact("mark block proven", move |conn| {
mark_proven_and_advance_sequence(conn, block_num)
})
.await
}
#[instrument(level = "debug", target = COMPONENT, skip_all, err)]
pub async fn select_block_proving_inputs(
&self,
block_num: BlockNumber,
) -> Result<Option<BlockProofRequest>> {
self.transact("select block proving inputs", move |conn| {
models::queries::select_block_proving_inputs(conn, block_num)
})
.await
}
#[instrument(level = "debug", target = COMPONENT, skip_all, err)]
pub async fn select_unproven_blocks(
&self,
after: BlockNumber,
limit: usize,
) -> Result<Vec<BlockNumber>> {
self.transact("select unproven blocks", move |conn| {
models::queries::select_unproven_blocks(conn, after, limit)
})
.await
}
#[instrument(level = "debug", target = COMPONENT, skip_all, ret(level = "debug"), err)]
pub async fn select_latest_proven_in_sequence_block_num(&self) -> Result<BlockNumber> {
self.transact("select latest proven block num", |conn| {
models::queries::select_latest_proven_in_sequence_block_num(conn)
})
.await
}
pub(crate) async fn select_storage_map_sync_values(
&self,
account_id: AccountId,
block_range: RangeInclusive<BlockNumber>,
entries_limit: Option<usize>,
) -> Result<StorageMapValuesPage> {
let entries_limit = entries_limit.unwrap_or_else(default_storage_map_entries_limit);
self.transact("select storage map sync values", move |conn| {
models::queries::select_account_storage_map_values_paged(
conn,
account_id,
block_range,
entries_limit,
)
})
.await
}
#[instrument(target = COMPONENT, skip_all)]
pub(crate) async fn reconstruct_storage_map_from_db(
&self,
account_id: AccountId,
slot_name: miden_protocol::account::StorageSlotName,
block_num: BlockNumber,
entries_limit: Option<usize>,
) -> Result<miden_node_proto::domain::account::AccountStorageMapDetails> {
use miden_node_proto::domain::account::{AccountStorageMapDetails, StorageMapEntries};
use miden_protocol::EMPTY_WORD;
let mut values = Vec::new();
let mut block_range_start = BlockNumber::GENESIS;
let entries_limit = entries_limit.unwrap_or_else(default_storage_map_entries_limit);
let mut page = self
.select_storage_map_sync_values(
account_id,
block_range_start..=block_num,
Some(entries_limit),
)
.await?;
values.extend(page.values);
let mut last_block_included = page.last_block_included;
if values.is_empty() && last_block_included == block_range_start {
return Ok(AccountStorageMapDetails::limit_exceeded(slot_name));
}
loop {
if page.last_block_included == block_num || page.last_block_included < block_range_start
{
break;
}
block_range_start = page.last_block_included.child();
page = self
.select_storage_map_sync_values(
account_id,
block_range_start..=block_num,
Some(entries_limit),
)
.await?;
if page.last_block_included <= last_block_included {
return Ok(AccountStorageMapDetails::limit_exceeded(slot_name));
}
last_block_included = page.last_block_included;
values.extend(page.values);
}
if page.last_block_included != block_num {
return Ok(AccountStorageMapDetails::limit_exceeded(slot_name));
}
let mut latest_values = BTreeMap::<StorageMapKey, Word>::new();
for value in values {
if value.slot_name == slot_name {
let raw_key = value.key;
latest_values.insert(raw_key, value.value);
}
}
latest_values.retain(|_, v| *v != EMPTY_WORD);
if latest_values.len() > AccountStorageMapDetails::MAX_RETURN_ENTRIES {
return Ok(AccountStorageMapDetails::limit_exceeded(slot_name));
}
let entries = Vec::from_iter(latest_values.into_iter());
Ok(AccountStorageMapDetails {
slot_name,
entries: StorageMapEntries::AllEntries(entries),
})
}
#[instrument(target = COMPONENT, skip_all, err)]
pub async fn analyze_table_sizes(&self) -> Result<(), DatabaseError> {
self.transact("db analysis", |conn| {
#[derive(QueryableByName)]
struct TotalSize {
#[diesel(sql_type = diesel::sql_types::BigInt)]
size: i64,
}
#[derive(QueryableByName)]
struct Table {
#[diesel(sql_type = diesel::sql_types::Text)]
name: String,
#[diesel(sql_type = diesel::sql_types::BigInt)]
size: i64,
}
let tables =
diesel::sql_query("SELECT name, sum(payload) AS size FROM dbstat GROUP BY name")
.load::<Table>(conn)?;
let span = tracing::Span::current();
for Table { name, size } in tables {
span.set_attribute(format!("database.table.{name}.size"), size);
}
let total = diesel::sql_query(
"SELECT page_count * page_size as size FROM pragma_page_count(), pragma_page_size()",
)
.get_result::<TotalSize>(conn)?;
span.set_attribute("database.total.size", total.size);
Result::<_, DatabaseError>::Ok(())
})
.await
.inspect_err(|err| tracing::Span::current().set_error(err))?;
Ok(())
}
pub(crate) async fn select_unconsumed_network_notes(
&self,
account_id: AccountId,
block_num: BlockNumber,
page: Page,
) -> Result<(Vec<NoteRecord>, Page)> {
self.transact("unconsumed network notes for account", move |conn| {
models::queries::select_unconsumed_network_notes_by_account_id(
conn, account_id, block_num, page,
)
})
.await
}
pub async fn get_account_vault_sync(
&self,
account_id: AccountId,
block_range: RangeInclusive<BlockNumber>,
) -> Result<(BlockNumber, Vec<AccountVaultValue>)> {
self.transact("account vault sync", move |conn| {
queries::select_account_vault_assets(conn, account_id, block_range)
})
.await
}
pub async fn select_note_script_by_root(&self, root: Word) -> Result<Option<NoteScript>> {
self.transact("note script by root", move |conn| {
queries::select_note_script_by_root(conn, root)
})
.await
}
pub async fn select_transactions_records(
&self,
account_ids: Vec<AccountId>,
block_range: RangeInclusive<BlockNumber>,
) -> Result<(BlockNumber, Vec<TransactionRecord>)> {
self.transact("full transactions records", move |conn| {
queries::select_transactions_records(conn, &account_ids, block_range)
})
.await
}
}
pub(crate) fn mark_proven_and_advance_sequence(
conn: &mut SqliteConnection,
block_num: BlockNumber,
) -> Result<Vec<BlockNumber>, DatabaseError> {
models::queries::clear_block_proving_inputs(conn, block_num)?;
let mut tip = models::queries::select_latest_proven_in_sequence_block_num(conn)?;
let unsequenced = models::queries::select_proven_not_in_sequence_blocks(conn)?;
let mut newly_in_sequence = Vec::new();
for candidate in unsequenced {
if candidate <= tip {
return Err(DatabaseError::DataCorrupted(format!(
"block {candidate} is proven but not marked in-sequence while the tip is at {tip}"
)));
}
if candidate == tip + 1 {
tip = candidate;
newly_in_sequence.push(candidate);
} else {
break;
}
}
if let (Some(&from), Some(&to)) = (newly_in_sequence.first(), newly_in_sequence.last()) {
models::queries::mark_blocks_as_proven_in_sequence(conn, from, to)?;
}
Ok(newly_in_sequence)
}