use error::{DecodeError, QueryError};
use essential_builder_types::SolutionFailure;
use essential_hash::content_addr;
use essential_types::{solution::Solution, ContentAddress, Hash};
#[cfg(feature = "pool")]
pub use pool::ConnectionPool;
use rusqlite::{named_params, Connection, OptionalExtension, Transaction};
use serde::{Deserialize, Serialize};
use std::{ops::Range, time::Duration};
pub mod error;
#[cfg(feature = "pool")]
pub mod pool;
pub mod sql;
pub fn encode<T>(value: &T) -> Vec<u8>
where
T: Serialize,
{
postcard::to_allocvec(value).expect("postcard serialization cannot fail")
}
pub fn decode<T>(value: &[u8]) -> Result<T, DecodeError>
where
T: for<'de> Deserialize<'de>,
{
Ok(postcard::from_bytes(value)?)
}
pub fn create_tables(tx: &Transaction) -> rusqlite::Result<()> {
for table in sql::table::ALL {
tx.execute(table.create, ())?;
}
Ok(())
}
pub fn insert_solution_submission(
tx: &Transaction,
solution: &Solution,
timestamp: Duration,
) -> rusqlite::Result<ContentAddress> {
let ca = content_addr(solution);
let ca_blob = &ca.0;
let solution_blob = encode(solution);
tx.execute(
sql::insert::SOLUTION,
named_params! {
":content_addr": &ca_blob,
":solution": solution_blob,
},
)?;
let secs = timestamp.as_secs();
let nanos = timestamp.subsec_nanos();
tx.execute(
sql::insert::SUBMISSION,
named_params! {
":solution_addr": ca_blob,
":timestamp_secs": secs,
":timestamp_nanos": nanos,
},
)?;
Ok(ca)
}
pub fn insert_solution_failure(
conn: &Connection,
solution_ca: &ContentAddress,
solution_failure: SolutionFailure,
) -> rusqlite::Result<()> {
conn.execute(
sql::insert::SOLUTION_FAILURE,
named_params! {
":solution_addr": &solution_ca.0,
":attempt_block_num": solution_failure.attempt_block_num,
":attempt_block_addr": &solution_failure.attempt_block_addr.0,
":attempt_solution_ix": solution_failure.attempt_solution_ix,
":err_msg": solution_failure.err_msg.as_bytes(),
},
)?;
Ok(())
}
pub fn get_solution(
conn: &Connection,
ca: &ContentAddress,
) -> Result<Option<Solution>, QueryError> {
let ca_blob = &ca.0;
let mut stmt = conn.prepare(sql::query::GET_SOLUTION)?;
let solution_blob: Option<Vec<u8>> = stmt
.query_row([ca_blob], |row| row.get("solution"))
.optional()?;
Ok(solution_blob.as_deref().map(decode).transpose()?)
}
pub fn list_solutions(
conn: &Connection,
time_range: Range<Duration>,
limit: i64,
) -> Result<Vec<(ContentAddress, Solution, Duration)>, QueryError> {
let mut stmt = conn.prepare(sql::query::LIST_SOLUTIONS)?;
let start_secs = time_range.start.as_secs();
let start_nanos = time_range.start.subsec_nanos();
let end_secs = time_range.end.as_secs();
let end_nanos = time_range.end.subsec_nanos();
let rows = stmt.query_map(
named_params! {
":start_secs": start_secs,
":start_nanos": start_nanos,
":end_secs": end_secs,
":end_nanos": end_nanos,
":limit": limit,
},
|row| {
let solution_addr_blob: Hash = row.get("content_addr")?;
let solution_blob: Vec<u8> = row.get("solution")?;
let secs: u64 = row.get("timestamp_secs")?;
let nanos: u32 = row.get("timestamp_nanos")?;
let timestamp = Duration::new(secs, nanos);
Ok((ContentAddress(solution_addr_blob), solution_blob, timestamp))
},
)?;
rows.into_iter()
.map(|res| {
let (ca, blob, ts) = res?;
let solution = decode(&blob)?;
Ok((ca, solution, ts))
})
.collect()
}
pub fn list_submissions(
conn: &Connection,
time_range: Range<Duration>,
limit: i64,
) -> rusqlite::Result<Vec<(ContentAddress, Duration)>> {
let mut stmt = conn.prepare(sql::query::LIST_SUBMISSIONS)?;
let start_secs = time_range.start.as_secs();
let start_nanos = time_range.start.subsec_nanos();
let end_secs = time_range.end.as_secs();
let end_nanos = time_range.end.subsec_nanos();
let rows = stmt.query_map(
named_params! {
":start_secs": start_secs,
":start_nanos": start_nanos,
":end_secs": end_secs,
":end_nanos": end_nanos,
":limit": limit,
},
|row| {
let solution_addr_blob: Hash = row.get("content_addr")?;
let secs: u64 = row.get("timestamp_secs")?;
let nanos: u32 = row.get("timestamp_nanos")?;
let timestamp = Duration::new(secs, nanos);
Ok((ContentAddress(solution_addr_blob), timestamp))
},
)?;
rows.collect()
}
pub fn latest_solution_failures(
conn: &Connection,
ca: &ContentAddress,
limit: u32,
) -> rusqlite::Result<Vec<SolutionFailure<'static>>> {
let ca_blob = &ca.0;
let mut stmt = conn.prepare(sql::query::LATEST_SOLUTION_FAILURES)?;
let rows = stmt.query_map(
named_params! {
":solution_addr": ca_blob,
":limit": limit,
},
|row| {
let attempt_block_num: i64 = row.get("attempt_block_num")?;
let attempt_block_addr: Hash = row.get("attempt_block_addr")?;
let attempt_solution_ix: u32 = row.get("attempt_solution_ix")?;
let err_msg_blob: Vec<u8> = row.get("err_msg")?;
let err_msg = String::from_utf8_lossy(&err_msg_blob).into_owned();
Ok(SolutionFailure {
attempt_block_num,
attempt_block_addr: ContentAddress(attempt_block_addr),
attempt_solution_ix,
err_msg: err_msg.into(),
})
},
)?;
rows.collect()
}
pub fn list_solution_failures(
conn: &Connection,
offset: u32,
limit: u32,
) -> rusqlite::Result<Vec<SolutionFailure<'static>>> {
let mut stmt = conn.prepare(sql::query::LIST_SOLUTION_FAILURES)?;
let rows = stmt.query_map(
named_params! {
":offset": offset,
":limit": limit,
},
|row| {
let attempt_block_num: i64 = row.get("attempt_block_num")?;
let attempt_block_addr: Hash = row.get("attempt_block_addr")?;
let attempt_solution_ix: u32 = row.get("attempt_solution_ix")?;
let err_msg_blob: Vec<u8> = row.get("err_msg")?;
let err_msg = String::from_utf8_lossy(&err_msg_blob).into_owned();
Ok(SolutionFailure {
attempt_block_num,
attempt_block_addr: ContentAddress(attempt_block_addr),
attempt_solution_ix,
err_msg: err_msg.into(),
})
},
)?;
rows.collect()
}
pub fn delete_solution(conn: &Connection, ca: &ContentAddress) -> rusqlite::Result<()> {
let ca_blob = &ca.0;
conn.execute(
sql::delete::SOLUTION,
named_params! {
":content_addr": ca_blob,
},
)?;
Ok(())
}
pub fn delete_solutions(
tx: &Transaction,
cas: impl IntoIterator<Item = ContentAddress>,
) -> rusqlite::Result<()> {
for ca in cas {
crate::delete_solution(tx, &ca)?;
}
Ok(())
}
pub fn delete_oldest_solution_failures(conn: &Connection, keep_limit: u32) -> rusqlite::Result<()> {
conn.execute(
sql::delete::OLDEST_SOLUTION_FAILURES,
named_params! {
":keep_limit": keep_limit,
},
)?;
Ok(())
}
pub fn with_tx<T, E>(
conn: &mut rusqlite::Connection,
f: impl FnOnce(&mut Transaction) -> Result<T, E>,
) -> Result<T, E>
where
E: From<rusqlite::Error>,
{
let mut tx = conn.transaction()?;
let out = f(&mut tx)?;
tx.commit()?;
Ok(out)
}