mod migrations;
mod models;
mod schema;
use std::path::PathBuf;
use diesel::SqliteConnection;
use diesel::dsl::exists;
use diesel::prelude::*;
use miden_node_db::{DatabaseError, Db, SqlTypeConvert};
use miden_protocol::block::{BlockHeader, BlockNumber};
use miden_protocol::transaction::TransactionId;
use miden_protocol::utils::serde::{Deserializable, Serializable};
use tracing::instrument;
use crate::COMPONENT;
use crate::db::migrations::apply_migrations;
use crate::db::models::{BlockHeaderRowInsert, ValidatedTransactionRowInsert};
use crate::tx_validation::ValidatedTransaction;
#[instrument(target = COMPONENT, skip_all)]
pub async fn load(database_filepath: PathBuf) -> Result<Db, DatabaseError> {
let db = Db::new(&database_filepath)?;
tracing::info!(
target: COMPONENT,
sqlite= %database_filepath.display(),
"Connected to the database"
);
db.query("migrations", apply_migrations).await?;
Ok(db)
}
#[instrument(target = COMPONENT, skip_all, fields(tx_id = %tx_info.tx_id()), err)]
pub(crate) fn insert_transaction(
conn: &mut SqliteConnection,
tx_info: &ValidatedTransaction,
) -> Result<usize, DatabaseError> {
let row = ValidatedTransactionRowInsert::new(tx_info);
let count = diesel::insert_into(schema::validated_transactions::table)
.values(row)
.on_conflict_do_nothing()
.execute(conn)?;
Ok(count)
}
#[instrument(target = COMPONENT, skip(conn), err)]
pub(crate) fn find_unvalidated_transactions(
conn: &mut SqliteConnection,
tx_ids: &[TransactionId],
) -> Result<Vec<TransactionId>, DatabaseError> {
let mut unvalidated_tx_ids = Vec::new();
for tx_id in tx_ids {
let exists = diesel::select(exists(
schema::validated_transactions::table
.filter(schema::validated_transactions::id.eq(tx_id.to_bytes())),
))
.get_result::<bool>(conn)?;
if !exists {
unvalidated_tx_ids.push(*tx_id);
}
}
Ok(unvalidated_tx_ids)
}
#[instrument(target = COMPONENT, skip(conn, header), err)]
pub fn upsert_block_header(
conn: &mut SqliteConnection,
header: &BlockHeader,
) -> Result<(), DatabaseError> {
let row = BlockHeaderRowInsert {
block_num: header.block_num().to_raw_sql(),
block_header: header.to_bytes(),
};
diesel::replace_into(schema::block_headers::table).values(row).execute(conn)?;
Ok(())
}
#[instrument(target = COMPONENT, skip(conn), err)]
pub fn load_chain_tip(conn: &mut SqliteConnection) -> Result<Option<BlockHeader>, DatabaseError> {
let row = schema::block_headers::table
.order(schema::block_headers::block_num.desc())
.select(schema::block_headers::block_header)
.first::<Vec<u8>>(conn)
.optional()?;
row.map(|bytes| {
BlockHeader::read_from_bytes(&bytes)
.map_err(|err| DatabaseError::deserialization("BlockHeader", err))
})
.transpose()
}
#[instrument(target = COMPONENT, skip(conn), err)]
pub fn load_block_header(
conn: &mut SqliteConnection,
block_num: BlockNumber,
) -> Result<Option<BlockHeader>, DatabaseError> {
let row = schema::block_headers::table
.filter(schema::block_headers::block_num.eq(block_num.to_raw_sql()))
.select(schema::block_headers::block_header)
.first::<Vec<u8>>(conn)
.optional()?;
row.map(|bytes| {
BlockHeader::read_from_bytes(&bytes)
.map_err(|err| DatabaseError::deserialization("BlockHeader", err))
})
.transpose()
}