forest/daemon/
mod.rs

1// Copyright 2019-2025 ChainSafe Systems
2// SPDX-License-Identifier: Apache-2.0, MIT
3
4pub mod bundle;
5mod context;
6pub mod db_util;
7pub mod main;
8
9use crate::blocks::Tipset;
10use crate::chain::HeadChange;
11use crate::chain::index::ResolveNullTipset;
12use crate::chain_sync::network_context::SyncNetworkContext;
13use crate::chain_sync::{ChainFollower, SyncStatus};
14use crate::cli_shared::snapshot;
15use crate::cli_shared::{
16    chain_path,
17    cli::{CliOpts, Config},
18};
19use crate::daemon::{
20    context::{AppContext, DbType},
21    db_util::import_chain_as_forest_car,
22};
23use crate::db::gc::SnapshotGarbageCollector;
24use crate::db::ttl::EthMappingCollector;
25use crate::libp2p::{Libp2pService, PeerManager};
26use crate::message_pool::{MessagePool, MpoolConfig, MpoolRpcProvider};
27use crate::networks::{self, ChainConfig};
28use crate::rpc::RPCState;
29use crate::rpc::eth::filter::EthEventHandler;
30use crate::rpc::start_rpc;
31use crate::shim::clock::ChainEpoch;
32use crate::shim::state_tree::StateTree;
33use crate::shim::version::NetworkVersion;
34use crate::utils;
35use crate::utils::misc::env::is_env_truthy;
36use crate::utils::{proofs_api::ensure_proof_params_downloaded, version::FOREST_VERSION_STRING};
37use anyhow::{Context as _, bail};
38use dialoguer::theme::ColorfulTheme;
39use futures::{Future, FutureExt, select};
40use std::path::Path;
41use std::sync::Arc;
42use std::sync::OnceLock;
43use std::time::Instant;
44use tokio::{
45    net::TcpListener,
46    signal::{
47        ctrl_c,
48        unix::{SignalKind, signal},
49    },
50    sync::mpsc,
51    task::JoinSet,
52};
53use tracing::{debug, info, warn};
54
55pub static GLOBAL_SNAPSHOT_GC: OnceLock<Arc<SnapshotGarbageCollector<DbType>>> = OnceLock::new();
56
57/// Increase the file descriptor limit to a reasonable number.
58/// This prevents the node from failing if the default soft limit is too low.
59/// Note that the value is only increased, never decreased.
60fn maybe_increase_fd_limit() -> anyhow::Result<()> {
61    static DESIRED_SOFT_LIMIT: u64 = 8192;
62    let (soft_before, _) = rlimit::Resource::NOFILE.get()?;
63
64    let soft_after = rlimit::increase_nofile_limit(DESIRED_SOFT_LIMIT)?;
65    if soft_before < soft_after {
66        debug!("Increased file descriptor limit from {soft_before} to {soft_after}");
67    }
68    if soft_after < DESIRED_SOFT_LIMIT {
69        warn!(
70            "File descriptor limit is too low: {soft_after} < {DESIRED_SOFT_LIMIT}. \
71            You may encounter 'too many open files' errors.",
72        );
73    }
74
75    Ok(())
76}
77
78// Start the daemon and abort if we're interrupted by ctrl-c, SIGTERM, or `forest-cli shutdown`.
79pub async fn start_interruptable(opts: CliOpts, config: Config) -> anyhow::Result<()> {
80    let start_time = chrono::Utc::now();
81    let mut terminate = signal(SignalKind::terminate())?;
82    let (shutdown_send, mut shutdown_recv) = mpsc::channel(1);
83
84    let result = tokio::select! {
85        ret = start(start_time, opts, config, shutdown_send) => ret,
86        _ = ctrl_c() => {
87            info!("Keyboard interrupt.");
88            Ok(())
89        },
90        _ = terminate.recv() => {
91            info!("Received SIGTERM.");
92            Ok(())
93        },
94        _ = shutdown_recv.recv() => {
95            info!("Client requested a shutdown.");
96            Ok(())
97        },
98    };
99    crate::utils::io::terminal_cleanup();
100    result
101}
102
103/// This function initialize Forest with below steps
104/// - increase file descriptor limit (for parity-db)
105/// - setup proofs parameter cache directory
106/// - prints Forest version
107fn startup_init(config: &Config) -> anyhow::Result<()> {
108    maybe_increase_fd_limit()?;
109    // Sets proof parameter file download path early, the files will be checked and
110    // downloaded later right after snapshot import step
111    crate::utils::proofs_api::maybe_set_proofs_parameter_cache_dir_env(&config.client.data_dir);
112    info!(
113        "Starting Forest daemon, version {}",
114        FOREST_VERSION_STRING.as_str()
115    );
116    Ok(())
117}
118
119async fn maybe_import_snapshot(
120    opts: &CliOpts,
121    config: &mut Config,
122    ctx: &AppContext,
123) -> anyhow::Result<()> {
124    let chain_config = ctx.state_manager.chain_config();
125    // Sets the latest snapshot if needed for downloading later
126    if config.client.snapshot_path.is_none() && !opts.stateless {
127        maybe_set_snapshot_path(
128            config,
129            chain_config,
130            ctx.state_manager.chain_store().heaviest_tipset().epoch(),
131            opts.auto_download_snapshot,
132            &ctx.db_meta_data.get_root_dir(),
133        )
134        .await?;
135    }
136
137    let snapshot_tracker = ctx.snapshot_progress_tracker.clone();
138    // Import chain if needed
139    if !opts.skip_load.unwrap_or_default()
140        && let Some(path) = &config.client.snapshot_path
141    {
142        let (car_db_path, ts) = import_chain_as_forest_car(
143            path,
144            &ctx.db_meta_data.get_forest_car_db_dir(),
145            config.client.import_mode,
146            config.client.rpc_v1_endpoint()?,
147            &crate::f3::get_f3_root(config),
148            ctx.chain_config(),
149            &snapshot_tracker,
150        )
151        .await?;
152        ctx.db
153            .read_only_files(std::iter::once(car_db_path.clone()))?;
154        let ts_epoch = ts.epoch();
155        // Explicitly set heaviest tipset here in case HEAD_KEY has already been set
156        // in the current setting store
157        ctx.state_manager
158            .chain_store()
159            .set_heaviest_tipset(ts.into())?;
160        debug!(
161            "Loaded car DB at {} and set current head to epoch {ts_epoch}",
162            car_db_path.display(),
163        );
164    }
165
166    // If the snapshot progress state is not completed,
167    // set the state to not required
168    if !snapshot_tracker.is_completed() {
169        snapshot_tracker.not_required();
170    }
171
172    if let Some(validate_from) = config.client.snapshot_height {
173        // We've been provided a snapshot and asked to validate it
174        ensure_proof_params_downloaded().await?;
175        // Use the specified HEAD, otherwise take the current HEAD.
176        let current_height = config
177            .client
178            .snapshot_head
179            .unwrap_or_else(|| ctx.state_manager.chain_store().heaviest_tipset().epoch());
180        assert!(current_height.is_positive());
181        match validate_from.is_negative() {
182            // allow --height=-1000 to scroll back from the current head
183            true => ctx
184                .state_manager
185                .validate_range((current_height + validate_from)..=current_height)?,
186            false => ctx
187                .state_manager
188                .validate_range(validate_from..=current_height)?,
189        }
190    }
191
192    Ok(())
193}
194
195async fn maybe_start_metrics_service(
196    services: &mut JoinSet<anyhow::Result<()>>,
197    config: &Config,
198    ctx: &AppContext,
199) -> anyhow::Result<()> {
200    if config.client.enable_metrics_endpoint {
201        // Start Prometheus server port
202        let prometheus_listener = TcpListener::bind(config.client.metrics_address)
203            .await
204            .with_context(|| format!("could not bind to {}", config.client.metrics_address))?;
205        info!(
206            "Prometheus server started at {}",
207            config.client.metrics_address
208        );
209        let db_directory = crate::db::db_engine::db_root(&chain_path(config))?;
210        let db = ctx.db.writer().clone();
211
212        let get_chain_head_height = Arc::new({
213            // Use `Weak` to not dead lock GC.
214            let chain_store = Arc::downgrade(ctx.state_manager.chain_store());
215            move || {
216                chain_store
217                    .upgrade()
218                    .map(|cs| cs.heaviest_tipset().epoch())
219                    .unwrap_or_default()
220            }
221        });
222        let get_chain_head_actor_version = Arc::new({
223            // Use `Weak` to not dead lock GC.
224            let chain_store = Arc::downgrade(ctx.state_manager.chain_store());
225            move || {
226                if let Some(cs) = chain_store.upgrade()
227                    && let Ok(state) = StateTree::new_from_root(
228                        cs.blockstore().clone(),
229                        cs.heaviest_tipset().parent_state(),
230                    )
231                    && let Ok(bundle_meta) = state.get_actor_bundle_metadata()
232                    && let Ok(actor_version) = bundle_meta.actor_major_version()
233                {
234                    return actor_version;
235                }
236                0
237            }
238        });
239        services.spawn({
240            let chain_config = ctx.chain_config().clone();
241            let get_chain_head_height = get_chain_head_height.clone();
242            async {
243                crate::metrics::init_prometheus(
244                    prometheus_listener,
245                    db_directory,
246                    db,
247                    chain_config,
248                    get_chain_head_height,
249                    get_chain_head_actor_version,
250                )
251                .await
252                .context("Failed to initiate prometheus server")
253            }
254        });
255
256        crate::metrics::register_collector(Box::new(
257            networks::metrics::NetworkHeightCollector::new(
258                ctx.state_manager.chain_config().block_delay_secs,
259                ctx.state_manager
260                    .chain_store()
261                    .genesis_block_header()
262                    .timestamp,
263                get_chain_head_height,
264            ),
265        ));
266    }
267    Ok(())
268}
269
270async fn create_p2p_service(
271    services: &mut JoinSet<anyhow::Result<()>>,
272    config: &mut Config,
273    ctx: &AppContext,
274) -> anyhow::Result<Libp2pService<DbType>> {
275    // if bootstrap peers are not set, set them
276    if config.network.bootstrap_peers.is_empty() {
277        config.network.bootstrap_peers = ctx.state_manager.chain_config().bootstrap_peers.clone();
278    }
279
280    let peer_manager = Arc::new(PeerManager::default());
281    services.spawn(peer_manager.clone().peer_operation_event_loop_task());
282    // Libp2p service setup
283    let p2p_service = Libp2pService::new(
284        config.network.clone(),
285        Arc::clone(ctx.state_manager.chain_store()),
286        peer_manager.clone(),
287        ctx.net_keypair.clone(),
288        config.chain.genesis_name(),
289        *ctx.state_manager.chain_store().genesis_block_header().cid(),
290    )
291    .await?;
292    Ok(p2p_service)
293}
294
295fn create_mpool(
296    services: &mut JoinSet<anyhow::Result<()>>,
297    p2p_service: &Libp2pService<DbType>,
298    ctx: &AppContext,
299) -> anyhow::Result<Arc<MessagePool<MpoolRpcProvider<DbType>>>> {
300    let publisher = ctx.state_manager.chain_store().publisher();
301    let provider = MpoolRpcProvider::new(publisher.clone(), ctx.state_manager.clone());
302    Ok(MessagePool::new(
303        provider,
304        p2p_service.network_sender().clone(),
305        MpoolConfig::load_config(ctx.db.writer().as_ref())?,
306        ctx.state_manager.chain_config().clone(),
307        services,
308    )
309    .map(Arc::new)?)
310}
311
312fn create_chain_follower(
313    opts: &CliOpts,
314    p2p_service: &Libp2pService<DbType>,
315    mpool: Arc<MessagePool<MpoolRpcProvider<DbType>>>,
316    ctx: &AppContext,
317) -> anyhow::Result<ChainFollower<DbType>> {
318    let network_send = p2p_service.network_sender().clone();
319    let peer_manager = p2p_service.peer_manager().clone();
320    let network = SyncNetworkContext::new(network_send, peer_manager, ctx.db.clone());
321    let chain_follower = ChainFollower::new(
322        ctx.state_manager.clone(),
323        network,
324        Arc::new(Tipset::from(
325            ctx.state_manager.chain_store().genesis_block_header(),
326        )),
327        p2p_service.network_receiver(),
328        opts.stateless,
329        mpool,
330    );
331    Ok(chain_follower)
332}
333
334fn start_chain_follower_service(
335    services: &mut JoinSet<anyhow::Result<()>>,
336    chain_follower: ChainFollower<DbType>,
337) {
338    services.spawn(async move { chain_follower.run().await });
339}
340
341async fn maybe_start_health_check_service(
342    services: &mut JoinSet<anyhow::Result<()>>,
343    config: &Config,
344    p2p_service: &Libp2pService<DbType>,
345    chain_follower: &ChainFollower<DbType>,
346    ctx: &AppContext,
347) -> anyhow::Result<()> {
348    if config.client.enable_health_check {
349        let forest_state = crate::health::ForestState {
350            config: config.clone(),
351            chain_config: ctx.state_manager.chain_config().clone(),
352            genesis_timestamp: ctx
353                .state_manager
354                .chain_store()
355                .genesis_block_header()
356                .timestamp,
357            sync_status: chain_follower.sync_status.clone(),
358            peer_manager: p2p_service.peer_manager().clone(),
359        };
360        let healthcheck_address = forest_state.config.client.healthcheck_address;
361        info!("Healthcheck endpoint will listen at {healthcheck_address}");
362        let listener = tokio::net::TcpListener::bind(healthcheck_address).await?;
363        services.spawn(async move {
364            crate::health::init_healthcheck_server(forest_state, listener)
365                .await
366                .context("Failed to initiate healthcheck server")
367        });
368    } else {
369        info!("Healthcheck service is disabled");
370    }
371    Ok(())
372}
373
374#[allow(clippy::too_many_arguments)]
375fn maybe_start_rpc_service(
376    services: &mut JoinSet<anyhow::Result<()>>,
377    config: &Config,
378    mpool: Arc<MessagePool<MpoolRpcProvider<DbType>>>,
379    chain_follower: &ChainFollower<DbType>,
380    start_time: chrono::DateTime<chrono::Utc>,
381    shutdown: mpsc::Sender<()>,
382    ctx: &AppContext,
383) -> anyhow::Result<()> {
384    if config.client.enable_rpc {
385        let rpc_address = config.client.rpc_address;
386        let filter_list = config
387            .client
388            .rpc_filter_list
389            .as_ref()
390            .map(|path| crate::rpc::FilterList::new_from_file(path))
391            .transpose()?;
392        info!("JSON-RPC endpoint will listen at {rpc_address}");
393        let eth_event_handler = Arc::new(EthEventHandler::from_config(&config.events));
394        if is_env_truthy("FOREST_JWT_DISABLE_EXP_VALIDATION") {
395            warn!(
396                "JWT expiration validation is disabled; this significantly weakens security and should only be used in tightly controlled environments"
397            );
398        }
399        services.spawn({
400            let state_manager = ctx.state_manager.clone();
401            let bad_blocks = chain_follower.bad_blocks.clone();
402            let sync_status = chain_follower.sync_status.clone();
403            let sync_network_context = chain_follower.network.clone();
404            let tipset_send = chain_follower.tipset_sender.clone();
405            let keystore = ctx.keystore.clone();
406            let snapshot_progress_tracker = ctx.snapshot_progress_tracker.clone();
407            let msgs_in_tipset = Arc::new(crate::chain::MsgsInTipsetCache::default());
408            async move {
409                start_rpc(
410                    RPCState {
411                        state_manager,
412                        keystore,
413                        mpool,
414                        bad_blocks,
415                        msgs_in_tipset,
416                        sync_status,
417                        eth_event_handler,
418                        sync_network_context,
419                        start_time,
420                        shutdown,
421                        tipset_send,
422                        snapshot_progress_tracker,
423                    },
424                    rpc_address,
425                    filter_list,
426                )
427                .await
428            }
429        });
430    } else {
431        debug!("RPC disabled.");
432    };
433    Ok(())
434}
435
436fn maybe_start_f3_service(opts: &CliOpts, config: &Config, ctx: &AppContext) -> anyhow::Result<()> {
437    // already running
438    if crate::rpc::f3::F3_LEASE_MANAGER.get().is_some() {
439        return Ok(());
440    }
441
442    if !config.client.enable_rpc {
443        if crate::f3::is_sidecar_ffi_enabled(ctx.state_manager.chain_config()) {
444            tracing::warn!("F3 sidecar is enabled but not run because RPC is disabled. ")
445        }
446        return Ok(());
447    }
448
449    if !opts.halt_after_import && !opts.stateless {
450        let rpc_endpoint = config.client.rpc_v1_endpoint()?;
451        let state_manager = &ctx.state_manager;
452        let p2p_peer_id = ctx.p2p_peer_id;
453        let admin_jwt = ctx.admin_jwt.clone();
454        tokio::task::spawn_blocking({
455            crate::rpc::f3::F3_LEASE_MANAGER
456                .set(crate::rpc::f3::F3LeaseManager::new(
457                    state_manager.chain_config().network.clone(),
458                    p2p_peer_id,
459                ))
460                .expect("F3 lease manager should not have been initialized before");
461            let chain_config = state_manager.chain_config().clone();
462            let f3_root = crate::f3::get_f3_root(config);
463            let crate::f3::F3Options {
464                chain_finality,
465                bootstrap_epoch,
466                initial_power_table,
467            } = crate::f3::get_f3_sidecar_params(&chain_config);
468            move || {
469                crate::f3::run_f3_sidecar_if_enabled(
470                    &chain_config,
471                    rpc_endpoint.to_string(),
472                    admin_jwt,
473                    crate::rpc::f3::get_f3_rpc_endpoint().to_string(),
474                    initial_power_table
475                        .map(|i| i.to_string())
476                        .unwrap_or_default(),
477                    bootstrap_epoch,
478                    chain_finality,
479                    f3_root.display().to_string(),
480                );
481            }
482        });
483    }
484
485    Ok(())
486}
487
488fn maybe_start_indexer_service(
489    services: &mut JoinSet<anyhow::Result<()>>,
490    opts: &CliOpts,
491    config: &Config,
492    ctx: &AppContext,
493) {
494    if config.chain_indexer.enable_indexer
495        && !opts.stateless
496        && !ctx.state_manager.chain_config().is_devnet()
497    {
498        let mut receiver = ctx.state_manager.chain_store().publisher().subscribe();
499        let chain_store = ctx.state_manager.chain_store().clone();
500        services.spawn(async move {
501            tracing::info!("Starting indexer service");
502
503            // Continuously listen for head changes
504            loop {
505                let msg = receiver.recv().await?;
506
507                let HeadChange::Apply(ts) = msg;
508                tracing::debug!("Indexing tipset {}", ts.key());
509
510                chain_store.put_tipset_key(ts.key())?;
511
512                let delegated_messages =
513                    chain_store.headers_delegated_messages(ts.block_headers().iter())?;
514
515                chain_store.process_signed_messages(&delegated_messages)?;
516            }
517        });
518
519        // Run the collector only if chain indexer is enabled
520        if let Some(retention_epochs) = config.chain_indexer.gc_retention_epochs {
521            let chain_store = ctx.state_manager.chain_store().clone();
522            let chain_config = ctx.state_manager.chain_config().clone();
523            services.spawn(async move {
524                tracing::info!("Starting collector for eth_mappings");
525                let mut collector = EthMappingCollector::new(
526                    chain_store.blockstore().clone(),
527                    chain_config.eth_chain_id,
528                    retention_epochs.into(),
529                );
530                collector.run().await
531            });
532        }
533    }
534}
535
536/// Starts daemon process
537pub(super) async fn start(
538    start_time: chrono::DateTime<chrono::Utc>,
539    opts: CliOpts,
540    config: Config,
541    shutdown_send: mpsc::Sender<()>,
542) -> anyhow::Result<()> {
543    startup_init(&config)?;
544    let (snap_gc, snap_gc_reboot_rx) = SnapshotGarbageCollector::new(&config)?;
545    let snap_gc = Arc::new(snap_gc);
546    GLOBAL_SNAPSHOT_GC
547        .set(snap_gc.clone())
548        .ok()
549        .context("failed to set GLOBAL_SNAPSHOT_GC")?;
550
551    // If the node is stateless, GC shouldn't get triggered even on demand.
552    if !opts.stateless {
553        tokio::task::spawn({
554            let snap_gc = snap_gc.clone();
555            async move { snap_gc.event_loop().await }
556        });
557    }
558    // GC shouldn't run periodically if the node is stateless or if the user has disabled it.
559    if !opts.no_gc && !opts.stateless {
560        tokio::task::spawn({
561            let snap_gc = snap_gc.clone();
562            async move { snap_gc.scheduler_loop().await }
563        });
564    }
565    loop {
566        tokio::select! {
567            _ = snap_gc_reboot_rx.recv_async() => {
568                snap_gc.cleanup_before_reboot().await;
569            }
570            result = start_services(start_time, &opts, config.clone(), shutdown_send.clone(), |ctx, sync_status| {
571                snap_gc.set_db(ctx.db.clone());
572                snap_gc.set_sync_status(sync_status);
573                snap_gc.set_car_db_head_epoch(ctx.db.heaviest_tipset().map(|ts|ts.epoch()).unwrap_or_default());
574            }) => {
575                break result
576            }
577        }
578    }
579}
580
581pub(super) async fn start_services(
582    start_time: chrono::DateTime<chrono::Utc>,
583    opts: &CliOpts,
584    mut config: Config,
585    shutdown_send: mpsc::Sender<()>,
586    on_app_context_and_db_initialized: impl FnOnce(&AppContext, SyncStatus),
587) -> anyhow::Result<()> {
588    // Cleanup the collector prometheus metrics registry on start
589    crate::metrics::reset_collector_registry();
590    let mut services = JoinSet::new();
591    let network = config.chain();
592    let ctx = AppContext::init(opts, &config).await?;
593    info!("Using network :: {network}");
594    utils::misc::display_chain_logo(config.chain());
595    if opts.exit_after_init {
596        return Ok(());
597    }
598    if !opts.stateless {
599        ctx.state_manager.maybe_rewind_heaviest_tipset()?;
600    }
601    let p2p_service = create_p2p_service(&mut services, &mut config, &ctx).await?;
602    let mpool = create_mpool(&mut services, &p2p_service, &ctx)?;
603    let chain_follower = create_chain_follower(opts, &p2p_service, mpool.clone(), &ctx)?;
604
605    maybe_start_rpc_service(
606        &mut services,
607        &config,
608        mpool.clone(),
609        &chain_follower,
610        start_time,
611        shutdown_send.clone(),
612        &ctx,
613    )?;
614
615    maybe_import_snapshot(opts, &mut config, &ctx).await?;
616    if opts.halt_after_import {
617        // Cancel all async services
618        services.shutdown().await;
619        return Ok(());
620    }
621    on_app_context_and_db_initialized(&ctx, chain_follower.sync_status.clone());
622    warmup_in_background(&ctx);
623    ctx.state_manager.populate_cache();
624    maybe_start_metrics_service(&mut services, &config, &ctx).await?;
625    maybe_start_f3_service(opts, &config, &ctx)?;
626    maybe_start_health_check_service(&mut services, &config, &p2p_service, &chain_follower, &ctx)
627        .await?;
628    maybe_start_indexer_service(&mut services, opts, &config, &ctx);
629    if !opts.stateless {
630        ensure_proof_params_downloaded().await?;
631    }
632    services.spawn(p2p_service.run());
633    start_chain_follower_service(&mut services, chain_follower);
634    // blocking until any of the services returns an error,
635    propagate_error(&mut services)
636        .await
637        .context("services failure")
638        .map(|_| {})
639}
640
641fn warmup_in_background(ctx: &AppContext) {
642    // Populate `tipset_by_height` cache
643    let cs = ctx.chain_store().clone();
644    tokio::task::spawn_blocking(move || {
645        let start = Instant::now();
646        match cs.chain_index().tipset_by_height(
647            // 0 would short-circuit the cache
648            1,
649            cs.heaviest_tipset(),
650            ResolveNullTipset::TakeOlder,
651        ) {
652            Ok(_) => {
653                tracing::info!(
654                    "Successfully populated tipset_by_height cache, took {}",
655                    humantime::format_duration(start.elapsed())
656                );
657            }
658            Err(e) => {
659                tracing::warn!("Failed to populate tipset_by_height cache: {e}");
660            }
661        }
662    });
663}
664
665/// If our current chain is below a supported height, we need a snapshot to bring it up
666/// to a supported height. If we've not been given a snapshot by the user, get one.
667///
668/// An [`Err`] should be considered fatal.
669async fn maybe_set_snapshot_path(
670    config: &mut Config,
671    chain_config: &ChainConfig,
672    epoch: ChainEpoch,
673    auto_download_snapshot: bool,
674    download_directory: &Path,
675) -> anyhow::Result<()> {
676    if !download_directory.is_dir() {
677        anyhow::bail!(
678            "`download_directory` does not exist: {}",
679            download_directory.display()
680        );
681    }
682
683    let vendor = snapshot::TrustedVendor::default();
684    let chain = config.chain();
685
686    // What height is our chain at right now, and what network version does that correspond to?
687    let network_version = chain_config.network_version(epoch);
688    let network_version_is_small = network_version < NetworkVersion::V16;
689
690    // We don't support small network versions (we can't validate from e.g genesis).
691    // So we need a snapshot (which will be from a recent network version)
692    let require_a_snapshot = network_version_is_small;
693    let have_a_snapshot = config.client.snapshot_path.is_some();
694
695    match (require_a_snapshot, have_a_snapshot, auto_download_snapshot) {
696        (false, _, _) => {}   // noop - don't need a snapshot
697        (true, true, _) => {} // noop - we need a snapshot, and we have one
698        (true, false, true) => {
699            let url = crate::cli_shared::snapshot::stable_url(vendor, chain)?;
700            config.client.snapshot_path = Some(url.to_string().into());
701        }
702        (true, false, false) => {
703            // we need a snapshot, don't have one, and don't have permission to download one, so ask the user
704            let (url, num_bytes, _path) = crate::cli_shared::snapshot::peek(vendor, chain)
705                .await
706                .context("couldn't get snapshot size")?;
707            // dialoguer will double-print long lines, so manually print the first clause ourselves,
708            // then let `Confirm` handle the second.
709            println!(
710                "Forest requires a snapshot to sync with the network, but automatic fetching is disabled."
711            );
712            let message = format!(
713                "Fetch a {} snapshot to the current directory? (denying will exit the program). ",
714                indicatif::HumanBytes(num_bytes)
715            );
716            let have_permission = asyncify(|| {
717                dialoguer::Confirm::with_theme(&ColorfulTheme::default())
718                    .with_prompt(message)
719                    .default(false)
720                    .interact()
721                    // e.g not a tty (or some other error), so haven't got permission.
722                    .unwrap_or(false)
723            })
724            .await;
725            if !have_permission {
726                bail!(
727                    "Forest requires a snapshot to sync with the network, but automatic fetching is disabled."
728                )
729            }
730            config.client.snapshot_path = Some(url.to_string().into());
731        }
732    };
733
734    Ok(())
735}
736
737/// returns the first error with which any of the services end, or never returns at all
738// This should return anyhow::Result<!> once the `Never` type is stabilized
739async fn propagate_error(
740    services: &mut JoinSet<Result<(), anyhow::Error>>,
741) -> anyhow::Result<std::convert::Infallible> {
742    while !services.is_empty() {
743        select! {
744            option = services.join_next().fuse() => {
745                if let Some(Ok(Err(error_message))) = option {
746                    return Err(error_message)
747                }
748            },
749        }
750    }
751    std::future::pending().await
752}
753
754/// Run the closure on a thread where blocking is allowed
755///
756/// # Panics
757/// If the closure panics
758fn asyncify<T>(f: impl FnOnce() -> T + Send + 'static) -> impl Future<Output = T>
759where
760    T: Send + 'static,
761{
762    tokio::task::spawn_blocking(f).then(|res| async { res.expect("spawned task panicked") })
763}