Skip to main content

forest/daemon/
mod.rs

1// Copyright 2019-2026 ChainSafe Systems
2// SPDX-License-Identifier: Apache-2.0, MIT
3
4pub mod bundle;
5mod context;
6pub mod db_util;
7pub mod main;
8
9use crate::blocks::Tipset;
10use crate::chain::HeadChange;
11use crate::chain::index::ResolveNullTipset;
12use crate::chain_sync::network_context::SyncNetworkContext;
13use crate::chain_sync::{ChainFollower, SyncStatus};
14use crate::cli_shared::snapshot;
15use crate::cli_shared::{
16    chain_path,
17    cli::{CliOpts, Config},
18};
19use crate::daemon::{
20    context::{AppContext, DbType},
21    db_util::import_chain_as_forest_car,
22};
23use crate::db::gc::SnapshotGarbageCollector;
24use crate::db::ttl::EthMappingCollector;
25use crate::libp2p::{Libp2pService, PeerManager};
26use crate::message_pool::{MessagePool, MpoolConfig, MpoolRpcProvider};
27use crate::networks::{self, ChainConfig};
28use crate::rpc::RPCState;
29use crate::rpc::eth::filter::EthEventHandler;
30use crate::rpc::start_rpc;
31use crate::shim::clock::ChainEpoch;
32use crate::shim::state_tree::StateTree;
33use crate::shim::version::NetworkVersion;
34use crate::utils;
35use crate::utils::misc::env::is_env_truthy;
36use crate::utils::{proofs_api::ensure_proof_params_downloaded, version::FOREST_VERSION_STRING};
37use anyhow::{Context as _, bail};
38use dialoguer::theme::ColorfulTheme;
39use futures::{Future, FutureExt};
40use std::path::Path;
41use std::sync::Arc;
42use std::sync::OnceLock;
43use std::time::Instant;
44use tokio::{
45    net::TcpListener,
46    signal::{
47        ctrl_c,
48        unix::{SignalKind, signal},
49    },
50    sync::mpsc,
51    task::JoinSet,
52};
53use tracing::{debug, info, warn};
54
55pub static GLOBAL_SNAPSHOT_GC: OnceLock<Arc<SnapshotGarbageCollector<DbType>>> = OnceLock::new();
56
57/// Increase the file descriptor limit to a reasonable number.
58/// This prevents the node from failing if the default soft limit is too low.
59/// Note that the value is only increased, never decreased.
60fn maybe_increase_fd_limit() -> anyhow::Result<()> {
61    static DESIRED_SOFT_LIMIT: u64 = 8192;
62    let (soft_before, _) = rlimit::Resource::NOFILE.get()?;
63
64    let soft_after = rlimit::increase_nofile_limit(DESIRED_SOFT_LIMIT)?;
65    if soft_before < soft_after {
66        debug!("Increased file descriptor limit from {soft_before} to {soft_after}");
67    }
68    if soft_after < DESIRED_SOFT_LIMIT {
69        warn!(
70            "File descriptor limit is too low: {soft_after} < {DESIRED_SOFT_LIMIT}. \
71            You may encounter 'too many open files' errors.",
72        );
73    }
74
75    Ok(())
76}
77
78// Start the daemon and abort if we're interrupted by ctrl-c, SIGTERM, or `forest-cli shutdown`.
79pub async fn start_interruptable(opts: CliOpts, config: Config) -> anyhow::Result<()> {
80    let start_time = chrono::Utc::now();
81    let mut terminate = signal(SignalKind::terminate())?;
82    let (shutdown_send, mut shutdown_recv) = mpsc::channel(1);
83    let result = tokio::select! {
84        ret = start(start_time, opts, config, shutdown_send) => ret,
85        _ = ctrl_c() => {
86            info!("Keyboard interrupt.");
87            Ok(())
88        },
89        _ = terminate.recv() => {
90            info!("Received SIGTERM.");
91            Ok(())
92        },
93        _ = shutdown_recv.recv() => {
94            info!("Client requested a shutdown.");
95            Ok(())
96        },
97    };
98    crate::utils::io::terminal_cleanup();
99    result
100}
101
102/// This function initialize Forest with below steps
103/// - increase file descriptor limit (for parity-db)
104/// - setup proofs parameter cache directory
105/// - prints Forest version
106fn startup_init(config: &Config) -> anyhow::Result<()> {
107    maybe_increase_fd_limit()?;
108    // Sets proof parameter file download path early, the files will be checked and
109    // downloaded later right after snapshot import step
110    crate::utils::proofs_api::maybe_set_proofs_parameter_cache_dir_env(&config.client.data_dir);
111    info!(
112        "Starting Forest daemon, version {}",
113        FOREST_VERSION_STRING.as_str()
114    );
115    Ok(())
116}
117
118async fn maybe_import_snapshot(
119    opts: &CliOpts,
120    config: &mut Config,
121    ctx: &AppContext,
122) -> anyhow::Result<()> {
123    let chain_config = ctx.state_manager.chain_config();
124    // Sets the latest snapshot if needed for downloading later
125    if config.client.snapshot_path.is_none() && !opts.stateless {
126        maybe_set_snapshot_path(
127            config,
128            chain_config,
129            ctx.state_manager.chain_store().heaviest_tipset().epoch(),
130            opts.auto_download_snapshot,
131            &ctx.db_meta_data.get_root_dir(),
132        )
133        .await?;
134    }
135
136    let snapshot_tracker = ctx.snapshot_progress_tracker.clone();
137    // Import chain if needed
138    if !opts.skip_load.unwrap_or_default()
139        && let Some(path) = &config.client.snapshot_path
140    {
141        let (car_db_path, ts) = import_chain_as_forest_car(
142            path,
143            &ctx.db_meta_data.get_forest_car_db_dir(),
144            config.client.import_mode,
145            config.client.rpc_v1_endpoint()?,
146            &crate::f3::get_f3_root(config),
147            ctx.chain_config(),
148            &snapshot_tracker,
149        )
150        .await?;
151        ctx.db
152            .read_only_files(std::iter::once(car_db_path.clone()))?;
153        let ts_epoch = ts.epoch();
154        // Explicitly set heaviest tipset here in case HEAD_KEY has already been set
155        // in the current setting store
156        ctx.state_manager.chain_store().set_heaviest_tipset(ts)?;
157        debug!(
158            "Loaded car DB at {} and set current head to epoch {ts_epoch}",
159            car_db_path.display(),
160        );
161    }
162
163    // If the snapshot progress state is not completed,
164    // set the state to not required
165    if !snapshot_tracker.is_completed() {
166        snapshot_tracker.not_required();
167    }
168
169    if let Some(validate_from) = config.client.snapshot_height {
170        // We've been provided a snapshot and asked to validate it
171        ensure_proof_params_downloaded().await?;
172        // Use the specified HEAD, otherwise take the current HEAD.
173        let current_height = config
174            .client
175            .snapshot_head
176            .unwrap_or_else(|| ctx.state_manager.chain_store().heaviest_tipset().epoch());
177        assert!(current_height.is_positive());
178        match validate_from.is_negative() {
179            // allow --height=-1000 to scroll back from the current head
180            true => ctx
181                .state_manager
182                .validate_range((current_height + validate_from)..=current_height)?,
183            false => ctx
184                .state_manager
185                .validate_range(validate_from..=current_height)?,
186        }
187    }
188
189    Ok(())
190}
191
192async fn maybe_start_metrics_service(
193    services: &mut JoinSet<anyhow::Result<()>>,
194    config: &Config,
195    ctx: &AppContext,
196) -> anyhow::Result<()> {
197    if config.client.enable_metrics_endpoint {
198        // Start Prometheus server port
199        let prometheus_listener = TcpListener::bind(config.client.metrics_address)
200            .await
201            .with_context(|| format!("could not bind to {}", config.client.metrics_address))?;
202        info!(
203            "Prometheus server started at {}",
204            config.client.metrics_address
205        );
206        let db_directory = crate::db::db_engine::db_root(&chain_path(config))?;
207        let db = ctx.db.writer().clone();
208
209        let get_chain_head_height = Arc::new({
210            // Use `Weak` to not dead lock GC.
211            let chain_store = Arc::downgrade(ctx.state_manager.chain_store());
212            move || {
213                chain_store
214                    .upgrade()
215                    .map(|cs| cs.heaviest_tipset().epoch())
216                    .unwrap_or_default()
217            }
218        });
219        let get_chain_head_actor_version = Arc::new({
220            // Use `Weak` to not dead lock GC.
221            let chain_store = Arc::downgrade(ctx.state_manager.chain_store());
222            move || {
223                if let Some(cs) = chain_store.upgrade()
224                    && let Ok(state) = StateTree::new_from_root(
225                        cs.blockstore().clone(),
226                        cs.heaviest_tipset().parent_state(),
227                    )
228                    && let Ok(bundle_meta) = state.get_actor_bundle_metadata()
229                    && let Ok(actor_version) = bundle_meta.actor_major_version()
230                {
231                    return actor_version;
232                }
233                0
234            }
235        });
236        services.spawn({
237            let chain_config = ctx.chain_config().clone();
238            let get_chain_head_height = get_chain_head_height.clone();
239            async {
240                crate::metrics::init_prometheus(
241                    prometheus_listener,
242                    db_directory,
243                    db,
244                    chain_config,
245                    get_chain_head_height,
246                    get_chain_head_actor_version,
247                )
248                .await
249                .context("Failed to initiate prometheus server")
250            }
251        });
252
253        crate::metrics::register_collector(Box::new(
254            networks::metrics::NetworkHeightCollector::new(
255                ctx.state_manager.chain_config().block_delay_secs,
256                ctx.state_manager
257                    .chain_store()
258                    .genesis_block_header()
259                    .timestamp,
260                get_chain_head_height,
261            ),
262        ));
263    }
264    Ok(())
265}
266
267async fn create_p2p_service(
268    services: &mut JoinSet<anyhow::Result<()>>,
269    config: &mut Config,
270    ctx: &AppContext,
271) -> anyhow::Result<Libp2pService<DbType>> {
272    // if bootstrap peers are not set, set them
273    if config.network.bootstrap_peers.is_empty() {
274        config.network.bootstrap_peers = ctx.state_manager.chain_config().bootstrap_peers.clone();
275    }
276
277    let peer_manager = Arc::new(PeerManager::default());
278    services.spawn(peer_manager.clone().peer_operation_event_loop_task());
279    // Libp2p service setup
280    let p2p_service = Libp2pService::new(
281        config.network.clone(),
282        Arc::clone(ctx.state_manager.chain_store()),
283        peer_manager.clone(),
284        ctx.net_keypair.clone(),
285        config.chain.genesis_name(),
286        *ctx.state_manager.chain_store().genesis_block_header().cid(),
287    )
288    .await?;
289    Ok(p2p_service)
290}
291
292fn create_mpool(
293    services: &mut JoinSet<anyhow::Result<()>>,
294    p2p_service: &Libp2pService<DbType>,
295    ctx: &AppContext,
296) -> anyhow::Result<Arc<MessagePool<MpoolRpcProvider<DbType>>>> {
297    let publisher = ctx.state_manager.chain_store().publisher();
298    let provider = MpoolRpcProvider::new(publisher.clone(), ctx.state_manager.clone());
299    Ok(MessagePool::new(
300        provider,
301        p2p_service.network_sender().clone(),
302        MpoolConfig::load_config(ctx.db.writer().as_ref())?,
303        ctx.state_manager.chain_config().clone(),
304        services,
305    )
306    .map(Arc::new)?)
307}
308
309fn create_chain_follower(
310    opts: &CliOpts,
311    p2p_service: &Libp2pService<DbType>,
312    mpool: Arc<MessagePool<MpoolRpcProvider<DbType>>>,
313    ctx: &AppContext,
314) -> anyhow::Result<ChainFollower<DbType>> {
315    let network_send = p2p_service.network_sender().clone();
316    let peer_manager = p2p_service.peer_manager().clone();
317    let network = SyncNetworkContext::new(network_send, peer_manager, ctx.db.clone());
318    let chain_follower = ChainFollower::new(
319        ctx.state_manager.clone(),
320        network,
321        Tipset::from(ctx.state_manager.chain_store().genesis_block_header()),
322        p2p_service.network_receiver(),
323        opts.stateless,
324        mpool,
325    );
326    Ok(chain_follower)
327}
328
329fn start_chain_follower_service(
330    services: &mut JoinSet<anyhow::Result<()>>,
331    chain_follower: ChainFollower<DbType>,
332) {
333    services.spawn(async move { chain_follower.run().await });
334}
335
336async fn maybe_start_health_check_service(
337    services: &mut JoinSet<anyhow::Result<()>>,
338    config: &Config,
339    p2p_service: &Libp2pService<DbType>,
340    chain_follower: &ChainFollower<DbType>,
341    ctx: &AppContext,
342) -> anyhow::Result<()> {
343    if config.client.enable_health_check {
344        let forest_state = crate::health::ForestState {
345            config: config.clone(),
346            chain_config: ctx.state_manager.chain_config().clone(),
347            genesis_timestamp: ctx
348                .state_manager
349                .chain_store()
350                .genesis_block_header()
351                .timestamp,
352            sync_status: chain_follower.sync_status.clone(),
353            peer_manager: p2p_service.peer_manager().clone(),
354        };
355        let healthcheck_address = forest_state.config.client.healthcheck_address;
356        info!("Healthcheck endpoint will listen at {healthcheck_address}");
357        let listener = tokio::net::TcpListener::bind(healthcheck_address).await?;
358        services.spawn(async move {
359            crate::health::init_healthcheck_server(forest_state, listener)
360                .await
361                .context("Failed to initiate healthcheck server")
362        });
363    } else {
364        info!("Healthcheck service is disabled");
365    }
366    Ok(())
367}
368
369#[allow(clippy::too_many_arguments)]
370fn maybe_start_rpc_service(
371    services: &mut JoinSet<anyhow::Result<()>>,
372    config: &Config,
373    mpool: Arc<MessagePool<MpoolRpcProvider<DbType>>>,
374    chain_follower: &ChainFollower<DbType>,
375    start_time: chrono::DateTime<chrono::Utc>,
376    shutdown: mpsc::Sender<()>,
377    rpc_stop_handle: jsonrpsee::server::StopHandle,
378    ctx: &AppContext,
379) -> anyhow::Result<()> {
380    if config.client.enable_rpc {
381        let rpc_address = config.client.rpc_address;
382        let filter_list = config
383            .client
384            .rpc_filter_list
385            .as_ref()
386            .map(|path| crate::rpc::FilterList::new_from_file(path))
387            .transpose()?;
388        info!("JSON-RPC endpoint will listen at {rpc_address}");
389        let eth_event_handler = Arc::new(EthEventHandler::from_config(&config.events));
390        if is_env_truthy("FOREST_JWT_DISABLE_EXP_VALIDATION") {
391            warn!(
392                "JWT expiration validation is disabled; this significantly weakens security and should only be used in tightly controlled environments"
393            );
394        }
395        services.spawn({
396            let state_manager = ctx.state_manager.clone();
397            let bad_blocks = chain_follower.bad_blocks.clone();
398            let sync_status = chain_follower.sync_status.clone();
399            let sync_network_context = chain_follower.network.clone();
400            let tipset_send = chain_follower.tipset_sender.clone();
401            let keystore = ctx.keystore.clone();
402            let snapshot_progress_tracker = ctx.snapshot_progress_tracker.clone();
403            let msgs_in_tipset = Arc::new(crate::chain::MsgsInTipsetCache::default());
404            async move {
405                let rpc_listener = tokio::net::TcpListener::bind(rpc_address)
406                    .await
407                    .map_err(|e| {
408                        anyhow::anyhow!("Unable to listen on RPC endpoint {rpc_address}: {e}")
409                    })
410                    .unwrap();
411                start_rpc(
412                    RPCState {
413                        state_manager,
414                        keystore,
415                        mpool,
416                        bad_blocks,
417                        msgs_in_tipset,
418                        sync_status,
419                        eth_event_handler,
420                        sync_network_context,
421                        start_time,
422                        shutdown,
423                        tipset_send,
424                        snapshot_progress_tracker,
425                    },
426                    rpc_listener,
427                    rpc_stop_handle,
428                    filter_list,
429                )
430                .await
431            }
432        });
433    } else {
434        debug!("RPC disabled.");
435    };
436    Ok(())
437}
438
439fn maybe_start_f3_service(opts: &CliOpts, config: &Config, ctx: &AppContext) -> anyhow::Result<()> {
440    // already running
441    if crate::rpc::f3::F3_LEASE_MANAGER.get().is_some() {
442        return Ok(());
443    }
444
445    if !config.client.enable_rpc {
446        if crate::f3::is_sidecar_ffi_enabled(ctx.state_manager.chain_config()) {
447            tracing::warn!("F3 sidecar is enabled but not run because RPC is disabled. ")
448        }
449        return Ok(());
450    }
451
452    if !opts.halt_after_import && !opts.stateless {
453        let rpc_endpoint = config.client.rpc_v1_endpoint()?;
454        let state_manager = &ctx.state_manager;
455        let p2p_peer_id = ctx.p2p_peer_id;
456        let admin_jwt = ctx.admin_jwt.clone();
457        tokio::task::spawn_blocking({
458            crate::rpc::f3::F3_LEASE_MANAGER
459                .set(crate::rpc::f3::F3LeaseManager::new(
460                    state_manager.chain_config().network.clone(),
461                    p2p_peer_id,
462                ))
463                .expect("F3 lease manager should not have been initialized before");
464            let chain_config = state_manager.chain_config().clone();
465            let f3_root = crate::f3::get_f3_root(config);
466            let crate::f3::F3Options {
467                chain_finality,
468                bootstrap_epoch,
469                initial_power_table,
470            } = crate::f3::get_f3_sidecar_params(&chain_config);
471            move || {
472                crate::f3::run_f3_sidecar_if_enabled(
473                    &chain_config,
474                    rpc_endpoint.to_string(),
475                    admin_jwt,
476                    crate::rpc::f3::get_f3_rpc_endpoint().to_string(),
477                    initial_power_table
478                        .map(|i| i.to_string())
479                        .unwrap_or_default(),
480                    bootstrap_epoch,
481                    chain_finality,
482                    f3_root.display().to_string(),
483                );
484            }
485        });
486    }
487
488    Ok(())
489}
490
491fn maybe_start_indexer_service(
492    services: &mut JoinSet<anyhow::Result<()>>,
493    opts: &CliOpts,
494    config: &Config,
495    ctx: &AppContext,
496) {
497    if config.chain_indexer.enable_indexer
498        && !opts.stateless
499        && !ctx.state_manager.chain_config().is_devnet()
500    {
501        let mut receiver = ctx.state_manager.chain_store().publisher().subscribe();
502        let chain_store = ctx.state_manager.chain_store().clone();
503        services.spawn(async move {
504            tracing::info!("Starting indexer service");
505
506            // Continuously listen for head changes
507            loop {
508                let msg = receiver.recv().await?;
509
510                let HeadChange::Apply(ts) = msg;
511                tracing::debug!("Indexing tipset {}", ts.key());
512
513                chain_store.put_tipset_key(ts.key())?;
514
515                let delegated_messages =
516                    chain_store.headers_delegated_messages(ts.block_headers().iter())?;
517
518                chain_store.process_signed_messages(&delegated_messages)?;
519            }
520        });
521
522        // Run the collector only if chain indexer is enabled
523        if let Some(retention_epochs) = config.chain_indexer.gc_retention_epochs {
524            let chain_store = ctx.state_manager.chain_store().clone();
525            let chain_config = ctx.state_manager.chain_config().clone();
526            services.spawn(async move {
527                tracing::info!("Starting collector for eth_mappings");
528                let mut collector = EthMappingCollector::new(
529                    chain_store.blockstore().clone(),
530                    chain_config.eth_chain_id,
531                    retention_epochs.into(),
532                );
533                collector.run().await
534            });
535        }
536    }
537}
538
539/// Starts daemon process
540pub(super) async fn start(
541    start_time: chrono::DateTime<chrono::Utc>,
542    opts: CliOpts,
543    config: Config,
544    shutdown_send: mpsc::Sender<()>,
545) -> anyhow::Result<()> {
546    startup_init(&config)?;
547    let (snap_gc, snap_gc_reboot_rx) = SnapshotGarbageCollector::new(&config)?;
548    let snap_gc = Arc::new(snap_gc);
549    GLOBAL_SNAPSHOT_GC
550        .set(snap_gc.clone())
551        .ok()
552        .context("failed to set GLOBAL_SNAPSHOT_GC")?;
553
554    // If the node is stateless, GC shouldn't get triggered even on demand.
555    if !opts.stateless {
556        tokio::task::spawn({
557            let snap_gc = snap_gc.clone();
558            async move { snap_gc.event_loop().await }
559        });
560    }
561    // GC shouldn't run periodically if the node is stateless or if the user has disabled it.
562    if !opts.no_gc && !opts.stateless {
563        tokio::task::spawn({
564            let snap_gc = snap_gc.clone();
565            async move { snap_gc.scheduler_loop().await }
566        });
567    }
568    loop {
569        let (rpc_stop_handle, rpc_server_handle) = jsonrpsee::server::stop_channel();
570        tokio::select! {
571            _ = snap_gc_reboot_rx.recv_async() => {
572                // gracefully shutdown RPC server
573                if let Err(e) = rpc_server_handle.stop() {
574                    tracing::warn!("failed to stop RPC server: {e}");
575                }
576                snap_gc.cleanup_before_reboot().await;
577            }
578            result = start_services(start_time, &opts, config.clone(), shutdown_send.clone(), rpc_stop_handle, |ctx, sync_status| {
579                snap_gc.set_db(ctx.db.clone());
580                snap_gc.set_sync_status(sync_status);
581                snap_gc.set_car_db_head_epoch(ctx.db.heaviest_tipset().map(|ts|ts.epoch()).unwrap_or_default());
582            }) => {
583                break result
584            }
585        }
586    }
587}
588
589pub(super) async fn start_services(
590    start_time: chrono::DateTime<chrono::Utc>,
591    opts: &CliOpts,
592    mut config: Config,
593    shutdown_send: mpsc::Sender<()>,
594    rpc_stop_handle: jsonrpsee::server::StopHandle,
595    on_app_context_and_db_initialized: impl FnOnce(&AppContext, SyncStatus),
596) -> anyhow::Result<()> {
597    // Cleanup the collector prometheus metrics registry on start
598    crate::metrics::reset_collector_registry();
599    let mut services = JoinSet::new();
600    let network = config.chain();
601    let ctx = AppContext::init(opts, &config).await?;
602    info!("Using network :: {network}");
603    utils::misc::display_chain_logo(config.chain());
604    if opts.exit_after_init {
605        return Ok(());
606    }
607    if !opts.stateless
608        && !opts.skip_load_actors
609        && let Err(e) = ctx.state_manager.maybe_rewind_heaviest_tipset()
610    {
611        tracing::warn!("error in maybe_rewind_heaviest_tipset: {e}");
612    }
613    let p2p_service = create_p2p_service(&mut services, &mut config, &ctx).await?;
614    let mpool = create_mpool(&mut services, &p2p_service, &ctx)?;
615    let chain_follower = create_chain_follower(opts, &p2p_service, mpool.clone(), &ctx)?;
616
617    maybe_start_rpc_service(
618        &mut services,
619        &config,
620        mpool.clone(),
621        &chain_follower,
622        start_time,
623        shutdown_send.clone(),
624        rpc_stop_handle,
625        &ctx,
626    )?;
627
628    maybe_import_snapshot(opts, &mut config, &ctx).await?;
629    if opts.halt_after_import {
630        // Cancel all async services
631        services.shutdown().await;
632        return Ok(());
633    }
634    on_app_context_and_db_initialized(&ctx, chain_follower.sync_status.clone());
635    warmup_in_background(&ctx);
636    ctx.state_manager.populate_cache();
637    maybe_start_metrics_service(&mut services, &config, &ctx).await?;
638    maybe_start_f3_service(opts, &config, &ctx)?;
639    maybe_start_health_check_service(&mut services, &config, &p2p_service, &chain_follower, &ctx)
640        .await?;
641    maybe_start_indexer_service(&mut services, opts, &config, &ctx);
642    if !opts.stateless {
643        ensure_proof_params_downloaded().await?;
644    }
645    services.spawn(p2p_service.run());
646    start_chain_follower_service(&mut services, chain_follower);
647    // blocking until any of the services returns an error,
648    propagate_error(&mut services)
649        .await
650        .context("services failure")
651        .map(|_| {})
652}
653
654fn warmup_in_background(ctx: &AppContext) {
655    // Populate `tipset_by_height` cache
656    let cs = ctx.chain_store().clone();
657    tokio::task::spawn_blocking(move || {
658        let start = Instant::now();
659        match cs.chain_index().tipset_by_height(
660            // 0 would short-circuit the cache
661            1,
662            cs.heaviest_tipset(),
663            ResolveNullTipset::TakeOlder,
664        ) {
665            Ok(_) => {
666                tracing::info!(
667                    "Successfully populated tipset_by_height cache, took {}",
668                    humantime::format_duration(start.elapsed())
669                );
670            }
671            Err(e) => {
672                tracing::warn!("Failed to populate tipset_by_height cache: {e}");
673            }
674        }
675    });
676}
677
678/// If our current chain is below a supported height, we need a snapshot to bring it up
679/// to a supported height. If we've not been given a snapshot by the user, get one.
680///
681/// An [`Err`] should be considered fatal.
682async fn maybe_set_snapshot_path(
683    config: &mut Config,
684    chain_config: &ChainConfig,
685    epoch: ChainEpoch,
686    auto_download_snapshot: bool,
687    download_directory: &Path,
688) -> anyhow::Result<()> {
689    if !download_directory.is_dir() {
690        anyhow::bail!(
691            "`download_directory` does not exist: {}",
692            download_directory.display()
693        );
694    }
695
696    let vendor = snapshot::TrustedVendor::default();
697    let chain = config.chain();
698
699    // What height is our chain at right now, and what network version does that correspond to?
700    let network_version = chain_config.network_version(epoch);
701    let network_version_is_small = network_version < NetworkVersion::V16;
702
703    // We don't support small network versions (we can't validate from e.g genesis).
704    // So we need a snapshot (which will be from a recent network version)
705    let require_a_snapshot = network_version_is_small;
706    let have_a_snapshot = config.client.snapshot_path.is_some();
707
708    match (require_a_snapshot, have_a_snapshot, auto_download_snapshot) {
709        (false, _, _) => {}   // noop - don't need a snapshot
710        (true, true, _) => {} // noop - we need a snapshot, and we have one
711        (true, false, true) => {
712            let url = crate::cli_shared::snapshot::stable_url(vendor, chain)?;
713            config.client.snapshot_path = Some(url.to_string().into());
714        }
715        (true, false, false) => {
716            // we need a snapshot, don't have one, and don't have permission to download one, so ask the user
717            let (url, num_bytes, _path) = crate::cli_shared::snapshot::peek(vendor, chain)
718                .await
719                .context("couldn't get snapshot size")?;
720            // dialoguer will double-print long lines, so manually print the first clause ourselves,
721            // then let `Confirm` handle the second.
722            println!(
723                "Forest requires a snapshot to sync with the network, but automatic fetching is disabled."
724            );
725            let message = format!(
726                "Fetch a {} snapshot to the current directory? (denying will exit the program). ",
727                indicatif::HumanBytes(num_bytes)
728            );
729            let have_permission = asyncify(|| {
730                dialoguer::Confirm::with_theme(&ColorfulTheme::default())
731                    .with_prompt(message)
732                    .default(false)
733                    .interact()
734                    // e.g not a tty (or some other error), so haven't got permission.
735                    .unwrap_or(false)
736            })
737            .await;
738            if !have_permission {
739                bail!(
740                    "Forest requires a snapshot to sync with the network, but automatic fetching is disabled."
741                )
742            }
743            config.client.snapshot_path = Some(url.to_string().into());
744        }
745    };
746
747    Ok(())
748}
749
750/// returns the first error with which any of the services end, or never returns at all
751// This should return anyhow::Result<!> once the `Never` type is stabilized
752async fn propagate_error(
753    services: &mut JoinSet<Result<(), anyhow::Error>>,
754) -> anyhow::Result<std::convert::Infallible> {
755    while let Some(result) = services.join_next().await {
756        if let Ok(Err(error_message)) = result {
757            return Err(error_message);
758        }
759    }
760    std::future::pending().await
761}
762
763/// Run the closure on a thread where blocking is allowed
764///
765/// # Panics
766/// If the closure panics
767fn asyncify<T>(f: impl FnOnce() -> T + Send + 'static) -> impl Future<Output = T>
768where
769    T: Send + 'static,
770{
771    tokio::task::spawn_blocking(f).then(|res| async { res.expect("spawned task panicked") })
772}