Skip to main content

forest/daemon/
mod.rs

1// Copyright 2019-2026 ChainSafe Systems
2// SPDX-License-Identifier: Apache-2.0, MIT
3
4pub mod bundle;
5mod context;
6pub mod db_util;
7pub mod main;
8
9use crate::blocks::Tipset;
10use crate::chain::ChainStore;
11use crate::chain::index::ResolveNullTipset;
12use crate::chain_sync::ChainFollower;
13use crate::chain_sync::network_context::SyncNetworkContext;
14use crate::cli_shared::snapshot;
15use crate::cli_shared::{
16    chain_path,
17    cli::{CliOpts, Config},
18};
19use crate::daemon::{
20    context::{AppContext, DbType},
21    db_util::import_chain_as_forest_car,
22};
23use crate::db::gc::SnapshotGarbageCollector;
24use crate::db::ttl::EthMappingCollector;
25use crate::libp2p::{Libp2pService, PeerManager};
26use crate::message_pool::{MessagePool, MpoolConfig, MpoolLocker, NonceTracker};
27use crate::networks::{self, ChainConfig};
28use crate::rpc::RPCState;
29use crate::rpc::eth::filter::EthEventHandler;
30use crate::rpc::start_rpc;
31use crate::shim::clock::ChainEpoch;
32use crate::shim::state_tree::StateTree;
33use crate::shim::version::NetworkVersion;
34use crate::utils::misc::env::is_env_truthy;
35use crate::utils::{self, ShallowClone as _};
36use crate::utils::{proofs_api::ensure_proof_params_downloaded, version::FOREST_VERSION_STRING};
37use anyhow::{Context as _, bail};
38use backon::{ExponentialBuilder, Retryable};
39use dialoguer::theme::ColorfulTheme;
40use futures::{Future, FutureExt};
41use std::path::Path;
42use std::sync::Arc;
43use std::sync::OnceLock;
44use std::time::{Duration, Instant};
45use tokio::sync::broadcast::error::RecvError;
46use tokio::{
47    signal::{
48        ctrl_c,
49        unix::{SignalKind, signal},
50    },
51    sync::mpsc,
52    task::JoinSet,
53};
54use tracing::{debug, info, warn};
55
56pub static GLOBAL_SNAPSHOT_GC: OnceLock<Arc<SnapshotGarbageCollector<DbType>>> = OnceLock::new();
57
58/// Increase the file descriptor limit to a reasonable number.
59/// This prevents the node from failing if the default soft limit is too low.
60/// Note that the value is only increased, never decreased.
61fn maybe_increase_fd_limit() -> anyhow::Result<()> {
62    static DESIRED_SOFT_LIMIT: u64 = 8192;
63    let (soft_before, _) = rlimit::Resource::NOFILE.get()?;
64
65    let soft_after = rlimit::increase_nofile_limit(DESIRED_SOFT_LIMIT)?;
66    if soft_before < soft_after {
67        debug!("Increased file descriptor limit from {soft_before} to {soft_after}");
68    }
69    if soft_after < DESIRED_SOFT_LIMIT {
70        warn!(
71            "File descriptor limit is too low: {soft_after} < {DESIRED_SOFT_LIMIT}. \
72            You may encounter 'too many open files' errors.",
73        );
74    }
75
76    Ok(())
77}
78
79// Start the daemon and abort if we're interrupted by ctrl-c, SIGTERM, or `forest-cli shutdown`.
80pub async fn start_interruptable(opts: CliOpts, config: Config) -> anyhow::Result<()> {
81    let start_time = chrono::Utc::now();
82    let mut terminate = signal(SignalKind::terminate())?;
83    let (shutdown_send, mut shutdown_recv) = mpsc::channel(1);
84    let (rpc_stop_handle, rpc_server_handle) = jsonrpsee::server::stop_channel();
85    let result = tokio::select! {
86        ret = start(start_time, opts, config, shutdown_send, rpc_stop_handle) => ret,
87        _ = ctrl_c() => {
88            info!("Keyboard interrupt.");
89            Ok(())
90        },
91        _ = terminate.recv() => {
92            info!("Received SIGTERM.");
93            Ok(())
94        },
95        _ = shutdown_recv.recv() => {
96            info!("Client requested a shutdown.");
97            Ok(())
98        },
99    };
100    _ = rpc_server_handle.stop();
101    crate::utils::io::terminal_cleanup();
102    result
103}
104
105/// This function initialize Forest with below steps
106/// - increase file descriptor limit (for parity-db)
107/// - setup proofs parameter cache directory
108/// - prints Forest version
109fn startup_init(config: &Config) -> anyhow::Result<()> {
110    maybe_increase_fd_limit()?;
111    // Sets proof parameter file download path early, the files will be checked and
112    // downloaded later right after snapshot import step
113    crate::utils::proofs_api::maybe_set_proofs_parameter_cache_dir_env(&config.client.data_dir);
114    info!(
115        "Starting Forest daemon, version {}",
116        FOREST_VERSION_STRING.as_str()
117    );
118    Ok(())
119}
120
121async fn maybe_import_snapshot(
122    opts: &CliOpts,
123    config: &mut Config,
124    ctx: &AppContext,
125) -> anyhow::Result<()> {
126    let chain_config = ctx.state_manager.chain_config();
127    // Sets the latest snapshot if needed for downloading later
128    if config.client.snapshot_path.is_none() && !opts.stateless {
129        maybe_set_snapshot_path(
130            config,
131            chain_config,
132            ctx.state_manager.chain_store().heaviest_tipset().epoch(),
133            opts.auto_download_snapshot,
134            &ctx.db_meta_data.get_root_dir(),
135        )
136        .await?;
137    }
138
139    let snapshot_tracker = ctx.snapshot_progress_tracker.clone();
140    // Import chain if needed
141    if !opts.skip_load.unwrap_or_default()
142        && let Some(path) = &config.client.snapshot_path
143    {
144        let (car_db_path, ts) = import_chain_as_forest_car(
145            path,
146            &ctx.db_meta_data.get_forest_car_db_dir(),
147            config.client.import_mode,
148            config.client.rpc_v1_endpoint()?,
149            &crate::f3::get_f3_root(config),
150            ctx.chain_config(),
151            &snapshot_tracker,
152        )
153        .await?;
154        ctx.db
155            .read_only_files(std::iter::once(car_db_path.clone()))?;
156        let ts_epoch = ts.epoch();
157        // Explicitly set heaviest tipset here in case HEAD_KEY has already been set
158        // in the current setting store
159        ctx.state_manager.chain_store().set_heaviest_tipset(ts)?;
160        debug!(
161            "Loaded car DB at {} and set current head to epoch {ts_epoch}",
162            car_db_path.display(),
163        );
164    }
165
166    // If the snapshot progress state is not completed,
167    // set the state to not required
168    if !snapshot_tracker.is_completed() {
169        snapshot_tracker.not_required();
170    }
171
172    if let Some(validate_from) = config.client.snapshot_height {
173        // We've been provided a snapshot and asked to validate it
174        ensure_proof_params_downloaded().await?;
175        // Use the specified HEAD, otherwise take the current HEAD.
176        let current_height = config
177            .client
178            .snapshot_head
179            .unwrap_or_else(|| ctx.state_manager.chain_store().heaviest_tipset().epoch());
180
181        let validation_range = validation_range(current_height, validate_from)?;
182        // `validate_range` is CPU-bound (drives rayon-parallel VM execution) and
183        // can run for minutes. Safer to spawn it on a blocking thread.
184        let state_manager = ctx.state_manager.clone();
185        tokio::task::spawn_blocking(move || state_manager.validate_range(validation_range))
186            .await??;
187    }
188
189    Ok(())
190}
191
192/// Returns the range of epochs to validate. This includes special handling for negative `from`
193/// values, which are interpreted as offsets from the current epoch.
194fn validation_range(
195    current: ChainEpoch,
196    from: ChainEpoch,
197) -> anyhow::Result<std::ops::RangeInclusive<ChainEpoch>> {
198    anyhow::ensure!(
199        current.is_positive(),
200        "current head epoch {current} is invalid"
201    );
202
203    // Negative values scroll back from the current head (e.g. --height=-1000).
204    // `saturating_add` + `.max(0)` keeps extreme negatives from underflowing or
205    // wrapping to a huge positive (which would silently produce an empty range).
206    let start = if from.is_negative() {
207        current.saturating_add(from).max(0)
208    } else {
209        from
210    };
211
212    // An absolute `--height` past the head would otherwise produce an empty
213    // range and silently succeed without validating anything.
214    anyhow::ensure!(
215        start <= current,
216        "requested validation start epoch {start} is beyond the current head at epoch {current}",
217    );
218
219    Ok(start..=current)
220}
221
222async fn maybe_start_metrics_service(
223    services: &mut JoinSet<anyhow::Result<()>>,
224    config: &Config,
225    ctx: &AppContext,
226) -> anyhow::Result<()> {
227    if config.client.enable_metrics_endpoint {
228        let prometheus_listener =
229            crate::utils::net::bind_tcp_listener(config.client.metrics_address, 0).await?;
230        info!(
231            "Prometheus server started at {}",
232            config.client.metrics_address
233        );
234        let db_directory = crate::db::db_engine::db_root(&chain_path(config))?;
235        let db = ctx.db.writer().clone();
236
237        let get_chain_head_height = Arc::new({
238            // Use `Weak` to not dead lock GC.
239            let chain_store = Arc::downgrade(ctx.state_manager.chain_store());
240            move || {
241                chain_store
242                    .upgrade()
243                    .map(|cs| cs.heaviest_tipset().epoch())
244                    .unwrap_or_default()
245            }
246        });
247        let get_chain_head_actor_version = Arc::new({
248            // Use `Weak` to not dead lock GC.
249            let chain_store = Arc::downgrade(ctx.state_manager.chain_store());
250            move || {
251                if let Some(cs) = chain_store.upgrade()
252                    && let Ok(state) = StateTree::new_from_root(
253                        cs.blockstore().clone(),
254                        cs.heaviest_tipset().parent_state(),
255                    )
256                    && let Ok(bundle_meta) = state.get_actor_bundle_metadata()
257                    && let Ok(actor_version) = bundle_meta.actor_major_version()
258                {
259                    return actor_version;
260                }
261                0
262            }
263        });
264        services.spawn({
265            let chain_config = ctx.chain_config().clone();
266            let get_chain_head_height = get_chain_head_height.clone();
267            async {
268                crate::metrics::init_prometheus(
269                    prometheus_listener,
270                    db_directory,
271                    db,
272                    chain_config,
273                    get_chain_head_height,
274                    get_chain_head_actor_version,
275                )
276                .await
277                .context("Failed to initiate prometheus server")
278            }
279        });
280
281        crate::metrics::register_collector(Box::new(
282            networks::metrics::NetworkHeightCollector::new(
283                ctx.state_manager.chain_config().block_delay_secs,
284                ctx.state_manager
285                    .chain_store()
286                    .genesis_block_header()
287                    .timestamp,
288                get_chain_head_height,
289            ),
290        ));
291    }
292    Ok(())
293}
294
295async fn create_p2p_service(
296    services: &mut JoinSet<anyhow::Result<()>>,
297    config: &mut Config,
298    ctx: &AppContext,
299) -> anyhow::Result<Libp2pService<DbType>> {
300    // if bootstrap peers are not set, set them
301    if config.network.bootstrap_peers.is_empty() {
302        config.network.bootstrap_peers = ctx.state_manager.chain_config().bootstrap_peers.clone();
303    }
304
305    let peer_manager = Arc::new(PeerManager::default());
306    services.spawn(peer_manager.clone().peer_operation_event_loop_task());
307    // Libp2p service setup
308    let p2p_service = Libp2pService::new(
309        config.network.clone(),
310        Arc::clone(ctx.state_manager.chain_store()),
311        peer_manager.clone(),
312        ctx.net_keypair.clone(),
313        config.chain.genesis_name(),
314        *ctx.state_manager.chain_store().genesis_block_header().cid(),
315    )
316    .await?;
317    Ok(p2p_service)
318}
319
320fn create_mpool(
321    services: &mut JoinSet<anyhow::Result<()>>,
322    p2p_service: &Libp2pService<DbType>,
323    ctx: &AppContext,
324) -> anyhow::Result<Arc<MessagePool<Arc<ChainStore<DbType>>>>> {
325    Ok(MessagePool::new(
326        ctx.state_manager.chain_store().clone(),
327        p2p_service.network_sender().clone(),
328        MpoolConfig::load_config(ctx.db.writer().as_ref())?,
329        ctx.state_manager.chain_config().clone(),
330        services,
331    )
332    .map(Arc::new)?)
333}
334
335fn create_chain_follower(
336    opts: &CliOpts,
337    p2p_service: &Libp2pService<DbType>,
338    mpool: Arc<MessagePool<Arc<ChainStore<DbType>>>>,
339    ctx: &AppContext,
340) -> anyhow::Result<Arc<ChainFollower<DbType>>> {
341    let network_send = p2p_service.network_sender().clone();
342    let peer_manager = p2p_service.peer_manager().clone();
343    let network = SyncNetworkContext::new(network_send, peer_manager, ctx.db.clone());
344    Ok(Arc::new(ChainFollower::new(
345        ctx.state_manager.clone(),
346        network,
347        Tipset::from(ctx.state_manager.chain_store().genesis_block_header()),
348        p2p_service.network_receiver(),
349        opts.stateless,
350        mpool,
351    )))
352}
353
354fn start_chain_follower_service(
355    services: &mut JoinSet<anyhow::Result<()>>,
356    chain_follower: Arc<ChainFollower<DbType>>,
357) {
358    services.spawn(async move { chain_follower.run().await });
359}
360
361async fn maybe_start_health_check_service(
362    services: &mut JoinSet<anyhow::Result<()>>,
363    config: &Config,
364    p2p_service: &Libp2pService<DbType>,
365    chain_follower: &ChainFollower<DbType>,
366    ctx: &AppContext,
367) -> anyhow::Result<()> {
368    if config.client.enable_health_check {
369        let forest_state = crate::health::ForestState {
370            config: config.clone(),
371            chain_config: ctx.state_manager.chain_config().clone(),
372            genesis_timestamp: ctx
373                .state_manager
374                .chain_store()
375                .genesis_block_header()
376                .timestamp,
377            sync_status: chain_follower.sync_status.clone(),
378            peer_manager: p2p_service.peer_manager().clone(),
379        };
380        let healthcheck_address = forest_state.config.client.healthcheck_address;
381        info!("Healthcheck endpoint will listen at {healthcheck_address}");
382        let listener = crate::utils::net::bind_tcp_listener(healthcheck_address, 0).await?;
383        services.spawn(async move {
384            crate::health::init_healthcheck_server(forest_state, listener)
385                .await
386                .context("Failed to initiate healthcheck server")
387        });
388    } else {
389        info!("Healthcheck service is disabled");
390    }
391    Ok(())
392}
393
394fn maybe_start_gc_service(
395    services: &mut JoinSet<anyhow::Result<()>>,
396    opts: &CliOpts,
397    config: &Config,
398    chain_follower: Arc<ChainFollower<DbType>>,
399) -> anyhow::Result<()> {
400    // If the node is stateless, GC shouldn't get triggered even on demand.
401    if opts.stateless {
402        return Ok(());
403    }
404
405    let snap_gc = Arc::new(SnapshotGarbageCollector::new(chain_follower, config)?);
406
407    GLOBAL_SNAPSHOT_GC
408        .set(snap_gc.clone())
409        .ok()
410        .context("failed to set GLOBAL_SNAPSHOT_GC")?;
411
412    services.spawn({
413        let snap_gc = snap_gc.clone();
414        async move {
415            snap_gc.event_loop().await;
416            Ok(())
417        }
418    });
419
420    // GC shouldn't run periodically if the node is stateless or if the user has disabled it.
421    if !opts.no_gc {
422        services.spawn({
423            let snap_gc = snap_gc.clone();
424            async move {
425                snap_gc.scheduler_loop().await;
426                Ok(())
427            }
428        });
429    }
430
431    Ok(())
432}
433
434#[allow(clippy::too_many_arguments)]
435fn maybe_start_rpc_service(
436    services: &mut JoinSet<anyhow::Result<()>>,
437    config: &Config,
438    mpool: Arc<MessagePool<Arc<ChainStore<DbType>>>>,
439    chain_follower: &ChainFollower<DbType>,
440    start_time: chrono::DateTime<chrono::Utc>,
441    shutdown: mpsc::Sender<()>,
442    rpc_stop_handle: jsonrpsee::server::StopHandle,
443    ctx: &AppContext,
444) -> anyhow::Result<()> {
445    if config.client.enable_rpc {
446        let rpc_address = config.client.rpc_address;
447        let filter_list = config
448            .client
449            .rpc_filter_list
450            .as_ref()
451            .map(|path| crate::rpc::FilterList::new_from_file(path))
452            .transpose()?;
453        info!("JSON-RPC endpoint will listen at {rpc_address}");
454        let eth_event_handler = Arc::new(EthEventHandler::from_config(&config.events));
455        if is_env_truthy("FOREST_JWT_DISABLE_EXP_VALIDATION") {
456            warn!(
457                "JWT expiration validation is disabled; this significantly weakens security and should only be used in tightly controlled environments"
458            );
459        }
460        services.spawn({
461            let state_manager = ctx.state_manager.shallow_clone();
462            let bad_blocks = chain_follower.bad_blocks.shallow_clone();
463            let sync_status = chain_follower.sync_status.shallow_clone();
464            let sync_network_context = chain_follower.network.shallow_clone();
465            let tipset_send = chain_follower.tipset_sender.clone();
466            let keystore = ctx.keystore.shallow_clone();
467            let snapshot_progress_tracker = ctx.snapshot_progress_tracker.clone();
468            let nonce_tracker = NonceTracker::new();
469            let mpool_locker = MpoolLocker::new();
470            let temp_dir = Arc::new(ctx.temp_dir.clone());
471            async move {
472                let rpc_listener = crate::utils::net::bind_tcp_listener(
473                    rpc_address,
474                    crate::rpc::default_max_connections(),
475                )
476                .await?;
477                start_rpc(
478                    RPCState {
479                        state_manager,
480                        keystore,
481                        mpool,
482                        bad_blocks,
483                        sync_status,
484                        eth_event_handler,
485                        sync_network_context,
486                        start_time,
487                        shutdown,
488                        tipset_send,
489                        snapshot_progress_tracker,
490                        mpool_locker,
491                        nonce_tracker,
492                        temp_dir,
493                    },
494                    rpc_listener,
495                    rpc_stop_handle,
496                    filter_list,
497                )
498                .await
499            }
500        });
501    } else {
502        debug!("RPC disabled.");
503    };
504    Ok(())
505}
506
507fn maybe_start_f3_service(opts: &CliOpts, config: &Config, ctx: &AppContext) -> anyhow::Result<()> {
508    // already running
509    if crate::rpc::f3::F3_LEASE_MANAGER.get().is_some() {
510        return Ok(());
511    }
512
513    if !config.client.enable_rpc {
514        if crate::f3::is_sidecar_ffi_enabled(ctx.state_manager.chain_config()) {
515            tracing::warn!("F3 sidecar is enabled but not run because RPC is disabled. ")
516        }
517        return Ok(());
518    }
519
520    if !opts.halt_after_import && !opts.stateless {
521        let rpc_endpoint = config.client.rpc_v1_endpoint()?;
522        let state_manager = &ctx.state_manager;
523        let p2p_peer_id = ctx.p2p_peer_id;
524        let admin_jwt = ctx.admin_jwt.clone();
525        tokio::task::spawn_blocking({
526            crate::rpc::f3::F3_LEASE_MANAGER
527                .set(crate::rpc::f3::F3LeaseManager::new(
528                    state_manager.chain_config().network.clone(),
529                    p2p_peer_id,
530                ))
531                .expect("F3 lease manager should not have been initialized before");
532            let chain_config = state_manager.chain_config().clone();
533            let f3_root = crate::f3::get_f3_root(config);
534            let crate::f3::F3Options {
535                chain_finality,
536                bootstrap_epoch,
537                initial_power_table,
538            } = crate::f3::get_f3_sidecar_params(&chain_config);
539            move || {
540                crate::f3::run_f3_sidecar_if_enabled(
541                    &chain_config,
542                    rpc_endpoint.to_string(),
543                    admin_jwt,
544                    crate::rpc::f3::get_f3_rpc_endpoint().to_string(),
545                    initial_power_table
546                        .map(|i| i.to_string())
547                        .unwrap_or_default(),
548                    bootstrap_epoch,
549                    chain_finality,
550                    f3_root.display().to_string(),
551                );
552            }
553        });
554        tokio::task::spawn({
555            let chain_store = ctx.chain_store().clone();
556            async move {
557                // wait 1s to let F3 RPC server start
558                tokio::time::sleep(Duration::from_secs(1)).await;
559                match (|| crate::rpc::f3::F3GetLatestCertificate::get())
560                    .retry(ExponentialBuilder::default())
561                    .await
562                {
563                    Ok(f3_finalized_cert) => {
564                        let f3_finalized_head = f3_finalized_cert.chain_head();
565                        match chain_store
566                            .chain_index()
567                            .load_required_tipset(&f3_finalized_head.key)
568                        {
569                            Ok(ts) => {
570                                chain_store.set_f3_finalized_tipset(ts);
571                                tracing::info!(
572                                    "Set F3 finalized tipset to epoch {} and key {}",
573                                    f3_finalized_head.epoch,
574                                    f3_finalized_head.key,
575                                );
576                            }
577                            Err(e) => {
578                                tracing::error!(
579                                    "Failed to get F3 finalized tipset epoch {} and key {}: {e}",
580                                    f3_finalized_head.epoch,
581                                    f3_finalized_head.key
582                                );
583                            }
584                        }
585                    }
586                    Err(e) => {
587                        tracing::error!("Failed to get F3 latest certificate: {e:#}");
588                    }
589                }
590            }
591        });
592    }
593
594    Ok(())
595}
596
597fn maybe_start_indexer_service(
598    services: &mut JoinSet<anyhow::Result<()>>,
599    opts: &CliOpts,
600    config: &Config,
601    ctx: &AppContext,
602) {
603    if config.chain_indexer.enable_indexer
604        && !opts.stateless
605        && !ctx.state_manager.chain_config().is_devnet()
606    {
607        let mut head_changes_rx = ctx.state_manager.chain_store().subscribe_head_changes();
608        let chain_store = ctx.state_manager.chain_store().clone();
609        services.spawn(async move {
610            tracing::info!("Starting indexer service");
611
612            // Continuously listen for head changes
613            loop {
614                match head_changes_rx.recv().await {
615                    Ok(changes) => {
616                        for ts in changes.applies {
617                            tracing::debug!("Indexing tipset {}", ts.key());
618                            let delegated_messages = chain_store
619                                .headers_delegated_messages(ts.block_headers().iter())?;
620                            chain_store.process_signed_messages(&delegated_messages)?;
621                        }
622                    }
623                    Err(RecvError::Lagged(n)) => {
624                        warn!("indexer service lagged: skipping {n} events")
625                    }
626                    Err(RecvError::Closed) => break Ok(()),
627                }
628            }
629        });
630
631        // Run the collector only if chain indexer is enabled
632        if let Some(retention_epochs) = config.chain_indexer.gc_retention_epochs {
633            let chain_store = ctx.state_manager.chain_store().clone();
634            let chain_config = ctx.state_manager.chain_config().clone();
635            services.spawn(async move {
636                tracing::info!("Starting collector for eth_mappings");
637                let mut collector = EthMappingCollector::new(
638                    chain_store.blockstore().clone(),
639                    chain_config.eth_chain_id,
640                    retention_epochs.into(),
641                );
642                collector.run().await
643            });
644        }
645    }
646}
647
648/// Starts daemon process
649pub(super) async fn start(
650    start_time: chrono::DateTime<chrono::Utc>,
651    opts: CliOpts,
652    config: Config,
653    shutdown_send: mpsc::Sender<()>,
654    rpc_stop_handle: jsonrpsee::server::StopHandle,
655) -> anyhow::Result<()> {
656    startup_init(&config)?;
657    start_services(
658        start_time,
659        &opts,
660        config.clone(),
661        shutdown_send.clone(),
662        rpc_stop_handle,
663    )
664    .await
665}
666
667pub(super) async fn start_services(
668    start_time: chrono::DateTime<chrono::Utc>,
669    opts: &CliOpts,
670    mut config: Config,
671    shutdown_send: mpsc::Sender<()>,
672    rpc_stop_handle: jsonrpsee::server::StopHandle,
673) -> anyhow::Result<()> {
674    // Cleanup the collector prometheus metrics registry on start
675    crate::metrics::reset_collector_registry();
676    let mut services = JoinSet::new();
677    let network = config.chain();
678    let ctx = AppContext::init(opts, &config).await?;
679    info!("Using network :: {network}");
680    utils::misc::display_chain_logo(config.chain());
681    if opts.exit_after_init {
682        return Ok(());
683    }
684    if !opts.stateless
685        && !opts.skip_load_actors
686        && let Err(e) = ctx.state_manager.maybe_rewind_heaviest_tipset()
687    {
688        tracing::warn!("error in maybe_rewind_heaviest_tipset: {e:#}");
689    }
690
691    let p2p_service = create_p2p_service(&mut services, &mut config, &ctx).await?;
692    let mpool = create_mpool(&mut services, &p2p_service, &ctx)?;
693    let chain_follower = create_chain_follower(opts, &p2p_service, mpool.clone(), &ctx)?;
694
695    maybe_start_rpc_service(
696        &mut services,
697        &config,
698        mpool.clone(),
699        &chain_follower,
700        start_time,
701        shutdown_send.clone(),
702        rpc_stop_handle,
703        &ctx,
704    )?;
705
706    maybe_import_snapshot(opts, &mut config, &ctx).await?;
707    if opts.halt_after_import {
708        // Cancel all async services
709        services.shutdown().await;
710        return Ok(());
711    }
712
713    warmup_in_background(&ctx);
714    maybe_start_gc_service(&mut services, opts, &config, chain_follower.clone())?;
715    maybe_start_metrics_service(&mut services, &config, &ctx).await?;
716    maybe_start_f3_service(opts, &config, &ctx)?;
717    maybe_start_health_check_service(&mut services, &config, &p2p_service, &chain_follower, &ctx)
718        .await?;
719    maybe_start_indexer_service(&mut services, opts, &config, &ctx);
720    if !opts.stateless {
721        ensure_proof_params_downloaded().await?;
722    }
723    services.spawn(p2p_service.run());
724    start_chain_follower_service(&mut services, chain_follower);
725    // blocking until any of the services returns an error,
726    propagate_error(&mut services)
727        .await
728        .context("services failure")
729        .map(|_| {})
730}
731
732fn warmup_in_background(ctx: &AppContext) {
733    // Populate `tipset_by_height` cache
734    let cs = ctx.chain_store().clone();
735    tokio::task::spawn_blocking(move || {
736        let start = Instant::now();
737        match cs.chain_index().tipset_by_height(
738            // 0 would short-circuit the cache
739            1,
740            cs.heaviest_tipset(),
741            ResolveNullTipset::TakeOlder,
742        ) {
743            Ok(_) => {
744                tracing::info!(
745                    "Successfully populated tipset_by_height cache, took {}",
746                    humantime::format_duration(start.elapsed())
747                );
748            }
749            Err(e) => {
750                tracing::warn!("Failed to populate tipset_by_height cache: {e}");
751            }
752        }
753    });
754}
755
756/// If our current chain is below a supported height, we need a snapshot to bring it up
757/// to a supported height. If we've not been given a snapshot by the user, get one.
758///
759/// An [`Err`] should be considered fatal.
760async fn maybe_set_snapshot_path(
761    config: &mut Config,
762    chain_config: &ChainConfig,
763    epoch: ChainEpoch,
764    auto_download_snapshot: bool,
765    download_directory: &Path,
766) -> anyhow::Result<()> {
767    if !download_directory.is_dir() {
768        anyhow::bail!(
769            "`download_directory` does not exist: {}",
770            download_directory.display()
771        );
772    }
773
774    let vendor = snapshot::TrustedVendor::default();
775    let chain = config.chain();
776
777    // What height is our chain at right now, and what network version does that correspond to?
778    let network_version = chain_config.network_version(epoch);
779    let network_version_is_small = network_version < NetworkVersion::V16;
780
781    // We don't support small network versions (we can't validate from e.g genesis).
782    // So we need a snapshot (which will be from a recent network version)
783    let require_a_snapshot = network_version_is_small;
784    let have_a_snapshot = config.client.snapshot_path.is_some();
785
786    match (require_a_snapshot, have_a_snapshot, auto_download_snapshot) {
787        (false, _, _) => {}   // noop - don't need a snapshot
788        (true, true, _) => {} // noop - we need a snapshot, and we have one
789        (true, false, true) => {
790            const AUTO_SNAPSHOT_PATH_ENV_KEY: &str = "FOREST_AUTO_DOWNLOAD_SNAPSHOT_PATH";
791            match std::env::var(AUTO_SNAPSHOT_PATH_ENV_KEY) {
792                Ok(path) if !path.is_empty() => {
793                    tracing::info!(
794                        "importing snapshot from {path} set by `{AUTO_SNAPSHOT_PATH_ENV_KEY}`"
795                    );
796                    config.client.snapshot_path = Some(path.into());
797                }
798                _ => {
799                    // Resolve the redirect URL to get the actual snapshot URL
800                    // This ensures all chunks download from the same snapshot even if
801                    // a new snapshot is published during the download
802                    let (resolved_url, _num_bytes, filename) =
803                        crate::cli_shared::snapshot::peek(vendor, chain).await?;
804                    tracing::info!("Downloading snapshot: {filename}");
805                    config.client.snapshot_path = Some(resolved_url.to_string().into());
806                }
807            }
808        }
809        (true, false, false) => {
810            // we need a snapshot, don't have one, and don't have permission to download one, so ask the user
811            let (url, num_bytes, filename) = crate::cli_shared::snapshot::peek(vendor, chain)
812                .await
813                .context("couldn't get snapshot size")?;
814            // dialoguer will double-print long lines, so manually print the first clause ourselves,
815            // then let `Confirm` handle the second.
816            println!(
817                "Forest requires a snapshot to sync with the network, but automatic fetching is disabled."
818            );
819            let message = format!(
820                "Fetch a {} snapshot? (denying will exit the program). ",
821                indicatif::HumanBytes(num_bytes)
822            );
823            let have_permission = asyncify(|| {
824                dialoguer::Confirm::with_theme(&ColorfulTheme::default())
825                    .with_prompt(message)
826                    .default(false)
827                    .interact()
828                    // e.g not a tty (or some other error), so haven't got permission.
829                    .unwrap_or(false)
830            })
831            .await;
832            if !have_permission {
833                bail!(
834                    "Forest requires a snapshot to sync with the network, but automatic fetching is disabled."
835                )
836            }
837            tracing::info!("Downloading snapshot: {filename}");
838            config.client.snapshot_path = Some(url.to_string().into());
839        }
840    };
841
842    Ok(())
843}
844
845/// returns the first error with which any of the services end, or never returns at all
846// This should return anyhow::Result<!> once the `Never` type is stabilized
847async fn propagate_error(
848    services: &mut JoinSet<anyhow::Result<()>>,
849) -> anyhow::Result<std::convert::Infallible> {
850    while let Some(result) = services.join_next().await {
851        if let Ok(Err(error_message)) = result {
852            return Err(error_message);
853        }
854    }
855    std::future::pending().await
856}
857
858/// Run the closure on a thread where blocking is allowed
859///
860/// # Panics
861/// If the closure panics
862fn asyncify<T>(f: impl FnOnce() -> T + Send + 'static) -> impl Future<Output = T>
863where
864    T: Send + 'static,
865{
866    tokio::task::spawn_blocking(f).then(|res| async { res.expect("spawned task panicked") })
867}
868
869#[cfg(test)]
870mod tests {
871    use rstest::rstest;
872
873    use super::*;
874
875    #[rstest]
876    #[case::current_non_positive(0, 1, anyhow::Result::Err(anyhow::anyhow!(
877        "current head epoch 0 is invalid"
878    )))]
879    #[case::current_non_positive(-1, 1, anyhow::Result::Err(anyhow::anyhow!(
880        "current head epoch 0 is invalid"
881    )))]
882    #[case::from_positive_beyond_head(10, 11, anyhow::Result::Err(anyhow::anyhow!(
883        "requested validation start epoch 11 is beyond the current head at epoch 10"
884    )))]
885    #[case::from_positive_within_range(10, 5, anyhow::Result::Ok(5..=10))]
886    #[case::from_zero(10, 0, anyhow::Result::Ok(0..=10))]
887    #[case::from_negative_within_range(10, -5, anyhow::Result::Ok(5..=10))]
888    #[case::from_negative_beyond_range(10, -15, anyhow::Result::Ok(0..=10))]
889    fn test_validation_range(
890        #[case] current: ChainEpoch,
891        #[case] from: ChainEpoch,
892        #[case] expected: anyhow::Result<std::ops::RangeInclusive<ChainEpoch>>,
893    ) {
894        let result = validation_range(current, from);
895        match expected {
896            Ok(expected_range) => {
897                assert_eq!(result.unwrap(), expected_range);
898            }
899            Err(_) => {
900                assert!(result.is_err());
901            }
902        }
903    }
904}