essential_node/
lib.rs

1#![deny(missing_docs)]
2//! The Essential node implementation.
3//!
4//! The primary API for accessing blocks and contracts is provided via the
5//! [`ConnectionPool`] type, accessible via the [`db`] function.
6//!
7//! The node, via the [`run`] function:
8//! - Runs the relayer stream and syncs blocks.
9//! - Performs validation.
10
11use error::{BigBangError, CriticalError};
12pub use essential_node_db as db;
13use essential_node_types::{block_notify::BlockTx, BigBang};
14use essential_relayer::Relayer;
15use essential_types::ContentAddress;
16pub use handles::node::Handle;
17pub use validate::validate_dry_run;
18pub use validate::validate_solution_set_dry_run;
19use validation::validation_stream;
20
21mod error;
22mod handles;
23#[cfg(any(feature = "test-utils", test))]
24#[allow(missing_docs)]
25pub mod test_utils;
26pub mod validate;
27mod validation;
28
29/// Options for running the node.
30#[derive(Clone, Debug)]
31pub struct RunConfig {
32    /// Node endpoint to sync blocks from.
33    /// If `None` then the relayer stream will not run.
34    pub relayer_source_endpoint: Option<String>,
35    /// If `false` then the validation stream will not run.
36    pub run_validation: bool,
37}
38
39/// Ensures that a big bang block exists in the DB for the given `BigBang` configuration.
40///
41/// If no block exists with `block_number` `0`, this inserts the big bang block.
42///
43/// If a block already exists with `block_number` `0`, this validates that its [`ContentAddress`]
44/// matches the `ContentAddress` of the `Block` returned from [`BigBang::block`].
45///
46/// If validation has not yet begun, this initializes progress to begin from the big bang `Block`.
47///
48/// Returns the `ContentAddress` of the big bang `Block`.
49#[cfg_attr(feature = "tracing", tracing::instrument(skip_all))]
50pub async fn ensure_big_bang_block(
51    conn_pool: &db::ConnectionPool,
52    big_bang: &BigBang,
53) -> Result<ContentAddress, BigBangError> {
54    let bb_block = big_bang.block();
55    let bb_block_ca = essential_hash::content_addr(&bb_block);
56
57    #[cfg(feature = "tracing")]
58    tracing::debug!("Big Bang Block CA: {bb_block_ca}");
59
60    // List out the first block.
61    match conn_pool.list_blocks(0..1).await?.into_iter().next() {
62        // If no block at block `0` exists, insert and "finalize" the big bang block.
63        None => {
64            #[cfg(feature = "tracing")]
65            tracing::debug!("Big Bang Block not found - inserting into DB");
66            let bbb_ca = bb_block_ca.clone();
67            conn_pool
68                .acquire_then(|conn| {
69                    db::with_tx(conn, move |tx| {
70                        db::insert_block(tx, &bb_block)?;
71                        db::finalize_block(tx, &bbb_ca)?;
72                        Ok::<_, rusqlite::Error>(())
73                    })
74                })
75                .await?;
76        }
77        // If a block already exists, ensure its the big bang block we expect.
78        Some(block) => {
79            let ca = essential_hash::content_addr(&block);
80            if ca != bb_block_ca {
81                return Err(BigBangError::UnexpectedBlock {
82                    expected: bb_block_ca,
83                    found: ca,
84                });
85            }
86            #[cfg(feature = "tracing")]
87            tracing::debug!("Big Bang Block already exists");
88        }
89    }
90
91    // If validation has not yet begun, ensure it begins from the big bang block.
92    if conn_pool.get_validation_progress().await?.is_none() {
93        #[cfg(feature = "tracing")]
94        tracing::debug!("Starting validation progress at Big Bang Block CA");
95        conn_pool
96            .update_validation_progress(bb_block_ca.clone())
97            .await?;
98    }
99
100    Ok(bb_block_ca)
101}
102
103/// Optionally run the relayer and validation stream.
104///
105/// Relayer will sync blocks from the node API blocks stream to node database
106/// and notify validation stream of new blocks via the shared watch channel.
107///
108/// Returns a [`Handle`] that can be used to close the streams.
109/// The streams will continue to run until the handle is dropped.
110pub fn run(
111    conn_pool: db::ConnectionPool,
112    conf: RunConfig,
113    contract_registry: ContentAddress,
114    program_registry: ContentAddress,
115    block_notify: BlockTx,
116) -> Result<Handle, CriticalError> {
117    let RunConfig {
118        run_validation,
119        relayer_source_endpoint,
120    } = conf;
121
122    // Run relayer.
123    let relayer_handle = if let Some(relayer_source_endpoint) = relayer_source_endpoint {
124        let relayer = Relayer::new(relayer_source_endpoint.as_str())?;
125        Some(relayer.run(conn_pool.clone(), block_notify.clone())?)
126    } else {
127        None
128    };
129
130    // Run validation stream.
131    let validation_handle = if run_validation {
132        Some(validation_stream(
133            conn_pool.clone(),
134            contract_registry,
135            program_registry,
136            block_notify.new_listener(),
137        )?)
138    } else {
139        None
140    };
141
142    Ok(Handle::new(relayer_handle, validation_handle))
143}