essential_builder/
lib.rs

1//! A block builder implementation for the Essential protocol.
2//!
3//! The primary entrypoint to this crate is the [`build_block_fifo`] function.
4
5use error::{
6    BuildBlockError, CheckSetError, CheckSetsError, InvalidSet, LastBlockHeaderError,
7    PredicateProgramsError, QueryPredicateError, QueryProgramError, SetPredicatesError,
8};
9use essential_builder_db::{self as builder_db};
10use essential_builder_types::SolutionSetFailure;
11use essential_check::{self as check, solution::CheckPredicateConfig, vm::Gas};
12pub use essential_node as node;
13use essential_node_db as node_db;
14use essential_node_types::{block_state_solution, BigBang, Block, BlockHeader};
15use essential_types::{
16    predicate::Predicate,
17    solution::{Solution, SolutionSet},
18    ContentAddress, PredicateAddress, Program, Word,
19};
20use std::{collections::HashMap, num::NonZero, ops::Range, sync::Arc, time::Duration};
21
22pub mod error;
23pub mod state;
24
25/// Block building configuration.
26#[derive(Clone, Debug, Eq, Hash, PartialEq)]
27pub struct Config {
28    /// The maximum number of solution set failures to keep in the DB, used to provide feedback to the
29    /// submitters.
30    ///
31    /// Defaults to [`Config::DEFAULT_SOLUTION_SET_FAILURE_KEEP_LIMIT`].
32    pub solution_set_failures_to_keep: u32,
33    /// The maximum number of solution sets to attempt to check and include in a block.
34    ///
35    /// Defaults to [`Config::DEFAULT_SOLUTION_SET_ATTEMPTS_PER_BLOCK`].
36    pub solution_set_attempts_per_block: NonZero<u32>,
37    /// The number of sequential solution sets to attempt to check in parallel at a time.
38    ///
39    /// If greater than `solution_set_attempts_per_block`, the `solution_set_attempts_per_block`
40    /// is used instead.
41    ///
42    /// If unspecified, uses `num_cpus::get()`.
43    pub parallel_chunk_size: NonZero<usize>,
44    /// Configuration required by [`check::solution::check_set_predicates`].
45    ///
46    /// Wrapped in an `Arc` as this is shared between tasks.
47    pub check: Arc<CheckPredicateConfig>,
48    /// The address of the big bang contract registry contract and its predicate.
49    pub contract_registry: PredicateAddress,
50    /// The address of the big bang program registry contract and its predicate.
51    pub program_registry: PredicateAddress,
52    /// The address of the big bang block state contract and its predicate.
53    pub block_state: PredicateAddress,
54}
55
56/// A summary of building a block, returned by [`build_block_fifo`].
57#[derive(Debug)]
58pub struct SolutionSetsSummary {
59    /// The addresses of all successful solution sets.
60    pub succeeded: Vec<(ContentAddress, Gas)>,
61    /// The addresses of all failed solution sets.
62    pub failed: Vec<(ContentAddress, SolutionSetIndex, InvalidSet)>,
63}
64
65/// The index of a solution set within a block.
66pub type SolutionSetIndex = u32;
67
68type BlockNum = i64;
69
70impl Config {
71    /// The default number of solution set failures that the builder will retain in its DB.
72    pub const DEFAULT_SOLUTION_SET_FAILURE_KEEP_LIMIT: u32 = 10_000;
73    /// The default max number of solution sets to attempt to check and include in a block.
74    pub const DEFAULT_SOLUTION_SET_ATTEMPTS_PER_BLOCK: u32 = 10_000;
75
76    /// The default number of sequential solution sets to attempt to check in parallel.
77    pub fn default_parallel_chunk_size() -> NonZero<usize> {
78        num_cpus::get()
79            .try_into()
80            .expect("`num_cpus::get()` must be non-zero")
81    }
82}
83
84impl Default for Config {
85    fn default() -> Self {
86        let big_bang = BigBang::default();
87        Self {
88            solution_set_failures_to_keep: Self::DEFAULT_SOLUTION_SET_FAILURE_KEEP_LIMIT,
89            solution_set_attempts_per_block: Self::DEFAULT_SOLUTION_SET_ATTEMPTS_PER_BLOCK
90                .try_into()
91                .expect("declared const must be non-zero"),
92            parallel_chunk_size: Self::default_parallel_chunk_size(),
93            contract_registry: big_bang.contract_registry,
94            program_registry: big_bang.program_registry,
95            block_state: big_bang.block_state,
96            check: Default::default(),
97        }
98    }
99}
100
101/// Naively build a block in FIFO order.
102///
103/// Attempts to build a block from the available solution sets in the pool in the order in which they
104/// were received. No attempt is made at MEV, and solution sets that don't succeed in the immediate
105/// order provided are considered failed.
106///
107/// All solution sets that are attempted (both those that succeed and those that fail) are deleted from
108/// the builder's solution set pool. Failed solution sets are recorded to the builder's `solution_set_failure`
109/// table, while the successful solution sets can be found in the block.
110///
111/// Returns the address of the block if one was successfully created alongside an in-memory
112/// [`SetsSummary`] that describes which solution sets succeeded and which ones failed for
113/// convenience.
114///
115/// # Example
116///
117/// ```no_run
118/// # async fn f() -> Result<(), essential_builder::error::BuildBlockError> {
119/// # let builder_conn_pool: essential_builder_db::ConnectionPool = todo!();
120/// # let node_conn_pool: essential_node::db::ConnectionPool = todo!();
121/// use essential_builder::{build_block_fifo, Config};
122///
123/// let config = Config::default();
124///
125/// // Build blocks in a loop.
126/// loop {
127///     build_block_fifo(&builder_conn_pool, &node_conn_pool, &config).await?;
128///
129///     // Wait some time or for an event before building next block if necessary.
130/// }
131/// # }
132/// ```
133#[cfg_attr(feature = "tracing", tracing::instrument("build", skip_all))]
134pub async fn build_block_fifo(
135    builder_conn_pool: &builder_db::ConnectionPool,
136    node_conn_pool: &node::db::ConnectionPool,
137    conf: &Config,
138) -> Result<(Option<ContentAddress>, SolutionSetsSummary), BuildBlockError> {
139    // Retrieve the last block header.
140    let last_block_header_opt = node_conn_pool
141        .acquire_then(|h| last_block_header(h))
142        .await?;
143
144    // Current timestamp as a `Duration` since `UNIX_EPOCH`.
145    let block_timestamp = std::time::SystemTime::now()
146        .duration_since(std::time::UNIX_EPOCH)
147        .map_err(|_| BuildBlockError::TimestampNotMonotonic)?;
148
149    // Determine the block number for this block.
150    let block_number = match last_block_header_opt {
151        None => 0,
152        Some(BlockHeader {
153            number: last_block_num,
154            timestamp: last_block_ts,
155        }) => {
156            let block_num = last_block_num
157                .checked_add(1)
158                .ok_or(BuildBlockError::BlockNumberOutOfRange)?;
159            if block_timestamp <= last_block_ts {
160                return Err(BuildBlockError::TimestampNotMonotonic);
161            }
162            block_num
163        }
164    };
165
166    #[cfg(feature = "tracing")]
167    tracing::debug!("Building block {}", block_number);
168
169    // TODO: Produce any "special" block-builder specific solutions here
170    // (e.g. updating block number and timestamp in the block contract).
171    let block_secs: Word = block_timestamp
172        .as_secs()
173        .try_into()
174        .map_err(|_| BuildBlockError::TimestampOutOfRange)?;
175    let solution = block_state_solution(conf.block_state.clone(), block_number, block_secs);
176    let solution_set = SolutionSet {
177        solutions: vec![solution],
178    };
179    let ca = essential_hash::content_addr(&solution_set);
180    let mut solution_sets = vec![(ca, Arc::new(solution_set))];
181
182    // Read out the oldest solution sets.
183    const MAX_TIMESTAMP_RANGE: Range<Duration> =
184        Duration::from_secs(0)..Duration::from_secs(i64::MAX as _);
185    let limit = i64::from(u32::from(conf.solution_set_attempts_per_block));
186    solution_sets.extend(
187        builder_conn_pool
188            .list_solution_sets(MAX_TIMESTAMP_RANGE, limit)
189            .await?
190            .into_iter()
191            .map(|(ca, solution_set, _ts)| (ca, Arc::new(solution_set))),
192    );
193
194    // Check all solution sets.
195    let (solution_sets, summary) =
196        check_solution_sets(node_conn_pool.clone(), block_number, &solution_sets, conf).await?;
197
198    // Construct the block.
199    let block = Block {
200        header: BlockHeader {
201            number: block_number,
202            timestamp: block_timestamp,
203        },
204        solution_sets: solution_sets
205            .into_iter()
206            .map(Arc::unwrap_or_clone)
207            .collect(),
208    };
209    let block_addr = essential_hash::content_addr(&block);
210    #[cfg(feature = "tracing")]
211    tracing::debug!(
212        "Built block {} with {} solution sets at {:?}",
213        block_addr,
214        block.solution_sets.len(),
215        block.header.timestamp
216    );
217
218    // If the block is empty, notify that we're skipping the block.
219    // FIXME: This uses `<= 1` because the first solution set is the block state solution set.
220    // This should be refactored.
221    let skip_block = block.solution_sets.len() <= 1;
222    if skip_block {
223        #[cfg(feature = "tracing")]
224        tracing::debug!("Skipping empty block {}", block_addr);
225
226    // Only insert the block if
227    } else {
228        // Commit the new block to the node DB.
229        // FIXME: Don't immediately insert and finalize when integrating with the L1.
230        node_conn_pool
231            .acquire_then(|conn| {
232                builder_db::with_tx(conn, move |tx| {
233                    let block_ca = essential_hash::content_addr(&block);
234                    node_db::insert_block(tx, &block)?;
235                    node_db::finalize_block(tx, &block_ca)
236                })
237            })
238            .await?;
239        #[cfg(feature = "tracing")]
240        tracing::debug!("Committed and finalized block {}", block_addr);
241    }
242
243    // Record solution set failures to the DB for submitter feedback.
244    let failures: Vec<_> = summary
245        .failed
246        .iter()
247        .map(|(ca, set_ix, invalid)| {
248            let failure = SolutionSetFailure {
249                attempt_block_num: block_number,
250                attempt_block_addr: block_addr.clone(),
251                attempt_solution_set_ix: *set_ix,
252                err_msg: format!("{invalid}").into(),
253            };
254            (ca.clone(), failure)
255        })
256        .collect();
257    let failures_to_keep = conf.solution_set_failures_to_keep;
258
259    // Delete all attempted solution sets, both those that succeeded and those that failed.
260    let attempted: Vec<_> = summary
261        .succeeded
262        .iter()
263        .map(|(ca, _gas)| ca.clone())
264        .chain(summary.failed.iter().map(|(ca, _ix, _err)| ca.clone()))
265        .collect();
266
267    builder_conn_pool
268        .acquire_then(move |conn| {
269            builder_db::with_tx(conn, |tx| {
270                record_solution_set_failures(tx, failures, failures_to_keep)?;
271                builder_db::delete_solution_sets(tx, attempted)
272            })
273        })
274        .await?;
275
276    let block_addr = if skip_block { None } else { Some(block_addr) };
277    Ok((block_addr, summary))
278}
279
280/// Retrieve the header for the last block.
281///
282/// Returns the block number and block timestamp in that order.
283fn last_block_header(
284    conn: &rusqlite::Connection,
285) -> Result<Option<BlockHeader>, LastBlockHeaderError> {
286    // Retrieve the last block CA.
287    let block_ca = match node_db::get_latest_finalized_block_address(conn)? {
288        Some(ca) => ca,
289        None => return Ok(None),
290    };
291
292    // Retrieve the block's number and timestamp.
293    let header = node_db::get_block_header(conn, &block_ca)?
294        .ok_or(LastBlockHeaderError::NoNumberForLastFinalizedBlock)?;
295
296    Ok(Some(header))
297}
298
299/// Check the given sequence of proposed solution sets.
300///
301/// We optimistically check `conf.parallel_chunk_size` solution sets in parallel at a time.
302/// This gives us the benefit of parallel checking, while capping the number of following solution
303/// sets that must be re-checked in the case that one of the solution sets earlier in the chunk fails to
304/// validate.
305async fn check_solution_sets(
306    node_conn_pool: node::db::ConnectionPool,
307    block_num: BlockNum,
308    proposed_solution_sets: &[(ContentAddress, Arc<SolutionSet>)],
309    conf: &Config,
310) -> Result<(Vec<Arc<SolutionSet>>, SolutionSetsSummary), CheckSetsError> {
311    let chunk_size = conf.parallel_chunk_size.into();
312    let mut solution_sets = vec![];
313    let mut succeeded = vec![];
314    let mut failed = vec![];
315    let mut mutations = state::Mutations::default();
316
317    // On each pass we process a chunk at a time.
318    // If there's a failure, the next chunk starts after the first failure in this chunk.
319    let mut chunk_start = 0;
320    while chunk_start < proposed_solution_sets.len() {
321        // The range of the chunk of solution sets to check on this pass.
322        let chunk_end = chunk_start
323            .saturating_add(chunk_size)
324            .min(proposed_solution_sets.len());
325        let range = chunk_start..chunk_end;
326        let chunk = &proposed_solution_sets[range.clone()];
327
328        // Apply the mutations from this chunk of solution sets.
329        mutations.extend(range.clone().zip(chunk.iter().map(|(_, s)| &**s)));
330        // Temporarily move mutations behind a share-able `Arc`.
331        let mutations_arc = Arc::new(std::mem::take(&mut mutations));
332
333        // Check the chunk in parallel.
334        let results = check_solution_set_chunk(
335            &node_conn_pool,
336            block_num,
337            &mutations_arc,
338            range.clone().zip(chunk.iter().map(|(_, s)| s.clone())),
339            &conf.contract_registry.contract,
340            &conf.program_registry.contract,
341            &conf.check,
342        )
343        .await?;
344
345        // Re-take ownership of the mutations.
346        // We know this is unique as `check_solution_set_chunk` has joined.
347        debug_assert_eq!(
348            Arc::strong_count(&mutations_arc),
349            1,
350            "`Arc<Mutations>` not unique"
351        );
352        mutations = Arc::unwrap_or_clone(mutations_arc);
353
354        // Process the results.
355        for (set_ix, (res, (set_ca, set))) in range.zip(results.into_iter().zip(chunk)) {
356            chunk_start += 1;
357            match res {
358                Ok(gas) => {
359                    succeeded.push((set_ca.clone(), gas));
360                    solution_sets.push(set.clone());
361                    #[cfg(feature = "tracing")]
362                    tracing::trace!("Solution set check success {}", set_ca);
363                }
364                // If a solution set was invalid, remove its mutations.
365                Err(invalid) => {
366                    mutations.remove_solution_set(set_ix);
367                    let set_ix: u32 = set_ix
368                        .try_into()
369                        .expect("`u32::MAX` below solution set limit");
370                    failed.push((set_ca.clone(), set_ix, invalid));
371                    #[cfg(feature = "tracing")]
372                    tracing::trace!("Solution set check failure {}", set_ca);
373                    break;
374                }
375            }
376        }
377    }
378    let summary = SolutionSetsSummary { succeeded, failed };
379    Ok((solution_sets, summary))
380}
381
382/// Check a sequential chunk of solution sets in parallel.
383async fn check_solution_set_chunk(
384    node_conn_pool: &node::db::ConnectionPool,
385    block_num: BlockNum,
386    proposed_mutations: &Arc<state::Mutations>,
387    chunk: impl IntoIterator<Item = (usize, Arc<SolutionSet>)>,
388    contract_registry: &ContentAddress,
389    program_registry: &ContentAddress,
390    check_conf: &Arc<check::solution::CheckPredicateConfig>,
391) -> Result<Vec<Result<Gas, InvalidSet>>, CheckSetsError> {
392    // Spawn concurrent checks for each solution set.
393    let checks: tokio::task::JoinSet<_> = chunk
394        .into_iter()
395        .map(move |(set_ix, set)| {
396            let mutations = proposed_mutations.clone();
397            let conn_pool = node_conn_pool.clone();
398            let check_conf = check_conf.clone();
399            let contract_registry = contract_registry.clone();
400            let program_registry = program_registry.clone();
401            let (pre, post) = state::pre_and_post_view(conn_pool, mutations, block_num, set_ix);
402            async move {
403                let res = check_set(
404                    set.clone(),
405                    pre,
406                    post,
407                    &contract_registry,
408                    &program_registry,
409                    check_conf,
410                )
411                .await;
412                (set_ix, res)
413            }
414        })
415        .collect();
416
417    // Await the results.
418    let mut results = checks.join_all().await;
419    results.sort_by_key(|&(ix, _)| ix);
420    results
421        .into_iter()
422        .map(|(_ix, res)| res.map_err(CheckSetsError::CheckSolution))
423        .collect()
424}
425
426/// Validate the given solution set.
427///
428/// If the solution set is valid, returns the total gas spent.
429async fn check_set(
430    solution_set: Arc<SolutionSet>,
431    pre_state: state::View,
432    post_state: state::View,
433    contract_registry: &ContentAddress,
434    program_registry: &ContentAddress,
435    check_conf: Arc<CheckPredicateConfig>,
436) -> Result<Result<Gas, InvalidSet>, CheckSetError> {
437    // Retrieve the predicates that the solution set attempts to solve from the post-state. This
438    // ensures that the solution set has access to contracts submitted as a part of the solution
439    // set.
440    let predicates =
441        match get_solution_set_predicates(contract_registry, &post_state, &solution_set.solutions)
442            .await
443        {
444            Ok(predicates) => predicates,
445            Err(SetPredicatesError::PredicateDoesNotExist(ca)) => {
446                return Ok(Err(InvalidSet::PredicateDoesNotExist(ca)));
447            }
448            Err(SetPredicatesError::QueryPredicate(err)) => match err {
449                QueryPredicateError::Decode(_)
450                | QueryPredicateError::MissingLenBytes
451                | QueryPredicateError::InvalidLenBytes => {
452                    return Ok(Err(InvalidSet::PredicateInvalid));
453                }
454                QueryPredicateError::ConnPoolQuery(err) => {
455                    return Err(CheckSetError::NodeQuery(err))
456                }
457            },
458        };
459
460    // Retrieve the programs that the predicates specify from the post-state.
461    let programs = match get_predicates_programs(program_registry, &post_state, &predicates).await {
462        Ok(programs) => programs,
463        Err(PredicateProgramsError::ProgramDoesNotExist(ca)) => {
464            return Ok(Err(InvalidSet::ProgramDoesNotExist(ca)));
465        }
466        Err(PredicateProgramsError::QueryProgram(err)) => match err {
467            QueryProgramError::MissingLenBytes | QueryProgramError::InvalidLenBytes => {
468                return Ok(Err(InvalidSet::ProgramInvalid));
469            }
470            QueryProgramError::ConnPoolQuery(err) => return Err(CheckSetError::NodeQuery(err)),
471        },
472    };
473
474    let get_predicate = move |addr: &PredicateAddress| {
475        predicates
476            .get(&addr.predicate)
477            .cloned()
478            .expect("predicate must have been fetched in the previous step")
479    };
480
481    let get_program = move |addr: &ContentAddress| {
482        programs
483            .get(addr)
484            .cloned()
485            .expect("program must have been fetched in the previous step")
486    };
487
488    // Create the post-state and check the solution set's predicates.
489    match check::solution::check_set_predicates(
490        &pre_state,
491        &post_state,
492        solution_set.clone(),
493        get_predicate,
494        get_program,
495        check_conf.clone(),
496    )
497    .await
498    {
499        Err(err) => Ok(Err(InvalidSet::Predicates(err))),
500        Ok(gas) => Ok(Ok(gas)),
501    }
502}
503
504/// Read and return all predicates required by the given solutions.
505async fn get_solution_set_predicates(
506    contract_registry: &ContentAddress,
507    view: &state::View,
508    solutions: &[Solution],
509) -> Result<HashMap<ContentAddress, Arc<Predicate>>, SetPredicatesError> {
510    // Spawn concurrent queries for each predicate.
511    let queries: tokio::task::JoinSet<_> = solutions
512        .iter()
513        .map(|solution| solution.predicate_to_solve.clone())
514        .enumerate()
515        .map(move |(ix, pred_addr)| {
516            let view = view.clone();
517            let registry = contract_registry.clone();
518            async move {
519                let pred = view.get_predicate(registry, &pred_addr).await;
520                (ix, pred)
521            }
522        })
523        .collect();
524
525    // Collect the results into a map.
526    let mut map = HashMap::new();
527    let mut results = queries.join_all().await;
528    results.sort_by_key(|(ix, _)| *ix);
529    for (sol, (_ix, res)) in solutions.iter().zip(results) {
530        let ca = sol.predicate_to_solve.predicate.clone();
531        let predicate =
532            res?.ok_or_else(|| SetPredicatesError::PredicateDoesNotExist(ca.clone()))?;
533        map.insert(ca, Arc::new(predicate));
534    }
535
536    Ok(map)
537}
538
539/// Read and return all programs required by the given predicates.
540async fn get_predicates_programs(
541    program_registry: &ContentAddress,
542    view: &state::View,
543    predicates: &HashMap<ContentAddress, Arc<Predicate>>,
544) -> Result<HashMap<ContentAddress, Arc<Program>>, PredicateProgramsError> {
545    // Spawn concurrent queries for each program.
546    let queries: tokio::task::JoinSet<_> = predicates
547        .iter()
548        .flat_map(|(_, pred)| {
549            pred.nodes
550                .iter()
551                .map(|node| node.program_address.clone())
552                .enumerate()
553                .map(move |(ix, prog_addr)| {
554                    let view = view.clone();
555                    let registry = program_registry.clone();
556                    async move {
557                        let prog = view.get_program(registry, &prog_addr).await;
558                        (ix, prog)
559                    }
560                })
561        })
562        .collect();
563
564    // Collect the results into a map.
565    let mut map = HashMap::new();
566    let mut results = queries.join_all().await;
567    results.sort_by_key(|(ix, _)| *ix);
568
569    for (node, (_ix, res)) in predicates
570        .iter()
571        .flat_map(|(_, pred)| pred.nodes.iter())
572        .zip(results)
573    {
574        let ca = node.program_address.clone();
575        let program =
576            res?.ok_or_else(|| PredicateProgramsError::ProgramDoesNotExist(ca.clone()))?;
577        map.insert(ca, Arc::new(program));
578    }
579
580    Ok(map)
581}
582
583/// Record solution set failures to the DB for submitter feedback.
584fn record_solution_set_failures(
585    builder_tx: &mut rusqlite::Transaction,
586    failures: Vec<(ContentAddress, SolutionSetFailure)>,
587    failures_to_keep: u32,
588) -> rusqlite::Result<()> {
589    // Nothing to do if no failures.
590    if failures.is_empty() {
591        return Ok(());
592    }
593    // Acquire a connection, record failures and delete old failures in one transaction.
594    for (ca, failure) in failures {
595        builder_db::insert_solution_set_failure(builder_tx, &ca, failure)?;
596    }
597    builder_db::delete_oldest_solution_set_failures(builder_tx, failures_to_keep)
598}