essential_node_cli/
lib.rs

1use anyhow::Context;
2use clap::{Parser, ValueEnum};
3use essential_node::{self as node, RunConfig};
4use essential_node_api as node_api;
5use essential_node_types::{block_notify::BlockTx, BigBang};
6use std::{
7    net::{SocketAddr, SocketAddrV4},
8    path::{Path, PathBuf},
9};
10
11#[cfg(test)]
12mod tests;
13
14/// The Essential Node CLI.
15#[derive(Parser, Clone)]
16#[command(version, about)]
17pub struct Args {
18    /// The address to bind to for the TCP listener that will be used to serve the API.
19    #[arg(long, default_value_t = SocketAddrV4::new([0; 4].into(), 0).into())]
20    bind_address: SocketAddr,
21    /// The endpoint of the node that will act as the layer-1.
22    ///
23    /// If this is `None`, then the relayer stream will not run.
24    ///
25    /// Note: This will likely be replaced with an L1 RPC URL flag upon switching to
26    /// use of Ethereum (or Ethereum test-net) as an L1.
27    #[arg(long)]
28    relayer_source_endpoint: Option<String>,
29    /// Disable the validation stream.
30    #[arg(long)]
31    disable_validation: bool,
32    /// The type of DB storage to use.
33    ///
34    /// In the case that "persistent" is specified, assumes the default path.
35    #[arg(long, default_value_t = Db::Memory, value_enum)]
36    db: Db,
37    /// The path to the node's sqlite database.
38    ///
39    /// Specifying this overrides the `db` type as `persistent`.
40    ///
41    /// By default, this path will be within the user's data directory.
42    #[arg(long)]
43    db_path: Option<PathBuf>,
44    /// The number of simultaneous sqlite DB connections to maintain for serving the API.
45    ///
46    /// By default, this is the number of available CPUs multiplied by 4.
47    #[arg(long, default_value_t = node::db::pool::Config::default_conn_limit())]
48    api_db_conn_limit: usize,
49    /// The number of simultaneous sqlite DB connections to maintain for the node's relayer and
50    /// validation streams.
51    ///
52    /// This is unique from the API connection limit in order to ensure that the node's relayer and
53    /// validation streams have high-priority DB connection access.
54    ///
55    /// By default, this is the number of available CPUs multiplied by 4.
56    #[arg(long, default_value_t = node::db::pool::Config::default_conn_limit())]
57    node_db_conn_limit: usize,
58    /// Disable the tracing subscriber.
59    #[arg(long)]
60    disable_tracing: bool,
61    /// The maximum number of TCP streams to be served simultaneously.
62    #[arg(long, default_value_t = node_api::DEFAULT_CONNECTION_LIMIT)]
63    tcp_conn_limit: usize,
64    /// Specify a path to the `big-bang.yml` configuration.
65    ///
66    /// This specifies the genesis configuration, which includes items like the contract registry
67    /// address, block state address and associated big-bang state.
68    ///
69    /// If no configuration is specified, defaults to the `BigBang::default()` implementation.
70    ///
71    /// To learn more, see the API docs for the `essential_node_types::BigBang` type.
72    #[arg(long)]
73    big_bang: Option<std::path::PathBuf>,
74}
75
76#[derive(ValueEnum, Clone, Copy, Debug)]
77enum Db {
78    /// Temporary, in-memory storage that lasts for the duration of the process.
79    Memory,
80    /// Persistent storage on the local HDD or SSD.
81    ///
82    /// The DB path may be specified with `--db-path`.
83    Persistent,
84}
85
86// TODO: Lift this into the node lib?
87fn default_db_path() -> Option<PathBuf> {
88    dirs::data_dir().map(|mut path| {
89        path.extend(["essential", "node", "db.sqlite"]);
90        path
91    })
92}
93
94/// Construct the node's DB config from the parsed args.
95fn node_db_conf_from_args(args: &Args) -> anyhow::Result<node::db::pool::Config> {
96    let source = match (&args.db, &args.db_path) {
97        (Db::Memory, None) => node::db::pool::Source::default_memory(),
98        (_, Some(path)) => node::db::pool::Source::Path(path.clone()),
99        (Db::Persistent, None) => {
100            let Some(path) = default_db_path() else {
101                anyhow::bail!("unable to detect user's data directory for default DB path")
102            };
103            node::db::pool::Source::Path(path)
104        }
105    };
106    let conn_limit = args.node_db_conn_limit;
107    let config = node::db::pool::Config::new(source, conn_limit);
108    Ok(config)
109}
110
111#[cfg(feature = "tracing")]
112fn init_tracing_subscriber() {
113    let _ = tracing_subscriber::fmt()
114        .with_env_filter(
115            tracing_subscriber::EnvFilter::builder()
116                .with_default_directive(tracing_subscriber::filter::LevelFilter::INFO.into())
117                .from_env_lossy(),
118        )
119        .try_init();
120}
121
122/// Load the big bang configuration from the yml file at the given path, or produce the default if
123/// no path is given.
124fn load_big_bang_or_default(path: Option<&Path>) -> anyhow::Result<BigBang> {
125    match path {
126        None => Ok(BigBang::default()),
127        Some(path) => {
128            let big_bang_str = std::fs::read_to_string(path)
129                .context("failed to read big bang configuration from path")?;
130            serde_yaml::from_str(&big_bang_str)
131                .context("failed to deserialize big bang configuration from YAML string")
132        }
133    }
134}
135/// Run the essential node.
136pub async fn run(args: Args) -> anyhow::Result<()> {
137    // Initialise tracing.
138    if !args.disable_tracing {
139        #[cfg(feature = "tracing")]
140        init_tracing_subscriber()
141    }
142
143    // Start the node.
144    let node_db_conf = node_db_conf_from_args(&args)?;
145    #[cfg(feature = "tracing")]
146    {
147        tracing::debug!("Node DB config:\n{:#?}", node_db_conf);
148        tracing::info!("Starting node");
149    }
150    let node_db = node::db::ConnectionPool::with_tables(&node_db_conf)?;
151
152    // Load the big bang configuration, and ensure the big bang block exists.
153    let big_bang = load_big_bang_or_default(args.big_bang.as_deref())?;
154    node::ensure_big_bang_block(&node_db, &big_bang)
155        .await
156        .context("failed to ensure big bang block")?;
157
158    // Run the node with specified config.
159    let Args {
160        relayer_source_endpoint,
161        disable_validation,
162        ..
163    } = args;
164
165    #[cfg(feature = "tracing")]
166    tracing::info!(
167        "Starting {}{}",
168        if disable_validation {
169            "".to_string()
170        } else {
171            "validation".to_string()
172        },
173        if let Some(node_endpoint) = relayer_source_endpoint.as_ref() {
174            format!(
175                "{}relayer (relaying from {:?})",
176                if disable_validation { "" } else { " and " },
177                node_endpoint,
178            )
179        } else {
180            "".to_string()
181        }
182    );
183
184    let block_tx = BlockTx::new();
185    let block_rx = block_tx.new_listener();
186
187    let run_conf = RunConfig {
188        relayer_source_endpoint: relayer_source_endpoint.clone(),
189        run_validation: !disable_validation,
190    };
191    let node_handle = node::run(
192        node_db.clone(),
193        run_conf,
194        big_bang.contract_registry.contract,
195        big_bang.program_registry.contract,
196        block_tx,
197    )?;
198    let node_future = async move {
199        if relayer_source_endpoint.is_none() && disable_validation {
200            std::future::pending().await
201        } else {
202            let r = node_handle.join().await;
203            if r.is_ok() && relayer_source_endpoint.is_none() {
204                #[cfg(feature = "tracing")]
205                tracing::info!("Node has completed all streams and is now idle");
206                std::future::pending().await
207            }
208            r
209        }
210    };
211
212    // Run the API with its own connection pool.
213    let api_db_conf = node::db::pool::Config {
214        conn_limit: args.api_db_conn_limit,
215        ..node_db_conf
216    };
217    #[cfg(feature = "tracing")]
218    tracing::debug!("API DB config:\n{:#?}", api_db_conf);
219    let api_db = node::db::ConnectionPool::with_tables(&api_db_conf)?;
220    let api_state = node_api::State {
221        new_block: Some(block_rx),
222        conn_pool: api_db.clone(),
223    };
224    let router = node_api::router(api_state);
225    let listener = tokio::net::TcpListener::bind(args.bind_address).await?;
226    #[cfg(feature = "tracing")]
227    tracing::info!("Starting API server at {}", listener.local_addr()?);
228    let api = node_api::serve(&router, &listener, args.tcp_conn_limit);
229
230    // Select the first future to complete to close.
231    // TODO: We should select over relayer / validation critical error here.
232    let ctrl_c = tokio::signal::ctrl_c();
233    tokio::select! {
234        _ = api => {},
235        _ = ctrl_c => {},
236        r = node_future => {
237            if let Err(e) = r {
238                #[cfg(feature = "tracing")]
239                tracing::error!("Critical error on relayer or validation stream: {e}")
240            }
241        },
242    }
243
244    node_db.close().map_err(|e| anyhow::anyhow!("{e}"))?;
245    api_db.close().map_err(|e| anyhow::anyhow!("{e}"))?;
246    Ok(())
247}