1use anyhow::Context;
2use clap::{Parser, ValueEnum};
3use essential_builder::{self as builder, build_block_fifo};
4use essential_builder_api as builder_api;
5use essential_builder_db as builder_db;
6use essential_check::solution::CheckPredicateConfig;
7use essential_node as node;
8use essential_node_api as node_api;
9use essential_node_types::{block_notify::BlockTx, BigBang};
10use std::{
11 net::{SocketAddr, SocketAddrV4},
12 num::NonZero,
13 path::{Path, PathBuf},
14 time::Duration,
15};
16
17#[cfg(test)]
18mod tests;
19
20#[derive(Parser, Clone)]
22#[command(version, about)]
23pub struct Args {
24 #[arg(long, default_value_t = false)]
26 disable_tracing: bool,
27 #[arg(long, default_value_t = DEFAULT_BLOCK_INTERVAL_MS)]
32 block_interval_ms: u32,
33 #[arg(long, default_value_t = builder::Config::DEFAULT_SOLUTION_SET_FAILURE_KEEP_LIMIT)]
36 solution_set_failures_to_keep: u32,
37 #[arg(long, default_value_t = NonZero::new(builder::Config::DEFAULT_SOLUTION_SET_ATTEMPTS_PER_BLOCK).expect("declared const must be non-zero"))]
39 solution_set_attempts_per_block: NonZero<u32>,
40 #[arg(long, default_value_t = builder::Config::default_parallel_chunk_size())]
47 parallel_chunk_size: NonZero<usize>,
48 #[arg(long)]
53 solution_set_check_collects_all_failures: bool,
54 #[arg(long)]
63 big_bang: Option<std::path::PathBuf>,
64
65 #[arg(long, default_value_t = SocketAddrV4::new([0; 4].into(), 0).into())]
68 builder_api_bind_address: SocketAddr,
69 #[arg(long, default_value_t = builder_api::DEFAULT_CONNECTION_LIMIT)]
71 builder_api_tcp_conn_limit: usize,
72
73 #[arg(long, default_value_t = SocketAddrV4::new([0; 4].into(), 0).into())]
76 node_api_bind_address: SocketAddr,
77 #[arg(long, default_value_t = node_api::DEFAULT_CONNECTION_LIMIT)]
79 node_api_tcp_conn_limit: usize,
80
81 #[arg(long, default_value_t = Db::Memory, value_enum)]
86 builder_db: Db,
87 #[arg(long)]
93 builder_db_path: Option<PathBuf>,
94 #[arg(long, default_value_t = builder_db::pool::Config::default_conn_limit())]
98 builder_db_conn_limit: usize,
99
100 #[arg(long, default_value_t = Db::Memory, value_enum)]
105 node_db: Db,
106 #[arg(long)]
112 node_db_path: Option<PathBuf>,
113 #[arg(long, default_value_t = node::db::pool::Config::default_conn_limit())]
117 node_db_conn_limit: usize,
118
119 #[arg(long)]
124 relayer_source_endpoint: Option<String>,
125 #[arg(long)]
127 validation: bool,
128}
129
130const DEFAULT_BLOCK_INTERVAL_MS: u32 = 5_000;
131
132#[derive(ValueEnum, Clone, Copy, Debug)]
133enum Db {
134 Memory,
136 Persistent,
140}
141
142fn default_builder_db_path() -> Option<PathBuf> {
144 dirs::data_dir().map(|mut path| {
145 path.extend(["essential", "builder", "db.sqlite3"]);
146 path
147 })
148}
149
150fn default_node_db_path() -> Option<PathBuf> {
152 dirs::data_dir().map(|mut path| {
153 path.extend(["essential", "node", "db.sqlite3"]);
154 path
155 })
156}
157
158fn builder_db_conf_from_args(args: &Args) -> anyhow::Result<builder_db::pool::Config> {
160 let source = match (&args.builder_db, &args.builder_db_path) {
161 (Db::Memory, None) => {
162 let id = format!("__essential-builder-db-{}", uuid::Uuid::new_v4());
163 builder_db::pool::Source::Memory(id)
164 }
165 (_, Some(path)) => builder_db::pool::Source::Path(path.clone()),
166 (Db::Persistent, None) => {
167 let Some(path) = default_builder_db_path() else {
168 anyhow::bail!("unable to detect user's data directory for default DB path")
169 };
170 builder_db::pool::Source::Path(path)
171 }
172 };
173 let conn_limit = args.builder_db_conn_limit;
174 let config = builder_db::pool::Config { source, conn_limit };
175 Ok(config)
176}
177
178fn node_db_conf_from_args(args: &Args) -> anyhow::Result<node::db::pool::Config> {
180 let source = match (&args.node_db, &args.node_db_path) {
181 (Db::Memory, None) => {
182 let id = format!("__essential-node-db-{}", uuid::Uuid::new_v4());
183 node::db::pool::Source::Memory(id)
184 }
185 (_, Some(path)) => node::db::pool::Source::Path(path.clone()),
186 (Db::Persistent, None) => {
187 let Some(path) = default_node_db_path() else {
188 anyhow::bail!("unable to detect user's data directory for default DB path")
189 };
190 node::db::pool::Source::Path(path)
191 }
192 };
193 let conn_limit = args.node_db_conn_limit;
194 let config = node::db::pool::Config { source, conn_limit };
195 Ok(config)
196}
197
198fn builder_conf_from_args(args: &Args, big_bang: &BigBang) -> builder::Config {
200 builder::Config {
201 solution_set_failures_to_keep: args.solution_set_failures_to_keep,
202 solution_set_attempts_per_block: args.solution_set_attempts_per_block,
203 parallel_chunk_size: args.parallel_chunk_size,
204 check: std::sync::Arc::new(CheckPredicateConfig {
205 collect_all_failures: args.solution_set_check_collects_all_failures,
206 }),
207 contract_registry: big_bang.contract_registry.clone(),
208 program_registry: big_bang.program_registry.clone(),
209 block_state: big_bang.block_state.clone(),
210 }
211}
212
213fn node_run_conf_from_args(args: &Args) -> node::RunConfig {
215 node::RunConfig {
216 relayer_source_endpoint: args.relayer_source_endpoint.clone(),
217 run_validation: args.validation,
218 }
219}
220
221fn load_big_bang_or_default(path: Option<&Path>) -> anyhow::Result<BigBang> {
224 match path {
225 None => Ok(BigBang::default()),
226 Some(path) => {
227 let big_bang_str = std::fs::read_to_string(path)
228 .context("failed to read big bang configuration from path")?;
229 serde_yaml::from_str(&big_bang_str)
230 .context("failed to deserialize big bang configuration from YAML string")
231 }
232 }
233}
234
235#[cfg(feature = "tracing")]
236fn init_tracing_subscriber() {
237 let _ = tracing_subscriber::fmt()
238 .with_env_filter(
239 tracing_subscriber::EnvFilter::builder()
240 .with_default_directive(tracing_subscriber::filter::LevelFilter::INFO.into())
241 .from_env_lossy(),
242 )
243 .try_init();
244}
245
246pub async fn run(args: Args) -> anyhow::Result<()> {
248 if !args.disable_tracing {
250 #[cfg(feature = "tracing")]
251 init_tracing_subscriber()
252 }
253
254 let node_db_conf = node_db_conf_from_args(&args)?;
256 #[cfg(feature = "tracing")]
257 {
258 tracing::debug!("Node DB config:\n{:#?}", node_db_conf);
259 tracing::info!("Initializing node DB");
260 }
261 let node_db = node::db::ConnectionPool::with_tables(&node_db_conf)?;
262
263 let big_bang = load_big_bang_or_default(args.big_bang.as_deref())?;
265 node::ensure_big_bang_block(&node_db, &big_bang)
266 .await
267 .context("failed to ensure big bang block")?;
268
269 let block_tx = BlockTx::new();
271 let block_rx = block_tx.new_listener();
272 let api_state = node_api::State {
273 new_block: Some(block_rx),
274 conn_pool: node_db.clone(),
275 };
276 let router = node_api::router(api_state);
277 let listener = tokio::net::TcpListener::bind(args.node_api_bind_address).await?;
278 #[cfg(feature = "tracing")]
279 tracing::info!("Starting node API server at {}", listener.local_addr()?);
280 let node_api = node_api::serve(&router, &listener, args.node_api_tcp_conn_limit);
281
282 let builder_db_conf = builder_db_conf_from_args(&args)?;
284 #[cfg(feature = "tracing")]
285 {
286 tracing::debug!("Builder DB config:\n{:#?}", builder_db_conf);
287 tracing::info!("Initializing builder DB");
288 }
289 let builder_db = builder_db::ConnectionPool::with_tables(&builder_db_conf)?;
290
291 let api_state = builder_api::State {
293 conn_pool: builder_db.clone(),
294 };
295 let router = builder_api::router(api_state);
296 let listener = tokio::net::TcpListener::bind(args.builder_api_bind_address).await?;
297 #[cfg(feature = "tracing")]
298 tracing::info!("Starting builder API server at {}", listener.local_addr()?);
299 let builder_api = builder_api::serve(&router, &listener, args.builder_api_tcp_conn_limit);
300
301 let builder_conf = builder_conf_from_args(&args, &big_bang);
303 let block_interval = Duration::from_millis(args.block_interval_ms.into());
304 let builder = run_builder(
305 builder_db.clone(),
306 node_db.clone(),
307 block_tx.clone(),
308 builder_conf,
309 block_interval,
310 );
311
312 let node_run_conf = node_run_conf_from_args(&args);
313 let node_run = {
314 let node_db = node_db.clone();
315 async move {
316 if node_run_conf.relayer_source_endpoint.is_none() && !node_run_conf.run_validation {
317 std::future::pending().await
318 } else {
319 node::run(
320 node_db.clone(),
321 node_run_conf,
322 big_bang.contract_registry.contract,
323 big_bang.program_registry.contract,
324 block_tx,
325 )?
326 .join()
327 .await?;
328 Ok::<_, anyhow::Error>(())
329 }
330 }
331 };
332
333 let ctrl_c = tokio::signal::ctrl_c();
335 tokio::select! {
336 _ = builder_api => {},
337 _ = node_api => (),
338 _ = node_run => (),
339 _ = ctrl_c => {},
340 res = builder => res.context("Critical error during block building")?,
341 }
342
343 builder_db.close().map_err(|e| anyhow::anyhow!("{e}"))?;
344 node_db.close().map_err(|e| anyhow::anyhow!("{e}"))?;
345 Ok(())
346}
347
348async fn run_builder(
350 builder_conn_pool: builder_db::ConnectionPool,
351 node_conn_pool: node::db::ConnectionPool,
352 block_tx: BlockTx,
353 conf: builder::Config,
354 block_interval: Duration,
355) -> anyhow::Result<()> {
356 #[cfg(feature = "tracing")]
357 tracing::info!("Running the block builder");
358 #[cfg(feature = "tracing")]
359 tracing::debug!("Builder config:\n{:#?}", conf);
360 let mut interval = tokio::time::interval(block_interval);
361 loop {
362 interval.tick().await;
363 let (built_block_addr, _summary) =
364 build_block_fifo(&builder_conn_pool, &node_conn_pool, &conf).await?;
365 if built_block_addr.is_some() {
366 block_tx.notify();
367 }
368 }
369}