essential_node_cli/
lib.rsuse anyhow::Context;
use clap::{Parser, ValueEnum};
use essential_node::{self as node, RunConfig};
use essential_node_api as node_api;
use essential_node_types::{block_notify::BlockTx, BigBang};
use std::{
net::{SocketAddr, SocketAddrV4},
path::{Path, PathBuf},
};
#[cfg(test)]
mod tests;
#[derive(Parser, Clone)]
#[command(version, about)]
pub struct Args {
#[arg(long, default_value_t = SocketAddrV4::new([0; 4].into(), 0).into())]
bind_address: SocketAddr,
#[arg(long)]
relayer_source_endpoint: Option<String>,
#[arg(long)]
disable_validation: bool,
#[arg(long, default_value_t = Db::Memory, value_enum)]
db: Db,
#[arg(long)]
db_path: Option<PathBuf>,
#[arg(long, default_value_t = node::db::pool::Config::default_conn_limit())]
api_db_conn_limit: usize,
#[arg(long, default_value_t = node::db::pool::Config::default_conn_limit())]
node_db_conn_limit: usize,
#[arg(long)]
disable_tracing: bool,
#[arg(long, default_value_t = node_api::DEFAULT_CONNECTION_LIMIT)]
tcp_conn_limit: usize,
#[arg(long)]
big_bang: Option<std::path::PathBuf>,
}
#[derive(ValueEnum, Clone, Copy, Debug)]
enum Db {
Memory,
Persistent,
}
fn default_db_path() -> Option<PathBuf> {
dirs::data_dir().map(|mut path| {
path.extend(["essential", "node", "db.sqlite"]);
path
})
}
fn node_db_conf_from_args(args: &Args) -> anyhow::Result<node::db::pool::Config> {
let source = match (&args.db, &args.db_path) {
(Db::Memory, None) => node::db::pool::Source::default_memory(),
(_, Some(path)) => node::db::pool::Source::Path(path.clone()),
(Db::Persistent, None) => {
let Some(path) = default_db_path() else {
anyhow::bail!("unable to detect user's data directory for default DB path")
};
node::db::pool::Source::Path(path)
}
};
let conn_limit = args.node_db_conn_limit;
let config = node::db::pool::Config::new(source, conn_limit);
Ok(config)
}
#[cfg(feature = "tracing")]
fn init_tracing_subscriber() {
let _ = tracing_subscriber::fmt()
.with_env_filter(
tracing_subscriber::EnvFilter::builder()
.with_default_directive(tracing_subscriber::filter::LevelFilter::INFO.into())
.from_env_lossy(),
)
.try_init();
}
fn load_big_bang_or_default(path: Option<&Path>) -> anyhow::Result<BigBang> {
match path {
None => Ok(BigBang::default()),
Some(path) => {
let big_bang_str = std::fs::read_to_string(path)
.context("failed to read big bang configuration from path")?;
serde_yaml::from_str(&big_bang_str)
.context("failed to deserialize big bang configuration from YAML string")
}
}
}
pub async fn run(args: Args) -> anyhow::Result<()> {
if !args.disable_tracing {
#[cfg(feature = "tracing")]
init_tracing_subscriber()
}
let node_db_conf = node_db_conf_from_args(&args)?;
#[cfg(feature = "tracing")]
{
tracing::debug!("Node DB config:\n{:#?}", node_db_conf);
tracing::info!("Starting node");
}
let node_db = node::db::ConnectionPool::with_tables(&node_db_conf)?;
let big_bang = load_big_bang_or_default(args.big_bang.as_deref())?;
node::ensure_big_bang_block(&node_db, &big_bang)
.await
.context("failed to ensure big bang block")?;
let Args {
relayer_source_endpoint,
disable_validation,
..
} = args;
#[cfg(feature = "tracing")]
tracing::info!(
"Starting {}{}",
if disable_validation {
"".to_string()
} else {
"validation".to_string()
},
if let Some(node_endpoint) = relayer_source_endpoint.as_ref() {
format!(
"{}relayer (relaying from {:?})",
if disable_validation { "" } else { " and " },
node_endpoint,
)
} else {
"".to_string()
}
);
let block_tx = BlockTx::new();
let block_rx = block_tx.new_listener();
let run_conf = RunConfig {
relayer_source_endpoint: relayer_source_endpoint.clone(),
run_validation: !disable_validation,
};
let node_handle = node::run(
node_db.clone(),
run_conf,
big_bang.contract_registry.contract,
block_tx,
)?;
let node_future = async move {
if relayer_source_endpoint.is_none() && disable_validation {
std::future::pending().await
} else {
let r = node_handle.join().await;
if r.is_ok() && relayer_source_endpoint.is_none() {
#[cfg(feature = "tracing")]
tracing::info!("Node has completed all streams and is now idle");
std::future::pending().await
}
r
}
};
let api_db_conf = node::db::pool::Config {
conn_limit: args.api_db_conn_limit,
..node_db_conf
};
#[cfg(feature = "tracing")]
tracing::debug!("API DB config:\n{:#?}", api_db_conf);
let api_db = node::db::ConnectionPool::with_tables(&api_db_conf)?;
let api_state = node_api::State {
new_block: Some(block_rx),
conn_pool: api_db.clone(),
};
let router = node_api::router(api_state);
let listener = tokio::net::TcpListener::bind(args.bind_address).await?;
#[cfg(feature = "tracing")]
tracing::info!("Starting API server at {}", listener.local_addr()?);
let api = node_api::serve(&router, &listener, args.tcp_conn_limit);
let ctrl_c = tokio::signal::ctrl_c();
tokio::select! {
_ = api => {},
_ = ctrl_c => {},
r = node_future => {
if let Err(e) = r {
#[cfg(feature = "tracing")]
tracing::error!("Critical error on relayer or validation stream: {e}")
}
},
}
node_db.close().map_err(|e| anyhow::anyhow!("{e}"))?;
api_db.close().map_err(|e| anyhow::anyhow!("{e}"))?;
Ok(())
}