miden_node_validator/db/
mod.rs1mod migrations;
2mod models;
3mod schema;
4
5use std::path::PathBuf;
6
7use diesel::SqliteConnection;
8use diesel::dsl::exists;
9use diesel::prelude::*;
10use miden_node_db::{DatabaseError, Db, SqlTypeConvert};
11use miden_protocol::block::{BlockHeader, BlockNumber};
12use miden_protocol::transaction::TransactionId;
13use miden_protocol::utils::{Deserializable, Serializable};
14use tracing::instrument;
15
16use crate::COMPONENT;
17use crate::db::migrations::apply_migrations;
18use crate::db::models::{BlockHeaderRowInsert, ValidatedTransactionRowInsert};
19use crate::tx_validation::ValidatedTransaction;
20
21#[instrument(target = COMPONENT, skip_all)]
23pub async fn load(database_filepath: PathBuf) -> Result<Db, DatabaseError> {
24 let db = Db::new(&database_filepath)?;
25 tracing::info!(
26 target: COMPONENT,
27 sqlite= %database_filepath.display(),
28 "Connected to the database"
29 );
30
31 db.query("migrations", apply_migrations).await?;
32 Ok(db)
33}
34
35#[instrument(target = COMPONENT, skip_all, fields(tx_id = %tx_info.tx_id()), err)]
37pub(crate) fn insert_transaction(
38 conn: &mut SqliteConnection,
39 tx_info: &ValidatedTransaction,
40) -> Result<usize, DatabaseError> {
41 let row = ValidatedTransactionRowInsert::new(tx_info);
42 let count = diesel::insert_into(schema::validated_transactions::table)
43 .values(row)
44 .on_conflict_do_nothing()
45 .execute(conn)?;
46 Ok(count)
47}
48
49#[instrument(target = COMPONENT, skip(conn), err)]
63pub(crate) fn find_unvalidated_transactions(
64 conn: &mut SqliteConnection,
65 tx_ids: &[TransactionId],
66) -> Result<Vec<TransactionId>, DatabaseError> {
67 let mut unvalidated_tx_ids = Vec::new();
68 for tx_id in tx_ids {
69 let exists = diesel::select(exists(
71 schema::validated_transactions::table
72 .filter(schema::validated_transactions::id.eq(tx_id.to_bytes())),
73 ))
74 .get_result::<bool>(conn)?;
75 if !exists {
77 unvalidated_tx_ids.push(*tx_id);
78 }
79 }
80 Ok(unvalidated_tx_ids)
81}
82
83#[instrument(target = COMPONENT, skip(conn, header), err)]
88pub fn upsert_block_header(
89 conn: &mut SqliteConnection,
90 header: &BlockHeader,
91) -> Result<(), DatabaseError> {
92 let row = BlockHeaderRowInsert {
93 block_num: header.block_num().to_raw_sql(),
94 block_header: header.to_bytes(),
95 };
96 diesel::replace_into(schema::block_headers::table).values(row).execute(conn)?;
97 Ok(())
98}
99
100#[instrument(target = COMPONENT, skip(conn), err)]
104pub fn load_chain_tip(conn: &mut SqliteConnection) -> Result<Option<BlockHeader>, DatabaseError> {
105 let row = schema::block_headers::table
106 .order(schema::block_headers::block_num.desc())
107 .select(schema::block_headers::block_header)
108 .first::<Vec<u8>>(conn)
109 .optional()?;
110
111 row.map(|bytes| {
112 BlockHeader::read_from_bytes(&bytes)
113 .map_err(|err| DatabaseError::deserialization("BlockHeader", err))
114 })
115 .transpose()
116}
117
118#[instrument(target = COMPONENT, skip(conn), err)]
122pub fn load_block_header(
123 conn: &mut SqliteConnection,
124 block_num: BlockNumber,
125) -> Result<Option<BlockHeader>, DatabaseError> {
126 let row = schema::block_headers::table
127 .filter(schema::block_headers::block_num.eq(block_num.to_raw_sql()))
128 .select(schema::block_headers::block_header)
129 .first::<Vec<u8>>(conn)
130 .optional()?;
131
132 row.map(|bytes| {
133 BlockHeader::read_from_bytes(&bytes)
134 .map_err(|err| DatabaseError::deserialization("BlockHeader", err))
135 })
136 .transpose()
137}