Skip to main content

miden_node_validator/db/
mod.rs

1mod migrations;
2mod models;
3mod schema;
4
5use std::path::PathBuf;
6
7use diesel::SqliteConnection;
8use diesel::dsl::exists;
9use diesel::prelude::*;
10use miden_node_db::{DatabaseError, Db, SqlTypeConvert};
11use miden_protocol::block::{BlockHeader, BlockNumber};
12use miden_protocol::transaction::TransactionId;
13use miden_protocol::utils::serde::{Deserializable, Serializable};
14use tracing::instrument;
15
16use crate::COMPONENT;
17use crate::db::migrations::apply_migrations;
18use crate::db::models::{BlockHeaderRowInsert, ValidatedTransactionRowInsert};
19use crate::tx_validation::ValidatedTransaction;
20
21/// Open a connection to the DB and apply any pending migrations.
22#[instrument(target = COMPONENT, skip_all)]
23pub async fn load(database_filepath: PathBuf) -> Result<Db, DatabaseError> {
24    let db = Db::new(&database_filepath)?;
25    tracing::info!(
26        target: COMPONENT,
27        sqlite= %database_filepath.display(),
28        "Connected to the database"
29    );
30
31    db.query("migrations", apply_migrations).await?;
32    Ok(db)
33}
34
35/// Inserts a new validated transaction into the database.
36#[instrument(target = COMPONENT, skip_all, fields(tx_id = %tx_info.tx_id()), err)]
37pub(crate) fn insert_transaction(
38    conn: &mut SqliteConnection,
39    tx_info: &ValidatedTransaction,
40) -> Result<usize, DatabaseError> {
41    let row = ValidatedTransactionRowInsert::new(tx_info);
42    let count = diesel::insert_into(schema::validated_transactions::table)
43        .values(row)
44        .on_conflict_do_nothing()
45        .execute(conn)?;
46    Ok(count)
47}
48
49/// Scans the database for transaction Ids that do not exist.
50///
51/// If the resulting vector is empty, all supplied transaction ids have been validated in the past.
52///
53/// # Raw SQL
54///
55/// ```sql
56/// SELECT EXISTS(
57///   SELECT 1
58///   FROM validated_transactions
59///   WHERE id = ?
60/// );
61/// ```
62#[instrument(target = COMPONENT, skip(conn), err)]
63pub(crate) fn find_unvalidated_transactions(
64    conn: &mut SqliteConnection,
65    tx_ids: &[TransactionId],
66) -> Result<Vec<TransactionId>, DatabaseError> {
67    let mut unvalidated_tx_ids = Vec::new();
68    for tx_id in tx_ids {
69        // Check whether each transaction id exists in the database.
70        let exists = diesel::select(exists(
71            schema::validated_transactions::table
72                .filter(schema::validated_transactions::id.eq(tx_id.to_bytes())),
73        ))
74        .get_result::<bool>(conn)?;
75        // Record any transaction ids that do not exist.
76        if !exists {
77            unvalidated_tx_ids.push(*tx_id);
78        }
79    }
80    Ok(unvalidated_tx_ids)
81}
82
83/// Upserts a block header into the database.
84///
85/// Inserts a new row if no block header exists at the given block number, or replaces the
86/// existing block header if one already exists.
87#[instrument(target = COMPONENT, skip(conn, header), err)]
88pub fn upsert_block_header(
89    conn: &mut SqliteConnection,
90    header: &BlockHeader,
91) -> Result<(), DatabaseError> {
92    let row = BlockHeaderRowInsert {
93        block_num: header.block_num().to_raw_sql(),
94        block_header: header.to_bytes(),
95    };
96    diesel::replace_into(schema::block_headers::table).values(row).execute(conn)?;
97    Ok(())
98}
99
100/// Loads the chain tip (block header with the highest block number) from the database.
101///
102/// Returns `None` if no block headers have been persisted (i.e. bootstrap has not been run).
103#[instrument(target = COMPONENT, skip(conn), err)]
104pub fn load_chain_tip(conn: &mut SqliteConnection) -> Result<Option<BlockHeader>, DatabaseError> {
105    let row = schema::block_headers::table
106        .order(schema::block_headers::block_num.desc())
107        .select(schema::block_headers::block_header)
108        .first::<Vec<u8>>(conn)
109        .optional()?;
110
111    row.map(|bytes| {
112        BlockHeader::read_from_bytes(&bytes)
113            .map_err(|err| DatabaseError::deserialization("BlockHeader", err))
114    })
115    .transpose()
116}
117
118/// Loads a block header by its block number.
119///
120/// Returns `None` if no block header exists at the given block number.
121#[instrument(target = COMPONENT, skip(conn), err)]
122pub fn load_block_header(
123    conn: &mut SqliteConnection,
124    block_num: BlockNumber,
125) -> Result<Option<BlockHeader>, DatabaseError> {
126    let row = schema::block_headers::table
127        .filter(schema::block_headers::block_num.eq(block_num.to_raw_sql()))
128        .select(schema::block_headers::block_header)
129        .first::<Vec<u8>>(conn)
130        .optional()?;
131
132    row.map(|bytes| {
133        BlockHeader::read_from_bytes(&bytes)
134            .map_err(|err| DatabaseError::deserialization("BlockHeader", err))
135    })
136    .transpose()
137}