1#![deny(missing_docs)]
2use error::{BigBangError, CriticalError};
12pub use essential_node_db as db;
13use essential_node_types::{block_notify::BlockTx, BigBang};
14use essential_relayer::Relayer;
15use essential_types::ContentAddress;
16pub use handles::node::Handle;
17pub use validate::validate_dry_run;
18pub use validate::validate_solution_set_dry_run;
19use validation::validation_stream;
20
21mod error;
22mod handles;
23#[cfg(any(feature = "test-utils", test))]
24#[allow(missing_docs)]
25pub mod test_utils;
26pub mod validate;
27mod validation;
28
29#[derive(Clone, Debug)]
31pub struct RunConfig {
32 pub relayer_source_endpoint: Option<String>,
35 pub run_validation: bool,
37}
38
39#[cfg_attr(feature = "tracing", tracing::instrument(skip_all))]
50pub async fn ensure_big_bang_block(
51 conn_pool: &db::ConnectionPool,
52 big_bang: &BigBang,
53) -> Result<ContentAddress, BigBangError> {
54 let bb_block = big_bang.block();
55 let bb_block_ca = essential_hash::content_addr(&bb_block);
56
57 #[cfg(feature = "tracing")]
58 tracing::debug!("Big Bang Block CA: {bb_block_ca}");
59
60 match conn_pool.list_blocks(0..1).await?.into_iter().next() {
62 None => {
64 #[cfg(feature = "tracing")]
65 tracing::debug!("Big Bang Block not found - inserting into DB");
66 let bbb_ca = bb_block_ca.clone();
67 conn_pool
68 .acquire_then(|conn| {
69 db::with_tx(conn, move |tx| {
70 db::insert_block(tx, &bb_block)?;
71 db::finalize_block(tx, &bbb_ca)?;
72 Ok::<_, rusqlite::Error>(())
73 })
74 })
75 .await?;
76 }
77 Some(block) => {
79 let ca = essential_hash::content_addr(&block);
80 if ca != bb_block_ca {
81 return Err(BigBangError::UnexpectedBlock {
82 expected: bb_block_ca,
83 found: ca,
84 });
85 }
86 #[cfg(feature = "tracing")]
87 tracing::debug!("Big Bang Block already exists");
88 }
89 }
90
91 if conn_pool.get_validation_progress().await?.is_none() {
93 #[cfg(feature = "tracing")]
94 tracing::debug!("Starting validation progress at Big Bang Block CA");
95 conn_pool
96 .update_validation_progress(bb_block_ca.clone())
97 .await?;
98 }
99
100 Ok(bb_block_ca)
101}
102
103pub fn run(
111 conn_pool: db::ConnectionPool,
112 conf: RunConfig,
113 contract_registry: ContentAddress,
114 program_registry: ContentAddress,
115 block_notify: BlockTx,
116) -> Result<Handle, CriticalError> {
117 let RunConfig {
118 run_validation,
119 relayer_source_endpoint,
120 } = conf;
121
122 let relayer_handle = if let Some(relayer_source_endpoint) = relayer_source_endpoint {
124 let relayer = Relayer::new(relayer_source_endpoint.as_str())?;
125 Some(relayer.run(conn_pool.clone(), block_notify.clone())?)
126 } else {
127 None
128 };
129
130 let validation_handle = if run_validation {
132 Some(validation_stream(
133 conn_pool.clone(),
134 contract_registry,
135 program_registry,
136 block_notify.new_listener(),
137 )?)
138 } else {
139 None
140 };
141
142 Ok(Handle::new(relayer_handle, validation_handle))
143}