essential_node_cli/
lib.rs1use anyhow::Context;
2use clap::{Parser, ValueEnum};
3use essential_node::{self as node, RunConfig};
4use essential_node_api as node_api;
5use essential_node_types::{block_notify::BlockTx, BigBang};
6use std::{
7 net::{SocketAddr, SocketAddrV4},
8 path::{Path, PathBuf},
9};
10
11#[cfg(test)]
12mod tests;
13
14#[derive(Parser, Clone)]
16#[command(version, about)]
17pub struct Args {
18 #[arg(long, default_value_t = SocketAddrV4::new([0; 4].into(), 0).into())]
20 bind_address: SocketAddr,
21 #[arg(long)]
28 relayer_source_endpoint: Option<String>,
29 #[arg(long)]
31 disable_validation: bool,
32 #[arg(long, default_value_t = Db::Memory, value_enum)]
36 db: Db,
37 #[arg(long)]
43 db_path: Option<PathBuf>,
44 #[arg(long, default_value_t = node::db::pool::Config::default_conn_limit())]
48 api_db_conn_limit: usize,
49 #[arg(long, default_value_t = node::db::pool::Config::default_conn_limit())]
57 node_db_conn_limit: usize,
58 #[arg(long)]
60 disable_tracing: bool,
61 #[arg(long, default_value_t = node_api::DEFAULT_CONNECTION_LIMIT)]
63 tcp_conn_limit: usize,
64 #[arg(long)]
73 big_bang: Option<std::path::PathBuf>,
74}
75
76#[derive(ValueEnum, Clone, Copy, Debug)]
77enum Db {
78 Memory,
80 Persistent,
84}
85
86fn default_db_path() -> Option<PathBuf> {
88 dirs::data_dir().map(|mut path| {
89 path.extend(["essential", "node", "db.sqlite"]);
90 path
91 })
92}
93
94fn node_db_conf_from_args(args: &Args) -> anyhow::Result<node::db::pool::Config> {
96 let source = match (&args.db, &args.db_path) {
97 (Db::Memory, None) => node::db::pool::Source::default_memory(),
98 (_, Some(path)) => node::db::pool::Source::Path(path.clone()),
99 (Db::Persistent, None) => {
100 let Some(path) = default_db_path() else {
101 anyhow::bail!("unable to detect user's data directory for default DB path")
102 };
103 node::db::pool::Source::Path(path)
104 }
105 };
106 let conn_limit = args.node_db_conn_limit;
107 let config = node::db::pool::Config::new(source, conn_limit);
108 Ok(config)
109}
110
111#[cfg(feature = "tracing")]
112fn init_tracing_subscriber() {
113 let _ = tracing_subscriber::fmt()
114 .with_env_filter(
115 tracing_subscriber::EnvFilter::builder()
116 .with_default_directive(tracing_subscriber::filter::LevelFilter::INFO.into())
117 .from_env_lossy(),
118 )
119 .try_init();
120}
121
122fn load_big_bang_or_default(path: Option<&Path>) -> anyhow::Result<BigBang> {
125 match path {
126 None => Ok(BigBang::default()),
127 Some(path) => {
128 let big_bang_str = std::fs::read_to_string(path)
129 .context("failed to read big bang configuration from path")?;
130 serde_yaml::from_str(&big_bang_str)
131 .context("failed to deserialize big bang configuration from YAML string")
132 }
133 }
134}
135pub async fn run(args: Args) -> anyhow::Result<()> {
137 if !args.disable_tracing {
139 #[cfg(feature = "tracing")]
140 init_tracing_subscriber()
141 }
142
143 let node_db_conf = node_db_conf_from_args(&args)?;
145 #[cfg(feature = "tracing")]
146 {
147 tracing::debug!("Node DB config:\n{:#?}", node_db_conf);
148 tracing::info!("Starting node");
149 }
150 let node_db = node::db::ConnectionPool::with_tables(&node_db_conf)?;
151
152 let big_bang = load_big_bang_or_default(args.big_bang.as_deref())?;
154 node::ensure_big_bang_block(&node_db, &big_bang)
155 .await
156 .context("failed to ensure big bang block")?;
157
158 let Args {
160 relayer_source_endpoint,
161 disable_validation,
162 ..
163 } = args;
164
165 #[cfg(feature = "tracing")]
166 tracing::info!(
167 "Starting {}{}",
168 if disable_validation {
169 "".to_string()
170 } else {
171 "validation".to_string()
172 },
173 if let Some(node_endpoint) = relayer_source_endpoint.as_ref() {
174 format!(
175 "{}relayer (relaying from {:?})",
176 if disable_validation { "" } else { " and " },
177 node_endpoint,
178 )
179 } else {
180 "".to_string()
181 }
182 );
183
184 let block_tx = BlockTx::new();
185 let block_rx = block_tx.new_listener();
186
187 let run_conf = RunConfig {
188 relayer_source_endpoint: relayer_source_endpoint.clone(),
189 run_validation: !disable_validation,
190 };
191 let node_handle = node::run(
192 node_db.clone(),
193 run_conf,
194 big_bang.contract_registry.contract,
195 big_bang.program_registry.contract,
196 block_tx,
197 )?;
198 let node_future = async move {
199 if relayer_source_endpoint.is_none() && disable_validation {
200 std::future::pending().await
201 } else {
202 let r = node_handle.join().await;
203 if r.is_ok() && relayer_source_endpoint.is_none() {
204 #[cfg(feature = "tracing")]
205 tracing::info!("Node has completed all streams and is now idle");
206 std::future::pending().await
207 }
208 r
209 }
210 };
211
212 let api_db_conf = node::db::pool::Config {
214 conn_limit: args.api_db_conn_limit,
215 ..node_db_conf
216 };
217 #[cfg(feature = "tracing")]
218 tracing::debug!("API DB config:\n{:#?}", api_db_conf);
219 let api_db = node::db::ConnectionPool::with_tables(&api_db_conf)?;
220 let api_state = node_api::State {
221 new_block: Some(block_rx),
222 conn_pool: api_db.clone(),
223 };
224 let router = node_api::router(api_state);
225 let listener = tokio::net::TcpListener::bind(args.bind_address).await?;
226 #[cfg(feature = "tracing")]
227 tracing::info!("Starting API server at {}", listener.local_addr()?);
228 let api = node_api::serve(&router, &listener, args.tcp_conn_limit);
229
230 let ctrl_c = tokio::signal::ctrl_c();
233 tokio::select! {
234 _ = api => {},
235 _ = ctrl_c => {},
236 r = node_future => {
237 if let Err(e) = r {
238 #[cfg(feature = "tracing")]
239 tracing::error!("Critical error on relayer or validation stream: {e}")
240 }
241 },
242 }
243
244 node_db.close().map_err(|e| anyhow::anyhow!("{e}"))?;
245 api_db.close().map_err(|e| anyhow::anyhow!("{e}"))?;
246 Ok(())
247}