1pub mod bundle;
5mod context;
6pub mod db_util;
7pub mod main;
8
9use crate::blocks::Tipset;
10use crate::chain::ChainStore;
11use crate::chain::index::ResolveNullTipset;
12use crate::chain_sync::ChainFollower;
13use crate::chain_sync::network_context::SyncNetworkContext;
14use crate::cli_shared::snapshot;
15use crate::cli_shared::{
16 chain_path,
17 cli::{CliOpts, Config},
18};
19use crate::daemon::{
20 context::{AppContext, DbType},
21 db_util::import_chain_as_forest_car,
22};
23use crate::db::gc::SnapshotGarbageCollector;
24use crate::db::ttl::EthMappingCollector;
25use crate::libp2p::{Libp2pService, PeerManager};
26use crate::message_pool::{MessagePool, MpoolConfig, MpoolLocker, NonceTracker};
27use crate::networks::{self, ChainConfig};
28use crate::rpc::RPCState;
29use crate::rpc::eth::filter::EthEventHandler;
30use crate::rpc::start_rpc;
31use crate::shim::clock::ChainEpoch;
32use crate::shim::state_tree::StateTree;
33use crate::shim::version::NetworkVersion;
34use crate::utils::misc::env::is_env_truthy;
35use crate::utils::{self, ShallowClone as _};
36use crate::utils::{proofs_api::ensure_proof_params_downloaded, version::FOREST_VERSION_STRING};
37use anyhow::{Context as _, bail};
38use backon::{ExponentialBuilder, Retryable};
39use dialoguer::theme::ColorfulTheme;
40use futures::{Future, FutureExt};
41use std::path::Path;
42use std::sync::Arc;
43use std::sync::OnceLock;
44use std::time::{Duration, Instant};
45use tokio::sync::broadcast::error::RecvError;
46use tokio::{
47 signal::{
48 ctrl_c,
49 unix::{SignalKind, signal},
50 },
51 sync::mpsc,
52 task::JoinSet,
53};
54use tracing::{debug, info, warn};
55
56pub static GLOBAL_SNAPSHOT_GC: OnceLock<Arc<SnapshotGarbageCollector<DbType>>> = OnceLock::new();
57
58fn maybe_increase_fd_limit() -> anyhow::Result<()> {
62 static DESIRED_SOFT_LIMIT: u64 = 8192;
63 let (soft_before, _) = rlimit::Resource::NOFILE.get()?;
64
65 let soft_after = rlimit::increase_nofile_limit(DESIRED_SOFT_LIMIT)?;
66 if soft_before < soft_after {
67 debug!("Increased file descriptor limit from {soft_before} to {soft_after}");
68 }
69 if soft_after < DESIRED_SOFT_LIMIT {
70 warn!(
71 "File descriptor limit is too low: {soft_after} < {DESIRED_SOFT_LIMIT}. \
72 You may encounter 'too many open files' errors.",
73 );
74 }
75
76 Ok(())
77}
78
79pub async fn start_interruptable(opts: CliOpts, config: Config) -> anyhow::Result<()> {
81 let start_time = chrono::Utc::now();
82 let mut terminate = signal(SignalKind::terminate())?;
83 let (shutdown_send, mut shutdown_recv) = mpsc::channel(1);
84 let (rpc_stop_handle, rpc_server_handle) = jsonrpsee::server::stop_channel();
85 let result = tokio::select! {
86 ret = start(start_time, opts, config, shutdown_send, rpc_stop_handle) => ret,
87 _ = ctrl_c() => {
88 info!("Keyboard interrupt.");
89 Ok(())
90 },
91 _ = terminate.recv() => {
92 info!("Received SIGTERM.");
93 Ok(())
94 },
95 _ = shutdown_recv.recv() => {
96 info!("Client requested a shutdown.");
97 Ok(())
98 },
99 };
100 _ = rpc_server_handle.stop();
101 crate::utils::io::terminal_cleanup();
102 result
103}
104
105fn startup_init(config: &Config) -> anyhow::Result<()> {
110 maybe_increase_fd_limit()?;
111 crate::utils::proofs_api::maybe_set_proofs_parameter_cache_dir_env(&config.client.data_dir);
114 info!(
115 "Starting Forest daemon, version {}",
116 FOREST_VERSION_STRING.as_str()
117 );
118 Ok(())
119}
120
121async fn maybe_import_snapshot(
122 opts: &CliOpts,
123 config: &mut Config,
124 ctx: &AppContext,
125) -> anyhow::Result<()> {
126 let chain_config = ctx.state_manager.chain_config();
127 if config.client.snapshot_path.is_none() && !opts.stateless {
129 maybe_set_snapshot_path(
130 config,
131 chain_config,
132 ctx.state_manager.chain_store().heaviest_tipset().epoch(),
133 opts.auto_download_snapshot,
134 &ctx.db_meta_data.get_root_dir(),
135 )
136 .await?;
137 }
138
139 let snapshot_tracker = ctx.snapshot_progress_tracker.clone();
140 if !opts.skip_load.unwrap_or_default()
142 && let Some(path) = &config.client.snapshot_path
143 {
144 let (car_db_path, ts) = import_chain_as_forest_car(
145 path,
146 &ctx.db_meta_data.get_forest_car_db_dir(),
147 config.client.import_mode,
148 config.client.rpc_v1_endpoint()?,
149 &crate::f3::get_f3_root(config),
150 ctx.chain_config(),
151 &snapshot_tracker,
152 )
153 .await?;
154 ctx.db
155 .read_only_files(std::iter::once(car_db_path.clone()))?;
156 let ts_epoch = ts.epoch();
157 ctx.state_manager.chain_store().set_heaviest_tipset(ts)?;
160 debug!(
161 "Loaded car DB at {} and set current head to epoch {ts_epoch}",
162 car_db_path.display(),
163 );
164 }
165
166 if !snapshot_tracker.is_completed() {
169 snapshot_tracker.not_required();
170 }
171
172 if let Some(validate_from) = config.client.snapshot_height {
173 ensure_proof_params_downloaded().await?;
175 let current_height = config
177 .client
178 .snapshot_head
179 .unwrap_or_else(|| ctx.state_manager.chain_store().heaviest_tipset().epoch());
180
181 let validation_range = validation_range(current_height, validate_from)?;
182 let state_manager = ctx.state_manager.clone();
185 tokio::task::spawn_blocking(move || state_manager.validate_range(validation_range))
186 .await??;
187 }
188
189 Ok(())
190}
191
192fn validation_range(
195 current: ChainEpoch,
196 from: ChainEpoch,
197) -> anyhow::Result<std::ops::RangeInclusive<ChainEpoch>> {
198 anyhow::ensure!(
199 current.is_positive(),
200 "current head epoch {current} is invalid"
201 );
202
203 let start = if from.is_negative() {
207 current.saturating_add(from).max(0)
208 } else {
209 from
210 };
211
212 anyhow::ensure!(
215 start <= current,
216 "requested validation start epoch {start} is beyond the current head at epoch {current}",
217 );
218
219 Ok(start..=current)
220}
221
222async fn maybe_start_metrics_service(
223 services: &mut JoinSet<anyhow::Result<()>>,
224 config: &Config,
225 ctx: &AppContext,
226) -> anyhow::Result<()> {
227 if config.client.enable_metrics_endpoint {
228 let prometheus_listener =
229 crate::utils::net::bind_tcp_listener(config.client.metrics_address, 0).await?;
230 info!(
231 "Prometheus server started at {}",
232 config.client.metrics_address
233 );
234 let db_directory = crate::db::db_engine::db_root(&chain_path(config))?;
235 let db = ctx.db.writer().clone();
236
237 let get_chain_head_height = Arc::new({
238 let chain_store = Arc::downgrade(ctx.state_manager.chain_store());
240 move || {
241 chain_store
242 .upgrade()
243 .map(|cs| cs.heaviest_tipset().epoch())
244 .unwrap_or_default()
245 }
246 });
247 let get_chain_head_actor_version = Arc::new({
248 let chain_store = Arc::downgrade(ctx.state_manager.chain_store());
250 move || {
251 if let Some(cs) = chain_store.upgrade()
252 && let Ok(state) = StateTree::new_from_root(
253 cs.blockstore().clone(),
254 cs.heaviest_tipset().parent_state(),
255 )
256 && let Ok(bundle_meta) = state.get_actor_bundle_metadata()
257 && let Ok(actor_version) = bundle_meta.actor_major_version()
258 {
259 return actor_version;
260 }
261 0
262 }
263 });
264 services.spawn({
265 let chain_config = ctx.chain_config().clone();
266 let get_chain_head_height = get_chain_head_height.clone();
267 async {
268 crate::metrics::init_prometheus(
269 prometheus_listener,
270 db_directory,
271 db,
272 chain_config,
273 get_chain_head_height,
274 get_chain_head_actor_version,
275 )
276 .await
277 .context("Failed to initiate prometheus server")
278 }
279 });
280
281 crate::metrics::register_collector(Box::new(
282 networks::metrics::NetworkHeightCollector::new(
283 ctx.state_manager.chain_config().block_delay_secs,
284 ctx.state_manager
285 .chain_store()
286 .genesis_block_header()
287 .timestamp,
288 get_chain_head_height,
289 ),
290 ));
291 }
292 Ok(())
293}
294
295async fn create_p2p_service(
296 services: &mut JoinSet<anyhow::Result<()>>,
297 config: &mut Config,
298 ctx: &AppContext,
299) -> anyhow::Result<Libp2pService<DbType>> {
300 if config.network.bootstrap_peers.is_empty() {
302 config.network.bootstrap_peers = ctx.state_manager.chain_config().bootstrap_peers.clone();
303 }
304
305 let peer_manager = Arc::new(PeerManager::default());
306 services.spawn(peer_manager.clone().peer_operation_event_loop_task());
307 let p2p_service = Libp2pService::new(
309 config.network.clone(),
310 Arc::clone(ctx.state_manager.chain_store()),
311 peer_manager.clone(),
312 ctx.net_keypair.clone(),
313 config.chain.genesis_name(),
314 *ctx.state_manager.chain_store().genesis_block_header().cid(),
315 )
316 .await?;
317 Ok(p2p_service)
318}
319
320fn create_mpool(
321 services: &mut JoinSet<anyhow::Result<()>>,
322 p2p_service: &Libp2pService<DbType>,
323 ctx: &AppContext,
324) -> anyhow::Result<Arc<MessagePool<Arc<ChainStore<DbType>>>>> {
325 Ok(MessagePool::new(
326 ctx.state_manager.chain_store().clone(),
327 p2p_service.network_sender().clone(),
328 MpoolConfig::load_config(ctx.db.writer().as_ref())?,
329 ctx.state_manager.chain_config().clone(),
330 services,
331 )
332 .map(Arc::new)?)
333}
334
335fn create_chain_follower(
336 opts: &CliOpts,
337 p2p_service: &Libp2pService<DbType>,
338 mpool: Arc<MessagePool<Arc<ChainStore<DbType>>>>,
339 ctx: &AppContext,
340) -> anyhow::Result<Arc<ChainFollower<DbType>>> {
341 let network_send = p2p_service.network_sender().clone();
342 let peer_manager = p2p_service.peer_manager().clone();
343 let network = SyncNetworkContext::new(network_send, peer_manager, ctx.db.clone());
344 Ok(Arc::new(ChainFollower::new(
345 ctx.state_manager.clone(),
346 network,
347 Tipset::from(ctx.state_manager.chain_store().genesis_block_header()),
348 p2p_service.network_receiver(),
349 opts.stateless,
350 mpool,
351 )))
352}
353
354fn start_chain_follower_service(
355 services: &mut JoinSet<anyhow::Result<()>>,
356 chain_follower: Arc<ChainFollower<DbType>>,
357) {
358 services.spawn(async move { chain_follower.run().await });
359}
360
361async fn maybe_start_health_check_service(
362 services: &mut JoinSet<anyhow::Result<()>>,
363 config: &Config,
364 p2p_service: &Libp2pService<DbType>,
365 chain_follower: &ChainFollower<DbType>,
366 ctx: &AppContext,
367) -> anyhow::Result<()> {
368 if config.client.enable_health_check {
369 let forest_state = crate::health::ForestState {
370 config: config.clone(),
371 chain_config: ctx.state_manager.chain_config().clone(),
372 genesis_timestamp: ctx
373 .state_manager
374 .chain_store()
375 .genesis_block_header()
376 .timestamp,
377 sync_status: chain_follower.sync_status.clone(),
378 peer_manager: p2p_service.peer_manager().clone(),
379 };
380 let healthcheck_address = forest_state.config.client.healthcheck_address;
381 info!("Healthcheck endpoint will listen at {healthcheck_address}");
382 let listener = crate::utils::net::bind_tcp_listener(healthcheck_address, 0).await?;
383 services.spawn(async move {
384 crate::health::init_healthcheck_server(forest_state, listener)
385 .await
386 .context("Failed to initiate healthcheck server")
387 });
388 } else {
389 info!("Healthcheck service is disabled");
390 }
391 Ok(())
392}
393
394fn maybe_start_gc_service(
395 services: &mut JoinSet<anyhow::Result<()>>,
396 opts: &CliOpts,
397 config: &Config,
398 chain_follower: Arc<ChainFollower<DbType>>,
399) -> anyhow::Result<()> {
400 if opts.stateless {
402 return Ok(());
403 }
404
405 let snap_gc = Arc::new(SnapshotGarbageCollector::new(chain_follower, config)?);
406
407 GLOBAL_SNAPSHOT_GC
408 .set(snap_gc.clone())
409 .ok()
410 .context("failed to set GLOBAL_SNAPSHOT_GC")?;
411
412 services.spawn({
413 let snap_gc = snap_gc.clone();
414 async move {
415 snap_gc.event_loop().await;
416 Ok(())
417 }
418 });
419
420 if !opts.no_gc {
422 services.spawn({
423 let snap_gc = snap_gc.clone();
424 async move {
425 snap_gc.scheduler_loop().await;
426 Ok(())
427 }
428 });
429 }
430
431 Ok(())
432}
433
434#[allow(clippy::too_many_arguments)]
435fn maybe_start_rpc_service(
436 services: &mut JoinSet<anyhow::Result<()>>,
437 config: &Config,
438 mpool: Arc<MessagePool<Arc<ChainStore<DbType>>>>,
439 chain_follower: &ChainFollower<DbType>,
440 start_time: chrono::DateTime<chrono::Utc>,
441 shutdown: mpsc::Sender<()>,
442 rpc_stop_handle: jsonrpsee::server::StopHandle,
443 ctx: &AppContext,
444) -> anyhow::Result<()> {
445 if config.client.enable_rpc {
446 let rpc_address = config.client.rpc_address;
447 let filter_list = config
448 .client
449 .rpc_filter_list
450 .as_ref()
451 .map(|path| crate::rpc::FilterList::new_from_file(path))
452 .transpose()?;
453 info!("JSON-RPC endpoint will listen at {rpc_address}");
454 let eth_event_handler = Arc::new(EthEventHandler::from_config(&config.events));
455 if is_env_truthy("FOREST_JWT_DISABLE_EXP_VALIDATION") {
456 warn!(
457 "JWT expiration validation is disabled; this significantly weakens security and should only be used in tightly controlled environments"
458 );
459 }
460 services.spawn({
461 let state_manager = ctx.state_manager.shallow_clone();
462 let bad_blocks = chain_follower.bad_blocks.shallow_clone();
463 let sync_status = chain_follower.sync_status.shallow_clone();
464 let sync_network_context = chain_follower.network.shallow_clone();
465 let tipset_send = chain_follower.tipset_sender.clone();
466 let keystore = ctx.keystore.shallow_clone();
467 let snapshot_progress_tracker = ctx.snapshot_progress_tracker.clone();
468 let nonce_tracker = NonceTracker::new();
469 let mpool_locker = MpoolLocker::new();
470 let temp_dir = Arc::new(ctx.temp_dir.clone());
471 async move {
472 let rpc_listener = crate::utils::net::bind_tcp_listener(
473 rpc_address,
474 crate::rpc::default_max_connections(),
475 )
476 .await?;
477 start_rpc(
478 RPCState {
479 state_manager,
480 keystore,
481 mpool,
482 bad_blocks,
483 sync_status,
484 eth_event_handler,
485 sync_network_context,
486 start_time,
487 shutdown,
488 tipset_send,
489 snapshot_progress_tracker,
490 mpool_locker,
491 nonce_tracker,
492 temp_dir,
493 },
494 rpc_listener,
495 rpc_stop_handle,
496 filter_list,
497 )
498 .await
499 }
500 });
501 } else {
502 debug!("RPC disabled.");
503 };
504 Ok(())
505}
506
507fn maybe_start_f3_service(opts: &CliOpts, config: &Config, ctx: &AppContext) -> anyhow::Result<()> {
508 if crate::rpc::f3::F3_LEASE_MANAGER.get().is_some() {
510 return Ok(());
511 }
512
513 if !config.client.enable_rpc {
514 if crate::f3::is_sidecar_ffi_enabled(ctx.state_manager.chain_config()) {
515 tracing::warn!("F3 sidecar is enabled but not run because RPC is disabled. ")
516 }
517 return Ok(());
518 }
519
520 if !opts.halt_after_import && !opts.stateless {
521 let rpc_endpoint = config.client.rpc_v1_endpoint()?;
522 let state_manager = &ctx.state_manager;
523 let p2p_peer_id = ctx.p2p_peer_id;
524 let admin_jwt = ctx.admin_jwt.clone();
525 tokio::task::spawn_blocking({
526 crate::rpc::f3::F3_LEASE_MANAGER
527 .set(crate::rpc::f3::F3LeaseManager::new(
528 state_manager.chain_config().network.clone(),
529 p2p_peer_id,
530 ))
531 .expect("F3 lease manager should not have been initialized before");
532 let chain_config = state_manager.chain_config().clone();
533 let f3_root = crate::f3::get_f3_root(config);
534 let crate::f3::F3Options {
535 chain_finality,
536 bootstrap_epoch,
537 initial_power_table,
538 } = crate::f3::get_f3_sidecar_params(&chain_config);
539 move || {
540 crate::f3::run_f3_sidecar_if_enabled(
541 &chain_config,
542 rpc_endpoint.to_string(),
543 admin_jwt,
544 crate::rpc::f3::get_f3_rpc_endpoint().to_string(),
545 initial_power_table
546 .map(|i| i.to_string())
547 .unwrap_or_default(),
548 bootstrap_epoch,
549 chain_finality,
550 f3_root.display().to_string(),
551 );
552 }
553 });
554 tokio::task::spawn({
555 let chain_store = ctx.chain_store().clone();
556 async move {
557 tokio::time::sleep(Duration::from_secs(1)).await;
559 match (|| crate::rpc::f3::F3GetLatestCertificate::get())
560 .retry(ExponentialBuilder::default())
561 .await
562 {
563 Ok(f3_finalized_cert) => {
564 let f3_finalized_head = f3_finalized_cert.chain_head();
565 match chain_store
566 .chain_index()
567 .load_required_tipset(&f3_finalized_head.key)
568 {
569 Ok(ts) => {
570 chain_store.set_f3_finalized_tipset(ts);
571 tracing::info!(
572 "Set F3 finalized tipset to epoch {} and key {}",
573 f3_finalized_head.epoch,
574 f3_finalized_head.key,
575 );
576 }
577 Err(e) => {
578 tracing::error!(
579 "Failed to get F3 finalized tipset epoch {} and key {}: {e}",
580 f3_finalized_head.epoch,
581 f3_finalized_head.key
582 );
583 }
584 }
585 }
586 Err(e) => {
587 tracing::error!("Failed to get F3 latest certificate: {e:#}");
588 }
589 }
590 }
591 });
592 }
593
594 Ok(())
595}
596
597fn maybe_start_indexer_service(
598 services: &mut JoinSet<anyhow::Result<()>>,
599 opts: &CliOpts,
600 config: &Config,
601 ctx: &AppContext,
602) {
603 if config.chain_indexer.enable_indexer
604 && !opts.stateless
605 && !ctx.state_manager.chain_config().is_devnet()
606 {
607 let mut head_changes_rx = ctx.state_manager.chain_store().subscribe_head_changes();
608 let chain_store = ctx.state_manager.chain_store().clone();
609 services.spawn(async move {
610 tracing::info!("Starting indexer service");
611
612 loop {
614 match head_changes_rx.recv().await {
615 Ok(changes) => {
616 for ts in changes.applies {
617 tracing::debug!("Indexing tipset {}", ts.key());
618 let delegated_messages = chain_store
619 .headers_delegated_messages(ts.block_headers().iter())?;
620 chain_store.process_signed_messages(&delegated_messages)?;
621 }
622 }
623 Err(RecvError::Lagged(n)) => {
624 warn!("indexer service lagged: skipping {n} events")
625 }
626 Err(RecvError::Closed) => break Ok(()),
627 }
628 }
629 });
630
631 if let Some(retention_epochs) = config.chain_indexer.gc_retention_epochs {
633 let chain_store = ctx.state_manager.chain_store().clone();
634 let chain_config = ctx.state_manager.chain_config().clone();
635 services.spawn(async move {
636 tracing::info!("Starting collector for eth_mappings");
637 let mut collector = EthMappingCollector::new(
638 chain_store.blockstore().clone(),
639 chain_config.eth_chain_id,
640 retention_epochs.into(),
641 );
642 collector.run().await
643 });
644 }
645 }
646}
647
648pub(super) async fn start(
650 start_time: chrono::DateTime<chrono::Utc>,
651 opts: CliOpts,
652 config: Config,
653 shutdown_send: mpsc::Sender<()>,
654 rpc_stop_handle: jsonrpsee::server::StopHandle,
655) -> anyhow::Result<()> {
656 startup_init(&config)?;
657 start_services(
658 start_time,
659 &opts,
660 config.clone(),
661 shutdown_send.clone(),
662 rpc_stop_handle,
663 )
664 .await
665}
666
667pub(super) async fn start_services(
668 start_time: chrono::DateTime<chrono::Utc>,
669 opts: &CliOpts,
670 mut config: Config,
671 shutdown_send: mpsc::Sender<()>,
672 rpc_stop_handle: jsonrpsee::server::StopHandle,
673) -> anyhow::Result<()> {
674 crate::metrics::reset_collector_registry();
676 let mut services = JoinSet::new();
677 let network = config.chain();
678 let ctx = AppContext::init(opts, &config).await?;
679 info!("Using network :: {network}");
680 utils::misc::display_chain_logo(config.chain());
681 if opts.exit_after_init {
682 return Ok(());
683 }
684 if !opts.stateless
685 && !opts.skip_load_actors
686 && let Err(e) = ctx.state_manager.maybe_rewind_heaviest_tipset()
687 {
688 tracing::warn!("error in maybe_rewind_heaviest_tipset: {e:#}");
689 }
690
691 let p2p_service = create_p2p_service(&mut services, &mut config, &ctx).await?;
692 let mpool = create_mpool(&mut services, &p2p_service, &ctx)?;
693 let chain_follower = create_chain_follower(opts, &p2p_service, mpool.clone(), &ctx)?;
694
695 maybe_start_rpc_service(
696 &mut services,
697 &config,
698 mpool.clone(),
699 &chain_follower,
700 start_time,
701 shutdown_send.clone(),
702 rpc_stop_handle,
703 &ctx,
704 )?;
705
706 maybe_import_snapshot(opts, &mut config, &ctx).await?;
707 if opts.halt_after_import {
708 services.shutdown().await;
710 return Ok(());
711 }
712
713 warmup_in_background(&ctx);
714 maybe_start_gc_service(&mut services, opts, &config, chain_follower.clone())?;
715 maybe_start_metrics_service(&mut services, &config, &ctx).await?;
716 maybe_start_f3_service(opts, &config, &ctx)?;
717 maybe_start_health_check_service(&mut services, &config, &p2p_service, &chain_follower, &ctx)
718 .await?;
719 maybe_start_indexer_service(&mut services, opts, &config, &ctx);
720 if !opts.stateless {
721 ensure_proof_params_downloaded().await?;
722 }
723 services.spawn(p2p_service.run());
724 start_chain_follower_service(&mut services, chain_follower);
725 propagate_error(&mut services)
727 .await
728 .context("services failure")
729 .map(|_| {})
730}
731
732fn warmup_in_background(ctx: &AppContext) {
733 let cs = ctx.chain_store().clone();
735 tokio::task::spawn_blocking(move || {
736 let start = Instant::now();
737 match cs.chain_index().tipset_by_height(
738 1,
740 cs.heaviest_tipset(),
741 ResolveNullTipset::TakeOlder,
742 ) {
743 Ok(_) => {
744 tracing::info!(
745 "Successfully populated tipset_by_height cache, took {}",
746 humantime::format_duration(start.elapsed())
747 );
748 }
749 Err(e) => {
750 tracing::warn!("Failed to populate tipset_by_height cache: {e}");
751 }
752 }
753 });
754}
755
756async fn maybe_set_snapshot_path(
761 config: &mut Config,
762 chain_config: &ChainConfig,
763 epoch: ChainEpoch,
764 auto_download_snapshot: bool,
765 download_directory: &Path,
766) -> anyhow::Result<()> {
767 if !download_directory.is_dir() {
768 anyhow::bail!(
769 "`download_directory` does not exist: {}",
770 download_directory.display()
771 );
772 }
773
774 let vendor = snapshot::TrustedVendor::default();
775 let chain = config.chain();
776
777 let network_version = chain_config.network_version(epoch);
779 let network_version_is_small = network_version < NetworkVersion::V16;
780
781 let require_a_snapshot = network_version_is_small;
784 let have_a_snapshot = config.client.snapshot_path.is_some();
785
786 match (require_a_snapshot, have_a_snapshot, auto_download_snapshot) {
787 (false, _, _) => {} (true, true, _) => {} (true, false, true) => {
790 const AUTO_SNAPSHOT_PATH_ENV_KEY: &str = "FOREST_AUTO_DOWNLOAD_SNAPSHOT_PATH";
791 match std::env::var(AUTO_SNAPSHOT_PATH_ENV_KEY) {
792 Ok(path) if !path.is_empty() => {
793 tracing::info!(
794 "importing snapshot from {path} set by `{AUTO_SNAPSHOT_PATH_ENV_KEY}`"
795 );
796 config.client.snapshot_path = Some(path.into());
797 }
798 _ => {
799 let (resolved_url, _num_bytes, filename) =
803 crate::cli_shared::snapshot::peek(vendor, chain).await?;
804 tracing::info!("Downloading snapshot: {filename}");
805 config.client.snapshot_path = Some(resolved_url.to_string().into());
806 }
807 }
808 }
809 (true, false, false) => {
810 let (url, num_bytes, filename) = crate::cli_shared::snapshot::peek(vendor, chain)
812 .await
813 .context("couldn't get snapshot size")?;
814 println!(
817 "Forest requires a snapshot to sync with the network, but automatic fetching is disabled."
818 );
819 let message = format!(
820 "Fetch a {} snapshot? (denying will exit the program). ",
821 indicatif::HumanBytes(num_bytes)
822 );
823 let have_permission = asyncify(|| {
824 dialoguer::Confirm::with_theme(&ColorfulTheme::default())
825 .with_prompt(message)
826 .default(false)
827 .interact()
828 .unwrap_or(false)
830 })
831 .await;
832 if !have_permission {
833 bail!(
834 "Forest requires a snapshot to sync with the network, but automatic fetching is disabled."
835 )
836 }
837 tracing::info!("Downloading snapshot: {filename}");
838 config.client.snapshot_path = Some(url.to_string().into());
839 }
840 };
841
842 Ok(())
843}
844
845async fn propagate_error(
848 services: &mut JoinSet<anyhow::Result<()>>,
849) -> anyhow::Result<std::convert::Infallible> {
850 while let Some(result) = services.join_next().await {
851 if let Ok(Err(error_message)) = result {
852 return Err(error_message);
853 }
854 }
855 std::future::pending().await
856}
857
858fn asyncify<T>(f: impl FnOnce() -> T + Send + 'static) -> impl Future<Output = T>
863where
864 T: Send + 'static,
865{
866 tokio::task::spawn_blocking(f).then(|res| async { res.expect("spawned task panicked") })
867}
868
869#[cfg(test)]
870mod tests {
871 use rstest::rstest;
872
873 use super::*;
874
875 #[rstest]
876 #[case::current_non_positive(0, 1, anyhow::Result::Err(anyhow::anyhow!(
877 "current head epoch 0 is invalid"
878 )))]
879 #[case::current_non_positive(-1, 1, anyhow::Result::Err(anyhow::anyhow!(
880 "current head epoch 0 is invalid"
881 )))]
882 #[case::from_positive_beyond_head(10, 11, anyhow::Result::Err(anyhow::anyhow!(
883 "requested validation start epoch 11 is beyond the current head at epoch 10"
884 )))]
885 #[case::from_positive_within_range(10, 5, anyhow::Result::Ok(5..=10))]
886 #[case::from_zero(10, 0, anyhow::Result::Ok(0..=10))]
887 #[case::from_negative_within_range(10, -5, anyhow::Result::Ok(5..=10))]
888 #[case::from_negative_beyond_range(10, -15, anyhow::Result::Ok(0..=10))]
889 fn test_validation_range(
890 #[case] current: ChainEpoch,
891 #[case] from: ChainEpoch,
892 #[case] expected: anyhow::Result<std::ops::RangeInclusive<ChainEpoch>>,
893 ) {
894 let result = validation_range(current, from);
895 match expected {
896 Ok(expected_range) => {
897 assert_eq!(result.unwrap(), expected_range);
898 }
899 Err(_) => {
900 assert!(result.is_err());
901 }
902 }
903 }
904}