#![expect(
clippy::cast_possible_wrap,
reason = "We will not approach the item count where i64 and usize cause issues"
)]
use std::collections::{BTreeMap, BTreeSet, HashSet};
use std::ops::RangeInclusive;
use diesel::prelude::{
BoolExpressionMethods,
ExpressionMethods,
Insertable,
QueryDsl,
Queryable,
QueryableByName,
Selectable,
};
use diesel::query_dsl::methods::SelectDsl;
use diesel::sqlite::Sqlite;
use diesel::{
JoinOnDsl,
NullableExpressionMethods,
OptionalExtension,
RunQueryDsl,
SelectableHelper,
SqliteConnection,
};
use miden_node_utils::limiter::{
QueryParamAccountIdLimit,
QueryParamLimiter,
QueryParamNoteCommitmentLimit,
QueryParamNoteTagLimit,
};
use miden_protocol::Word;
use miden_protocol::account::AccountId;
use miden_protocol::block::{BlockNoteIndex, BlockNumber};
use miden_protocol::crypto::merkle::SparseMerklePath;
use miden_protocol::note::{
NoteAssets,
NoteAttachment,
NoteDetails,
NoteId,
NoteInclusionProof,
NoteMetadata,
NoteRecipient,
NoteScript,
NoteStorage,
NoteTag,
NoteType,
Nullifier,
};
use miden_protocol::utils::serde::{Deserializable, Serializable};
use miden_standards::note::NetworkAccountTarget;
use crate::COMPONENT;
use crate::db::models::conv::{
SqlTypeConvert,
idx_to_raw_sql,
note_type_to_raw_sql,
raw_sql_to_idx,
};
use crate::db::models::queries::select_block_header_by_block_num;
use crate::db::models::{serialize_vec, vec_raw_try_into};
use crate::db::{DatabaseError, NoteRecord, NoteSyncRecord, NoteSyncUpdate, Page, schema};
use crate::errors::NoteSyncError;
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
#[repr(i32)]
pub(crate) enum NetworkNoteType {
None = 0,
SingleTarget = 1,
}
impl From<NetworkNoteType> for i32 {
fn from(value: NetworkNoteType) -> Self {
value as i32
}
}
pub(crate) fn select_notes_since_block_by_tag_and_sender(
conn: &mut SqliteConnection,
account_ids: &[AccountId],
note_tags: &[u32],
block_range: RangeInclusive<BlockNumber>,
) -> Result<Vec<NoteSyncRecord>, DatabaseError> {
QueryParamAccountIdLimit::check(account_ids.len())?;
QueryParamNoteTagLimit::check(note_tags.len())?;
let desired_note_tags = Vec::from_iter(note_tags.iter().map(|tag| *tag as i32));
let desired_senders = serialize_vec(account_ids.iter());
let start_block_num = block_range.start().to_raw_sql();
let end_block_num = block_range.end().to_raw_sql();
let Some(desired_block_num): Option<i64> =
SelectDsl::select(schema::notes::table, schema::notes::committed_at)
.filter(
schema::notes::tag
.eq_any(&desired_note_tags[..])
.or(schema::notes::sender.eq_any(&desired_senders[..])),
)
.filter(schema::notes::committed_at.ge(start_block_num))
.filter(schema::notes::committed_at.le(end_block_num))
.order_by(schema::notes::committed_at.asc())
.limit(1)
.get_result(conn)
.optional()?
else {
return Ok(Vec::new());
};
let notes = SelectDsl::select(schema::notes::table, NoteSyncRecordRawRow::as_select())
.filter(schema::notes::committed_at.eq(
&desired_block_num
))
.filter(
schema::notes::tag
.eq_any(&desired_note_tags)
.or(
schema::notes::sender
.eq_any(&desired_senders)
)
)
.get_results::<NoteSyncRecordRawRow>(conn)
.map_err(DatabaseError::from)?;
vec_raw_try_into(notes)
}
pub(crate) fn select_notes_by_id(
conn: &mut SqliteConnection,
note_ids: &[NoteId],
) -> Result<Vec<NoteRecord>, DatabaseError> {
let note_ids = serialize_vec(note_ids);
let q = schema::notes::table
.left_join(
schema::note_scripts::table
.on(schema::notes::script_root.eq(schema::note_scripts::script_root.nullable())),
)
.filter(schema::notes::note_id.eq_any(¬e_ids));
let raw: Vec<_> = SelectDsl::select(
q,
(NoteRecordRawRow::as_select(), schema::note_scripts::script.nullable()),
)
.load::<(NoteRecordRawRow, Option<Vec<u8>>)>(conn)?;
let records = vec_raw_try_into::<NoteRecord, NoteRecordWithScriptRawJoined>(
raw.into_iter().map(NoteRecordWithScriptRawJoined::from),
)?;
Ok(records)
}
pub(crate) fn select_existing_note_commitments(
conn: &mut SqliteConnection,
note_commitments: &[Word],
) -> Result<HashSet<Word>, DatabaseError> {
QueryParamNoteCommitmentLimit::check(note_commitments.len())?;
let note_commitments = serialize_vec(note_commitments.iter());
let raw_commitments = SelectDsl::select(schema::notes::table, schema::notes::note_commitment)
.filter(schema::notes::note_commitment.eq_any(¬e_commitments))
.load::<Vec<u8>>(conn)?;
let commitments = raw_commitments
.into_iter()
.map(|commitment| Word::read_from_bytes(&commitment[..]))
.collect::<Result<HashSet<_>, _>>()?;
Ok(commitments)
}
#[cfg(test)]
pub(crate) fn select_all_notes(
conn: &mut SqliteConnection,
) -> Result<Vec<NoteRecord>, DatabaseError> {
let q = schema::notes::table.left_join(
schema::note_scripts::table
.on(schema::notes::script_root.eq(schema::note_scripts::script_root.nullable())),
);
let raw: Vec<_> = SelectDsl::select(
q,
(NoteRecordRawRow::as_select(), schema::note_scripts::script.nullable()),
)
.order(schema::notes::committed_at.asc())
.load::<(NoteRecordRawRow, Option<Vec<u8>>)>(conn)?;
let records = vec_raw_try_into::<NoteRecord, NoteRecordWithScriptRawJoined>(
raw.into_iter().map(NoteRecordWithScriptRawJoined::from),
)?;
Ok(records)
}
pub(crate) fn select_note_inclusion_proofs(
conn: &mut SqliteConnection,
note_commitments: &BTreeSet<Word>,
) -> Result<BTreeMap<NoteId, NoteInclusionProof>, DatabaseError> {
QueryParamNoteCommitmentLimit::check(note_commitments.len())?;
let note_commitments = serialize_vec(note_commitments.iter());
let raw_notes = SelectDsl::select(
schema::notes::table,
(
schema::notes::committed_at,
schema::notes::note_id,
schema::notes::batch_index,
schema::notes::note_index,
schema::notes::inclusion_path,
),
)
.filter(schema::notes::note_commitment.eq_any(note_commitments))
.order_by(schema::notes::committed_at.asc())
.load::<(i64, Vec<u8>, i32, i32, Vec<u8>)>(conn)?;
Result::<BTreeMap<_, _>, _>::from_iter(raw_notes.iter().map(
|(block_num, note_id, batch_index, note_index, merkle_path)| {
let note_id = NoteId::read_from_bytes(¬e_id[..])?;
let block_num = BlockNumber::from_raw_sql(*block_num)?;
let node_index_in_block =
BlockNoteIndex::new(raw_sql_to_idx(*batch_index), raw_sql_to_idx(*note_index))
.expect("batch and note index from DB should be valid")
.leaf_index_value();
let merkle_path = SparseMerklePath::read_from_bytes(&merkle_path[..])?;
let proof = NoteInclusionProof::new(block_num, node_index_in_block, merkle_path)?;
Ok((note_id, proof))
},
))
}
pub(crate) fn select_note_sync_records(
conn: &mut SqliteConnection,
note_commitments: &[Word],
) -> Result<BTreeMap<NoteId, NoteSyncRecord>, DatabaseError> {
QueryParamNoteCommitmentLimit::check(note_commitments.len())?;
let note_commitments = serialize_vec(note_commitments.iter());
let raw_notes = SelectDsl::select(schema::notes::table, NoteSyncRecordRawRow::as_select())
.filter(schema::notes::note_commitment.eq_any(note_commitments))
.order_by(schema::notes::committed_at.asc())
.load::<NoteSyncRecordRawRow>(conn)?;
raw_notes
.into_iter()
.map(|raw_note| {
let note: NoteSyncRecord = raw_note.try_into()?;
Ok((NoteId::from_raw(note.note_id), note))
})
.collect()
}
pub(crate) fn select_note_script_by_root(
conn: &mut SqliteConnection,
root: Word,
) -> Result<Option<NoteScript>, DatabaseError> {
let raw = SelectDsl::select(schema::note_scripts::table, schema::note_scripts::script)
.filter(schema::note_scripts::script_root.eq(root.to_bytes()))
.get_result::<Vec<u8>>(conn)
.optional()?;
raw.as_ref()
.map(|bytes| NoteScript::from_bytes(bytes))
.transpose()
.map_err(Into::into)
}
#[expect(clippy::cast_sign_loss, reason = "row_id is a positive integer")]
pub(crate) fn select_unconsumed_network_notes_by_account_id(
conn: &mut SqliteConnection,
account_id: AccountId,
block_num: BlockNumber,
mut page: Page,
) -> Result<(Vec<NoteRecord>, Page), DatabaseError> {
let rowid_sel = diesel::dsl::sql::<diesel::sql_types::BigInt>("notes.rowid");
let rowid_sel_ge =
diesel::dsl::sql::<diesel::sql_types::Bool>("notes.rowid >= ")
.bind::<diesel::sql_types::BigInt, i64>(page.token.unwrap_or_default() as i64);
#[expect(
clippy::items_after_statements,
reason = "It's only relevant for a single call function"
)]
type RawLoadedTuple = (
NoteRecordRawRow,
Option<Vec<u8>>, i64, );
#[expect(
clippy::items_after_statements,
reason = "It's only relevant for a single call function"
)]
fn split_into_raw_note_record_and_implicit_row_id(
tuple: RawLoadedTuple,
) -> (NoteRecordWithScriptRawJoined, i64) {
let (note, script, row) = tuple;
let combined = NoteRecordWithScriptRawJoined::from((note, script));
(combined, row)
}
let raw = SelectDsl::select(
schema::notes::table.left_join(
schema::note_scripts::table
.on(schema::notes::script_root.eq(schema::note_scripts::script_root.nullable())),
),
(
NoteRecordRawRow::as_select(),
schema::note_scripts::script.nullable(),
rowid_sel.clone(),
),
)
.filter(schema::notes::network_note_type.eq(i32::from(NetworkNoteType::SingleTarget)))
.filter(schema::notes::target_account_id.eq(Some(account_id.to_bytes())))
.filter(schema::notes::committed_at.le(block_num.to_raw_sql()))
.filter(
schema::notes::consumed_at
.is_null()
.or(schema::notes::consumed_at.gt(block_num.to_raw_sql())),
)
.filter(rowid_sel_ge)
.order(rowid_sel.asc())
.limit(page.size.get() as i64 + 1)
.load::<RawLoadedTuple>(conn)?;
let mut notes = Vec::with_capacity(page.size.into());
for raw_item in raw {
let (raw_item, row_id) = split_into_raw_note_record_and_implicit_row_id(raw_item);
page.token = None;
if notes.len() == page.size.get() {
page.token = Some(row_id as u64);
break;
}
notes.push(TryInto::<NoteRecord>::try_into(raw_item)?);
}
Ok((notes, page))
}
pub(crate) fn get_note_sync(
conn: &mut SqliteConnection,
note_tags: &[u32],
block_range: RangeInclusive<BlockNumber>,
) -> Result<Option<NoteSyncUpdate>, NoteSyncError> {
QueryParamNoteTagLimit::check(note_tags.len()).map_err(DatabaseError::from)?;
let notes = select_notes_since_block_by_tag_and_sender(conn, &[], note_tags, block_range)?;
if notes.is_empty() {
return Ok(None);
}
let block_header =
select_block_header_by_block_num(conn, notes.first().map(|note| note.block_num))?
.ok_or(NoteSyncError::EmptyBlockHeadersTable)?;
Ok(Some(NoteSyncUpdate { notes, block_header }))
}
#[derive(Debug, Clone, PartialEq, Selectable, Queryable, QueryableByName)]
#[diesel(table_name = schema::notes)]
#[diesel(check_for_backend(Sqlite))]
pub struct NoteSyncRecordRawRow {
pub committed_at: i64, #[diesel(embed)]
pub block_note_index: BlockNoteIndexRawRow,
pub note_id: Vec<u8>, #[diesel(embed)]
pub metadata: NoteMetadataRawRow,
pub inclusion_path: Vec<u8>, }
impl TryInto<NoteSyncRecord> for NoteSyncRecordRawRow {
type Error = DatabaseError;
fn try_into(self) -> Result<NoteSyncRecord, Self::Error> {
let block_num = BlockNumber::from_raw_sql(self.committed_at)?;
let note_index = self.block_note_index.try_into()?;
let note_id = Word::read_from_bytes(&self.note_id[..])?;
let inclusion_path = SparseMerklePath::read_from_bytes(&self.inclusion_path[..])?;
let metadata = self.metadata.try_into()?;
Ok(NoteSyncRecord {
block_num,
note_index,
note_id,
metadata,
inclusion_path,
})
}
}
#[derive(Debug, Clone, PartialEq, Selectable, Queryable, QueryableByName)]
#[diesel(table_name = schema::notes)]
#[diesel(check_for_backend(Sqlite))]
pub struct NoteDetailsRawRow {
pub assets: Option<Vec<u8>>,
pub storage: Option<Vec<u8>>,
pub serial_num: Option<Vec<u8>>,
}
#[derive(Debug, Clone, PartialEq, Queryable)]
pub struct NoteRecordWithScriptRawJoined {
pub committed_at: i64,
pub batch_index: i32,
pub note_index: i32, pub note_id: Vec<u8>,
pub note_commitment: Vec<u8>,
pub note_type: i32,
pub sender: Vec<u8>, pub tag: i32,
pub attachment: Vec<u8>,
pub assets: Option<Vec<u8>>,
pub storage: Option<Vec<u8>>,
pub serial_num: Option<Vec<u8>>,
pub inclusion_path: Vec<u8>,
pub script: Option<Vec<u8>>, }
impl From<(NoteRecordRawRow, Option<Vec<u8>>)> for NoteRecordWithScriptRawJoined {
fn from((note, script): (NoteRecordRawRow, Option<Vec<u8>>)) -> Self {
let NoteRecordRawRow {
committed_at,
batch_index,
note_index,
note_id,
note_commitment,
note_type,
sender,
tag,
attachment,
assets,
storage,
serial_num,
inclusion_path,
} = note;
Self {
committed_at,
batch_index,
note_index,
note_id,
note_commitment,
note_type,
sender,
tag,
attachment,
assets,
storage,
serial_num,
inclusion_path,
script,
}
}
}
impl TryInto<NoteRecord> for NoteRecordWithScriptRawJoined {
type Error = DatabaseError;
fn try_into(self) -> Result<NoteRecord, Self::Error> {
let raw = self;
let NoteRecordWithScriptRawJoined {
committed_at,
batch_index,
note_index,
note_id,
note_commitment,
note_type,
sender,
tag,
attachment,
assets,
storage,
serial_num,
inclusion_path,
script,
..
} = raw;
let index = BlockNoteIndexRawRow { batch_index, note_index };
let metadata = NoteMetadataRawRow { note_type, sender, tag, attachment };
let details = NoteDetailsRawRow { assets, storage, serial_num };
let metadata = metadata.try_into()?;
let committed_at = BlockNumber::from_raw_sql(committed_at)?;
let note_id = Word::read_from_bytes(¬e_id[..])?;
let note_commitment = Word::read_from_bytes(¬e_commitment[..])?;
let script = script.map(|script| NoteScript::read_from_bytes(&script[..])).transpose()?;
let details = if let NoteDetailsRawRow {
assets: Some(assets),
storage: Some(storage),
serial_num: Some(serial_num),
} = details
{
let storage = NoteStorage::read_from_bytes(&storage[..])?;
let serial_num = Word::read_from_bytes(&serial_num[..])?;
let script =
script.ok_or_else(|| {
miden_node_db::DatabaseError::conversiont_from_sql::<
NoteRecipient,
DatabaseError,
_,
>(None)
})?;
let recipient = NoteRecipient::new(serial_num, script, storage);
let assets = NoteAssets::read_from_bytes(&assets[..])?;
Some(NoteDetails::new(assets, recipient))
} else {
None
};
let inclusion_path = SparseMerklePath::read_from_bytes(&inclusion_path[..])?;
let note_index = index.try_into()?;
Ok(NoteRecord {
block_num: committed_at,
note_index,
note_id,
note_commitment,
metadata,
details,
inclusion_path,
})
}
}
#[derive(Debug, Clone, PartialEq, Selectable, Queryable, QueryableByName)]
#[diesel(table_name = schema::notes)]
#[diesel(check_for_backend(Sqlite))]
pub struct NoteRecordRawRow {
pub committed_at: i64,
pub batch_index: i32,
pub note_index: i32, pub note_id: Vec<u8>,
pub note_commitment: Vec<u8>,
pub note_type: i32,
pub sender: Vec<u8>, pub tag: i32,
pub attachment: Vec<u8>,
pub assets: Option<Vec<u8>>,
pub storage: Option<Vec<u8>>,
pub serial_num: Option<Vec<u8>>,
pub inclusion_path: Vec<u8>,
}
#[derive(Debug, Clone, PartialEq, Selectable, Queryable, QueryableByName)]
#[diesel(table_name = schema::notes)]
#[diesel(check_for_backend(Sqlite))]
pub struct NoteMetadataRawRow {
note_type: i32,
sender: Vec<u8>, tag: i32,
attachment: Vec<u8>,
}
#[expect(clippy::cast_sign_loss)]
impl TryInto<NoteMetadata> for NoteMetadataRawRow {
type Error = DatabaseError;
fn try_into(self) -> Result<NoteMetadata, Self::Error> {
let sender = AccountId::read_from_bytes(&self.sender[..])?;
let note_type = NoteType::try_from(self.note_type as u32)
.map_err(miden_node_db::DatabaseError::conversiont_from_sql::<NoteType, _, _>)?;
let tag = NoteTag::new(self.tag as u32);
let attachment = NoteAttachment::read_from_bytes(&self.attachment)?;
Ok(NoteMetadata::new(sender, note_type).with_tag(tag).with_attachment(attachment))
}
}
#[derive(Debug, Clone, PartialEq, Selectable, Queryable, QueryableByName)]
#[diesel(table_name = schema::notes)]
#[diesel(check_for_backend(Sqlite))]
pub struct BlockNoteIndexRawRow {
pub batch_index: i32,
pub note_index: i32, }
#[expect(clippy::cast_sign_loss, reason = "Indices are cast to usize for ease of use")]
impl TryInto<BlockNoteIndex> for BlockNoteIndexRawRow {
type Error = DatabaseError;
fn try_into(self) -> Result<BlockNoteIndex, Self::Error> {
let batch_index = self.batch_index as usize;
let note_index = self.note_index as usize;
let index = BlockNoteIndex::new(batch_index, note_index).ok_or_else(|| {
miden_node_db::DatabaseError::conversiont_from_sql::<BlockNoteIndex, DatabaseError, _>(
None,
)
})?;
Ok(index)
}
}
#[tracing::instrument(
target = COMPONENT,
skip_all,
err,
)]
pub(crate) fn insert_notes(
conn: &mut SqliteConnection,
notes: &[(NoteRecord, Option<Nullifier>)],
) -> Result<usize, DatabaseError> {
let count = diesel::insert_into(schema::notes::table)
.values(Vec::from_iter(
notes
.iter()
.map(|(note, nullifier)| NoteInsertRow::from((note.clone(), *nullifier))),
))
.execute(conn)?;
Ok(count)
}
#[tracing::instrument(
target = COMPONENT,
skip_all,
err,
)]
pub(crate) fn insert_scripts<'a>(
conn: &mut SqliteConnection,
notes: impl IntoIterator<Item = &'a NoteRecord>,
) -> Result<usize, DatabaseError> {
let values = Vec::from_iter(notes.into_iter().filter_map(|note| {
let note_details = note.details.as_ref()?;
Some((
schema::note_scripts::script_root.eq(note_details.script().root().to_bytes()),
schema::note_scripts::script.eq(note_details.script().to_bytes()),
))
}));
let count = diesel::insert_or_ignore_into(schema::note_scripts::table)
.values(values)
.execute(conn)?;
Ok(count)
}
#[derive(Debug, Clone, PartialEq, Insertable)]
#[diesel(table_name = schema::notes)]
pub struct NoteInsertRow {
pub committed_at: i64,
pub batch_index: i32,
pub note_index: i32,
pub note_id: Vec<u8>,
pub note_commitment: Vec<u8>,
pub note_type: i32,
pub sender: Vec<u8>, pub tag: i32,
pub network_note_type: i32,
pub target_account_id: Option<Vec<u8>>,
pub attachment: Vec<u8>,
pub inclusion_path: Vec<u8>,
pub consumed_at: Option<i64>,
pub nullifier: Option<Vec<u8>>,
pub assets: Option<Vec<u8>>,
pub storage: Option<Vec<u8>>,
pub script_root: Option<Vec<u8>>,
pub serial_num: Option<Vec<u8>>,
}
impl From<(NoteRecord, Option<Nullifier>)> for NoteInsertRow {
fn from((note, nullifier): (NoteRecord, Option<Nullifier>)) -> Self {
let attachment = note.metadata.attachment();
let target_account_id = NetworkAccountTarget::try_from(attachment).ok();
let network_note_type = if target_account_id.is_some() && !note.metadata.is_private() {
NetworkNoteType::SingleTarget
} else {
NetworkNoteType::None
};
let attachment_bytes = attachment.to_bytes();
Self {
committed_at: note.block_num.to_raw_sql(),
batch_index: idx_to_raw_sql(note.note_index.batch_idx()),
note_index: idx_to_raw_sql(note.note_index.note_idx_in_batch()),
note_id: note.note_id.to_bytes(),
note_commitment: note.note_commitment.to_bytes(),
note_type: note_type_to_raw_sql(note.metadata.note_type() as u8),
sender: note.metadata.sender().to_bytes(),
tag: note.metadata.tag().to_raw_sql(),
network_note_type: network_note_type.into(),
target_account_id: target_account_id.map(|t| t.target_id().to_bytes()),
attachment: attachment_bytes,
inclusion_path: note.inclusion_path.to_bytes(),
consumed_at: None::<i64>, nullifier: nullifier.as_ref().map(Nullifier::to_bytes),
assets: note.details.as_ref().map(|d| d.assets().to_bytes()),
storage: note.details.as_ref().map(|d| d.storage().to_bytes()),
script_root: note.details.as_ref().map(|d| d.script().root().to_bytes()),
serial_num: note.details.as_ref().map(|d| d.serial_num().to_bytes()),
}
}
}