use error::{
BuildBlockError, CheckSetError, CheckSetsError, InvalidSet, LastBlockHeaderError,
PredicateProgramsError, QueryPredicateError, QueryProgramError, SetPredicatesError,
};
use essential_builder_db::{self as builder_db};
use essential_builder_types::SolutionSetFailure;
use essential_check::{self as check, solution::CheckPredicateConfig, vm::Gas};
pub use essential_node as node;
use essential_node_db as node_db;
use essential_node_types::{block_state_solution, BigBang, Block, BlockHeader};
use essential_types::{
predicate::Predicate,
solution::{Solution, SolutionSet},
ContentAddress, PredicateAddress, Program, Word,
};
use std::{collections::HashMap, num::NonZero, ops::Range, sync::Arc, time::Duration};
pub mod error;
pub mod state;
#[derive(Clone, Debug, Eq, Hash, PartialEq)]
pub struct Config {
pub solution_set_failures_to_keep: u32,
pub solution_set_attempts_per_block: NonZero<u32>,
pub parallel_chunk_size: NonZero<usize>,
pub check: Arc<CheckPredicateConfig>,
pub contract_registry: PredicateAddress,
pub program_registry: PredicateAddress,
pub block_state: PredicateAddress,
}
#[derive(Debug)]
pub struct SolutionSetsSummary {
pub succeeded: Vec<(ContentAddress, Gas)>,
pub failed: Vec<(ContentAddress, SolutionSetIndex, InvalidSet)>,
}
pub type SolutionSetIndex = u32;
type BlockNum = i64;
impl Config {
pub const DEFAULT_SOLUTION_SET_FAILURE_KEEP_LIMIT: u32 = 10_000;
pub const DEFAULT_SOLUTION_SET_ATTEMPTS_PER_BLOCK: u32 = 10_000;
pub fn default_parallel_chunk_size() -> NonZero<usize> {
num_cpus::get()
.try_into()
.expect("`num_cpus::get()` must be non-zero")
}
}
impl Default for Config {
fn default() -> Self {
let big_bang = BigBang::default();
Self {
solution_set_failures_to_keep: Self::DEFAULT_SOLUTION_SET_FAILURE_KEEP_LIMIT,
solution_set_attempts_per_block: Self::DEFAULT_SOLUTION_SET_ATTEMPTS_PER_BLOCK
.try_into()
.expect("declared const must be non-zero"),
parallel_chunk_size: Self::default_parallel_chunk_size(),
contract_registry: big_bang.contract_registry,
program_registry: big_bang.program_registry,
block_state: big_bang.block_state,
check: Default::default(),
}
}
}
#[cfg_attr(feature = "tracing", tracing::instrument("build", skip_all))]
pub async fn build_block_fifo(
builder_conn_pool: &builder_db::ConnectionPool,
node_conn_pool: &node::db::ConnectionPool,
conf: &Config,
) -> Result<(Option<ContentAddress>, SolutionSetsSummary), BuildBlockError> {
let last_block_header_opt = node_conn_pool
.acquire_then(|h| last_block_header(h))
.await?;
let block_timestamp = std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.map_err(|_| BuildBlockError::TimestampNotMonotonic)?;
let block_number = match last_block_header_opt {
None => 0,
Some(BlockHeader {
number: last_block_num,
timestamp: last_block_ts,
}) => {
let block_num = last_block_num
.checked_add(1)
.ok_or(BuildBlockError::BlockNumberOutOfRange)?;
if block_timestamp <= last_block_ts {
return Err(BuildBlockError::TimestampNotMonotonic);
}
block_num
}
};
#[cfg(feature = "tracing")]
tracing::debug!("Building block {}", block_number);
let block_secs: Word = block_timestamp
.as_secs()
.try_into()
.map_err(|_| BuildBlockError::TimestampOutOfRange)?;
let solution = block_state_solution(conf.block_state.clone(), block_number, block_secs);
let solution_set = SolutionSet {
solutions: vec![solution],
};
let ca = essential_hash::content_addr(&solution_set);
let mut solution_sets = vec![(ca, Arc::new(solution_set))];
const MAX_TIMESTAMP_RANGE: Range<Duration> =
Duration::from_secs(0)..Duration::from_secs(i64::MAX as _);
let limit = i64::from(u32::from(conf.solution_set_attempts_per_block));
solution_sets.extend(
builder_conn_pool
.list_solution_sets(MAX_TIMESTAMP_RANGE, limit)
.await?
.into_iter()
.map(|(ca, solution_set, _ts)| (ca, Arc::new(solution_set))),
);
let (solution_sets, summary) =
check_solution_sets(node_conn_pool.clone(), block_number, &solution_sets, conf).await?;
let block = Block {
header: BlockHeader {
number: block_number,
timestamp: block_timestamp,
},
solution_sets: solution_sets
.into_iter()
.map(Arc::unwrap_or_clone)
.collect(),
};
let block_addr = essential_hash::content_addr(&block);
#[cfg(feature = "tracing")]
tracing::debug!(
"Built block {} with {} solution sets at {:?}",
block_addr,
block.solution_sets.len(),
block.header.timestamp
);
let skip_block = block.solution_sets.len() <= 1;
if skip_block {
#[cfg(feature = "tracing")]
tracing::debug!("Skipping empty block {}", block_addr);
} else {
node_conn_pool
.acquire_then(|conn| {
builder_db::with_tx(conn, move |tx| {
let block_ca = essential_hash::content_addr(&block);
node_db::insert_block(tx, &block)?;
node_db::finalize_block(tx, &block_ca)
})
})
.await?;
#[cfg(feature = "tracing")]
tracing::debug!("Committed and finalized block {}", block_addr);
}
let failures: Vec<_> = summary
.failed
.iter()
.map(|(ca, set_ix, invalid)| {
let failure = SolutionSetFailure {
attempt_block_num: block_number,
attempt_block_addr: block_addr.clone(),
attempt_solution_set_ix: *set_ix,
err_msg: format!("{invalid}").into(),
};
(ca.clone(), failure)
})
.collect();
let failures_to_keep = conf.solution_set_failures_to_keep;
let attempted: Vec<_> = summary
.succeeded
.iter()
.map(|(ca, _gas)| ca.clone())
.chain(summary.failed.iter().map(|(ca, _ix, _err)| ca.clone()))
.collect();
builder_conn_pool
.acquire_then(move |conn| {
builder_db::with_tx(conn, |tx| {
record_solution_set_failures(tx, failures, failures_to_keep)?;
builder_db::delete_solution_sets(tx, attempted)
})
})
.await?;
let block_addr = if skip_block { None } else { Some(block_addr) };
Ok((block_addr, summary))
}
fn last_block_header(
conn: &rusqlite::Connection,
) -> Result<Option<BlockHeader>, LastBlockHeaderError> {
let block_ca = match node_db::get_latest_finalized_block_address(conn)? {
Some(ca) => ca,
None => return Ok(None),
};
let header = node_db::get_block_header(conn, &block_ca)?
.ok_or(LastBlockHeaderError::NoNumberForLastFinalizedBlock)?;
Ok(Some(header))
}
async fn check_solution_sets(
node_conn_pool: node::db::ConnectionPool,
block_num: BlockNum,
proposed_solution_sets: &[(ContentAddress, Arc<SolutionSet>)],
conf: &Config,
) -> Result<(Vec<Arc<SolutionSet>>, SolutionSetsSummary), CheckSetsError> {
let chunk_size = conf.parallel_chunk_size.into();
let mut solution_sets = vec![];
let mut succeeded = vec![];
let mut failed = vec![];
let mut mutations = state::Mutations::default();
let mut chunk_start = 0;
while chunk_start < proposed_solution_sets.len() {
let chunk_end = chunk_start
.saturating_add(chunk_size)
.min(proposed_solution_sets.len());
let range = chunk_start..chunk_end;
let chunk = &proposed_solution_sets[range.clone()];
mutations.extend(range.clone().zip(chunk.iter().map(|(_, s)| &**s)));
let mutations_arc = Arc::new(std::mem::take(&mut mutations));
let results = check_solution_set_chunk(
&node_conn_pool,
block_num,
&mutations_arc,
range.clone().zip(chunk.iter().map(|(_, s)| s.clone())),
&conf.contract_registry.contract,
&conf.program_registry.contract,
&conf.check,
)
.await?;
debug_assert_eq!(
Arc::strong_count(&mutations_arc),
1,
"`Arc<Mutations>` not unique"
);
mutations = Arc::unwrap_or_clone(mutations_arc);
for (set_ix, (res, (set_ca, set))) in range.zip(results.into_iter().zip(chunk)) {
chunk_start += 1;
match res {
Ok(gas) => {
succeeded.push((set_ca.clone(), gas));
solution_sets.push(set.clone());
#[cfg(feature = "tracing")]
tracing::trace!("Solution set check success {}", set_ca);
}
Err(invalid) => {
mutations.remove_solution_set(set_ix);
let set_ix: u32 = set_ix
.try_into()
.expect("`u32::MAX` below solution set limit");
failed.push((set_ca.clone(), set_ix, invalid));
#[cfg(feature = "tracing")]
tracing::trace!("Solution set check failure {}", set_ca);
break;
}
}
}
}
let summary = SolutionSetsSummary { succeeded, failed };
Ok((solution_sets, summary))
}
async fn check_solution_set_chunk(
node_conn_pool: &node::db::ConnectionPool,
block_num: BlockNum,
proposed_mutations: &Arc<state::Mutations>,
chunk: impl IntoIterator<Item = (usize, Arc<SolutionSet>)>,
contract_registry: &ContentAddress,
program_registry: &ContentAddress,
check_conf: &Arc<check::solution::CheckPredicateConfig>,
) -> Result<Vec<Result<Gas, InvalidSet>>, CheckSetsError> {
let checks: tokio::task::JoinSet<_> = chunk
.into_iter()
.map(move |(set_ix, set)| {
let mutations = proposed_mutations.clone();
let conn_pool = node_conn_pool.clone();
let check_conf = check_conf.clone();
let contract_registry = contract_registry.clone();
let program_registry = program_registry.clone();
let (pre, post) = state::pre_and_post_view(conn_pool, mutations, block_num, set_ix);
async move {
let res = check_set(
set.clone(),
pre,
post,
&contract_registry,
&program_registry,
check_conf,
)
.await;
(set_ix, res)
}
})
.collect();
let mut results = checks.join_all().await;
results.sort_by_key(|&(ix, _)| ix);
results
.into_iter()
.map(|(_ix, res)| res.map_err(CheckSetsError::CheckSolution))
.collect()
}
async fn check_set(
solution_set: Arc<SolutionSet>,
pre_state: state::View,
post_state: state::View,
contract_registry: &ContentAddress,
program_registry: &ContentAddress,
check_conf: Arc<CheckPredicateConfig>,
) -> Result<Result<Gas, InvalidSet>, CheckSetError> {
let predicates =
match get_solution_set_predicates(contract_registry, &post_state, &solution_set.solutions)
.await
{
Ok(predicates) => predicates,
Err(SetPredicatesError::PredicateDoesNotExist(ca)) => {
return Ok(Err(InvalidSet::PredicateDoesNotExist(ca)));
}
Err(SetPredicatesError::QueryPredicate(err)) => match err {
QueryPredicateError::Decode(_)
| QueryPredicateError::MissingLenBytes
| QueryPredicateError::InvalidLenBytes => {
return Ok(Err(InvalidSet::PredicateInvalid));
}
QueryPredicateError::ConnPoolQuery(err) => {
return Err(CheckSetError::NodeQuery(err))
}
},
};
let programs = match get_predicates_programs(program_registry, &post_state, &predicates).await {
Ok(programs) => programs,
Err(PredicateProgramsError::ProgramDoesNotExist(ca)) => {
return Ok(Err(InvalidSet::ProgramDoesNotExist(ca)));
}
Err(PredicateProgramsError::QueryProgram(err)) => match err {
QueryProgramError::MissingLenBytes | QueryProgramError::InvalidLenBytes => {
return Ok(Err(InvalidSet::ProgramInvalid));
}
QueryProgramError::ConnPoolQuery(err) => return Err(CheckSetError::NodeQuery(err)),
},
};
let get_predicate = move |addr: &PredicateAddress| {
predicates
.get(&addr.predicate)
.cloned()
.expect("predicate must have been fetched in the previous step")
};
let get_program = move |addr: &ContentAddress| {
programs
.get(addr)
.cloned()
.expect("program must have been fetched in the previous step")
};
match check::solution::check_set_predicates(
&pre_state,
&post_state,
solution_set.clone(),
get_predicate,
get_program,
check_conf.clone(),
)
.await
{
Err(err) => Ok(Err(InvalidSet::Predicates(err))),
Ok(gas) => Ok(Ok(gas)),
}
}
async fn get_solution_set_predicates(
contract_registry: &ContentAddress,
view: &state::View,
solutions: &[Solution],
) -> Result<HashMap<ContentAddress, Arc<Predicate>>, SetPredicatesError> {
let queries: tokio::task::JoinSet<_> = solutions
.iter()
.map(|solution| solution.predicate_to_solve.clone())
.enumerate()
.map(move |(ix, pred_addr)| {
let view = view.clone();
let registry = contract_registry.clone();
async move {
let pred = view.get_predicate(registry, &pred_addr).await;
(ix, pred)
}
})
.collect();
let mut map = HashMap::new();
let mut results = queries.join_all().await;
results.sort_by_key(|(ix, _)| *ix);
for (sol, (_ix, res)) in solutions.iter().zip(results) {
let ca = sol.predicate_to_solve.predicate.clone();
let predicate =
res?.ok_or_else(|| SetPredicatesError::PredicateDoesNotExist(ca.clone()))?;
map.insert(ca, Arc::new(predicate));
}
Ok(map)
}
async fn get_predicates_programs(
program_registry: &ContentAddress,
view: &state::View,
predicates: &HashMap<ContentAddress, Arc<Predicate>>,
) -> Result<HashMap<ContentAddress, Arc<Program>>, PredicateProgramsError> {
let queries: tokio::task::JoinSet<_> = predicates
.iter()
.flat_map(|(_, pred)| {
pred.nodes
.iter()
.map(|node| node.program_address.clone())
.enumerate()
.map(move |(ix, prog_addr)| {
let view = view.clone();
let registry = program_registry.clone();
async move {
let prog = view.get_program(registry, &prog_addr).await;
(ix, prog)
}
})
})
.collect();
let mut map = HashMap::new();
let mut results = queries.join_all().await;
results.sort_by_key(|(ix, _)| *ix);
for (node, (_ix, res)) in predicates
.iter()
.flat_map(|(_, pred)| pred.nodes.iter())
.zip(results)
{
let ca = node.program_address.clone();
let program =
res?.ok_or_else(|| PredicateProgramsError::ProgramDoesNotExist(ca.clone()))?;
map.insert(ca, Arc::new(program));
}
Ok(map)
}
fn record_solution_set_failures(
builder_tx: &mut rusqlite::Transaction,
failures: Vec<(ContentAddress, SolutionSetFailure)>,
failures_to_keep: u32,
) -> rusqlite::Result<()> {
if failures.is_empty() {
return Ok(());
}
for (ca, failure) in failures {
builder_db::insert_solution_set_failure(builder_tx, &ca, failure)?;
}
builder_db::delete_oldest_solution_set_failures(builder_tx, failures_to_keep)
}