essential_node/lib.rs
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141
#![deny(missing_docs)]
//! The Essential node implementation.
//!
//! The primary API for accessing blocks and contracts is provided via the
//! [`ConnectionPool`] type, accessible via the [`db`] function.
//!
//! The node, via the [`run`] function:
//! - Runs the relayer stream and syncs blocks.
//! - Performs validation.
use error::{BigBangError, CriticalError};
pub use essential_node_db as db;
use essential_node_types::{block_notify::BlockTx, BigBang};
use essential_relayer::Relayer;
use essential_types::ContentAddress;
pub use handles::node::Handle;
pub use validate::validate_dry_run;
pub use validate::validate_solution_dry_run;
use validation::validation_stream;
mod error;
mod handles;
#[cfg(any(feature = "test-utils", test))]
#[allow(missing_docs)]
pub mod test_utils;
pub mod validate;
mod validation;
/// Options for running the node.
#[derive(Clone, Debug)]
pub struct RunConfig {
/// Node endpoint to sync blocks from.
/// If `None` then the relayer stream will not run.
pub relayer_source_endpoint: Option<String>,
/// If `false` then the validation stream will not run.
pub run_validation: bool,
}
/// Ensures that a big bang block exists in the DB for the given `BigBang` configuration.
///
/// If no block exists with `block_number` `0`, this inserts the big bang block.
///
/// If a block already exists with `block_number` `0`, this validates that its [`ContentAddress`]
/// matches the `ContentAddress` of the `Block` returned from [`BigBang::block`].
///
/// If validation has not yet begun, this initializes progress to begin from the big bang `Block`.
///
/// Returns the `ContentAddress` of the big bang `Block`.
#[cfg_attr(feature = "tracing", tracing::instrument(skip_all))]
pub async fn ensure_big_bang_block(
conn_pool: &db::ConnectionPool,
big_bang: &BigBang,
) -> Result<ContentAddress, BigBangError> {
let bb_block = big_bang.block();
let bb_block_ca = essential_hash::content_addr(&bb_block);
#[cfg(feature = "tracing")]
tracing::debug!("Big Bang Block CA: {bb_block_ca}");
// List out the first block.
match conn_pool.list_blocks(0..1).await?.into_iter().next() {
// If no block at block `0` exists, insert and "finalize" the big bang block.
None => {
#[cfg(feature = "tracing")]
tracing::debug!("Big Bang Block not found - inserting into DB");
let bbb_ca = bb_block_ca.clone();
conn_pool
.acquire_then(|conn| {
db::with_tx(conn, move |tx| {
db::insert_block(tx, &bb_block)?;
db::finalize_block(tx, &bbb_ca)?;
Ok::<_, rusqlite::Error>(())
})
})
.await?;
}
// If a block already exists, ensure its the big bang block we expect.
Some(block) => {
let ca = essential_hash::content_addr(&block);
if ca != bb_block_ca {
return Err(BigBangError::UnexpectedBlock {
expected: bb_block_ca,
found: ca,
});
}
#[cfg(feature = "tracing")]
tracing::debug!("Big Bang Block already exists");
}
}
// If validation has not yet begun, ensure it begins from the big bang block.
if conn_pool.get_validation_progress().await?.is_none() {
#[cfg(feature = "tracing")]
tracing::debug!("Starting validation progress at Big Bang Block CA");
conn_pool
.update_validation_progress(bb_block_ca.clone())
.await?;
}
Ok(bb_block_ca)
}
/// Optionally run the relayer and validation stream.
///
/// Relayer will sync blocks from the node API blocks stream to node database
/// and notify validation stream of new blocks via the shared watch channel.
///
/// Returns a [`Handle`] that can be used to close the streams.
/// The streams will continue to run until the handle is dropped.
pub fn run(
conn_pool: db::ConnectionPool,
conf: RunConfig,
contract_registry: ContentAddress,
block_notify: BlockTx,
) -> Result<Handle, CriticalError> {
let RunConfig {
run_validation,
relayer_source_endpoint,
} = conf;
// Run relayer.
let relayer_handle = if let Some(relayer_source_endpoint) = relayer_source_endpoint {
let relayer = Relayer::new(relayer_source_endpoint.as_str())?;
Some(relayer.run(conn_pool.clone(), block_notify.clone())?)
} else {
None
};
// Run validation stream.
let validation_handle = if run_validation {
Some(validation_stream(
conn_pool.clone(),
contract_registry,
block_notify.new_listener(),
)?)
} else {
None
};
Ok(Handle::new(relayer_handle, validation_handle))
}