use std::ops::RangeInclusive;
use diesel::prelude::{Insertable, Queryable};
use diesel::query_dsl::methods::SelectDsl;
use diesel::{
BoolExpressionMethods,
ExpressionMethods,
QueryDsl,
QueryableByName,
RunQueryDsl,
Selectable,
SelectableHelper,
SqliteConnection,
};
use miden_node_utils::limiter::{
MAX_RESPONSE_PAYLOAD_BYTES,
QueryParamAccountIdLimit,
QueryParamLimiter,
QueryParamNoteCommitmentLimit,
};
use miden_protocol::account::AccountId;
use miden_protocol::block::BlockNumber;
use miden_protocol::note::NoteHeader;
use miden_protocol::transaction::{
InputNoteCommitment,
InputNotes,
OrderedTransactionHeaders,
TransactionHeader,
TransactionId,
};
use miden_protocol::utils::serde::{Deserializable, Serializable};
use super::{DatabaseError, select_note_sync_records};
use crate::COMPONENT;
use crate::db::models::conv::SqlTypeConvert;
use crate::db::models::serialize_vec;
use crate::db::schema;
#[derive(Debug, Clone, PartialEq, Queryable, Selectable, QueryableByName)]
#[diesel(table_name = schema::transactions)]
#[diesel(check_for_backend(diesel::sqlite::Sqlite))]
pub struct TransactionRecordRaw {
account_id: Vec<u8>,
block_num: i64,
transaction_id: Vec<u8>,
initial_state_commitment: Vec<u8>,
final_state_commitment: Vec<u8>,
input_notes: Vec<u8>,
output_notes: Vec<u8>,
size_in_bytes: i64,
fee: Vec<u8>,
}
#[tracing::instrument(
target = COMPONENT,
skip_all,
err,
)]
pub(crate) fn insert_transactions(
conn: &mut SqliteConnection,
block_num: BlockNumber,
transactions: &OrderedTransactionHeaders,
) -> Result<usize, DatabaseError> {
let rows: Vec<_> = transactions
.as_slice()
.iter()
.map(|tx| TransactionSummaryRowInsert::new(tx, block_num))
.collect();
let count = diesel::insert_into(schema::transactions::table).values(rows).execute(conn)?;
Ok(count)
}
#[derive(Debug, Clone, PartialEq, Insertable)]
#[diesel(table_name = schema::transactions)]
#[diesel(check_for_backend(diesel::sqlite::Sqlite))]
pub struct TransactionSummaryRowInsert {
transaction_id: Vec<u8>,
account_id: Vec<u8>,
block_num: i64,
initial_state_commitment: Vec<u8>,
final_state_commitment: Vec<u8>,
input_notes: Vec<u8>,
output_notes: Vec<u8>,
size_in_bytes: i64,
fee: Vec<u8>,
}
impl TransactionSummaryRowInsert {
#[expect(
clippy::cast_possible_wrap,
reason = "We will not approach the item count where i64 and usize cause issues"
)]
fn new(
transaction_header: &miden_protocol::transaction::TransactionHeader,
block_num: BlockNumber,
) -> Self {
const HEADER_BASE_SIZE_BYTES: usize = 4 + 32 + 16 + 64;
const INPUT_NOTE_COMMITMENT_SIZE_BYTES: usize = 64;
const OUTPUT_NOTE_SYNC_RECORD_SIZE_BYTES: usize = 700;
let input_notes: Vec<InputNoteCommitment> =
transaction_header.input_notes().iter().cloned().collect();
let input_notes_binary = input_notes.to_bytes();
let output_notes: Vec<NoteHeader> = transaction_header.output_notes().to_vec();
let output_notes_binary = output_notes.to_bytes();
let input_notes_size = (transaction_header.input_notes().num_notes() as usize)
* INPUT_NOTE_COMMITMENT_SIZE_BYTES;
let output_notes_size =
transaction_header.output_notes().len() * OUTPUT_NOTE_SYNC_RECORD_SIZE_BYTES;
let size_in_bytes = (HEADER_BASE_SIZE_BYTES + input_notes_size + output_notes_size) as i64;
Self {
transaction_id: transaction_header.id().to_bytes(),
account_id: transaction_header.account_id().to_bytes(),
block_num: block_num.to_raw_sql(),
initial_state_commitment: transaction_header.initial_state_commitment().to_bytes(),
final_state_commitment: transaction_header.final_state_commitment().to_bytes(),
input_notes: input_notes_binary,
output_notes: output_notes_binary,
size_in_bytes,
fee: transaction_header.fee().to_bytes(),
}
}
}
pub fn select_transactions_records(
conn: &mut SqliteConnection,
account_ids: &[AccountId],
block_range: RangeInclusive<BlockNumber>,
) -> Result<(BlockNumber, Vec<crate::db::TransactionRecord>), DatabaseError> {
const NUM_TXS_PER_CHUNK: i64 = 1000;
QueryParamAccountIdLimit::check(account_ids.len())?;
let max_payload_bytes =
i64::try_from(MAX_RESPONSE_PAYLOAD_BYTES).expect("payload limit fits within i64");
if block_range.is_empty() {
return Err(DatabaseError::InvalidBlockRange {
from: *block_range.start(),
to: *block_range.end(),
});
}
let desired_account_ids = serialize_vec(account_ids);
let mut all_transactions = Vec::new();
let mut total_size = 0i64;
let mut last_block_num: Option<i64> = None;
let mut last_transaction_id: Option<Vec<u8>> = None;
loop {
let mut query =
SelectDsl::select(schema::transactions::table, TransactionRecordRaw::as_select())
.filter(schema::transactions::block_num.ge(block_range.start().to_raw_sql()))
.filter(schema::transactions::block_num.le(block_range.end().to_raw_sql()))
.filter(schema::transactions::account_id.eq_any(&desired_account_ids))
.into_boxed();
if let (Some(last_block), Some(last_tx_id)) = (last_block_num, &last_transaction_id) {
query = query.filter(
schema::transactions::block_num
.gt(last_block)
.or(schema::transactions::block_num
.eq(last_block)
.and(schema::transactions::transaction_id.gt(last_tx_id))),
);
}
let chunk = query
.order((
schema::transactions::block_num.asc(),
schema::transactions::transaction_id.asc(),
))
.limit(NUM_TXS_PER_CHUNK)
.load::<TransactionRecordRaw>(conn)
.map_err(DatabaseError::from)?;
let mut added_from_chunk = 0;
for tx in chunk {
if total_size + tx.size_in_bytes <= max_payload_bytes {
total_size += tx.size_in_bytes;
last_block_num = Some(tx.block_num);
last_transaction_id = Some(tx.transaction_id.clone());
all_transactions.push(tx);
added_from_chunk += 1;
} else {
break;
}
}
if added_from_chunk < NUM_TXS_PER_CHUNK {
break;
}
}
if total_size >= max_payload_bytes {
let last_block_num = last_block_num.expect(
"guaranteed to have processed at least one transaction when size limit is reached",
);
let filtered_transactions = with_output_note_proofs(
conn,
all_transactions
.into_iter()
.take_while(|row| row.block_num != last_block_num)
.collect(),
)?;
let last_included_block = BlockNumber::from_raw_sql(last_block_num.saturating_sub(1))?;
Ok((last_included_block, filtered_transactions))
} else {
Ok((*block_range.end(), with_output_note_proofs(conn, all_transactions)?))
}
}
fn with_output_note_proofs(
conn: &mut SqliteConnection,
raw_transactions: Vec<TransactionRecordRaw>,
) -> Result<Vec<crate::db::TransactionRecord>, DatabaseError> {
use miden_protocol::Word;
use miden_protocol::asset::FungibleAsset;
let mut tx_output_notes = Vec::with_capacity(raw_transactions.len());
let mut all_note_commitments = Vec::new();
for raw in &raw_transactions {
let notes: Vec<NoteHeader> = Deserializable::read_from_bytes(&raw.output_notes)?;
all_note_commitments.extend(notes.iter().map(NoteHeader::to_commitment));
tx_output_notes.push(notes);
}
let mut output_notes_by_id = std::collections::BTreeMap::new();
for chunk in all_note_commitments.chunks(QueryParamNoteCommitmentLimit::LIMIT) {
output_notes_by_id.extend(select_note_sync_records(conn, chunk)?);
}
raw_transactions
.into_iter()
.zip(tx_output_notes)
.map(|(raw, output_notes)| {
let transaction_id = TransactionId::read_from_bytes(&raw.transaction_id)?;
let output_note_proofs = output_notes
.iter()
.filter_map(|note| output_notes_by_id.get(¬e.id()).cloned())
.collect();
let header = TransactionHeader::new_unchecked(
transaction_id,
AccountId::read_from_bytes(&raw.account_id)?,
Word::read_from_bytes(&raw.initial_state_commitment)?,
Word::read_from_bytes(&raw.final_state_commitment)?,
InputNotes::new_unchecked(Deserializable::read_from_bytes(&raw.input_notes)?),
output_notes,
FungibleAsset::read_from_bytes(&raw.fee)?,
);
Ok(crate::db::TransactionRecord {
block_num: BlockNumber::from_raw_sql(raw.block_num)?,
header,
output_note_proofs,
})
})
.collect()
}