Skip to main content

forest/daemon/
mod.rs

1// Copyright 2019-2026 ChainSafe Systems
2// SPDX-License-Identifier: Apache-2.0, MIT
3
4pub mod bundle;
5mod context;
6pub mod db_util;
7pub mod main;
8
9use crate::blocks::Tipset;
10use crate::chain::ChainStore;
11use crate::chain::index::ResolveNullTipset;
12use crate::chain_sync::ChainFollower;
13use crate::chain_sync::network_context::SyncNetworkContext;
14use crate::cli_shared::snapshot;
15use crate::cli_shared::{
16    chain_path,
17    cli::{CliOpts, Config},
18};
19use crate::daemon::{
20    context::{AppContext, DbType},
21    db_util::import_chain_as_forest_car,
22};
23use crate::db::gc::SnapshotGarbageCollector;
24use crate::db::ttl::EthMappingCollector;
25use crate::libp2p::{Libp2pService, PeerManager};
26use crate::message_pool::{MessagePool, MpoolConfig, MpoolLocker, NonceTracker};
27use crate::networks::{self, ChainConfig};
28use crate::rpc::RPCState;
29use crate::rpc::eth::filter::EthEventHandler;
30use crate::rpc::start_rpc;
31use crate::shim::clock::ChainEpoch;
32use crate::shim::state_tree::StateTree;
33use crate::shim::version::NetworkVersion;
34use crate::utils::misc::env::is_env_truthy;
35use crate::utils::{self, ShallowClone as _};
36use crate::utils::{proofs_api::ensure_proof_params_downloaded, version::FOREST_VERSION_STRING};
37use anyhow::{Context as _, bail};
38use backon::{ExponentialBuilder, Retryable};
39use dialoguer::theme::ColorfulTheme;
40use futures::{Future, FutureExt};
41use std::path::Path;
42use std::sync::Arc;
43use std::sync::OnceLock;
44use std::time::{Duration, Instant};
45use tokio::sync::broadcast::error::RecvError;
46use tokio::{
47    net::TcpListener,
48    signal::{
49        ctrl_c,
50        unix::{SignalKind, signal},
51    },
52    sync::mpsc,
53    task::JoinSet,
54};
55use tracing::{debug, info, warn};
56
57pub static GLOBAL_SNAPSHOT_GC: OnceLock<Arc<SnapshotGarbageCollector<DbType>>> = OnceLock::new();
58
59/// Increase the file descriptor limit to a reasonable number.
60/// This prevents the node from failing if the default soft limit is too low.
61/// Note that the value is only increased, never decreased.
62fn maybe_increase_fd_limit() -> anyhow::Result<()> {
63    static DESIRED_SOFT_LIMIT: u64 = 8192;
64    let (soft_before, _) = rlimit::Resource::NOFILE.get()?;
65
66    let soft_after = rlimit::increase_nofile_limit(DESIRED_SOFT_LIMIT)?;
67    if soft_before < soft_after {
68        debug!("Increased file descriptor limit from {soft_before} to {soft_after}");
69    }
70    if soft_after < DESIRED_SOFT_LIMIT {
71        warn!(
72            "File descriptor limit is too low: {soft_after} < {DESIRED_SOFT_LIMIT}. \
73            You may encounter 'too many open files' errors.",
74        );
75    }
76
77    Ok(())
78}
79
80// Start the daemon and abort if we're interrupted by ctrl-c, SIGTERM, or `forest-cli shutdown`.
81pub async fn start_interruptable(opts: CliOpts, config: Config) -> anyhow::Result<()> {
82    let start_time = chrono::Utc::now();
83    let mut terminate = signal(SignalKind::terminate())?;
84    let (shutdown_send, mut shutdown_recv) = mpsc::channel(1);
85    let (rpc_stop_handle, rpc_server_handle) = jsonrpsee::server::stop_channel();
86    let result = tokio::select! {
87        ret = start(start_time, opts, config, shutdown_send, rpc_stop_handle) => ret,
88        _ = ctrl_c() => {
89            info!("Keyboard interrupt.");
90            Ok(())
91        },
92        _ = terminate.recv() => {
93            info!("Received SIGTERM.");
94            Ok(())
95        },
96        _ = shutdown_recv.recv() => {
97            info!("Client requested a shutdown.");
98            Ok(())
99        },
100    };
101    _ = rpc_server_handle.stop();
102    crate::utils::io::terminal_cleanup();
103    result
104}
105
106/// This function initialize Forest with below steps
107/// - increase file descriptor limit (for parity-db)
108/// - setup proofs parameter cache directory
109/// - prints Forest version
110fn startup_init(config: &Config) -> anyhow::Result<()> {
111    maybe_increase_fd_limit()?;
112    // Sets proof parameter file download path early, the files will be checked and
113    // downloaded later right after snapshot import step
114    crate::utils::proofs_api::maybe_set_proofs_parameter_cache_dir_env(&config.client.data_dir);
115    info!(
116        "Starting Forest daemon, version {}",
117        FOREST_VERSION_STRING.as_str()
118    );
119    Ok(())
120}
121
122async fn maybe_import_snapshot(
123    opts: &CliOpts,
124    config: &mut Config,
125    ctx: &AppContext,
126) -> anyhow::Result<()> {
127    let chain_config = ctx.state_manager.chain_config();
128    // Sets the latest snapshot if needed for downloading later
129    if config.client.snapshot_path.is_none() && !opts.stateless {
130        maybe_set_snapshot_path(
131            config,
132            chain_config,
133            ctx.state_manager.chain_store().heaviest_tipset().epoch(),
134            opts.auto_download_snapshot,
135            &ctx.db_meta_data.get_root_dir(),
136        )
137        .await?;
138    }
139
140    let snapshot_tracker = ctx.snapshot_progress_tracker.clone();
141    // Import chain if needed
142    if !opts.skip_load.unwrap_or_default()
143        && let Some(path) = &config.client.snapshot_path
144    {
145        let (car_db_path, ts) = import_chain_as_forest_car(
146            path,
147            &ctx.db_meta_data.get_forest_car_db_dir(),
148            config.client.import_mode,
149            config.client.rpc_v1_endpoint()?,
150            &crate::f3::get_f3_root(config),
151            ctx.chain_config(),
152            &snapshot_tracker,
153        )
154        .await?;
155        ctx.db
156            .read_only_files(std::iter::once(car_db_path.clone()))?;
157        let ts_epoch = ts.epoch();
158        // Explicitly set heaviest tipset here in case HEAD_KEY has already been set
159        // in the current setting store
160        ctx.state_manager.chain_store().set_heaviest_tipset(ts)?;
161        debug!(
162            "Loaded car DB at {} and set current head to epoch {ts_epoch}",
163            car_db_path.display(),
164        );
165    }
166
167    // If the snapshot progress state is not completed,
168    // set the state to not required
169    if !snapshot_tracker.is_completed() {
170        snapshot_tracker.not_required();
171    }
172
173    if let Some(validate_from) = config.client.snapshot_height {
174        // We've been provided a snapshot and asked to validate it
175        ensure_proof_params_downloaded().await?;
176        // Use the specified HEAD, otherwise take the current HEAD.
177        let current_height = config
178            .client
179            .snapshot_head
180            .unwrap_or_else(|| ctx.state_manager.chain_store().heaviest_tipset().epoch());
181
182        let validation_range = validation_range(current_height, validate_from)?;
183        // `validate_range` is CPU-bound (drives rayon-parallel VM execution) and
184        // can run for minutes. Safer to spawn it on a blocking thread.
185        let state_manager = ctx.state_manager.clone();
186        tokio::task::spawn_blocking(move || state_manager.validate_range(validation_range))
187            .await??;
188    }
189
190    Ok(())
191}
192
193/// Returns the range of epochs to validate. This includes special handling for negative `from`
194/// values, which are interpreted as offsets from the current epoch.
195fn validation_range(
196    current: ChainEpoch,
197    from: ChainEpoch,
198) -> anyhow::Result<std::ops::RangeInclusive<ChainEpoch>> {
199    anyhow::ensure!(
200        current.is_positive(),
201        "current head epoch {current} is invalid"
202    );
203
204    // Negative values scroll back from the current head (e.g. --height=-1000).
205    // `saturating_add` + `.max(0)` keeps extreme negatives from underflowing or
206    // wrapping to a huge positive (which would silently produce an empty range).
207    let start = if from.is_negative() {
208        current.saturating_add(from).max(0)
209    } else {
210        from
211    };
212
213    // An absolute `--height` past the head would otherwise produce an empty
214    // range and silently succeed without validating anything.
215    anyhow::ensure!(
216        start <= current,
217        "requested validation start epoch {start} is beyond the current head at epoch {current}",
218    );
219
220    Ok(start..=current)
221}
222
223async fn maybe_start_metrics_service(
224    services: &mut JoinSet<anyhow::Result<()>>,
225    config: &Config,
226    ctx: &AppContext,
227) -> anyhow::Result<()> {
228    if config.client.enable_metrics_endpoint {
229        // Start Prometheus server port
230        let prometheus_listener = TcpListener::bind(config.client.metrics_address)
231            .await
232            .with_context(|| format!("could not bind to {}", config.client.metrics_address))?;
233        info!(
234            "Prometheus server started at {}",
235            config.client.metrics_address
236        );
237        let db_directory = crate::db::db_engine::db_root(&chain_path(config))?;
238        let db = ctx.db.writer().clone();
239
240        let get_chain_head_height = Arc::new({
241            // Use `Weak` to not dead lock GC.
242            let chain_store = Arc::downgrade(ctx.state_manager.chain_store());
243            move || {
244                chain_store
245                    .upgrade()
246                    .map(|cs| cs.heaviest_tipset().epoch())
247                    .unwrap_or_default()
248            }
249        });
250        let get_chain_head_actor_version = Arc::new({
251            // Use `Weak` to not dead lock GC.
252            let chain_store = Arc::downgrade(ctx.state_manager.chain_store());
253            move || {
254                if let Some(cs) = chain_store.upgrade()
255                    && let Ok(state) = StateTree::new_from_root(
256                        cs.blockstore().clone(),
257                        cs.heaviest_tipset().parent_state(),
258                    )
259                    && let Ok(bundle_meta) = state.get_actor_bundle_metadata()
260                    && let Ok(actor_version) = bundle_meta.actor_major_version()
261                {
262                    return actor_version;
263                }
264                0
265            }
266        });
267        services.spawn({
268            let chain_config = ctx.chain_config().clone();
269            let get_chain_head_height = get_chain_head_height.clone();
270            async {
271                crate::metrics::init_prometheus(
272                    prometheus_listener,
273                    db_directory,
274                    db,
275                    chain_config,
276                    get_chain_head_height,
277                    get_chain_head_actor_version,
278                )
279                .await
280                .context("Failed to initiate prometheus server")
281            }
282        });
283
284        crate::metrics::register_collector(Box::new(
285            networks::metrics::NetworkHeightCollector::new(
286                ctx.state_manager.chain_config().block_delay_secs,
287                ctx.state_manager
288                    .chain_store()
289                    .genesis_block_header()
290                    .timestamp,
291                get_chain_head_height,
292            ),
293        ));
294    }
295    Ok(())
296}
297
298async fn create_p2p_service(
299    services: &mut JoinSet<anyhow::Result<()>>,
300    config: &mut Config,
301    ctx: &AppContext,
302) -> anyhow::Result<Libp2pService<DbType>> {
303    // if bootstrap peers are not set, set them
304    if config.network.bootstrap_peers.is_empty() {
305        config.network.bootstrap_peers = ctx.state_manager.chain_config().bootstrap_peers.clone();
306    }
307
308    let peer_manager = Arc::new(PeerManager::default());
309    services.spawn(peer_manager.clone().peer_operation_event_loop_task());
310    // Libp2p service setup
311    let p2p_service = Libp2pService::new(
312        config.network.clone(),
313        Arc::clone(ctx.state_manager.chain_store()),
314        peer_manager.clone(),
315        ctx.net_keypair.clone(),
316        config.chain.genesis_name(),
317        *ctx.state_manager.chain_store().genesis_block_header().cid(),
318    )
319    .await?;
320    Ok(p2p_service)
321}
322
323fn create_mpool(
324    services: &mut JoinSet<anyhow::Result<()>>,
325    p2p_service: &Libp2pService<DbType>,
326    ctx: &AppContext,
327) -> anyhow::Result<Arc<MessagePool<Arc<ChainStore<DbType>>>>> {
328    Ok(MessagePool::new(
329        ctx.state_manager.chain_store().clone(),
330        p2p_service.network_sender().clone(),
331        MpoolConfig::load_config(ctx.db.writer().as_ref())?,
332        ctx.state_manager.chain_config().clone(),
333        services,
334    )
335    .map(Arc::new)?)
336}
337
338fn create_chain_follower(
339    opts: &CliOpts,
340    p2p_service: &Libp2pService<DbType>,
341    mpool: Arc<MessagePool<Arc<ChainStore<DbType>>>>,
342    ctx: &AppContext,
343) -> anyhow::Result<Arc<ChainFollower<DbType>>> {
344    let network_send = p2p_service.network_sender().clone();
345    let peer_manager = p2p_service.peer_manager().clone();
346    let network = SyncNetworkContext::new(network_send, peer_manager, ctx.db.clone());
347    Ok(Arc::new(ChainFollower::new(
348        ctx.state_manager.clone(),
349        network,
350        Tipset::from(ctx.state_manager.chain_store().genesis_block_header()),
351        p2p_service.network_receiver(),
352        opts.stateless,
353        mpool,
354    )))
355}
356
357fn start_chain_follower_service(
358    services: &mut JoinSet<anyhow::Result<()>>,
359    chain_follower: Arc<ChainFollower<DbType>>,
360) {
361    services.spawn(async move { chain_follower.run().await });
362}
363
364async fn maybe_start_health_check_service(
365    services: &mut JoinSet<anyhow::Result<()>>,
366    config: &Config,
367    p2p_service: &Libp2pService<DbType>,
368    chain_follower: &ChainFollower<DbType>,
369    ctx: &AppContext,
370) -> anyhow::Result<()> {
371    if config.client.enable_health_check {
372        let forest_state = crate::health::ForestState {
373            config: config.clone(),
374            chain_config: ctx.state_manager.chain_config().clone(),
375            genesis_timestamp: ctx
376                .state_manager
377                .chain_store()
378                .genesis_block_header()
379                .timestamp,
380            sync_status: chain_follower.sync_status.clone(),
381            peer_manager: p2p_service.peer_manager().clone(),
382        };
383        let healthcheck_address = forest_state.config.client.healthcheck_address;
384        info!("Healthcheck endpoint will listen at {healthcheck_address}");
385        let listener = tokio::net::TcpListener::bind(healthcheck_address).await?;
386        services.spawn(async move {
387            crate::health::init_healthcheck_server(forest_state, listener)
388                .await
389                .context("Failed to initiate healthcheck server")
390        });
391    } else {
392        info!("Healthcheck service is disabled");
393    }
394    Ok(())
395}
396
397fn maybe_start_gc_service(
398    services: &mut JoinSet<anyhow::Result<()>>,
399    opts: &CliOpts,
400    config: &Config,
401    chain_follower: Arc<ChainFollower<DbType>>,
402) -> anyhow::Result<()> {
403    // If the node is stateless, GC shouldn't get triggered even on demand.
404    if opts.stateless {
405        return Ok(());
406    }
407
408    let snap_gc = Arc::new(SnapshotGarbageCollector::new(chain_follower, config)?);
409
410    GLOBAL_SNAPSHOT_GC
411        .set(snap_gc.clone())
412        .ok()
413        .context("failed to set GLOBAL_SNAPSHOT_GC")?;
414
415    services.spawn({
416        let snap_gc = snap_gc.clone();
417        async move {
418            snap_gc.event_loop().await;
419            Ok(())
420        }
421    });
422
423    // GC shouldn't run periodically if the node is stateless or if the user has disabled it.
424    if !opts.no_gc {
425        services.spawn({
426            let snap_gc = snap_gc.clone();
427            async move {
428                snap_gc.scheduler_loop().await;
429                Ok(())
430            }
431        });
432    }
433
434    Ok(())
435}
436
437#[allow(clippy::too_many_arguments)]
438fn maybe_start_rpc_service(
439    services: &mut JoinSet<anyhow::Result<()>>,
440    config: &Config,
441    mpool: Arc<MessagePool<Arc<ChainStore<DbType>>>>,
442    chain_follower: &ChainFollower<DbType>,
443    start_time: chrono::DateTime<chrono::Utc>,
444    shutdown: mpsc::Sender<()>,
445    rpc_stop_handle: jsonrpsee::server::StopHandle,
446    ctx: &AppContext,
447) -> anyhow::Result<()> {
448    if config.client.enable_rpc {
449        let rpc_address = config.client.rpc_address;
450        let filter_list = config
451            .client
452            .rpc_filter_list
453            .as_ref()
454            .map(|path| crate::rpc::FilterList::new_from_file(path))
455            .transpose()?;
456        info!("JSON-RPC endpoint will listen at {rpc_address}");
457        let eth_event_handler = Arc::new(EthEventHandler::from_config(&config.events));
458        if is_env_truthy("FOREST_JWT_DISABLE_EXP_VALIDATION") {
459            warn!(
460                "JWT expiration validation is disabled; this significantly weakens security and should only be used in tightly controlled environments"
461            );
462        }
463        services.spawn({
464            let state_manager = ctx.state_manager.shallow_clone();
465            let bad_blocks = chain_follower.bad_blocks.shallow_clone();
466            let sync_status = chain_follower.sync_status.shallow_clone();
467            let sync_network_context = chain_follower.network.shallow_clone();
468            let tipset_send = chain_follower.tipset_sender.clone();
469            let keystore = ctx.keystore.shallow_clone();
470            let snapshot_progress_tracker = ctx.snapshot_progress_tracker.clone();
471            let nonce_tracker = NonceTracker::new();
472            let mpool_locker = MpoolLocker::new();
473            let temp_dir = Arc::new(ctx.temp_dir.clone());
474            async move {
475                let rpc_listener = tokio::net::TcpListener::bind(rpc_address)
476                    .await
477                    .map_err(|e| {
478                        anyhow::anyhow!("Unable to listen on RPC endpoint {rpc_address}: {e}")
479                    })
480                    .unwrap();
481                start_rpc(
482                    RPCState {
483                        state_manager,
484                        keystore,
485                        mpool,
486                        bad_blocks,
487                        sync_status,
488                        eth_event_handler,
489                        sync_network_context,
490                        start_time,
491                        shutdown,
492                        tipset_send,
493                        snapshot_progress_tracker,
494                        mpool_locker,
495                        nonce_tracker,
496                        temp_dir,
497                    },
498                    rpc_listener,
499                    rpc_stop_handle,
500                    filter_list,
501                )
502                .await
503            }
504        });
505    } else {
506        debug!("RPC disabled.");
507    };
508    Ok(())
509}
510
511fn maybe_start_f3_service(opts: &CliOpts, config: &Config, ctx: &AppContext) -> anyhow::Result<()> {
512    // already running
513    if crate::rpc::f3::F3_LEASE_MANAGER.get().is_some() {
514        return Ok(());
515    }
516
517    if !config.client.enable_rpc {
518        if crate::f3::is_sidecar_ffi_enabled(ctx.state_manager.chain_config()) {
519            tracing::warn!("F3 sidecar is enabled but not run because RPC is disabled. ")
520        }
521        return Ok(());
522    }
523
524    if !opts.halt_after_import && !opts.stateless {
525        let rpc_endpoint = config.client.rpc_v1_endpoint()?;
526        let state_manager = &ctx.state_manager;
527        let p2p_peer_id = ctx.p2p_peer_id;
528        let admin_jwt = ctx.admin_jwt.clone();
529        tokio::task::spawn_blocking({
530            crate::rpc::f3::F3_LEASE_MANAGER
531                .set(crate::rpc::f3::F3LeaseManager::new(
532                    state_manager.chain_config().network.clone(),
533                    p2p_peer_id,
534                ))
535                .expect("F3 lease manager should not have been initialized before");
536            let chain_config = state_manager.chain_config().clone();
537            let f3_root = crate::f3::get_f3_root(config);
538            let crate::f3::F3Options {
539                chain_finality,
540                bootstrap_epoch,
541                initial_power_table,
542            } = crate::f3::get_f3_sidecar_params(&chain_config);
543            move || {
544                crate::f3::run_f3_sidecar_if_enabled(
545                    &chain_config,
546                    rpc_endpoint.to_string(),
547                    admin_jwt,
548                    crate::rpc::f3::get_f3_rpc_endpoint().to_string(),
549                    initial_power_table
550                        .map(|i| i.to_string())
551                        .unwrap_or_default(),
552                    bootstrap_epoch,
553                    chain_finality,
554                    f3_root.display().to_string(),
555                );
556            }
557        });
558        tokio::task::spawn({
559            let chain_store = ctx.chain_store().clone();
560            async move {
561                // wait 1s to let F3 RPC server start
562                tokio::time::sleep(Duration::from_secs(1)).await;
563                match (|| crate::rpc::f3::F3GetLatestCertificate::get())
564                    .retry(ExponentialBuilder::default())
565                    .await
566                {
567                    Ok(f3_finalized_cert) => {
568                        let f3_finalized_head = f3_finalized_cert.chain_head();
569                        match chain_store
570                            .chain_index()
571                            .load_required_tipset(&f3_finalized_head.key)
572                        {
573                            Ok(ts) => {
574                                chain_store.set_f3_finalized_tipset(ts);
575                                tracing::info!(
576                                    "Set F3 finalized tipset to epoch {} and key {}",
577                                    f3_finalized_head.epoch,
578                                    f3_finalized_head.key,
579                                );
580                            }
581                            Err(e) => {
582                                tracing::error!(
583                                    "Failed to get F3 finalized tipset epoch {} and key {}: {e}",
584                                    f3_finalized_head.epoch,
585                                    f3_finalized_head.key
586                                );
587                            }
588                        }
589                    }
590                    Err(e) => {
591                        tracing::error!("Failed to get F3 latest certificate: {e:#}");
592                    }
593                }
594            }
595        });
596    }
597
598    Ok(())
599}
600
601fn maybe_start_indexer_service(
602    services: &mut JoinSet<anyhow::Result<()>>,
603    opts: &CliOpts,
604    config: &Config,
605    ctx: &AppContext,
606) {
607    if config.chain_indexer.enable_indexer
608        && !opts.stateless
609        && !ctx.state_manager.chain_config().is_devnet()
610    {
611        let mut head_changes_rx = ctx.state_manager.chain_store().subscribe_head_changes();
612        let chain_store = ctx.state_manager.chain_store().clone();
613        services.spawn(async move {
614            tracing::info!("Starting indexer service");
615
616            // Continuously listen for head changes
617            loop {
618                match head_changes_rx.recv().await {
619                    Ok(changes) => {
620                        for ts in changes.applies {
621                            tracing::debug!("Indexing tipset {}", ts.key());
622                            let delegated_messages = chain_store
623                                .headers_delegated_messages(ts.block_headers().iter())?;
624                            chain_store.process_signed_messages(&delegated_messages)?;
625                        }
626                    }
627                    Err(RecvError::Lagged(n)) => {
628                        warn!("indexer service lagged: skipping {n} events")
629                    }
630                    Err(RecvError::Closed) => break Ok(()),
631                }
632            }
633        });
634
635        // Run the collector only if chain indexer is enabled
636        if let Some(retention_epochs) = config.chain_indexer.gc_retention_epochs {
637            let chain_store = ctx.state_manager.chain_store().clone();
638            let chain_config = ctx.state_manager.chain_config().clone();
639            services.spawn(async move {
640                tracing::info!("Starting collector for eth_mappings");
641                let mut collector = EthMappingCollector::new(
642                    chain_store.blockstore().clone(),
643                    chain_config.eth_chain_id,
644                    retention_epochs.into(),
645                );
646                collector.run().await
647            });
648        }
649    }
650}
651
652/// Starts daemon process
653pub(super) async fn start(
654    start_time: chrono::DateTime<chrono::Utc>,
655    opts: CliOpts,
656    config: Config,
657    shutdown_send: mpsc::Sender<()>,
658    rpc_stop_handle: jsonrpsee::server::StopHandle,
659) -> anyhow::Result<()> {
660    startup_init(&config)?;
661    start_services(
662        start_time,
663        &opts,
664        config.clone(),
665        shutdown_send.clone(),
666        rpc_stop_handle,
667    )
668    .await
669}
670
671pub(super) async fn start_services(
672    start_time: chrono::DateTime<chrono::Utc>,
673    opts: &CliOpts,
674    mut config: Config,
675    shutdown_send: mpsc::Sender<()>,
676    rpc_stop_handle: jsonrpsee::server::StopHandle,
677) -> anyhow::Result<()> {
678    // Cleanup the collector prometheus metrics registry on start
679    crate::metrics::reset_collector_registry();
680    let mut services = JoinSet::new();
681    let network = config.chain();
682    let ctx = AppContext::init(opts, &config).await?;
683    info!("Using network :: {network}");
684    utils::misc::display_chain_logo(config.chain());
685    if opts.exit_after_init {
686        return Ok(());
687    }
688    if !opts.stateless
689        && !opts.skip_load_actors
690        && let Err(e) = ctx.state_manager.maybe_rewind_heaviest_tipset()
691    {
692        tracing::warn!("error in maybe_rewind_heaviest_tipset: {e:#}");
693    }
694
695    let p2p_service = create_p2p_service(&mut services, &mut config, &ctx).await?;
696    let mpool = create_mpool(&mut services, &p2p_service, &ctx)?;
697    let chain_follower = create_chain_follower(opts, &p2p_service, mpool.clone(), &ctx)?;
698
699    maybe_start_rpc_service(
700        &mut services,
701        &config,
702        mpool.clone(),
703        &chain_follower,
704        start_time,
705        shutdown_send.clone(),
706        rpc_stop_handle,
707        &ctx,
708    )?;
709
710    maybe_import_snapshot(opts, &mut config, &ctx).await?;
711    if opts.halt_after_import {
712        // Cancel all async services
713        services.shutdown().await;
714        return Ok(());
715    }
716
717    warmup_in_background(&ctx);
718    maybe_start_gc_service(&mut services, opts, &config, chain_follower.clone())?;
719    maybe_start_metrics_service(&mut services, &config, &ctx).await?;
720    maybe_start_f3_service(opts, &config, &ctx)?;
721    maybe_start_health_check_service(&mut services, &config, &p2p_service, &chain_follower, &ctx)
722        .await?;
723    maybe_start_indexer_service(&mut services, opts, &config, &ctx);
724    if !opts.stateless {
725        ensure_proof_params_downloaded().await?;
726    }
727    services.spawn(p2p_service.run());
728    start_chain_follower_service(&mut services, chain_follower);
729    // blocking until any of the services returns an error,
730    propagate_error(&mut services)
731        .await
732        .context("services failure")
733        .map(|_| {})
734}
735
736fn warmup_in_background(ctx: &AppContext) {
737    // Populate `tipset_by_height` cache
738    let cs = ctx.chain_store().clone();
739    tokio::task::spawn_blocking(move || {
740        let start = Instant::now();
741        match cs.chain_index().tipset_by_height(
742            // 0 would short-circuit the cache
743            1,
744            cs.heaviest_tipset(),
745            ResolveNullTipset::TakeOlder,
746        ) {
747            Ok(_) => {
748                tracing::info!(
749                    "Successfully populated tipset_by_height cache, took {}",
750                    humantime::format_duration(start.elapsed())
751                );
752            }
753            Err(e) => {
754                tracing::warn!("Failed to populate tipset_by_height cache: {e}");
755            }
756        }
757    });
758}
759
760/// If our current chain is below a supported height, we need a snapshot to bring it up
761/// to a supported height. If we've not been given a snapshot by the user, get one.
762///
763/// An [`Err`] should be considered fatal.
764async fn maybe_set_snapshot_path(
765    config: &mut Config,
766    chain_config: &ChainConfig,
767    epoch: ChainEpoch,
768    auto_download_snapshot: bool,
769    download_directory: &Path,
770) -> anyhow::Result<()> {
771    if !download_directory.is_dir() {
772        anyhow::bail!(
773            "`download_directory` does not exist: {}",
774            download_directory.display()
775        );
776    }
777
778    let vendor = snapshot::TrustedVendor::default();
779    let chain = config.chain();
780
781    // What height is our chain at right now, and what network version does that correspond to?
782    let network_version = chain_config.network_version(epoch);
783    let network_version_is_small = network_version < NetworkVersion::V16;
784
785    // We don't support small network versions (we can't validate from e.g genesis).
786    // So we need a snapshot (which will be from a recent network version)
787    let require_a_snapshot = network_version_is_small;
788    let have_a_snapshot = config.client.snapshot_path.is_some();
789
790    match (require_a_snapshot, have_a_snapshot, auto_download_snapshot) {
791        (false, _, _) => {}   // noop - don't need a snapshot
792        (true, true, _) => {} // noop - we need a snapshot, and we have one
793        (true, false, true) => {
794            const AUTO_SNAPSHOT_PATH_ENV_KEY: &str = "FOREST_AUTO_DOWNLOAD_SNAPSHOT_PATH";
795            match std::env::var(AUTO_SNAPSHOT_PATH_ENV_KEY) {
796                Ok(path) if !path.is_empty() => {
797                    tracing::info!(
798                        "importing snapshot from {path} set by `{AUTO_SNAPSHOT_PATH_ENV_KEY}`"
799                    );
800                    config.client.snapshot_path = Some(path.into());
801                }
802                _ => {
803                    // Resolve the redirect URL to get the actual snapshot URL
804                    // This ensures all chunks download from the same snapshot even if
805                    // a new snapshot is published during the download
806                    let (resolved_url, _num_bytes, filename) =
807                        crate::cli_shared::snapshot::peek(vendor, chain).await?;
808                    tracing::info!("Downloading snapshot: {filename}");
809                    config.client.snapshot_path = Some(resolved_url.to_string().into());
810                }
811            }
812        }
813        (true, false, false) => {
814            // we need a snapshot, don't have one, and don't have permission to download one, so ask the user
815            let (url, num_bytes, filename) = crate::cli_shared::snapshot::peek(vendor, chain)
816                .await
817                .context("couldn't get snapshot size")?;
818            // dialoguer will double-print long lines, so manually print the first clause ourselves,
819            // then let `Confirm` handle the second.
820            println!(
821                "Forest requires a snapshot to sync with the network, but automatic fetching is disabled."
822            );
823            let message = format!(
824                "Fetch a {} snapshot? (denying will exit the program). ",
825                indicatif::HumanBytes(num_bytes)
826            );
827            let have_permission = asyncify(|| {
828                dialoguer::Confirm::with_theme(&ColorfulTheme::default())
829                    .with_prompt(message)
830                    .default(false)
831                    .interact()
832                    // e.g not a tty (or some other error), so haven't got permission.
833                    .unwrap_or(false)
834            })
835            .await;
836            if !have_permission {
837                bail!(
838                    "Forest requires a snapshot to sync with the network, but automatic fetching is disabled."
839                )
840            }
841            tracing::info!("Downloading snapshot: {filename}");
842            config.client.snapshot_path = Some(url.to_string().into());
843        }
844    };
845
846    Ok(())
847}
848
849/// returns the first error with which any of the services end, or never returns at all
850// This should return anyhow::Result<!> once the `Never` type is stabilized
851async fn propagate_error(
852    services: &mut JoinSet<anyhow::Result<()>>,
853) -> anyhow::Result<std::convert::Infallible> {
854    while let Some(result) = services.join_next().await {
855        if let Ok(Err(error_message)) = result {
856            return Err(error_message);
857        }
858    }
859    std::future::pending().await
860}
861
862/// Run the closure on a thread where blocking is allowed
863///
864/// # Panics
865/// If the closure panics
866fn asyncify<T>(f: impl FnOnce() -> T + Send + 'static) -> impl Future<Output = T>
867where
868    T: Send + 'static,
869{
870    tokio::task::spawn_blocking(f).then(|res| async { res.expect("spawned task panicked") })
871}
872
873#[cfg(test)]
874mod tests {
875    use rstest::rstest;
876
877    use super::*;
878
879    #[rstest]
880    #[case::current_non_positive(0, 1, anyhow::Result::Err(anyhow::anyhow!(
881        "current head epoch 0 is invalid"
882    )))]
883    #[case::current_non_positive(-1, 1, anyhow::Result::Err(anyhow::anyhow!(
884        "current head epoch 0 is invalid"
885    )))]
886    #[case::from_positive_beyond_head(10, 11, anyhow::Result::Err(anyhow::anyhow!(
887        "requested validation start epoch 11 is beyond the current head at epoch 10"
888    )))]
889    #[case::from_positive_within_range(10, 5, anyhow::Result::Ok(5..=10))]
890    #[case::from_zero(10, 0, anyhow::Result::Ok(0..=10))]
891    #[case::from_negative_within_range(10, -5, anyhow::Result::Ok(5..=10))]
892    #[case::from_negative_beyond_range(10, -15, anyhow::Result::Ok(0..=10))]
893    fn test_validation_range(
894        #[case] current: ChainEpoch,
895        #[case] from: ChainEpoch,
896        #[case] expected: anyhow::Result<std::ops::RangeInclusive<ChainEpoch>>,
897    ) {
898        let result = validation_range(current, from);
899        match expected {
900            Ok(expected_range) => {
901                assert_eq!(result.unwrap(), expected_range);
902            }
903            Err(_) => {
904                assert!(result.is_err());
905            }
906        }
907    }
908}