use anyhow::Context;
use clap::{Parser, ValueEnum};
use essential_builder::{self as builder, build_block_fifo};
use essential_builder_api as builder_api;
use essential_builder_db as builder_db;
use essential_check::solution::CheckPredicateConfig;
use essential_node as node;
use essential_node_api as node_api;
use essential_node_types::{block_notify::BlockTx, BigBang};
use std::{
net::{SocketAddr, SocketAddrV4},
num::NonZero,
path::{Path, PathBuf},
time::Duration,
};
#[cfg(test)]
mod tests;
#[derive(Parser, Clone)]
#[command(version, about)]
pub struct Args {
#[arg(long, default_value_t = false)]
disable_tracing: bool,
#[arg(long, default_value_t = DEFAULT_BLOCK_INTERVAL_MS)]
block_interval_ms: u32,
#[arg(long, default_value_t = builder::Config::DEFAULT_SOLUTION_FAILURE_KEEP_LIMIT)]
solution_failures_to_keep: u32,
#[arg(long, default_value_t = NonZero::new(builder::Config::DEFAULT_SOLUTION_ATTEMPTS_PER_BLOCK).expect("declared const must be non-zero"))]
solution_attempts_per_block: NonZero<u32>,
#[arg(long, default_value_t = builder::Config::default_parallel_chunk_size())]
parallel_chunk_size: NonZero<usize>,
#[arg(long)]
solution_check_collects_all_failures: bool,
#[arg(long)]
big_bang: Option<std::path::PathBuf>,
#[arg(long, default_value_t = SocketAddrV4::new([0; 4].into(), 0).into())]
builder_api_bind_address: SocketAddr,
#[arg(long, default_value_t = builder_api::DEFAULT_CONNECTION_LIMIT)]
builder_api_tcp_conn_limit: usize,
#[arg(long, default_value_t = SocketAddrV4::new([0; 4].into(), 0).into())]
node_api_bind_address: SocketAddr,
#[arg(long, default_value_t = node_api::DEFAULT_CONNECTION_LIMIT)]
node_api_tcp_conn_limit: usize,
#[arg(long, default_value_t = Db::Memory, value_enum)]
builder_db: Db,
#[arg(long)]
builder_db_path: Option<PathBuf>,
#[arg(long, default_value_t = builder_db::pool::Config::default_conn_limit())]
builder_db_conn_limit: usize,
#[arg(long, default_value_t = Db::Memory, value_enum)]
node_db: Db,
#[arg(long)]
node_db_path: Option<PathBuf>,
#[arg(long, default_value_t = node::db::pool::Config::default_conn_limit())]
node_db_conn_limit: usize,
#[arg(long)]
relayer_source_endpoint: Option<String>,
#[arg(long)]
validation: bool,
}
const DEFAULT_BLOCK_INTERVAL_MS: u32 = 5_000;
#[derive(ValueEnum, Clone, Copy, Debug)]
enum Db {
Memory,
Persistent,
}
fn default_builder_db_path() -> Option<PathBuf> {
dirs::data_dir().map(|mut path| {
path.extend(["essential", "builder", "db.sqlite3"]);
path
})
}
fn default_node_db_path() -> Option<PathBuf> {
dirs::data_dir().map(|mut path| {
path.extend(["essential", "node", "db.sqlite3"]);
path
})
}
fn builder_db_conf_from_args(args: &Args) -> anyhow::Result<builder_db::pool::Config> {
let source = match (&args.builder_db, &args.builder_db_path) {
(Db::Memory, None) => {
let id = format!("__essential-builder-db-{}", uuid::Uuid::new_v4());
builder_db::pool::Source::Memory(id)
}
(_, Some(path)) => builder_db::pool::Source::Path(path.clone()),
(Db::Persistent, None) => {
let Some(path) = default_builder_db_path() else {
anyhow::bail!("unable to detect user's data directory for default DB path")
};
builder_db::pool::Source::Path(path)
}
};
let conn_limit = args.builder_db_conn_limit;
let config = builder_db::pool::Config { source, conn_limit };
Ok(config)
}
fn node_db_conf_from_args(args: &Args) -> anyhow::Result<node::db::pool::Config> {
let source = match (&args.node_db, &args.node_db_path) {
(Db::Memory, None) => {
let id = format!("__essential-node-db-{}", uuid::Uuid::new_v4());
node::db::pool::Source::Memory(id)
}
(_, Some(path)) => node::db::pool::Source::Path(path.clone()),
(Db::Persistent, None) => {
let Some(path) = default_node_db_path() else {
anyhow::bail!("unable to detect user's data directory for default DB path")
};
node::db::pool::Source::Path(path)
}
};
let conn_limit = args.node_db_conn_limit;
let config = node::db::pool::Config { source, conn_limit };
Ok(config)
}
fn builder_conf_from_args(args: &Args, big_bang: &BigBang) -> builder::Config {
builder::Config {
solution_failures_to_keep: args.solution_failures_to_keep,
solution_attempts_per_block: args.solution_attempts_per_block,
parallel_chunk_size: args.parallel_chunk_size,
check: std::sync::Arc::new(CheckPredicateConfig {
collect_all_failures: args.solution_check_collects_all_failures,
}),
contract_registry: big_bang.contract_registry.clone(),
block_state: big_bang.block_state.clone(),
}
}
fn node_run_conf_from_args(args: &Args) -> node::RunConfig {
node::RunConfig {
relayer_source_endpoint: args.relayer_source_endpoint.clone(),
run_validation: args.validation,
}
}
fn load_big_bang_or_default(path: Option<&Path>) -> anyhow::Result<BigBang> {
match path {
None => Ok(BigBang::default()),
Some(path) => {
let big_bang_str = std::fs::read_to_string(path)
.context("failed to read big bang configuration from path")?;
serde_yaml::from_str(&big_bang_str)
.context("failed to deserialize big bang configuration from YAML string")
}
}
}
#[cfg(feature = "tracing")]
fn init_tracing_subscriber() {
let _ = tracing_subscriber::fmt()
.with_env_filter(
tracing_subscriber::EnvFilter::builder()
.with_default_directive(tracing_subscriber::filter::LevelFilter::INFO.into())
.from_env_lossy(),
)
.try_init();
}
pub async fn run(args: Args) -> anyhow::Result<()> {
if !args.disable_tracing {
#[cfg(feature = "tracing")]
init_tracing_subscriber()
}
let node_db_conf = node_db_conf_from_args(&args)?;
#[cfg(feature = "tracing")]
{
tracing::debug!("Node DB config:\n{:#?}", node_db_conf);
tracing::info!("Initializing node DB");
}
let node_db = node::db::ConnectionPool::with_tables(&node_db_conf)?;
let big_bang = load_big_bang_or_default(args.big_bang.as_deref())?;
node::ensure_big_bang_block(&node_db, &big_bang)
.await
.context("failed to ensure big bang block")?;
let block_tx = BlockTx::new();
let block_rx = block_tx.new_listener();
let api_state = node_api::State {
new_block: Some(block_rx),
conn_pool: node_db.clone(),
};
let router = node_api::router(api_state);
let listener = tokio::net::TcpListener::bind(args.node_api_bind_address).await?;
#[cfg(feature = "tracing")]
tracing::info!("Starting node API server at {}", listener.local_addr()?);
let node_api = node_api::serve(&router, &listener, args.node_api_tcp_conn_limit);
let builder_db_conf = builder_db_conf_from_args(&args)?;
#[cfg(feature = "tracing")]
{
tracing::debug!("Builder DB config:\n{:#?}", builder_db_conf);
tracing::info!("Initializing builder DB");
}
let builder_db = builder_db::ConnectionPool::with_tables(&builder_db_conf)?;
let api_state = builder_api::State {
conn_pool: builder_db.clone(),
};
let router = builder_api::router(api_state);
let listener = tokio::net::TcpListener::bind(args.builder_api_bind_address).await?;
#[cfg(feature = "tracing")]
tracing::info!("Starting builder API server at {}", listener.local_addr()?);
let builder_api = builder_api::serve(&router, &listener, args.builder_api_tcp_conn_limit);
let builder_conf = builder_conf_from_args(&args, &big_bang);
let block_interval = Duration::from_millis(args.block_interval_ms.into());
let builder = run_builder(
builder_db.clone(),
node_db.clone(),
block_tx.clone(),
builder_conf,
block_interval,
);
let node_run_conf = node_run_conf_from_args(&args);
let node_run = {
let node_db = node_db.clone();
async move {
if node_run_conf.relayer_source_endpoint.is_none() && !node_run_conf.run_validation {
std::future::pending().await
} else {
node::run(
node_db.clone(),
node_run_conf,
big_bang.contract_registry.contract,
block_tx,
)?
.join()
.await?;
Ok::<_, anyhow::Error>(())
}
}
};
let ctrl_c = tokio::signal::ctrl_c();
tokio::select! {
_ = builder_api => {},
_ = node_api => (),
_ = node_run => (),
_ = ctrl_c => {},
res = builder => res.context("Critical error during block building")?,
}
builder_db.close().map_err(|e| anyhow::anyhow!("{e}"))?;
node_db.close().map_err(|e| anyhow::anyhow!("{e}"))?;
Ok(())
}
async fn run_builder(
builder_conn_pool: builder_db::ConnectionPool,
node_conn_pool: node::db::ConnectionPool,
block_tx: BlockTx,
conf: builder::Config,
block_interval: Duration,
) -> anyhow::Result<()> {
#[cfg(feature = "tracing")]
tracing::info!("Running the block builder");
#[cfg(feature = "tracing")]
tracing::debug!("Builder config:\n{:#?}", conf);
let mut interval = tokio::time::interval(block_interval);
loop {
interval.tick().await;
let (built_block_addr, _summary) =
build_block_fifo(&builder_conn_pool, &node_conn_pool, &conf).await?;
if built_block_addr.is_some() {
block_tx.notify();
}
}
}