1use error::{
6 BuildBlockError, CheckSetError, CheckSetsError, InvalidSet, LastBlockHeaderError,
7 PredicateProgramsError, QueryPredicateError, QueryProgramError, SetPredicatesError,
8};
9use essential_builder_db::{self as builder_db};
10use essential_builder_types::SolutionSetFailure;
11use essential_check::{self as check, solution::CheckPredicateConfig, vm::Gas};
12pub use essential_node as node;
13use essential_node_db as node_db;
14use essential_node_types::{block_state_solution, BigBang, Block, BlockHeader};
15use essential_types::{
16 predicate::Predicate,
17 solution::{Solution, SolutionSet},
18 ContentAddress, PredicateAddress, Program, Word,
19};
20use std::{collections::HashMap, num::NonZero, ops::Range, sync::Arc, time::Duration};
21
22pub mod error;
23pub mod state;
24
25#[derive(Clone, Debug, Eq, Hash, PartialEq)]
27pub struct Config {
28 pub solution_set_failures_to_keep: u32,
33 pub solution_set_attempts_per_block: NonZero<u32>,
37 pub parallel_chunk_size: NonZero<usize>,
44 pub check: Arc<CheckPredicateConfig>,
48 pub contract_registry: PredicateAddress,
50 pub program_registry: PredicateAddress,
52 pub block_state: PredicateAddress,
54}
55
56#[derive(Debug)]
58pub struct SolutionSetsSummary {
59 pub succeeded: Vec<(ContentAddress, Gas)>,
61 pub failed: Vec<(ContentAddress, SolutionSetIndex, InvalidSet)>,
63}
64
65pub type SolutionSetIndex = u32;
67
68type BlockNum = i64;
69
70impl Config {
71 pub const DEFAULT_SOLUTION_SET_FAILURE_KEEP_LIMIT: u32 = 10_000;
73 pub const DEFAULT_SOLUTION_SET_ATTEMPTS_PER_BLOCK: u32 = 10_000;
75
76 pub fn default_parallel_chunk_size() -> NonZero<usize> {
78 num_cpus::get()
79 .try_into()
80 .expect("`num_cpus::get()` must be non-zero")
81 }
82}
83
84impl Default for Config {
85 fn default() -> Self {
86 let big_bang = BigBang::default();
87 Self {
88 solution_set_failures_to_keep: Self::DEFAULT_SOLUTION_SET_FAILURE_KEEP_LIMIT,
89 solution_set_attempts_per_block: Self::DEFAULT_SOLUTION_SET_ATTEMPTS_PER_BLOCK
90 .try_into()
91 .expect("declared const must be non-zero"),
92 parallel_chunk_size: Self::default_parallel_chunk_size(),
93 contract_registry: big_bang.contract_registry,
94 program_registry: big_bang.program_registry,
95 block_state: big_bang.block_state,
96 check: Default::default(),
97 }
98 }
99}
100
101#[cfg_attr(feature = "tracing", tracing::instrument("build", skip_all))]
134pub async fn build_block_fifo(
135 builder_conn_pool: &builder_db::ConnectionPool,
136 node_conn_pool: &node::db::ConnectionPool,
137 conf: &Config,
138) -> Result<(Option<ContentAddress>, SolutionSetsSummary), BuildBlockError> {
139 let last_block_header_opt = node_conn_pool
141 .acquire_then(|h| last_block_header(h))
142 .await?;
143
144 let block_timestamp = std::time::SystemTime::now()
146 .duration_since(std::time::UNIX_EPOCH)
147 .map_err(|_| BuildBlockError::TimestampNotMonotonic)?;
148
149 let block_number = match last_block_header_opt {
151 None => 0,
152 Some(BlockHeader {
153 number: last_block_num,
154 timestamp: last_block_ts,
155 }) => {
156 let block_num = last_block_num
157 .checked_add(1)
158 .ok_or(BuildBlockError::BlockNumberOutOfRange)?;
159 if block_timestamp <= last_block_ts {
160 return Err(BuildBlockError::TimestampNotMonotonic);
161 }
162 block_num
163 }
164 };
165
166 #[cfg(feature = "tracing")]
167 tracing::debug!("Building block {}", block_number);
168
169 let block_secs: Word = block_timestamp
172 .as_secs()
173 .try_into()
174 .map_err(|_| BuildBlockError::TimestampOutOfRange)?;
175 let solution = block_state_solution(conf.block_state.clone(), block_number, block_secs);
176 let solution_set = SolutionSet {
177 solutions: vec![solution],
178 };
179 let ca = essential_hash::content_addr(&solution_set);
180 let mut solution_sets = vec![(ca, Arc::new(solution_set))];
181
182 const MAX_TIMESTAMP_RANGE: Range<Duration> =
184 Duration::from_secs(0)..Duration::from_secs(i64::MAX as _);
185 let limit = i64::from(u32::from(conf.solution_set_attempts_per_block));
186 solution_sets.extend(
187 builder_conn_pool
188 .list_solution_sets(MAX_TIMESTAMP_RANGE, limit)
189 .await?
190 .into_iter()
191 .map(|(ca, solution_set, _ts)| (ca, Arc::new(solution_set))),
192 );
193
194 let (solution_sets, summary) =
196 check_solution_sets(node_conn_pool.clone(), block_number, &solution_sets, conf).await?;
197
198 let block = Block {
200 header: BlockHeader {
201 number: block_number,
202 timestamp: block_timestamp,
203 },
204 solution_sets: solution_sets
205 .into_iter()
206 .map(Arc::unwrap_or_clone)
207 .collect(),
208 };
209 let block_addr = essential_hash::content_addr(&block);
210 #[cfg(feature = "tracing")]
211 tracing::debug!(
212 "Built block {} with {} solution sets at {:?}",
213 block_addr,
214 block.solution_sets.len(),
215 block.header.timestamp
216 );
217
218 let skip_block = block.solution_sets.len() <= 1;
222 if skip_block {
223 #[cfg(feature = "tracing")]
224 tracing::debug!("Skipping empty block {}", block_addr);
225
226 } else {
228 node_conn_pool
231 .acquire_then(|conn| {
232 builder_db::with_tx(conn, move |tx| {
233 let block_ca = essential_hash::content_addr(&block);
234 node_db::insert_block(tx, &block)?;
235 node_db::finalize_block(tx, &block_ca)
236 })
237 })
238 .await?;
239 #[cfg(feature = "tracing")]
240 tracing::debug!("Committed and finalized block {}", block_addr);
241 }
242
243 let failures: Vec<_> = summary
245 .failed
246 .iter()
247 .map(|(ca, set_ix, invalid)| {
248 let failure = SolutionSetFailure {
249 attempt_block_num: block_number,
250 attempt_block_addr: block_addr.clone(),
251 attempt_solution_set_ix: *set_ix,
252 err_msg: format!("{invalid}").into(),
253 };
254 (ca.clone(), failure)
255 })
256 .collect();
257 let failures_to_keep = conf.solution_set_failures_to_keep;
258
259 let attempted: Vec<_> = summary
261 .succeeded
262 .iter()
263 .map(|(ca, _gas)| ca.clone())
264 .chain(summary.failed.iter().map(|(ca, _ix, _err)| ca.clone()))
265 .collect();
266
267 builder_conn_pool
268 .acquire_then(move |conn| {
269 builder_db::with_tx(conn, |tx| {
270 record_solution_set_failures(tx, failures, failures_to_keep)?;
271 builder_db::delete_solution_sets(tx, attempted)
272 })
273 })
274 .await?;
275
276 let block_addr = if skip_block { None } else { Some(block_addr) };
277 Ok((block_addr, summary))
278}
279
280fn last_block_header(
284 conn: &rusqlite::Connection,
285) -> Result<Option<BlockHeader>, LastBlockHeaderError> {
286 let block_ca = match node_db::get_latest_finalized_block_address(conn)? {
288 Some(ca) => ca,
289 None => return Ok(None),
290 };
291
292 let header = node_db::get_block_header(conn, &block_ca)?
294 .ok_or(LastBlockHeaderError::NoNumberForLastFinalizedBlock)?;
295
296 Ok(Some(header))
297}
298
299async fn check_solution_sets(
306 node_conn_pool: node::db::ConnectionPool,
307 block_num: BlockNum,
308 proposed_solution_sets: &[(ContentAddress, Arc<SolutionSet>)],
309 conf: &Config,
310) -> Result<(Vec<Arc<SolutionSet>>, SolutionSetsSummary), CheckSetsError> {
311 let chunk_size = conf.parallel_chunk_size.into();
312 let mut solution_sets = vec![];
313 let mut succeeded = vec![];
314 let mut failed = vec![];
315 let mut mutations = state::Mutations::default();
316
317 let mut chunk_start = 0;
320 while chunk_start < proposed_solution_sets.len() {
321 let chunk_end = chunk_start
323 .saturating_add(chunk_size)
324 .min(proposed_solution_sets.len());
325 let range = chunk_start..chunk_end;
326 let chunk = &proposed_solution_sets[range.clone()];
327
328 mutations.extend(range.clone().zip(chunk.iter().map(|(_, s)| &**s)));
330 let mutations_arc = Arc::new(std::mem::take(&mut mutations));
332
333 let results = check_solution_set_chunk(
335 &node_conn_pool,
336 block_num,
337 &mutations_arc,
338 range.clone().zip(chunk.iter().map(|(_, s)| s.clone())),
339 &conf.contract_registry.contract,
340 &conf.program_registry.contract,
341 &conf.check,
342 )
343 .await?;
344
345 debug_assert_eq!(
348 Arc::strong_count(&mutations_arc),
349 1,
350 "`Arc<Mutations>` not unique"
351 );
352 mutations = Arc::unwrap_or_clone(mutations_arc);
353
354 for (set_ix, (res, (set_ca, set))) in range.zip(results.into_iter().zip(chunk)) {
356 chunk_start += 1;
357 match res {
358 Ok(gas) => {
359 succeeded.push((set_ca.clone(), gas));
360 solution_sets.push(set.clone());
361 #[cfg(feature = "tracing")]
362 tracing::trace!("Solution set check success {}", set_ca);
363 }
364 Err(invalid) => {
366 mutations.remove_solution_set(set_ix);
367 let set_ix: u32 = set_ix
368 .try_into()
369 .expect("`u32::MAX` below solution set limit");
370 failed.push((set_ca.clone(), set_ix, invalid));
371 #[cfg(feature = "tracing")]
372 tracing::trace!("Solution set check failure {}", set_ca);
373 break;
374 }
375 }
376 }
377 }
378 let summary = SolutionSetsSummary { succeeded, failed };
379 Ok((solution_sets, summary))
380}
381
382async fn check_solution_set_chunk(
384 node_conn_pool: &node::db::ConnectionPool,
385 block_num: BlockNum,
386 proposed_mutations: &Arc<state::Mutations>,
387 chunk: impl IntoIterator<Item = (usize, Arc<SolutionSet>)>,
388 contract_registry: &ContentAddress,
389 program_registry: &ContentAddress,
390 check_conf: &Arc<check::solution::CheckPredicateConfig>,
391) -> Result<Vec<Result<Gas, InvalidSet>>, CheckSetsError> {
392 let checks: tokio::task::JoinSet<_> = chunk
394 .into_iter()
395 .map(move |(set_ix, set)| {
396 let mutations = proposed_mutations.clone();
397 let conn_pool = node_conn_pool.clone();
398 let check_conf = check_conf.clone();
399 let contract_registry = contract_registry.clone();
400 let program_registry = program_registry.clone();
401 let (pre, post) = state::pre_and_post_view(conn_pool, mutations, block_num, set_ix);
402 async move {
403 let res = check_set(
404 set.clone(),
405 pre,
406 post,
407 &contract_registry,
408 &program_registry,
409 check_conf,
410 )
411 .await;
412 (set_ix, res)
413 }
414 })
415 .collect();
416
417 let mut results = checks.join_all().await;
419 results.sort_by_key(|&(ix, _)| ix);
420 results
421 .into_iter()
422 .map(|(_ix, res)| res.map_err(CheckSetsError::CheckSolution))
423 .collect()
424}
425
426async fn check_set(
430 solution_set: Arc<SolutionSet>,
431 pre_state: state::View,
432 post_state: state::View,
433 contract_registry: &ContentAddress,
434 program_registry: &ContentAddress,
435 check_conf: Arc<CheckPredicateConfig>,
436) -> Result<Result<Gas, InvalidSet>, CheckSetError> {
437 let predicates =
441 match get_solution_set_predicates(contract_registry, &post_state, &solution_set.solutions)
442 .await
443 {
444 Ok(predicates) => predicates,
445 Err(SetPredicatesError::PredicateDoesNotExist(ca)) => {
446 return Ok(Err(InvalidSet::PredicateDoesNotExist(ca)));
447 }
448 Err(SetPredicatesError::QueryPredicate(err)) => match err {
449 QueryPredicateError::Decode(_)
450 | QueryPredicateError::MissingLenBytes
451 | QueryPredicateError::InvalidLenBytes => {
452 return Ok(Err(InvalidSet::PredicateInvalid));
453 }
454 QueryPredicateError::ConnPoolQuery(err) => {
455 return Err(CheckSetError::NodeQuery(err))
456 }
457 },
458 };
459
460 let programs = match get_predicates_programs(program_registry, &post_state, &predicates).await {
462 Ok(programs) => programs,
463 Err(PredicateProgramsError::ProgramDoesNotExist(ca)) => {
464 return Ok(Err(InvalidSet::ProgramDoesNotExist(ca)));
465 }
466 Err(PredicateProgramsError::QueryProgram(err)) => match err {
467 QueryProgramError::MissingLenBytes | QueryProgramError::InvalidLenBytes => {
468 return Ok(Err(InvalidSet::ProgramInvalid));
469 }
470 QueryProgramError::ConnPoolQuery(err) => return Err(CheckSetError::NodeQuery(err)),
471 },
472 };
473
474 let get_predicate = move |addr: &PredicateAddress| {
475 predicates
476 .get(&addr.predicate)
477 .cloned()
478 .expect("predicate must have been fetched in the previous step")
479 };
480
481 let get_program = move |addr: &ContentAddress| {
482 programs
483 .get(addr)
484 .cloned()
485 .expect("program must have been fetched in the previous step")
486 };
487
488 match check::solution::check_set_predicates(
490 &pre_state,
491 &post_state,
492 solution_set.clone(),
493 get_predicate,
494 get_program,
495 check_conf.clone(),
496 )
497 .await
498 {
499 Err(err) => Ok(Err(InvalidSet::Predicates(err))),
500 Ok(gas) => Ok(Ok(gas)),
501 }
502}
503
504async fn get_solution_set_predicates(
506 contract_registry: &ContentAddress,
507 view: &state::View,
508 solutions: &[Solution],
509) -> Result<HashMap<ContentAddress, Arc<Predicate>>, SetPredicatesError> {
510 let queries: tokio::task::JoinSet<_> = solutions
512 .iter()
513 .map(|solution| solution.predicate_to_solve.clone())
514 .enumerate()
515 .map(move |(ix, pred_addr)| {
516 let view = view.clone();
517 let registry = contract_registry.clone();
518 async move {
519 let pred = view.get_predicate(registry, &pred_addr).await;
520 (ix, pred)
521 }
522 })
523 .collect();
524
525 let mut map = HashMap::new();
527 let mut results = queries.join_all().await;
528 results.sort_by_key(|(ix, _)| *ix);
529 for (sol, (_ix, res)) in solutions.iter().zip(results) {
530 let ca = sol.predicate_to_solve.predicate.clone();
531 let predicate =
532 res?.ok_or_else(|| SetPredicatesError::PredicateDoesNotExist(ca.clone()))?;
533 map.insert(ca, Arc::new(predicate));
534 }
535
536 Ok(map)
537}
538
539async fn get_predicates_programs(
541 program_registry: &ContentAddress,
542 view: &state::View,
543 predicates: &HashMap<ContentAddress, Arc<Predicate>>,
544) -> Result<HashMap<ContentAddress, Arc<Program>>, PredicateProgramsError> {
545 let queries: tokio::task::JoinSet<_> = predicates
547 .iter()
548 .flat_map(|(_, pred)| {
549 pred.nodes
550 .iter()
551 .map(|node| node.program_address.clone())
552 .enumerate()
553 .map(move |(ix, prog_addr)| {
554 let view = view.clone();
555 let registry = program_registry.clone();
556 async move {
557 let prog = view.get_program(registry, &prog_addr).await;
558 (ix, prog)
559 }
560 })
561 })
562 .collect();
563
564 let mut map = HashMap::new();
566 let mut results = queries.join_all().await;
567 results.sort_by_key(|(ix, _)| *ix);
568
569 for (node, (_ix, res)) in predicates
570 .iter()
571 .flat_map(|(_, pred)| pred.nodes.iter())
572 .zip(results)
573 {
574 let ca = node.program_address.clone();
575 let program =
576 res?.ok_or_else(|| PredicateProgramsError::ProgramDoesNotExist(ca.clone()))?;
577 map.insert(ca, Arc::new(program));
578 }
579
580 Ok(map)
581}
582
583fn record_solution_set_failures(
585 builder_tx: &mut rusqlite::Transaction,
586 failures: Vec<(ContentAddress, SolutionSetFailure)>,
587 failures_to_keep: u32,
588) -> rusqlite::Result<()> {
589 if failures.is_empty() {
591 return Ok(());
592 }
593 for (ca, failure) in failures {
595 builder_db::insert_solution_set_failure(builder_tx, &ca, failure)?;
596 }
597 builder_db::delete_oldest_solution_set_failures(builder_tx, failures_to_keep)
598}