use std::ops::RangeInclusive;
use diesel::prelude::{Insertable, Queryable};
use diesel::query_dsl::methods::SelectDsl;
use diesel::{
BoolExpressionMethods,
ExpressionMethods,
QueryDsl,
QueryableByName,
RunQueryDsl,
Selectable,
SelectableHelper,
SqliteConnection,
};
use miden_node_utils::limiter::{
MAX_RESPONSE_PAYLOAD_BYTES,
QueryParamAccountIdLimit,
QueryParamLimiter,
};
use miden_protocol::account::AccountId;
use miden_protocol::block::BlockNumber;
use miden_protocol::note::{NoteId, Nullifier};
use miden_protocol::transaction::{OrderedTransactionHeaders, TransactionId};
use miden_protocol::utils::{Deserializable, Serializable};
use super::DatabaseError;
use crate::COMPONENT;
use crate::db::models::conv::SqlTypeConvert;
use crate::db::models::{serialize_vec, vec_raw_try_into};
use crate::db::schema;
#[derive(Debug, Clone, PartialEq, Queryable, Selectable, QueryableByName)]
#[diesel(table_name = schema::transactions)]
#[diesel(check_for_backend(diesel::sqlite::Sqlite))]
pub struct TransactionRecordRaw {
account_id: Vec<u8>,
block_num: i64,
transaction_id: Vec<u8>,
initial_state_commitment: Vec<u8>,
final_state_commitment: Vec<u8>,
nullifiers: Vec<u8>,
output_notes: Vec<u8>,
size_in_bytes: i64,
}
impl TryInto<crate::db::TransactionRecord> for TransactionRecordRaw {
type Error = DatabaseError;
fn try_into(self) -> Result<crate::db::TransactionRecord, Self::Error> {
use miden_protocol::Word;
let initial_state_commitment = self.initial_state_commitment;
let final_state_commitment = self.final_state_commitment;
let nullifiers_binary = self.nullifiers;
let output_notes_binary = self.output_notes;
let nullifiers: Vec<Nullifier> = Deserializable::read_from_bytes(&nullifiers_binary)?;
let output_notes: Vec<NoteId> = Deserializable::read_from_bytes(&output_notes_binary)?;
Ok(crate::db::TransactionRecord {
account_id: AccountId::read_from_bytes(&self.account_id[..])?,
block_num: BlockNumber::from_raw_sql(self.block_num)?,
transaction_id: TransactionId::read_from_bytes(&self.transaction_id[..])?,
initial_state_commitment: Word::read_from_bytes(&initial_state_commitment)?,
final_state_commitment: Word::read_from_bytes(&final_state_commitment)?,
nullifiers,
output_notes,
})
}
}
#[tracing::instrument(
target = COMPONENT,
skip_all,
err,
)]
pub(crate) fn insert_transactions(
conn: &mut SqliteConnection,
block_num: BlockNumber,
transactions: &OrderedTransactionHeaders,
) -> Result<usize, DatabaseError> {
let rows: Vec<_> = transactions
.as_slice()
.iter()
.map(|tx| TransactionSummaryRowInsert::new(tx, block_num))
.collect();
let count = diesel::insert_into(schema::transactions::table).values(rows).execute(conn)?;
Ok(count)
}
#[derive(Debug, Clone, PartialEq, Insertable)]
#[diesel(table_name = schema::transactions)]
#[diesel(check_for_backend(diesel::sqlite::Sqlite))]
pub struct TransactionSummaryRowInsert {
transaction_id: Vec<u8>,
account_id: Vec<u8>,
block_num: i64,
initial_state_commitment: Vec<u8>,
final_state_commitment: Vec<u8>,
nullifiers: Vec<u8>,
output_notes: Vec<u8>,
size_in_bytes: i64,
}
impl TransactionSummaryRowInsert {
#[expect(
clippy::cast_possible_wrap,
reason = "We will not approach the item count where i64 and usize cause issues"
)]
fn new(
transaction_header: &miden_protocol::transaction::TransactionHeader,
block_num: BlockNumber,
) -> Self {
const HEADER_BASE_SIZE: usize = 4 + 32 + 16 + 64;
let nullifiers: Vec<Nullifier> = transaction_header
.input_notes()
.iter()
.map(miden_protocol::transaction::InputNoteCommitment::nullifier)
.collect();
let nullifiers_binary = nullifiers.to_bytes();
let output_note_ids: Vec<NoteId> = transaction_header
.output_notes()
.iter()
.map(miden_protocol::note::NoteHeader::id)
.collect();
let output_notes_binary = output_note_ids.to_bytes();
let nullifiers_size = (transaction_header.input_notes().num_notes() * 32) as usize;
let output_notes_size = transaction_header.output_notes().len() * 500;
let size_in_bytes = (HEADER_BASE_SIZE + nullifiers_size + output_notes_size) as i64;
Self {
transaction_id: transaction_header.id().to_bytes(),
account_id: transaction_header.account_id().to_bytes(),
block_num: block_num.to_raw_sql(),
initial_state_commitment: transaction_header.initial_state_commitment().to_bytes(),
final_state_commitment: transaction_header.final_state_commitment().to_bytes(),
nullifiers: nullifiers_binary,
output_notes: output_notes_binary,
size_in_bytes,
}
}
}
pub fn select_transactions_records(
conn: &mut SqliteConnection,
account_ids: &[AccountId],
block_range: RangeInclusive<BlockNumber>,
) -> Result<(BlockNumber, Vec<crate::db::TransactionRecord>), DatabaseError> {
const NUM_TXS_PER_CHUNK: i64 = 1000;
QueryParamAccountIdLimit::check(account_ids.len())?;
let max_payload_bytes =
i64::try_from(MAX_RESPONSE_PAYLOAD_BYTES).expect("payload limit fits within i64");
if block_range.is_empty() {
return Err(DatabaseError::InvalidBlockRange {
from: *block_range.start(),
to: *block_range.end(),
});
}
let desired_account_ids = serialize_vec(account_ids);
let mut all_transactions = Vec::new();
let mut total_size = 0i64;
let mut last_block_num: Option<i64> = None;
let mut last_transaction_id: Option<Vec<u8>> = None;
loop {
let mut query =
SelectDsl::select(schema::transactions::table, TransactionRecordRaw::as_select())
.filter(schema::transactions::block_num.ge(block_range.start().to_raw_sql()))
.filter(schema::transactions::block_num.le(block_range.end().to_raw_sql()))
.filter(schema::transactions::account_id.eq_any(&desired_account_ids))
.into_boxed();
if let (Some(last_block), Some(last_tx_id)) = (last_block_num, &last_transaction_id) {
query = query.filter(
schema::transactions::block_num
.gt(last_block)
.or(schema::transactions::block_num
.eq(last_block)
.and(schema::transactions::transaction_id.gt(last_tx_id))),
);
}
let chunk = query
.order((
schema::transactions::block_num.asc(),
schema::transactions::transaction_id.asc(),
))
.limit(NUM_TXS_PER_CHUNK)
.load::<TransactionRecordRaw>(conn)
.map_err(DatabaseError::from)?;
let mut added_from_chunk = 0;
for tx in chunk {
if total_size + tx.size_in_bytes <= max_payload_bytes {
total_size += tx.size_in_bytes;
last_block_num = Some(tx.block_num);
last_transaction_id = Some(tx.transaction_id.clone());
all_transactions.push(tx);
added_from_chunk += 1;
} else {
break;
}
}
if added_from_chunk < NUM_TXS_PER_CHUNK {
break;
}
}
if total_size >= max_payload_bytes {
let last_block_num = last_block_num.expect(
"guaranteed to have processed at least one transaction when size limit is reached",
);
let filtered_transactions = vec_raw_try_into(
all_transactions.into_iter().take_while(|row| row.block_num != last_block_num),
)?;
let last_included_block = BlockNumber::from_raw_sql(last_block_num.saturating_sub(1))?;
Ok((last_included_block, filtered_transactions))
} else {
Ok((*block_range.end(), vec_raw_try_into(all_transactions)?))
}
}