essential_builder_db/
lib.rs

1//! The database API for the block builder's solution set pool and related storage.
2//!
3//! The `essential-builder-db` crate provides a simple database API for managing the block
4//! builder's solution set pool and related storage, using SQLite as the underlying database. It allows
5//! you to store, query, and delete solution sets, as well as manage solution set submissions with
6//! timestamps.
7//!
8//! ## Overview
9//!
10//! - [`create_tables`]: Creates all required tables in the database.
11//! - [`insert_solution_set_submission`]: Inserts a solution set and its associated submission timestamp.
12//! - [`insert_solution_set_failure`]: Records a failure to apply a solution set to a block.
13//! - [`get_solution_set`]: Retrieves a solution set by its content address.
14//! - [`list_solution_sets`]: Lists all solution sets that were submitted within a given time range.
15//! - [`list_submissions`]: Lists submissions based on timestamp.
16//! - [`latest_solution_set_failures`]: Queries the latest failures for a given solution set.
17//! - [`delete_solution_set`]: Deletes a solution set and its submissions given the solution set's address.
18//! - [`delete_oldest_solution_set_failures`]: Deletes the oldest solution set failures until the
19//!   stored number is within a given limit.
20
21use error::{DecodeError, QueryError};
22use essential_builder_types::SolutionSetFailure;
23use essential_hash::content_addr;
24use essential_types::{solution::SolutionSet, ContentAddress, Hash};
25#[cfg(feature = "pool")]
26pub use pool::ConnectionPool;
27use rusqlite::{named_params, Connection, OptionalExtension, Transaction};
28use serde::{Deserialize, Serialize};
29use std::{ops::Range, time::Duration};
30
31pub mod error;
32#[cfg(feature = "pool")]
33pub mod pool;
34pub mod sql;
35
36/// Encodes the given value into a blob.
37///
38/// This serializes the value using postcard.
39pub fn encode<T>(value: &T) -> Vec<u8>
40where
41    T: Serialize,
42{
43    postcard::to_allocvec(value).expect("postcard serialization cannot fail")
44}
45
46/// Decodes the given blob into a value of type `T`.
47///
48/// This deserializes the bytes into a value of `T` with `postcard`.
49pub fn decode<T>(value: &[u8]) -> Result<T, DecodeError>
50where
51    T: for<'de> Deserialize<'de>,
52{
53    Ok(postcard::from_bytes(value)?)
54}
55
56/// Create all tables.
57pub fn create_tables(tx: &Transaction) -> rusqlite::Result<()> {
58    for table in sql::table::ALL {
59        tx.execute(table.create, ())?;
60    }
61    Ok(())
62}
63
64/// Insert a submitted solution set and the time it was received into the table.
65///
66/// This first inserts the solution set and its CA into the solution set table if it doesn't
67/// already exist, then inserts associated timestamp into the submission table.
68pub fn insert_solution_set_submission(
69    tx: &Transaction,
70    solution_set: &SolutionSet,
71    timestamp: Duration,
72) -> rusqlite::Result<ContentAddress> {
73    // Insert the solution set (or ignore if exists).
74    let ca = content_addr(solution_set);
75    let ca_blob = &ca.0;
76    let solution_set_blob = encode(solution_set);
77    tx.execute(
78        sql::insert::SOLUTION_SET,
79        named_params! {
80            ":content_addr": &ca_blob,
81            ":solution_set": solution_set_blob,
82        },
83    )?;
84
85    // Insert the submission timestamp.
86    let secs = timestamp.as_secs();
87    let nanos = timestamp.subsec_nanos();
88    tx.execute(
89        sql::insert::SUBMISSION,
90        named_params! {
91            ":solution_set_addr": ca_blob,
92            ":timestamp_secs": secs,
93            ":timestamp_nanos": nanos,
94        },
95    )?;
96
97    Ok(ca)
98}
99
100/// Record a failure to include a solution set in a block.
101///
102/// We only record that a failure occurred, the block number, solution set index at which the failure
103/// occurred, and a basic error message that can be returned to the submitter. If the user or
104/// application requires more detailed information about the failure, the block number and solution
105/// set index should be enough to reconstruct the failure with a synced node.
106pub fn insert_solution_set_failure(
107    conn: &Connection,
108    solution_set_ca: &ContentAddress,
109    solution_set_failure: SolutionSetFailure,
110) -> rusqlite::Result<()> {
111    conn.execute(
112        sql::insert::SOLUTION_SET_FAILURE,
113        named_params! {
114            ":solution_set_addr": &solution_set_ca.0,
115            ":attempt_block_num": solution_set_failure.attempt_block_num,
116            ":attempt_block_addr": &solution_set_failure.attempt_block_addr.0,
117            ":attempt_solution_set_ix": solution_set_failure.attempt_solution_set_ix,
118            ":err_msg": solution_set_failure.err_msg.as_bytes(),
119        },
120    )?;
121    Ok(())
122}
123
124/// Fetches a solution set by its content address.
125pub fn get_solution_set(
126    conn: &Connection,
127    ca: &ContentAddress,
128) -> Result<Option<SolutionSet>, QueryError> {
129    let ca_blob = &ca.0;
130    let mut stmt = conn.prepare(sql::query::GET_SOLUTION_SET)?;
131    let solution_set_blob: Option<Vec<u8>> = stmt
132        .query_row([ca_blob], |row| row.get("solution_set"))
133        .optional()?;
134    Ok(solution_set_blob.as_deref().map(decode).transpose()?)
135}
136
137/// List all solution sets that were submitted within the given time range.
138///
139/// The number of results will be limited to the given `limit`.
140///
141/// Note that if the same solution set was submitted multiple times within the given time range, they
142/// will appear multiple times in the result.
143pub fn list_solution_sets(
144    conn: &Connection,
145    time_range: Range<Duration>,
146    limit: i64,
147) -> Result<Vec<(ContentAddress, SolutionSet, Duration)>, QueryError> {
148    let mut stmt = conn.prepare(sql::query::LIST_SOLUTION_SETS)?;
149    let start_secs = time_range.start.as_secs();
150    let start_nanos = time_range.start.subsec_nanos();
151    let end_secs = time_range.end.as_secs();
152    let end_nanos = time_range.end.subsec_nanos();
153    let rows = stmt.query_map(
154        named_params! {
155            ":start_secs": start_secs,
156            ":start_nanos": start_nanos,
157            ":end_secs": end_secs,
158            ":end_nanos": end_nanos,
159            ":limit": limit,
160        },
161        |row| {
162            let solution_set_addr_blob: Hash = row.get("content_addr")?;
163            let solution_set_blob: Vec<u8> = row.get("solution_set")?;
164            let secs: u64 = row.get("timestamp_secs")?;
165            let nanos: u32 = row.get("timestamp_nanos")?;
166            let timestamp = Duration::new(secs, nanos);
167            Ok((
168                ContentAddress(solution_set_addr_blob),
169                solution_set_blob,
170                timestamp,
171            ))
172        },
173    )?;
174    rows.into_iter()
175        .map(|res| {
176            let (ca, blob, ts) = res?;
177            let solution_set = decode(&blob)?;
178            Ok((ca, solution_set, ts))
179        })
180        .collect()
181}
182
183/// List all submissions that were made within the given time range.
184///
185/// The number of results will be limited to the given `limit`.
186pub fn list_submissions(
187    conn: &Connection,
188    time_range: Range<Duration>,
189    limit: i64,
190) -> rusqlite::Result<Vec<(ContentAddress, Duration)>> {
191    let mut stmt = conn.prepare(sql::query::LIST_SUBMISSIONS)?;
192    let start_secs = time_range.start.as_secs();
193    let start_nanos = time_range.start.subsec_nanos();
194    let end_secs = time_range.end.as_secs();
195    let end_nanos = time_range.end.subsec_nanos();
196    let rows = stmt.query_map(
197        named_params! {
198            ":start_secs": start_secs,
199            ":start_nanos": start_nanos,
200            ":end_secs": end_secs,
201            ":end_nanos": end_nanos,
202            ":limit": limit,
203        },
204        |row| {
205            let solution_set_addr_blob: Hash = row.get("content_addr")?;
206            let secs: u64 = row.get("timestamp_secs")?;
207            let nanos: u32 = row.get("timestamp_nanos")?;
208            let timestamp = Duration::new(secs, nanos);
209            Ok((ContentAddress(solution_set_addr_blob), timestamp))
210        },
211    )?;
212    rows.collect()
213}
214
215/// Query the latest solution set failures for a given solution set content address.
216///
217/// Results are ordered by block number and solution set index in descending order.
218///
219/// Returns at most `limit` failures.
220pub fn latest_solution_set_failures(
221    conn: &Connection,
222    ca: &ContentAddress,
223    limit: u32,
224) -> rusqlite::Result<Vec<SolutionSetFailure<'static>>> {
225    let ca_blob = &ca.0;
226    let mut stmt = conn.prepare(sql::query::LATEST_SOLUTION_SET_FAILURES)?;
227    let rows = stmt.query_map(
228        named_params! {
229            ":solution_set_addr": ca_blob,
230            ":limit": limit,
231        },
232        |row| {
233            let attempt_block_num: i64 = row.get("attempt_block_num")?;
234            let attempt_block_addr: Hash = row.get("attempt_block_addr")?;
235            let attempt_solution_set_ix: u32 = row.get("attempt_solution_set_ix")?;
236            let err_msg_blob: Vec<u8> = row.get("err_msg")?;
237            let err_msg = String::from_utf8_lossy(&err_msg_blob).into_owned();
238            Ok(SolutionSetFailure {
239                attempt_block_num,
240                attempt_block_addr: ContentAddress(attempt_block_addr),
241                attempt_solution_set_ix,
242                err_msg: err_msg.into(),
243            })
244        },
245    )?;
246    rows.collect()
247}
248
249/// List the latest solution set failures.
250///
251/// Results are ordered by block number and solution set index in descending order.
252///
253/// Returns at most `limit` failures and starts at `offset`.
254pub fn list_solution_set_failures(
255    conn: &Connection,
256    offset: u32,
257    limit: u32,
258) -> rusqlite::Result<Vec<SolutionSetFailure<'static>>> {
259    let mut stmt = conn.prepare(sql::query::LIST_SOLUTION_SET_FAILURES)?;
260    let rows = stmt.query_map(
261        named_params! {
262            ":offset": offset,
263            ":limit": limit,
264        },
265        |row| {
266            let attempt_block_num: i64 = row.get("attempt_block_num")?;
267            let attempt_block_addr: Hash = row.get("attempt_block_addr")?;
268            let attempt_solution_set_ix: u32 = row.get("attempt_solution_set_ix")?;
269            let err_msg_blob: Vec<u8> = row.get("err_msg")?;
270            let err_msg = String::from_utf8_lossy(&err_msg_blob).into_owned();
271            Ok(SolutionSetFailure {
272                attempt_block_num,
273                attempt_block_addr: ContentAddress(attempt_block_addr),
274                attempt_solution_set_ix,
275                err_msg: err_msg.into(),
276            })
277        },
278    )?;
279    rows.collect()
280}
281
282/// Delete the solution set with the given CA from the database if it exists.
283///
284/// This also deletes all submissions associated with the specified solution set.
285pub fn delete_solution_set(conn: &Connection, ca: &ContentAddress) -> rusqlite::Result<()> {
286    let ca_blob = &ca.0;
287    conn.execute(
288        sql::delete::SOLUTION_SET,
289        named_params! {
290            ":content_addr": ca_blob,
291        },
292    )?;
293    Ok(())
294}
295
296/// Delete the solution sets with the given CAs from the database if they exist.
297///
298/// This also deletes all submissions associated with the specified solution sets.
299pub fn delete_solution_sets(
300    tx: &Transaction,
301    cas: impl IntoIterator<Item = ContentAddress>,
302) -> rusqlite::Result<()> {
303    for ca in cas {
304        crate::delete_solution_set(tx, &ca)?;
305    }
306    Ok(())
307}
308
309/// Delete the oldest solution set failures until the number of stored failures
310/// is less than or equal to `keep_limit`.
311pub fn delete_oldest_solution_set_failures(
312    conn: &Connection,
313    keep_limit: u32,
314) -> rusqlite::Result<()> {
315    conn.execute(
316        sql::delete::OLDEST_SOLUTION_SET_FAILURES,
317        named_params! {
318            ":keep_limit": keep_limit,
319        },
320    )?;
321    Ok(())
322}
323
324/// Short-hand for constructing a transaction, providing it as an argument to
325/// the given function, then committing the transaction before returning.
326pub fn with_tx<T, E>(
327    conn: &mut rusqlite::Connection,
328    f: impl FnOnce(&mut Transaction) -> Result<T, E>,
329) -> Result<T, E>
330where
331    E: From<rusqlite::Error>,
332{
333    let mut tx = conn.transaction()?;
334    let out = f(&mut tx)?;
335    tx.commit()?;
336    Ok(out)
337}