essential_builder_cli/
lib.rs

1use anyhow::Context;
2use clap::{Parser, ValueEnum};
3use essential_builder::{self as builder, build_block_fifo};
4use essential_builder_api as builder_api;
5use essential_builder_db as builder_db;
6use essential_check::solution::CheckPredicateConfig;
7use essential_node as node;
8use essential_node_api as node_api;
9use essential_node_types::{block_notify::BlockTx, BigBang};
10use std::{
11    net::{SocketAddr, SocketAddrV4},
12    num::NonZero,
13    path::{Path, PathBuf},
14    time::Duration,
15};
16
17#[cfg(test)]
18mod tests;
19
20/// The Essential Builder CLI.
21#[derive(Parser, Clone)]
22#[command(version, about)]
23pub struct Args {
24    /// Disable the tracing subscriber.
25    #[arg(long, default_value_t = false)]
26    disable_tracing: bool,
27    /// Specify the interval at which the builder will attempt to build new blocks.
28    ///
29    /// This is a temporary parameter, mainly named at emulating the experience of waiting for the
30    /// L1 block time.
31    #[arg(long, default_value_t = DEFAULT_BLOCK_INTERVAL_MS)]
32    block_interval_ms: u32,
33    /// The maximum number of solution set failures to keep in the DB, used to provide feedback to the
34    /// submitters.
35    #[arg(long, default_value_t = builder::Config::DEFAULT_SOLUTION_SET_FAILURE_KEEP_LIMIT)]
36    solution_set_failures_to_keep: u32,
37    /// The maximum number of solution sets to attempt to check and include in a block.
38    #[arg(long, default_value_t = NonZero::new(builder::Config::DEFAULT_SOLUTION_SET_ATTEMPTS_PER_BLOCK).expect("declared const must be non-zero"))]
39    solution_set_attempts_per_block: NonZero<u32>,
40    /// The number of sequential solution sets to attempt to check in parallel at a time.
41    ///
42    /// If greater than `solution-set-attempts-per-block`, the `solution-set-attempts-per-block`
43    /// is used instead.
44    ///
45    /// If unspecified, uses `num_cpus::get()`.
46    #[arg(long, default_value_t = builder::Config::default_parallel_chunk_size())]
47    parallel_chunk_size: NonZero<usize>,
48    /// Whether or not to wait and collect all failures during solution set checking after a single
49    /// state read or constraint fails.
50    ///
51    /// Potentially useful for debugging or testing tools.
52    #[arg(long)]
53    solution_set_check_collects_all_failures: bool,
54    /// Specify a path to the `big-bang.yml` configuration.
55    ///
56    /// This specifies the genesis configuration, which includes items like the contract registry
57    /// address, block state address and associated big-bang state.
58    ///
59    /// If no configuration is specified, defaults to the `BigBang::default()` implementation.
60    ///
61    /// To learn more, see the API docs for the `essential_node_types::BigBang` type.
62    #[arg(long)]
63    big_bang: Option<std::path::PathBuf>,
64
65    // ----- builder API -----
66    /// The address to bind to for the builder API's TCP listener.
67    #[arg(long, default_value_t = SocketAddrV4::new([0; 4].into(), 0).into())]
68    builder_api_bind_address: SocketAddr,
69    /// The maximum number of builder API TCP streams to be served.
70    #[arg(long, default_value_t = builder_api::DEFAULT_CONNECTION_LIMIT)]
71    builder_api_tcp_conn_limit: usize,
72
73    // ----- node API -----
74    /// The address to bind to for the node API's TCP listener.
75    #[arg(long, default_value_t = SocketAddrV4::new([0; 4].into(), 0).into())]
76    node_api_bind_address: SocketAddr,
77    /// The maximum number of node API TCP streams to be served.
78    #[arg(long, default_value_t = node_api::DEFAULT_CONNECTION_LIMIT)]
79    node_api_tcp_conn_limit: usize,
80
81    // ----- builder DB -----
82    /// The type of builder DB storage to use.
83    ///
84    /// In the case that "persistent" is specified, assumes the default path.
85    #[arg(long, default_value_t = Db::Memory, value_enum)]
86    builder_db: Db,
87    /// The path to the builder's sqlite database.
88    ///
89    /// Specifying this overrides the `builder_db` type as `persistent`.
90    ///
91    /// By default, this path will be within the user's data directory.
92    #[arg(long)]
93    builder_db_path: Option<PathBuf>,
94    /// The number of simultaneous sqlite DB connections to maintain for serving the API.
95    ///
96    /// By default, this is the number of available CPUs multiplied by 4.
97    #[arg(long, default_value_t = builder_db::pool::Config::default_conn_limit())]
98    builder_db_conn_limit: usize,
99
100    // ----- node DB -----
101    /// The type of node DB storage to use.
102    ///
103    /// In the case that "persistent" is specified, assumes the default path.
104    #[arg(long, default_value_t = Db::Memory, value_enum)]
105    node_db: Db,
106    /// The path to the node's sqlite database.
107    ///
108    /// Specifying this overrides the `node_db` type as `persistent`.
109    ///
110    /// By default, this path will be within the user's data directory.
111    #[arg(long)]
112    node_db_path: Option<PathBuf>,
113    /// The number of simultaneous sqlite DB connections to maintain for serving the API.
114    ///
115    /// By default, this is the number of available CPUs multiplied by 4.
116    #[arg(long, default_value_t = node::db::pool::Config::default_conn_limit())]
117    node_db_conn_limit: usize,
118
119    // ----- run node -----
120    /// The endpoint of the node that will act as the layer-1 for the relayer.
121    ///
122    /// If this is `Some`, then the relayer stream will run.
123    #[arg(long)]
124    relayer_source_endpoint: Option<String>,
125    /// Run the validation stream of the node.
126    #[arg(long)]
127    validation: bool,
128}
129
130const DEFAULT_BLOCK_INTERVAL_MS: u32 = 5_000;
131
132#[derive(ValueEnum, Clone, Copy, Debug)]
133enum Db {
134    /// Temporary, in-memory storage that lasts for the duration of the process.
135    Memory,
136    /// Persistent storage on the local HDD or SSD.
137    ///
138    /// The DB path may be specified with `--db-path`.
139    Persistent,
140}
141
142// The default path to the builder's DB.
143fn default_builder_db_path() -> Option<PathBuf> {
144    dirs::data_dir().map(|mut path| {
145        path.extend(["essential", "builder", "db.sqlite3"]);
146        path
147    })
148}
149
150// The default path to the node's DB.
151fn default_node_db_path() -> Option<PathBuf> {
152    dirs::data_dir().map(|mut path| {
153        path.extend(["essential", "node", "db.sqlite3"]);
154        path
155    })
156}
157
158/// Construct the builder's DB config from the parsed args.
159fn builder_db_conf_from_args(args: &Args) -> anyhow::Result<builder_db::pool::Config> {
160    let source = match (&args.builder_db, &args.builder_db_path) {
161        (Db::Memory, None) => {
162            let id = format!("__essential-builder-db-{}", uuid::Uuid::new_v4());
163            builder_db::pool::Source::Memory(id)
164        }
165        (_, Some(path)) => builder_db::pool::Source::Path(path.clone()),
166        (Db::Persistent, None) => {
167            let Some(path) = default_builder_db_path() else {
168                anyhow::bail!("unable to detect user's data directory for default DB path")
169            };
170            builder_db::pool::Source::Path(path)
171        }
172    };
173    let conn_limit = args.builder_db_conn_limit;
174    let config = builder_db::pool::Config { source, conn_limit };
175    Ok(config)
176}
177
178/// Construct the node's DB config from the parsed args.
179fn node_db_conf_from_args(args: &Args) -> anyhow::Result<node::db::pool::Config> {
180    let source = match (&args.node_db, &args.node_db_path) {
181        (Db::Memory, None) => {
182            let id = format!("__essential-node-db-{}", uuid::Uuid::new_v4());
183            node::db::pool::Source::Memory(id)
184        }
185        (_, Some(path)) => node::db::pool::Source::Path(path.clone()),
186        (Db::Persistent, None) => {
187            let Some(path) = default_node_db_path() else {
188                anyhow::bail!("unable to detect user's data directory for default DB path")
189            };
190            node::db::pool::Source::Path(path)
191        }
192    };
193    let conn_limit = args.node_db_conn_limit;
194    let config = node::db::pool::Config { source, conn_limit };
195    Ok(config)
196}
197
198/// Construct the builder's block-building config from the parsed args.
199fn builder_conf_from_args(args: &Args, big_bang: &BigBang) -> builder::Config {
200    builder::Config {
201        solution_set_failures_to_keep: args.solution_set_failures_to_keep,
202        solution_set_attempts_per_block: args.solution_set_attempts_per_block,
203        parallel_chunk_size: args.parallel_chunk_size,
204        check: std::sync::Arc::new(CheckPredicateConfig {
205            collect_all_failures: args.solution_set_check_collects_all_failures,
206        }),
207        contract_registry: big_bang.contract_registry.clone(),
208        program_registry: big_bang.program_registry.clone(),
209        block_state: big_bang.block_state.clone(),
210    }
211}
212
213/// Construct the node's run config from the parsed args.
214fn node_run_conf_from_args(args: &Args) -> node::RunConfig {
215    node::RunConfig {
216        relayer_source_endpoint: args.relayer_source_endpoint.clone(),
217        run_validation: args.validation,
218    }
219}
220
221/// Load the big bang configuration from the yml file at the given path, or produce the default if
222/// no path is given.
223fn load_big_bang_or_default(path: Option<&Path>) -> anyhow::Result<BigBang> {
224    match path {
225        None => Ok(BigBang::default()),
226        Some(path) => {
227            let big_bang_str = std::fs::read_to_string(path)
228                .context("failed to read big bang configuration from path")?;
229            serde_yaml::from_str(&big_bang_str)
230                .context("failed to deserialize big bang configuration from YAML string")
231        }
232    }
233}
234
235#[cfg(feature = "tracing")]
236fn init_tracing_subscriber() {
237    let _ = tracing_subscriber::fmt()
238        .with_env_filter(
239            tracing_subscriber::EnvFilter::builder()
240                .with_default_directive(tracing_subscriber::filter::LevelFilter::INFO.into())
241                .from_env_lossy(),
242        )
243        .try_init();
244}
245
246/// Run the essential builder.
247pub async fn run(args: Args) -> anyhow::Result<()> {
248    // Initialise tracing.
249    if !args.disable_tracing {
250        #[cfg(feature = "tracing")]
251        init_tracing_subscriber()
252    }
253
254    // Initialize the node DB.
255    let node_db_conf = node_db_conf_from_args(&args)?;
256    #[cfg(feature = "tracing")]
257    {
258        tracing::debug!("Node DB config:\n{:#?}", node_db_conf);
259        tracing::info!("Initializing node DB");
260    }
261    let node_db = node::db::ConnectionPool::with_tables(&node_db_conf)?;
262
263    // Load the big bang configuration, and ensure the big bang block exists.
264    let big_bang = load_big_bang_or_default(args.big_bang.as_deref())?;
265    node::ensure_big_bang_block(&node_db, &big_bang)
266        .await
267        .context("failed to ensure big bang block")?;
268
269    // Run the node API.
270    let block_tx = BlockTx::new();
271    let block_rx = block_tx.new_listener();
272    let api_state = node_api::State {
273        new_block: Some(block_rx),
274        conn_pool: node_db.clone(),
275    };
276    let router = node_api::router(api_state);
277    let listener = tokio::net::TcpListener::bind(args.node_api_bind_address).await?;
278    #[cfg(feature = "tracing")]
279    tracing::info!("Starting node API server at {}", listener.local_addr()?);
280    let node_api = node_api::serve(&router, &listener, args.node_api_tcp_conn_limit);
281
282    // Initialize the builder DB.
283    let builder_db_conf = builder_db_conf_from_args(&args)?;
284    #[cfg(feature = "tracing")]
285    {
286        tracing::debug!("Builder DB config:\n{:#?}", builder_db_conf);
287        tracing::info!("Initializing builder DB");
288    }
289    let builder_db = builder_db::ConnectionPool::with_tables(&builder_db_conf)?;
290
291    // Run the builder API.
292    let api_state = builder_api::State {
293        conn_pool: builder_db.clone(),
294    };
295    let router = builder_api::router(api_state);
296    let listener = tokio::net::TcpListener::bind(args.builder_api_bind_address).await?;
297    #[cfg(feature = "tracing")]
298    tracing::info!("Starting builder API server at {}", listener.local_addr()?);
299    let builder_api = builder_api::serve(&router, &listener, args.builder_api_tcp_conn_limit);
300
301    // Run the block builder.
302    let builder_conf = builder_conf_from_args(&args, &big_bang);
303    let block_interval = Duration::from_millis(args.block_interval_ms.into());
304    let builder = run_builder(
305        builder_db.clone(),
306        node_db.clone(),
307        block_tx.clone(),
308        builder_conf,
309        block_interval,
310    );
311
312    let node_run_conf = node_run_conf_from_args(&args);
313    let node_run = {
314        let node_db = node_db.clone();
315        async move {
316            if node_run_conf.relayer_source_endpoint.is_none() && !node_run_conf.run_validation {
317                std::future::pending().await
318            } else {
319                node::run(
320                    node_db.clone(),
321                    node_run_conf,
322                    big_bang.contract_registry.contract,
323                    big_bang.program_registry.contract,
324                    block_tx,
325                )?
326                .join()
327                .await?;
328                Ok::<_, anyhow::Error>(())
329            }
330        }
331    };
332
333    // Select the first future to complete to close.
334    let ctrl_c = tokio::signal::ctrl_c();
335    tokio::select! {
336        _ = builder_api => {},
337        _ = node_api => (),
338        _ = node_run => (),
339        _ = ctrl_c => {},
340        res = builder => res.context("Critical error during block building")?,
341    }
342
343    builder_db.close().map_err(|e| anyhow::anyhow!("{e}"))?;
344    node_db.close().map_err(|e| anyhow::anyhow!("{e}"))?;
345    Ok(())
346}
347
348/// Run the block building loop forever, or until we encounter a critical error.
349async fn run_builder(
350    builder_conn_pool: builder_db::ConnectionPool,
351    node_conn_pool: node::db::ConnectionPool,
352    block_tx: BlockTx,
353    conf: builder::Config,
354    block_interval: Duration,
355) -> anyhow::Result<()> {
356    #[cfg(feature = "tracing")]
357    tracing::info!("Running the block builder");
358    #[cfg(feature = "tracing")]
359    tracing::debug!("Builder config:\n{:#?}", conf);
360    let mut interval = tokio::time::interval(block_interval);
361    loop {
362        interval.tick().await;
363        let (built_block_addr, _summary) =
364            build_block_fifo(&builder_conn_pool, &node_conn_pool, &conf).await?;
365        if built_block_addr.is_some() {
366            block_tx.notify();
367        }
368    }
369}