#![warn(missing_docs)]
pub use error::{DecodeError, QueryError};
use essential_hash::content_addr;
#[doc(inline)]
pub use essential_node_db_sql as sql;
use essential_types::{
contract::Contract, predicate::Predicate, solution::Solution, Block, ContentAddress, Hash, Key,
Value, Word,
};
use futures::Stream;
use rusqlite::{named_params, Connection, OptionalExtension, Transaction};
use serde::{Deserialize, Serialize};
use std::{ops::Range, time::Duration};
pub use query_range::finalized;
mod error;
mod query_range;
pub trait AcquireConnection {
#[allow(async_fn_in_trait)]
async fn acquire_connection(&self) -> Option<impl 'static + AsRef<Connection>>;
}
pub trait AwaitNewBlock {
#[allow(async_fn_in_trait)]
async fn await_new_block(&mut self) -> Option<()>;
}
pub fn encode<T>(value: &T) -> Vec<u8>
where
T: Serialize,
{
postcard::to_allocvec(value).expect("postcard serialization cannot fail")
}
pub fn decode<T>(value: &[u8]) -> Result<T, DecodeError>
where
T: for<'de> Deserialize<'de>,
{
Ok(postcard::from_bytes(value)?)
}
pub fn create_tables(tx: &Transaction) -> rusqlite::Result<()> {
for table in sql::table::ALL {
tx.execute(table.create, ())?;
}
Ok(())
}
pub fn insert_block(tx: &Transaction, block: &Block) -> rusqlite::Result<()> {
let secs = block.timestamp.as_secs();
let nanos = block.timestamp.subsec_nanos() as u64;
let solution_hashes: Vec<ContentAddress> = block.solutions.iter().map(content_addr).collect();
let block_address =
essential_hash::block_addr::from_block_and_solutions_addrs_slice(block, &solution_hashes);
tx.execute(
sql::insert::BLOCK,
named_params! {
":block_address": block_address.0,
":number": block.number,
":timestamp_secs": secs,
":timestamp_nanos": nanos,
},
)?;
let mut stmt_solution = tx.prepare(sql::insert::SOLUTION)?;
let mut stmt_block_solution = tx.prepare(sql::insert::BLOCK_SOLUTION)?;
let mut stmt_mutation = tx.prepare(sql::insert::MUTATION)?;
let mut stmt_dec_var = tx.prepare(sql::insert::DEC_VAR)?;
let mut stmt_pub_var = tx.prepare(sql::insert::PUB_VAR)?;
for (ix, (solution, ca)) in block.solutions.iter().zip(solution_hashes).enumerate() {
let solution_blob = encode(solution);
stmt_solution.execute(named_params! {
":content_hash": ca.0,
":solution": solution_blob,
})?;
stmt_block_solution.execute(named_params! {
":block_address": block_address.0,
":solution_hash": &ca.0,
":solution_index": ix,
})?;
for (data_ix, data) in solution.data.iter().enumerate() {
let contract_ca_blob = encode(&data.predicate_to_solve.contract);
for (mutation_ix, mutation) in data.state_mutations.iter().enumerate() {
let key_blob = encode(&mutation.key);
let value_blob = encode(&mutation.value);
stmt_mutation.execute(named_params! {
":solution_hash": ca.0,
":data_index": data_ix,
":mutation_index": mutation_ix,
":contract_ca": contract_ca_blob,
":key": key_blob,
":value": value_blob,
})?;
}
for (dec_var_ix, dec_var) in data.decision_variables.iter().enumerate() {
let blob = encode(&dec_var);
stmt_dec_var.execute(named_params! {
":solution_hash": ca.0,
":data_index": data_ix,
":dec_var_index": dec_var_ix,
":value": blob
})?;
}
for pub_var in &data.transient_data {
let key_blob = encode(&pub_var.key);
let value_blob = encode(&pub_var.value);
stmt_pub_var.execute(named_params! {
":solution_hash": ca.0,
":data_index": data_ix,
":key": key_blob,
":value": value_blob,
})?;
}
}
}
stmt_solution.finalize()?;
stmt_block_solution.finalize()?;
stmt_mutation.finalize()?;
stmt_dec_var.finalize()?;
stmt_pub_var.finalize()?;
Ok(())
}
pub fn finalize_block(conn: &Connection, block_address: &ContentAddress) -> rusqlite::Result<()> {
conn.execute(
sql::insert::FINALIZE_BLOCK,
named_params! {
":block_address": block_address.0,
},
)?;
Ok(())
}
pub fn insert_failed_block(
conn: &Connection,
block_address: &ContentAddress,
solution_hash: &ContentAddress,
) -> rusqlite::Result<()> {
conn.execute(
sql::insert::FAILED_BLOCK,
named_params! {
":block_address": block_address.0,
":solution_hash": solution_hash.0,
},
)?;
Ok(())
}
pub fn insert_contract(
tx: &Transaction,
contract: &Contract,
l2_block_number: Word,
) -> rusqlite::Result<()> {
let predicate_cas: Vec<_> = contract.predicates.iter().map(content_addr).collect();
let contract_ca = essential_hash::contract_addr::from_predicate_addrs(
predicate_cas.iter().cloned(),
&contract.salt,
);
let contract_ca_blob = encode(&contract_ca);
let salt_blob = encode(&contract.salt);
tx.execute(
sql::insert::CONTRACT,
named_params! {
":content_hash": contract_ca_blob,
":salt": salt_blob,
":l2_block_number": l2_block_number,
},
)?;
let mut stmt_predicate = tx.prepare(sql::insert::PREDICATE)?;
let mut stmt_contract_predicate = tx.prepare(sql::insert::CONTRACT_PREDICATE)?;
for (pred, pred_ca) in contract.predicates.iter().zip(&predicate_cas) {
let pred_blob = encode(pred);
let pred_ca_blob = encode(pred_ca);
stmt_predicate.execute(named_params! {
":content_hash": &pred_ca_blob,
":predicate": pred_blob,
})?;
stmt_contract_predicate.execute(named_params! {
":contract_hash": &contract_ca_blob,
":predicate_hash": &pred_ca_blob,
})?;
}
Ok(())
}
pub fn insert_contract_progress(
conn: &Connection,
l2_block_number: Word,
contract_ca: &ContentAddress,
) -> rusqlite::Result<()> {
let contract_ca_blob = encode(contract_ca);
conn.execute(
sql::insert::CONTRACT_PROGRESS,
named_params! {
":l2_block_number": l2_block_number,
":content_hash": contract_ca_blob,
},
)?;
Ok(())
}
pub fn update_state(
conn: &Connection,
contract_ca: &ContentAddress,
key: &Key,
value: &Value,
) -> rusqlite::Result<()> {
let contract_ca_blob = encode(contract_ca);
let key_blob = encode(key);
let value_blob = encode(value);
conn.execute(
sql::update::STATE,
named_params! {
":contract_hash": contract_ca_blob,
":key": key_blob,
":value": value_blob,
},
)?;
Ok(())
}
pub fn update_state_progress(
conn: &Connection,
block_address: &ContentAddress,
) -> rusqlite::Result<()> {
conn.execute(
sql::insert::STATE_PROGRESS,
named_params! {
":block_address": block_address.0,
},
)?;
Ok(())
}
pub fn update_validation_progress(
conn: &Connection,
block_address: &ContentAddress,
) -> rusqlite::Result<()> {
conn.execute(
sql::insert::VALIDATION_PROGRESS,
named_params! {
":block_address": block_address.0,
},
)?;
Ok(())
}
pub fn delete_state(
conn: &Connection,
contract_ca: &ContentAddress,
key: &Key,
) -> rusqlite::Result<()> {
let contract_ca_blob = encode(contract_ca);
let key_blob = encode(key);
conn.execute(
sql::update::DELETE_STATE,
named_params! {
":contract_hash": contract_ca_blob,
":key": key_blob,
},
)?;
Ok(())
}
pub fn get_contract_salt(
conn: &Connection,
ca: &ContentAddress,
) -> Result<Option<Hash>, QueryError> {
let ca_blob = encode(ca);
get_contract_salt_by_ca_blob(conn, &ca_blob)
}
fn get_contract_salt_by_ca_blob(
conn: &Connection,
ca_blob: &[u8],
) -> Result<Option<Hash>, QueryError> {
let mut stmt = conn.prepare(sql::query::GET_CONTRACT_SALT)?;
let salt_blob: Option<Vec<u8>> = stmt
.query_row([ca_blob], |row| row.get("salt"))
.optional()?;
Ok(salt_blob.as_deref().map(decode).transpose()?)
}
pub fn get_contract_predicates(
conn: &Connection,
ca: &ContentAddress,
) -> Result<Option<Vec<Predicate>>, QueryError> {
let ca_blob = encode(ca);
get_contract_predicates_by_ca_blob(conn, &ca_blob)
}
fn get_contract_predicates_by_ca_blob(
conn: &Connection,
ca_blob: &[u8],
) -> Result<Option<Vec<Predicate>>, QueryError> {
let mut stmt = conn.prepare(sql::query::GET_CONTRACT_PREDICATES)?;
let pred_blobs = stmt.query_map([ca_blob], |row| row.get::<_, Vec<u8>>("predicate"))?;
let mut predicates: Vec<Predicate> = vec![];
for pred_blob in pred_blobs {
predicates.push(decode(&pred_blob?)?);
}
Ok(Some(predicates))
}
pub fn get_contract(
conn: &Connection,
ca: &ContentAddress,
) -> Result<Option<Contract>, QueryError> {
let ca_blob = encode(ca);
let Some(salt) = get_contract_salt_by_ca_blob(conn, &ca_blob)? else {
return Ok(None);
};
let Some(predicates) = get_contract_predicates_by_ca_blob(conn, &ca_blob)? else {
return Ok(None);
};
Ok(Some(Contract { salt, predicates }))
}
pub fn get_contract_progress(
conn: &Connection,
) -> Result<Option<(Word, ContentAddress)>, QueryError> {
let Some((l2_block_number, content_hash)) = conn
.query_row(sql::query::GET_CONTRACT_PROGRESS, [], |row| {
let l2_block_number: Word = row.get("l2_block_number")?;
let content_hash: Vec<u8> = row.get("content_hash")?;
Ok((l2_block_number, content_hash))
})
.optional()?
else {
return Ok(None);
};
let content_hash = decode(&content_hash)?;
Ok(Some((l2_block_number, content_hash)))
}
pub fn get_predicate(
conn: &Connection,
predicate_ca: &ContentAddress,
) -> Result<Option<Predicate>, QueryError> {
let predicate_ca_blob = encode(predicate_ca);
let mut stmt = conn.prepare(sql::query::GET_PREDICATE)?;
let pred_blob: Option<Vec<u8>> = stmt
.query_row(
named_params! {
":predicate_hash": predicate_ca_blob,
},
|row| row.get("predicate"),
)
.optional()?;
Ok(pred_blob.as_deref().map(decode).transpose()?)
}
pub fn get_solution(
conn: &Connection,
ca: &ContentAddress,
) -> Result<Option<Solution>, QueryError> {
let mut stmt = conn.prepare(sql::query::GET_SOLUTION)?;
let solution_blob: Option<Vec<u8>> = stmt
.query_row([ca.0], |row| row.get("solution"))
.optional()?;
Ok(solution_blob.as_deref().map(decode).transpose()?)
}
pub fn query_state(
conn: &Connection,
contract_ca: &ContentAddress,
key: &Key,
) -> Result<Option<Value>, QueryError> {
use rusqlite::OptionalExtension;
let contract_ca_blob = encode(contract_ca);
let key_blob = encode(key);
let mut stmt = conn.prepare(sql::query::GET_STATE)?;
let value_blob: Option<Vec<u8>> = stmt
.query_row([contract_ca_blob, key_blob], |row| row.get("value"))
.optional()?;
Ok(value_blob.as_deref().map(decode).transpose()?)
}
pub fn get_block_number(
conn: &Connection,
block_address: &ContentAddress,
) -> Result<Option<Word>, rusqlite::Error> {
conn.query_row(
sql::query::GET_BLOCK_NUMBER,
named_params! {
":block_address": block_address.0,
},
|row| row.get::<_, Option<Word>>("number"),
)
}
pub fn get_latest_finalized_block_address(
conn: &Connection,
) -> Result<Option<ContentAddress>, rusqlite::Error> {
conn.query_row(sql::query::GET_LATEST_FINALIZED_BLOCK_ADDRESS, [], |row| {
row.get::<_, Hash>("block_address").map(ContentAddress)
})
.optional()
}
pub fn get_state_progress(conn: &Connection) -> Result<Option<ContentAddress>, QueryError> {
let mut stmt = conn.prepare(sql::query::GET_STATE_PROGRESS)?;
let value: Option<ContentAddress> = stmt
.query_row([], |row| {
let block_address: Hash = row.get("block_address")?;
Ok(ContentAddress(block_address))
})
.optional()?;
Ok(value)
}
pub fn get_validation_progress(conn: &Connection) -> Result<Option<ContentAddress>, QueryError> {
let mut stmt = conn.prepare(sql::query::GET_VALIDATION_PROGRESS)?;
let value: Option<ContentAddress> = stmt
.query_row([], |row| {
let block_address: Hash = row.get("block_address")?;
Ok(ContentAddress(block_address))
})
.optional()?;
Ok(value)
}
pub fn list_blocks(conn: &Connection, block_range: Range<Word>) -> Result<Vec<Block>, QueryError> {
let mut stmt = conn.prepare(sql::query::LIST_BLOCKS)?;
let rows = stmt.query_map(
named_params! {
":start_block": block_range.start,
":end_block": block_range.end,
},
|row| {
let block_address: essential_types::Hash = row.get("block_address")?;
let block_number: Word = row.get("number")?;
let timestamp_secs: u64 = row.get("timestamp_secs")?;
let timestamp_nanos: u32 = row.get("timestamp_nanos")?;
let solution_blob: Vec<u8> = row.get("solution")?;
let timestamp = Duration::new(timestamp_secs, timestamp_nanos);
Ok((block_address, block_number, timestamp, solution_blob))
},
)?;
let mut blocks: Vec<Block> = vec![];
let mut last_block_address = None;
for res in rows {
let (block_address, block_number, timestamp, solution_blob): (
essential_types::Hash,
Word,
Duration,
Vec<u8>,
) = res?;
let block = match last_block_address {
Some(b) if b == block_address => blocks.last_mut().expect("last block must exist"),
_ => {
last_block_address = Some(block_address);
blocks.push(Block {
number: block_number,
timestamp,
solutions: vec![],
});
blocks.last_mut().expect("last block must exist")
}
};
let solution: Solution = decode(&solution_blob)?;
block.solutions.push(solution);
}
Ok(blocks)
}
pub fn list_blocks_by_time(
conn: &Connection,
range: Range<Duration>,
page_size: i64,
page_number: i64,
) -> Result<Vec<Block>, QueryError> {
let mut stmt = conn.prepare(sql::query::LIST_BLOCKS_BY_TIME)?;
let rows = stmt.query_map(
named_params! {
":start_secs": range.start.as_secs(),
":start_nanos": range.start.subsec_nanos(),
":end_secs": range.end.as_secs(),
":end_nanos": range.end.subsec_nanos(),
":page_size": page_size,
":page_number": page_number,
},
|row| {
let block_address: essential_types::Hash = row.get("block_address")?;
let block_number: Word = row.get("number")?;
let timestamp_secs: u64 = row.get("timestamp_secs")?;
let timestamp_nanos: u32 = row.get("timestamp_nanos")?;
let solution_blob: Vec<u8> = row.get("solution")?;
let timestamp = Duration::new(timestamp_secs, timestamp_nanos);
Ok((block_address, block_number, timestamp, solution_blob))
},
)?;
let mut blocks: Vec<Block> = vec![];
let mut last_block_address: Option<essential_types::Hash> = None;
for res in rows {
let (block_address, block_number, timestamp, solution_blob): (
essential_types::Hash,
Word,
Duration,
Vec<u8>,
) = res?;
let block = match last_block_address {
Some(n) if n == block_address => blocks.last_mut().expect("last block must exist"),
_ => {
last_block_address = Some(block_address);
blocks.push(Block {
number: block_number,
timestamp,
solutions: vec![],
});
blocks.last_mut().expect("last block must exist")
}
};
let solution: Solution = decode(&solution_blob)?;
block.solutions.push(solution);
}
Ok(blocks)
}
pub fn list_contracts(
conn: &Connection,
block_range: Range<Word>,
) -> Result<Vec<(Word, Vec<Contract>)>, QueryError> {
let mut stmt = conn.prepare(sql::query::LIST_CONTRACTS)?;
let rows = stmt.query_map(
named_params! {
":start_block": block_range.start,
":end_block": block_range.end,
},
|row| {
let block_num: Word = row.get("l2_block_number")?;
let salt_blob: Vec<u8> = row.get("salt")?;
let contract_ca_blob: Vec<u8> = row.get("content_hash")?;
let pred_blob: Vec<u8> = row.get("predicate")?;
Ok((block_num, contract_ca_blob, salt_blob, pred_blob))
},
)?;
let mut blocks: Vec<(Word, Vec<Contract>)> = vec![];
let mut last_block_num: Option<Word> = None;
let mut last_contract_ca = None;
for res in rows {
let (l2_block_num, ca_blob, salt_blob, pred_blob): (Word, Vec<u8>, Vec<u8>, Vec<u8>) = res?;
let contract_ca: ContentAddress = decode(&ca_blob)?;
let salt: Hash = decode(&salt_blob)?;
let block = match last_block_num {
Some(n) if n == l2_block_num => blocks.last_mut().expect("block entry must exist"),
_ => {
last_block_num = Some(l2_block_num);
last_contract_ca = None;
blocks.push((l2_block_num, vec![]));
blocks.last_mut().expect("block entry must exist")
}
};
let contract = match last_contract_ca {
Some(ref ca) if ca == &contract_ca => block.1.last_mut().expect("entry must exist"),
_ => {
last_contract_ca = Some(contract_ca);
let predicates = vec![];
block.1.push(Contract { salt, predicates });
block.1.last_mut().expect("entry must exist")
}
};
let pred: Predicate = decode(&pred_blob)?;
contract.predicates.push(pred);
}
Ok(blocks)
}
pub fn list_failed_blocks(
conn: &Connection,
block_range: Range<Word>,
) -> Result<Vec<(Word, ContentAddress)>, QueryError> {
let mut stmt = conn.prepare(sql::query::LIST_FAILED_BLOCKS)?;
let rows = stmt.query_map(
named_params! {
":start_block": block_range.start,
":end_block": block_range.end,
},
|row| {
let block_number: Word = row.get("number")?;
let solution_hash: Hash = row.get("content_hash")?;
Ok((block_number, ContentAddress(solution_hash)))
},
)?;
let mut failed_blocks = vec![];
for res in rows {
let (block_number, solution_hash) = res?;
failed_blocks.push((block_number, solution_hash));
}
Ok(failed_blocks)
}
pub fn list_unchecked_blocks(
conn: &Connection,
block_range: Range<Word>,
) -> Result<Vec<Block>, QueryError> {
let mut stmt = conn.prepare(sql::query::LIST_UNCHECKED_BLOCKS)?;
let rows = stmt.query_map(
named_params! {
":start_block": block_range.start,
":end_block": block_range.end,
},
|row| {
let block_address: essential_types::Hash = row.get("block_address")?;
let block_number: Word = row.get("number")?;
let timestamp_secs: u64 = row.get("timestamp_secs")?;
let timestamp_nanos: u32 = row.get("timestamp_nanos")?;
let solution_blob: Vec<u8> = row.get("solution")?;
let timestamp = Duration::new(timestamp_secs, timestamp_nanos);
Ok((block_address, block_number, timestamp, solution_blob))
},
)?;
let mut blocks: Vec<Block> = vec![];
let mut last_block_address = None;
for res in rows {
let (block_address, block_number, timestamp, solution_blob): (
essential_types::Hash,
Word,
Duration,
Vec<u8>,
) = res?;
let block = match last_block_address {
Some(b) if b == block_address => blocks.last_mut().expect("last block must exist"),
_ => {
last_block_address = Some(block_address);
blocks.push(Block {
number: block_number,
timestamp,
solutions: vec![],
});
blocks.last_mut().expect("last block must exist")
}
};
let solution: Solution = decode(&solution_blob)?;
block.solutions.push(solution);
}
Ok(blocks)
}
pub fn subscribe_blocks(
start_block: Word,
acquire_conn: impl AcquireConnection,
await_new_block: impl AwaitNewBlock,
) -> impl Stream<Item = Result<Block, QueryError>> {
let init = (start_block, acquire_conn, await_new_block);
futures::stream::unfold(init, move |(block_ix, acq_conn, mut new_block)| {
let next_ix = block_ix + 1;
async move {
loop {
let conn = acq_conn.acquire_connection().await?;
let res = list_blocks(conn.as_ref(), block_ix..next_ix);
std::mem::drop(conn);
match res {
Err(err) => return Some((Err(err), (block_ix, acq_conn, new_block))),
Ok(mut vec) => match vec.pop() {
None => new_block.await_new_block().await?,
Some(block) => return Some((Ok(block), (next_ix, acq_conn, new_block))),
},
}
}
}
})
}