1pub mod bundle;
5mod context;
6pub mod db_util;
7pub mod main;
8
9use crate::blocks::Tipset;
10use crate::chain::ChainStore;
11use crate::chain::index::ResolveNullTipset;
12use crate::chain_sync::ChainFollower;
13use crate::chain_sync::network_context::SyncNetworkContext;
14use crate::cli_shared::snapshot;
15use crate::cli_shared::{
16 chain_path,
17 cli::{CliOpts, Config},
18};
19use crate::daemon::{
20 context::{AppContext, DbType},
21 db_util::import_chain_as_forest_car,
22};
23use crate::db::gc::SnapshotGarbageCollector;
24use crate::db::ttl::EthMappingCollector;
25use crate::libp2p::{Libp2pService, PeerManager};
26use crate::message_pool::{MessagePool, MpoolConfig, MpoolLocker, NonceTracker};
27use crate::networks::{self, ChainConfig};
28use crate::rpc::RPCState;
29use crate::rpc::eth::filter::EthEventHandler;
30use crate::rpc::start_rpc;
31use crate::shim::clock::ChainEpoch;
32use crate::shim::state_tree::StateTree;
33use crate::shim::version::NetworkVersion;
34use crate::utils::misc::env::is_env_truthy;
35use crate::utils::{self, ShallowClone as _};
36use crate::utils::{proofs_api::ensure_proof_params_downloaded, version::FOREST_VERSION_STRING};
37use anyhow::{Context as _, bail};
38use backon::{ExponentialBuilder, Retryable};
39use dialoguer::theme::ColorfulTheme;
40use futures::{Future, FutureExt};
41use std::path::Path;
42use std::sync::Arc;
43use std::sync::OnceLock;
44use std::time::{Duration, Instant};
45use tokio::sync::broadcast::error::RecvError;
46use tokio::{
47 net::TcpListener,
48 signal::{
49 ctrl_c,
50 unix::{SignalKind, signal},
51 },
52 sync::mpsc,
53 task::JoinSet,
54};
55use tracing::{debug, info, warn};
56
57pub static GLOBAL_SNAPSHOT_GC: OnceLock<Arc<SnapshotGarbageCollector<DbType>>> = OnceLock::new();
58
59fn maybe_increase_fd_limit() -> anyhow::Result<()> {
63 static DESIRED_SOFT_LIMIT: u64 = 8192;
64 let (soft_before, _) = rlimit::Resource::NOFILE.get()?;
65
66 let soft_after = rlimit::increase_nofile_limit(DESIRED_SOFT_LIMIT)?;
67 if soft_before < soft_after {
68 debug!("Increased file descriptor limit from {soft_before} to {soft_after}");
69 }
70 if soft_after < DESIRED_SOFT_LIMIT {
71 warn!(
72 "File descriptor limit is too low: {soft_after} < {DESIRED_SOFT_LIMIT}. \
73 You may encounter 'too many open files' errors.",
74 );
75 }
76
77 Ok(())
78}
79
80pub async fn start_interruptable(opts: CliOpts, config: Config) -> anyhow::Result<()> {
82 let start_time = chrono::Utc::now();
83 let mut terminate = signal(SignalKind::terminate())?;
84 let (shutdown_send, mut shutdown_recv) = mpsc::channel(1);
85 let (rpc_stop_handle, rpc_server_handle) = jsonrpsee::server::stop_channel();
86 let result = tokio::select! {
87 ret = start(start_time, opts, config, shutdown_send, rpc_stop_handle) => ret,
88 _ = ctrl_c() => {
89 info!("Keyboard interrupt.");
90 Ok(())
91 },
92 _ = terminate.recv() => {
93 info!("Received SIGTERM.");
94 Ok(())
95 },
96 _ = shutdown_recv.recv() => {
97 info!("Client requested a shutdown.");
98 Ok(())
99 },
100 };
101 _ = rpc_server_handle.stop();
102 crate::utils::io::terminal_cleanup();
103 result
104}
105
106fn startup_init(config: &Config) -> anyhow::Result<()> {
111 maybe_increase_fd_limit()?;
112 crate::utils::proofs_api::maybe_set_proofs_parameter_cache_dir_env(&config.client.data_dir);
115 info!(
116 "Starting Forest daemon, version {}",
117 FOREST_VERSION_STRING.as_str()
118 );
119 Ok(())
120}
121
122async fn maybe_import_snapshot(
123 opts: &CliOpts,
124 config: &mut Config,
125 ctx: &AppContext,
126) -> anyhow::Result<()> {
127 let chain_config = ctx.state_manager.chain_config();
128 if config.client.snapshot_path.is_none() && !opts.stateless {
130 maybe_set_snapshot_path(
131 config,
132 chain_config,
133 ctx.state_manager.chain_store().heaviest_tipset().epoch(),
134 opts.auto_download_snapshot,
135 &ctx.db_meta_data.get_root_dir(),
136 )
137 .await?;
138 }
139
140 let snapshot_tracker = ctx.snapshot_progress_tracker.clone();
141 if !opts.skip_load.unwrap_or_default()
143 && let Some(path) = &config.client.snapshot_path
144 {
145 let (car_db_path, ts) = import_chain_as_forest_car(
146 path,
147 &ctx.db_meta_data.get_forest_car_db_dir(),
148 config.client.import_mode,
149 config.client.rpc_v1_endpoint()?,
150 &crate::f3::get_f3_root(config),
151 ctx.chain_config(),
152 &snapshot_tracker,
153 )
154 .await?;
155 ctx.db
156 .read_only_files(std::iter::once(car_db_path.clone()))?;
157 let ts_epoch = ts.epoch();
158 ctx.state_manager.chain_store().set_heaviest_tipset(ts)?;
161 debug!(
162 "Loaded car DB at {} and set current head to epoch {ts_epoch}",
163 car_db_path.display(),
164 );
165 }
166
167 if !snapshot_tracker.is_completed() {
170 snapshot_tracker.not_required();
171 }
172
173 if let Some(validate_from) = config.client.snapshot_height {
174 ensure_proof_params_downloaded().await?;
176 let current_height = config
178 .client
179 .snapshot_head
180 .unwrap_or_else(|| ctx.state_manager.chain_store().heaviest_tipset().epoch());
181
182 let validation_range = validation_range(current_height, validate_from)?;
183 let state_manager = ctx.state_manager.clone();
186 tokio::task::spawn_blocking(move || state_manager.validate_range(validation_range))
187 .await??;
188 }
189
190 Ok(())
191}
192
193fn validation_range(
196 current: ChainEpoch,
197 from: ChainEpoch,
198) -> anyhow::Result<std::ops::RangeInclusive<ChainEpoch>> {
199 anyhow::ensure!(
200 current.is_positive(),
201 "current head epoch {current} is invalid"
202 );
203
204 let start = if from.is_negative() {
208 current.saturating_add(from).max(0)
209 } else {
210 from
211 };
212
213 anyhow::ensure!(
216 start <= current,
217 "requested validation start epoch {start} is beyond the current head at epoch {current}",
218 );
219
220 Ok(start..=current)
221}
222
223async fn maybe_start_metrics_service(
224 services: &mut JoinSet<anyhow::Result<()>>,
225 config: &Config,
226 ctx: &AppContext,
227) -> anyhow::Result<()> {
228 if config.client.enable_metrics_endpoint {
229 let prometheus_listener = TcpListener::bind(config.client.metrics_address)
231 .await
232 .with_context(|| format!("could not bind to {}", config.client.metrics_address))?;
233 info!(
234 "Prometheus server started at {}",
235 config.client.metrics_address
236 );
237 let db_directory = crate::db::db_engine::db_root(&chain_path(config))?;
238 let db = ctx.db.writer().clone();
239
240 let get_chain_head_height = Arc::new({
241 let chain_store = Arc::downgrade(ctx.state_manager.chain_store());
243 move || {
244 chain_store
245 .upgrade()
246 .map(|cs| cs.heaviest_tipset().epoch())
247 .unwrap_or_default()
248 }
249 });
250 let get_chain_head_actor_version = Arc::new({
251 let chain_store = Arc::downgrade(ctx.state_manager.chain_store());
253 move || {
254 if let Some(cs) = chain_store.upgrade()
255 && let Ok(state) = StateTree::new_from_root(
256 cs.blockstore().clone(),
257 cs.heaviest_tipset().parent_state(),
258 )
259 && let Ok(bundle_meta) = state.get_actor_bundle_metadata()
260 && let Ok(actor_version) = bundle_meta.actor_major_version()
261 {
262 return actor_version;
263 }
264 0
265 }
266 });
267 services.spawn({
268 let chain_config = ctx.chain_config().clone();
269 let get_chain_head_height = get_chain_head_height.clone();
270 async {
271 crate::metrics::init_prometheus(
272 prometheus_listener,
273 db_directory,
274 db,
275 chain_config,
276 get_chain_head_height,
277 get_chain_head_actor_version,
278 )
279 .await
280 .context("Failed to initiate prometheus server")
281 }
282 });
283
284 crate::metrics::register_collector(Box::new(
285 networks::metrics::NetworkHeightCollector::new(
286 ctx.state_manager.chain_config().block_delay_secs,
287 ctx.state_manager
288 .chain_store()
289 .genesis_block_header()
290 .timestamp,
291 get_chain_head_height,
292 ),
293 ));
294 }
295 Ok(())
296}
297
298async fn create_p2p_service(
299 services: &mut JoinSet<anyhow::Result<()>>,
300 config: &mut Config,
301 ctx: &AppContext,
302) -> anyhow::Result<Libp2pService<DbType>> {
303 if config.network.bootstrap_peers.is_empty() {
305 config.network.bootstrap_peers = ctx.state_manager.chain_config().bootstrap_peers.clone();
306 }
307
308 let peer_manager = Arc::new(PeerManager::default());
309 services.spawn(peer_manager.clone().peer_operation_event_loop_task());
310 let p2p_service = Libp2pService::new(
312 config.network.clone(),
313 Arc::clone(ctx.state_manager.chain_store()),
314 peer_manager.clone(),
315 ctx.net_keypair.clone(),
316 config.chain.genesis_name(),
317 *ctx.state_manager.chain_store().genesis_block_header().cid(),
318 )
319 .await?;
320 Ok(p2p_service)
321}
322
323fn create_mpool(
324 services: &mut JoinSet<anyhow::Result<()>>,
325 p2p_service: &Libp2pService<DbType>,
326 ctx: &AppContext,
327) -> anyhow::Result<Arc<MessagePool<Arc<ChainStore<DbType>>>>> {
328 Ok(MessagePool::new(
329 ctx.state_manager.chain_store().clone(),
330 p2p_service.network_sender().clone(),
331 MpoolConfig::load_config(ctx.db.writer().as_ref())?,
332 ctx.state_manager.chain_config().clone(),
333 services,
334 )
335 .map(Arc::new)?)
336}
337
338fn create_chain_follower(
339 opts: &CliOpts,
340 p2p_service: &Libp2pService<DbType>,
341 mpool: Arc<MessagePool<Arc<ChainStore<DbType>>>>,
342 ctx: &AppContext,
343) -> anyhow::Result<Arc<ChainFollower<DbType>>> {
344 let network_send = p2p_service.network_sender().clone();
345 let peer_manager = p2p_service.peer_manager().clone();
346 let network = SyncNetworkContext::new(network_send, peer_manager, ctx.db.clone());
347 Ok(Arc::new(ChainFollower::new(
348 ctx.state_manager.clone(),
349 network,
350 Tipset::from(ctx.state_manager.chain_store().genesis_block_header()),
351 p2p_service.network_receiver(),
352 opts.stateless,
353 mpool,
354 )))
355}
356
357fn start_chain_follower_service(
358 services: &mut JoinSet<anyhow::Result<()>>,
359 chain_follower: Arc<ChainFollower<DbType>>,
360) {
361 services.spawn(async move { chain_follower.run().await });
362}
363
364async fn maybe_start_health_check_service(
365 services: &mut JoinSet<anyhow::Result<()>>,
366 config: &Config,
367 p2p_service: &Libp2pService<DbType>,
368 chain_follower: &ChainFollower<DbType>,
369 ctx: &AppContext,
370) -> anyhow::Result<()> {
371 if config.client.enable_health_check {
372 let forest_state = crate::health::ForestState {
373 config: config.clone(),
374 chain_config: ctx.state_manager.chain_config().clone(),
375 genesis_timestamp: ctx
376 .state_manager
377 .chain_store()
378 .genesis_block_header()
379 .timestamp,
380 sync_status: chain_follower.sync_status.clone(),
381 peer_manager: p2p_service.peer_manager().clone(),
382 };
383 let healthcheck_address = forest_state.config.client.healthcheck_address;
384 info!("Healthcheck endpoint will listen at {healthcheck_address}");
385 let listener = tokio::net::TcpListener::bind(healthcheck_address).await?;
386 services.spawn(async move {
387 crate::health::init_healthcheck_server(forest_state, listener)
388 .await
389 .context("Failed to initiate healthcheck server")
390 });
391 } else {
392 info!("Healthcheck service is disabled");
393 }
394 Ok(())
395}
396
397fn maybe_start_gc_service(
398 services: &mut JoinSet<anyhow::Result<()>>,
399 opts: &CliOpts,
400 config: &Config,
401 chain_follower: Arc<ChainFollower<DbType>>,
402) -> anyhow::Result<()> {
403 if opts.stateless {
405 return Ok(());
406 }
407
408 let snap_gc = Arc::new(SnapshotGarbageCollector::new(chain_follower, config)?);
409
410 GLOBAL_SNAPSHOT_GC
411 .set(snap_gc.clone())
412 .ok()
413 .context("failed to set GLOBAL_SNAPSHOT_GC")?;
414
415 services.spawn({
416 let snap_gc = snap_gc.clone();
417 async move {
418 snap_gc.event_loop().await;
419 Ok(())
420 }
421 });
422
423 if !opts.no_gc {
425 services.spawn({
426 let snap_gc = snap_gc.clone();
427 async move {
428 snap_gc.scheduler_loop().await;
429 Ok(())
430 }
431 });
432 }
433
434 Ok(())
435}
436
437#[allow(clippy::too_many_arguments)]
438fn maybe_start_rpc_service(
439 services: &mut JoinSet<anyhow::Result<()>>,
440 config: &Config,
441 mpool: Arc<MessagePool<Arc<ChainStore<DbType>>>>,
442 chain_follower: &ChainFollower<DbType>,
443 start_time: chrono::DateTime<chrono::Utc>,
444 shutdown: mpsc::Sender<()>,
445 rpc_stop_handle: jsonrpsee::server::StopHandle,
446 ctx: &AppContext,
447) -> anyhow::Result<()> {
448 if config.client.enable_rpc {
449 let rpc_address = config.client.rpc_address;
450 let filter_list = config
451 .client
452 .rpc_filter_list
453 .as_ref()
454 .map(|path| crate::rpc::FilterList::new_from_file(path))
455 .transpose()?;
456 info!("JSON-RPC endpoint will listen at {rpc_address}");
457 let eth_event_handler = Arc::new(EthEventHandler::from_config(&config.events));
458 if is_env_truthy("FOREST_JWT_DISABLE_EXP_VALIDATION") {
459 warn!(
460 "JWT expiration validation is disabled; this significantly weakens security and should only be used in tightly controlled environments"
461 );
462 }
463 services.spawn({
464 let state_manager = ctx.state_manager.shallow_clone();
465 let bad_blocks = chain_follower.bad_blocks.shallow_clone();
466 let sync_status = chain_follower.sync_status.shallow_clone();
467 let sync_network_context = chain_follower.network.shallow_clone();
468 let tipset_send = chain_follower.tipset_sender.clone();
469 let keystore = ctx.keystore.shallow_clone();
470 let snapshot_progress_tracker = ctx.snapshot_progress_tracker.clone();
471 let nonce_tracker = NonceTracker::new();
472 let mpool_locker = MpoolLocker::new();
473 let temp_dir = Arc::new(ctx.temp_dir.clone());
474 async move {
475 let rpc_listener = tokio::net::TcpListener::bind(rpc_address)
476 .await
477 .map_err(|e| {
478 anyhow::anyhow!("Unable to listen on RPC endpoint {rpc_address}: {e}")
479 })
480 .unwrap();
481 start_rpc(
482 RPCState {
483 state_manager,
484 keystore,
485 mpool,
486 bad_blocks,
487 sync_status,
488 eth_event_handler,
489 sync_network_context,
490 start_time,
491 shutdown,
492 tipset_send,
493 snapshot_progress_tracker,
494 mpool_locker,
495 nonce_tracker,
496 temp_dir,
497 },
498 rpc_listener,
499 rpc_stop_handle,
500 filter_list,
501 )
502 .await
503 }
504 });
505 } else {
506 debug!("RPC disabled.");
507 };
508 Ok(())
509}
510
511fn maybe_start_f3_service(opts: &CliOpts, config: &Config, ctx: &AppContext) -> anyhow::Result<()> {
512 if crate::rpc::f3::F3_LEASE_MANAGER.get().is_some() {
514 return Ok(());
515 }
516
517 if !config.client.enable_rpc {
518 if crate::f3::is_sidecar_ffi_enabled(ctx.state_manager.chain_config()) {
519 tracing::warn!("F3 sidecar is enabled but not run because RPC is disabled. ")
520 }
521 return Ok(());
522 }
523
524 if !opts.halt_after_import && !opts.stateless {
525 let rpc_endpoint = config.client.rpc_v1_endpoint()?;
526 let state_manager = &ctx.state_manager;
527 let p2p_peer_id = ctx.p2p_peer_id;
528 let admin_jwt = ctx.admin_jwt.clone();
529 tokio::task::spawn_blocking({
530 crate::rpc::f3::F3_LEASE_MANAGER
531 .set(crate::rpc::f3::F3LeaseManager::new(
532 state_manager.chain_config().network.clone(),
533 p2p_peer_id,
534 ))
535 .expect("F3 lease manager should not have been initialized before");
536 let chain_config = state_manager.chain_config().clone();
537 let f3_root = crate::f3::get_f3_root(config);
538 let crate::f3::F3Options {
539 chain_finality,
540 bootstrap_epoch,
541 initial_power_table,
542 } = crate::f3::get_f3_sidecar_params(&chain_config);
543 move || {
544 crate::f3::run_f3_sidecar_if_enabled(
545 &chain_config,
546 rpc_endpoint.to_string(),
547 admin_jwt,
548 crate::rpc::f3::get_f3_rpc_endpoint().to_string(),
549 initial_power_table
550 .map(|i| i.to_string())
551 .unwrap_or_default(),
552 bootstrap_epoch,
553 chain_finality,
554 f3_root.display().to_string(),
555 );
556 }
557 });
558 tokio::task::spawn({
559 let chain_store = ctx.chain_store().clone();
560 async move {
561 tokio::time::sleep(Duration::from_secs(1)).await;
563 match (|| crate::rpc::f3::F3GetLatestCertificate::get())
564 .retry(ExponentialBuilder::default())
565 .await
566 {
567 Ok(f3_finalized_cert) => {
568 let f3_finalized_head = f3_finalized_cert.chain_head();
569 match chain_store
570 .chain_index()
571 .load_required_tipset(&f3_finalized_head.key)
572 {
573 Ok(ts) => {
574 chain_store.set_f3_finalized_tipset(ts);
575 tracing::info!(
576 "Set F3 finalized tipset to epoch {} and key {}",
577 f3_finalized_head.epoch,
578 f3_finalized_head.key,
579 );
580 }
581 Err(e) => {
582 tracing::error!(
583 "Failed to get F3 finalized tipset epoch {} and key {}: {e}",
584 f3_finalized_head.epoch,
585 f3_finalized_head.key
586 );
587 }
588 }
589 }
590 Err(e) => {
591 tracing::error!("Failed to get F3 latest certificate: {e:#}");
592 }
593 }
594 }
595 });
596 }
597
598 Ok(())
599}
600
601fn maybe_start_indexer_service(
602 services: &mut JoinSet<anyhow::Result<()>>,
603 opts: &CliOpts,
604 config: &Config,
605 ctx: &AppContext,
606) {
607 if config.chain_indexer.enable_indexer
608 && !opts.stateless
609 && !ctx.state_manager.chain_config().is_devnet()
610 {
611 let mut head_changes_rx = ctx.state_manager.chain_store().subscribe_head_changes();
612 let chain_store = ctx.state_manager.chain_store().clone();
613 services.spawn(async move {
614 tracing::info!("Starting indexer service");
615
616 loop {
618 match head_changes_rx.recv().await {
619 Ok(changes) => {
620 for ts in changes.applies {
621 tracing::debug!("Indexing tipset {}", ts.key());
622 let delegated_messages = chain_store
623 .headers_delegated_messages(ts.block_headers().iter())?;
624 chain_store.process_signed_messages(&delegated_messages)?;
625 }
626 }
627 Err(RecvError::Lagged(n)) => {
628 warn!("indexer service lagged: skipping {n} events")
629 }
630 Err(RecvError::Closed) => break Ok(()),
631 }
632 }
633 });
634
635 if let Some(retention_epochs) = config.chain_indexer.gc_retention_epochs {
637 let chain_store = ctx.state_manager.chain_store().clone();
638 let chain_config = ctx.state_manager.chain_config().clone();
639 services.spawn(async move {
640 tracing::info!("Starting collector for eth_mappings");
641 let mut collector = EthMappingCollector::new(
642 chain_store.blockstore().clone(),
643 chain_config.eth_chain_id,
644 retention_epochs.into(),
645 );
646 collector.run().await
647 });
648 }
649 }
650}
651
652pub(super) async fn start(
654 start_time: chrono::DateTime<chrono::Utc>,
655 opts: CliOpts,
656 config: Config,
657 shutdown_send: mpsc::Sender<()>,
658 rpc_stop_handle: jsonrpsee::server::StopHandle,
659) -> anyhow::Result<()> {
660 startup_init(&config)?;
661 start_services(
662 start_time,
663 &opts,
664 config.clone(),
665 shutdown_send.clone(),
666 rpc_stop_handle,
667 )
668 .await
669}
670
671pub(super) async fn start_services(
672 start_time: chrono::DateTime<chrono::Utc>,
673 opts: &CliOpts,
674 mut config: Config,
675 shutdown_send: mpsc::Sender<()>,
676 rpc_stop_handle: jsonrpsee::server::StopHandle,
677) -> anyhow::Result<()> {
678 crate::metrics::reset_collector_registry();
680 let mut services = JoinSet::new();
681 let network = config.chain();
682 let ctx = AppContext::init(opts, &config).await?;
683 info!("Using network :: {network}");
684 utils::misc::display_chain_logo(config.chain());
685 if opts.exit_after_init {
686 return Ok(());
687 }
688 if !opts.stateless
689 && !opts.skip_load_actors
690 && let Err(e) = ctx.state_manager.maybe_rewind_heaviest_tipset()
691 {
692 tracing::warn!("error in maybe_rewind_heaviest_tipset: {e:#}");
693 }
694
695 let p2p_service = create_p2p_service(&mut services, &mut config, &ctx).await?;
696 let mpool = create_mpool(&mut services, &p2p_service, &ctx)?;
697 let chain_follower = create_chain_follower(opts, &p2p_service, mpool.clone(), &ctx)?;
698
699 maybe_start_rpc_service(
700 &mut services,
701 &config,
702 mpool.clone(),
703 &chain_follower,
704 start_time,
705 shutdown_send.clone(),
706 rpc_stop_handle,
707 &ctx,
708 )?;
709
710 maybe_import_snapshot(opts, &mut config, &ctx).await?;
711 if opts.halt_after_import {
712 services.shutdown().await;
714 return Ok(());
715 }
716
717 warmup_in_background(&ctx);
718 maybe_start_gc_service(&mut services, opts, &config, chain_follower.clone())?;
719 maybe_start_metrics_service(&mut services, &config, &ctx).await?;
720 maybe_start_f3_service(opts, &config, &ctx)?;
721 maybe_start_health_check_service(&mut services, &config, &p2p_service, &chain_follower, &ctx)
722 .await?;
723 maybe_start_indexer_service(&mut services, opts, &config, &ctx);
724 if !opts.stateless {
725 ensure_proof_params_downloaded().await?;
726 }
727 services.spawn(p2p_service.run());
728 start_chain_follower_service(&mut services, chain_follower);
729 propagate_error(&mut services)
731 .await
732 .context("services failure")
733 .map(|_| {})
734}
735
736fn warmup_in_background(ctx: &AppContext) {
737 let cs = ctx.chain_store().clone();
739 tokio::task::spawn_blocking(move || {
740 let start = Instant::now();
741 match cs.chain_index().tipset_by_height(
742 1,
744 cs.heaviest_tipset(),
745 ResolveNullTipset::TakeOlder,
746 ) {
747 Ok(_) => {
748 tracing::info!(
749 "Successfully populated tipset_by_height cache, took {}",
750 humantime::format_duration(start.elapsed())
751 );
752 }
753 Err(e) => {
754 tracing::warn!("Failed to populate tipset_by_height cache: {e}");
755 }
756 }
757 });
758}
759
760async fn maybe_set_snapshot_path(
765 config: &mut Config,
766 chain_config: &ChainConfig,
767 epoch: ChainEpoch,
768 auto_download_snapshot: bool,
769 download_directory: &Path,
770) -> anyhow::Result<()> {
771 if !download_directory.is_dir() {
772 anyhow::bail!(
773 "`download_directory` does not exist: {}",
774 download_directory.display()
775 );
776 }
777
778 let vendor = snapshot::TrustedVendor::default();
779 let chain = config.chain();
780
781 let network_version = chain_config.network_version(epoch);
783 let network_version_is_small = network_version < NetworkVersion::V16;
784
785 let require_a_snapshot = network_version_is_small;
788 let have_a_snapshot = config.client.snapshot_path.is_some();
789
790 match (require_a_snapshot, have_a_snapshot, auto_download_snapshot) {
791 (false, _, _) => {} (true, true, _) => {} (true, false, true) => {
794 const AUTO_SNAPSHOT_PATH_ENV_KEY: &str = "FOREST_AUTO_DOWNLOAD_SNAPSHOT_PATH";
795 match std::env::var(AUTO_SNAPSHOT_PATH_ENV_KEY) {
796 Ok(path) if !path.is_empty() => {
797 tracing::info!(
798 "importing snapshot from {path} set by `{AUTO_SNAPSHOT_PATH_ENV_KEY}`"
799 );
800 config.client.snapshot_path = Some(path.into());
801 }
802 _ => {
803 let (resolved_url, _num_bytes, filename) =
807 crate::cli_shared::snapshot::peek(vendor, chain).await?;
808 tracing::info!("Downloading snapshot: {filename}");
809 config.client.snapshot_path = Some(resolved_url.to_string().into());
810 }
811 }
812 }
813 (true, false, false) => {
814 let (url, num_bytes, filename) = crate::cli_shared::snapshot::peek(vendor, chain)
816 .await
817 .context("couldn't get snapshot size")?;
818 println!(
821 "Forest requires a snapshot to sync with the network, but automatic fetching is disabled."
822 );
823 let message = format!(
824 "Fetch a {} snapshot? (denying will exit the program). ",
825 indicatif::HumanBytes(num_bytes)
826 );
827 let have_permission = asyncify(|| {
828 dialoguer::Confirm::with_theme(&ColorfulTheme::default())
829 .with_prompt(message)
830 .default(false)
831 .interact()
832 .unwrap_or(false)
834 })
835 .await;
836 if !have_permission {
837 bail!(
838 "Forest requires a snapshot to sync with the network, but automatic fetching is disabled."
839 )
840 }
841 tracing::info!("Downloading snapshot: {filename}");
842 config.client.snapshot_path = Some(url.to_string().into());
843 }
844 };
845
846 Ok(())
847}
848
849async fn propagate_error(
852 services: &mut JoinSet<anyhow::Result<()>>,
853) -> anyhow::Result<std::convert::Infallible> {
854 while let Some(result) = services.join_next().await {
855 if let Ok(Err(error_message)) = result {
856 return Err(error_message);
857 }
858 }
859 std::future::pending().await
860}
861
862fn asyncify<T>(f: impl FnOnce() -> T + Send + 'static) -> impl Future<Output = T>
867where
868 T: Send + 'static,
869{
870 tokio::task::spawn_blocking(f).then(|res| async { res.expect("spawned task panicked") })
871}
872
873#[cfg(test)]
874mod tests {
875 use rstest::rstest;
876
877 use super::*;
878
879 #[rstest]
880 #[case::current_non_positive(0, 1, anyhow::Result::Err(anyhow::anyhow!(
881 "current head epoch 0 is invalid"
882 )))]
883 #[case::current_non_positive(-1, 1, anyhow::Result::Err(anyhow::anyhow!(
884 "current head epoch 0 is invalid"
885 )))]
886 #[case::from_positive_beyond_head(10, 11, anyhow::Result::Err(anyhow::anyhow!(
887 "requested validation start epoch 11 is beyond the current head at epoch 10"
888 )))]
889 #[case::from_positive_within_range(10, 5, anyhow::Result::Ok(5..=10))]
890 #[case::from_zero(10, 0, anyhow::Result::Ok(0..=10))]
891 #[case::from_negative_within_range(10, -5, anyhow::Result::Ok(5..=10))]
892 #[case::from_negative_beyond_range(10, -15, anyhow::Result::Ok(0..=10))]
893 fn test_validation_range(
894 #[case] current: ChainEpoch,
895 #[case] from: ChainEpoch,
896 #[case] expected: anyhow::Result<std::ops::RangeInclusive<ChainEpoch>>,
897 ) {
898 let result = validation_range(current, from);
899 match expected {
900 Ok(expected_range) => {
901 assert_eq!(result.unwrap(), expected_range);
902 }
903 Err(_) => {
904 assert!(result.is_err());
905 }
906 }
907 }
908}