use error::{
BuildBlockError, CheckSolutionError, CheckSolutionsError, InvalidSolution,
LastBlockHeaderError, QueryPredicateError, SolutionPredicatesError,
};
use essential_builder_db::{self as builder_db};
use essential_builder_types::SolutionFailure;
use essential_check::{self as check, solution::CheckPredicateConfig, state_read_vm::Gas};
pub use essential_node as node;
use essential_node_db as node_db;
use essential_node_types::{block_state_solution, BigBang};
use essential_types::{
predicate::Predicate,
solution::{Solution, SolutionData},
Block, ContentAddress, PredicateAddress, Word,
};
use std::{collections::HashMap, num::NonZero, ops::Range, sync::Arc, time::Duration};
pub mod error;
pub mod state;
#[derive(Clone, Debug, Eq, Hash, PartialEq)]
pub struct Config {
pub solution_failures_to_keep: u32,
pub solution_attempts_per_block: NonZero<u32>,
pub parallel_chunk_size: NonZero<usize>,
pub check: Arc<CheckPredicateConfig>,
pub contract_registry: PredicateAddress,
pub block_state: PredicateAddress,
}
#[derive(Debug)]
pub struct SolutionsSummary {
pub succeeded: Vec<(ContentAddress, Gas)>,
pub failed: Vec<(ContentAddress, SolutionIndex, InvalidSolution)>,
}
pub type SolutionIndex = u32;
type BlockNum = i64;
impl Config {
pub const DEFAULT_SOLUTION_FAILURE_KEEP_LIMIT: u32 = 10_000;
pub const DEFAULT_SOLUTION_ATTEMPTS_PER_BLOCK: u32 = 10_000;
pub fn default_parallel_chunk_size() -> NonZero<usize> {
num_cpus::get()
.try_into()
.expect("`num_cpus::get()` must be non-zero")
}
}
impl Default for Config {
fn default() -> Self {
let big_bang = BigBang::default();
Self {
solution_failures_to_keep: Self::DEFAULT_SOLUTION_FAILURE_KEEP_LIMIT,
solution_attempts_per_block: Self::DEFAULT_SOLUTION_ATTEMPTS_PER_BLOCK
.try_into()
.expect("declared const must be non-zero"),
parallel_chunk_size: Self::default_parallel_chunk_size(),
contract_registry: big_bang.contract_registry,
block_state: big_bang.block_state,
check: Default::default(),
}
}
}
#[cfg_attr(feature = "tracing", tracing::instrument("build", skip_all))]
pub async fn build_block_fifo(
builder_conn_pool: &builder_db::ConnectionPool,
node_conn_pool: &node::db::ConnectionPool,
conf: &Config,
) -> Result<SolutionsSummary, BuildBlockError> {
let last_block_header_opt = node_conn_pool
.acquire_then(|h| last_block_header(h))
.await?;
let block_timestamp = std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.map_err(|_| BuildBlockError::TimestampNotMonotonic)?;
let block_number = match last_block_header_opt {
None => 0,
Some((last_block_num, last_block_ts)) => {
let block_num = last_block_num
.checked_add(1)
.ok_or(BuildBlockError::BlockNumberOutOfRange)?;
if block_timestamp <= last_block_ts {
return Err(BuildBlockError::TimestampNotMonotonic);
}
block_num
}
};
#[cfg(feature = "tracing")]
tracing::debug!("Building block {}", block_number);
let block_secs: Word = block_timestamp
.as_secs()
.try_into()
.map_err(|_| BuildBlockError::TimestampOutOfRange)?;
let sol_data = block_state_solution(conf.block_state.clone(), block_number, block_secs);
let solution = Solution {
data: vec![sol_data],
};
let ca = essential_hash::content_addr(&solution);
let mut solutions = vec![(ca, Arc::new(solution))];
const MAX_TIMESTAMP_RANGE: Range<Duration> =
Duration::from_secs(0)..Duration::from_secs(i64::MAX as _);
let limit = i64::from(u32::from(conf.solution_attempts_per_block));
solutions.extend(
builder_conn_pool
.list_solutions(MAX_TIMESTAMP_RANGE, limit)
.await?
.into_iter()
.map(|(ca, solution, _ts)| (ca, Arc::new(solution))),
);
let (solutions, summary) =
check_solutions(node_conn_pool.clone(), block_number, &solutions, conf).await?;
let block = Block {
number: block_number,
timestamp: block_timestamp,
solutions: solutions.into_iter().map(Arc::unwrap_or_clone).collect(),
};
let block_addr = essential_hash::content_addr(&block);
#[cfg(feature = "tracing")]
tracing::debug!(
"Built block {} with {} solutions at {:?}",
block_addr,
block.solutions.len(),
block.timestamp
);
if block.solutions.len() <= 1 {
#[cfg(feature = "tracing")]
tracing::debug!("Skipping empty block {}", block_addr);
} else {
node_conn_pool
.acquire_then(|conn| {
builder_db::with_tx(conn, move |tx| {
let block_ca = essential_hash::content_addr(&block);
node_db::insert_block(tx, &block)?;
node_db::finalize_block(tx, &block_ca)
})
})
.await?;
#[cfg(feature = "tracing")]
tracing::debug!("Committed and finalized block {}", block_addr);
}
let failures: Vec<_> = summary
.failed
.iter()
.map(|(ca, sol_ix, invalid)| {
let failure = SolutionFailure {
attempt_block_num: block_number,
attempt_block_addr: block_addr.clone(),
attempt_solution_ix: *sol_ix,
err_msg: format!("{invalid}").into(),
};
(ca.clone(), failure)
})
.collect();
let failures_to_keep = conf.solution_failures_to_keep;
let attempted: Vec<_> = summary
.succeeded
.iter()
.map(|(ca, _gas)| ca.clone())
.chain(summary.failed.iter().map(|(ca, _ix, _err)| ca.clone()))
.collect();
builder_conn_pool
.acquire_then(move |conn| {
builder_db::with_tx(conn, |tx| {
record_solution_failures(tx, failures, failures_to_keep)?;
builder_db::delete_solutions(tx, attempted)
})
})
.await?;
Ok(summary)
}
fn last_block_header(
conn: &rusqlite::Connection,
) -> Result<Option<(Word, Duration)>, LastBlockHeaderError> {
let block_ca = match node_db::get_latest_finalized_block_address(conn)? {
Some(ca) => ca,
None => return Ok(None),
};
let number = node_db::get_block_number(conn, &block_ca)?
.ok_or(LastBlockHeaderError::NoNumberForLastFinalizedBlock)?;
let timestamp = {
let range = number..number.saturating_add(2);
let blocks = node_db::list_blocks(conn, range)?;
let mut blocks: Vec<_> = blocks.into_iter().take(2).collect();
match blocks.len() {
1 => blocks.pop().expect("len is 1").timestamp,
_ => return Err(LastBlockHeaderError::NoTimestampForLastFinalizedBlock),
}
};
Ok(Some((number, timestamp)))
}
async fn check_solutions(
node_conn_pool: node::db::ConnectionPool,
block_num: BlockNum,
proposed_solutions: &[(ContentAddress, Arc<Solution>)],
conf: &Config,
) -> Result<(Vec<Arc<Solution>>, SolutionsSummary), CheckSolutionsError> {
let chunk_size = conf.parallel_chunk_size.into();
let mut solutions = vec![];
let mut succeeded = vec![];
let mut failed = vec![];
let mut mutations = state::Mutations::default();
let mut chunk_start = 0;
while chunk_start < proposed_solutions.len() {
let chunk_end = chunk_start
.saturating_add(chunk_size)
.min(proposed_solutions.len());
let range = chunk_start..chunk_end;
let chunk = &proposed_solutions[range.clone()];
mutations.extend(range.clone().zip(chunk.iter().map(|(_, s)| &**s)));
let mutations_arc = Arc::new(std::mem::take(&mut mutations));
let results = check_solution_chunk(
&node_conn_pool,
block_num,
&mutations_arc,
range.clone().zip(chunk.iter().map(|(_, s)| s.clone())),
&conf.contract_registry.contract,
&conf.check,
)
.await?;
debug_assert_eq!(
Arc::strong_count(&mutations_arc),
1,
"`Arc<Mutations>` not unique"
);
mutations = Arc::unwrap_or_clone(mutations_arc);
for (sol_ix, (res, (sol_ca, sol))) in range.zip(results.into_iter().zip(chunk)) {
chunk_start += 1;
match res {
Ok(gas) => {
succeeded.push((sol_ca.clone(), gas));
solutions.push(sol.clone());
#[cfg(feature = "tracing")]
tracing::trace!("Solution check success {}", sol_ca);
}
Err(invalid) => {
mutations.remove_solution(sol_ix);
let sol_ix: u32 = sol_ix.try_into().expect("`u32::MAX` below solution limit");
failed.push((sol_ca.clone(), sol_ix, invalid));
#[cfg(feature = "tracing")]
tracing::trace!("Solution check failure {}", sol_ca);
break;
}
}
}
}
let summary = SolutionsSummary { succeeded, failed };
Ok((solutions, summary))
}
async fn check_solution_chunk(
node_conn_pool: &node::db::ConnectionPool,
block_num: BlockNum,
proposed_mutations: &Arc<state::Mutations>,
chunk: impl IntoIterator<Item = (usize, Arc<Solution>)>,
contract_registry: &ContentAddress,
check_conf: &Arc<check::solution::CheckPredicateConfig>,
) -> Result<Vec<Result<Gas, InvalidSolution>>, CheckSolutionsError> {
let checks: tokio::task::JoinSet<_> = chunk
.into_iter()
.map(move |(sol_ix, solution)| {
let mutations = proposed_mutations.clone();
let conn_pool = node_conn_pool.clone();
let check_conf = check_conf.clone();
let registry = contract_registry.clone();
let (pre, post) = state::pre_and_post_view(conn_pool, mutations, block_num, sol_ix);
async move {
let res = check_solution(solution.clone(), pre, post, ®istry, check_conf).await;
(sol_ix, res)
}
})
.collect();
let mut results = checks.join_all().await;
results.sort_by_key(|&(ix, _)| ix);
results
.into_iter()
.map(|(_ix, res)| res.map_err(CheckSolutionsError::CheckSolution))
.collect()
}
async fn check_solution(
solution: Arc<Solution>,
pre_state: state::View,
post_state: state::View,
contract_registry: &ContentAddress,
check_conf: Arc<CheckPredicateConfig>,
) -> Result<Result<Gas, InvalidSolution>, CheckSolutionError> {
let predicates =
match get_solution_predicates(contract_registry, &post_state, &solution.data).await {
Ok(predicates) => predicates,
Err(SolutionPredicatesError::PredicateDoesNotExist(ca)) => {
return Ok(Err(InvalidSolution::PredicateDoesNotExist(ca)));
}
Err(SolutionPredicatesError::QueryPredicate(err)) => match err {
QueryPredicateError::Decode(_)
| QueryPredicateError::MissingLenBytes
| QueryPredicateError::InvalidLenBytes => {
return Ok(Err(InvalidSolution::PredicateInvalid));
}
QueryPredicateError::ConnPoolQuery(err) => {
return Err(CheckSolutionError::NodeQuery(err))
}
},
};
match check::solution::check_predicates(
&pre_state,
&post_state,
solution.clone(),
|addr: &PredicateAddress| predicates[&addr.predicate].clone(),
check_conf.clone(),
)
.await
{
Err(err) => Ok(Err(InvalidSolution::Predicates(err))),
Ok(gas) => Ok(Ok(gas)),
}
}
async fn get_solution_predicates(
contract_registry: &ContentAddress,
view: &state::View,
solution_data: &[SolutionData],
) -> Result<HashMap<ContentAddress, Arc<Predicate>>, SolutionPredicatesError> {
let queries: tokio::task::JoinSet<_> = solution_data
.iter()
.map(|data| data.predicate_to_solve.clone())
.enumerate()
.map(move |(ix, pred_addr)| {
let view = view.clone();
let registry = contract_registry.clone();
async move {
let pred = view.get_predicate(registry, &pred_addr).await;
(ix, pred)
}
})
.collect();
let mut map = HashMap::new();
let mut results = queries.join_all().await;
results.sort_by_key(|(ix, _)| *ix);
for (data, (_ix, res)) in solution_data.iter().zip(results) {
let ca = data.predicate_to_solve.predicate.clone();
let predicate =
res?.ok_or_else(|| SolutionPredicatesError::PredicateDoesNotExist(ca.clone()))?;
map.insert(ca, Arc::new(predicate));
}
Ok(map)
}
fn record_solution_failures(
builder_tx: &mut rusqlite::Transaction,
failures: Vec<(ContentAddress, SolutionFailure)>,
failures_to_keep: u32,
) -> rusqlite::Result<()> {
if failures.is_empty() {
return Ok(());
}
for (ca, failure) in failures {
builder_db::insert_solution_failure(builder_tx, &ca, failure)?;
}
builder_db::delete_oldest_solution_failures(builder_tx, failures_to_keep)
}