1pub mod bundle;
5mod context;
6pub mod db_util;
7pub mod main;
8
9use crate::blocks::Tipset;
10use crate::chain::HeadChange;
11use crate::chain::index::ResolveNullTipset;
12use crate::chain_sync::network_context::SyncNetworkContext;
13use crate::chain_sync::{ChainFollower, SyncStatus};
14use crate::cli_shared::snapshot;
15use crate::cli_shared::{
16 chain_path,
17 cli::{CliOpts, Config},
18};
19use crate::daemon::{
20 context::{AppContext, DbType},
21 db_util::import_chain_as_forest_car,
22};
23use crate::db::gc::SnapshotGarbageCollector;
24use crate::db::ttl::EthMappingCollector;
25use crate::libp2p::{Libp2pService, PeerManager};
26use crate::message_pool::{MessagePool, MpoolConfig, MpoolRpcProvider};
27use crate::networks::{self, ChainConfig};
28use crate::rpc::RPCState;
29use crate::rpc::eth::filter::EthEventHandler;
30use crate::rpc::start_rpc;
31use crate::shim::clock::ChainEpoch;
32use crate::shim::state_tree::StateTree;
33use crate::shim::version::NetworkVersion;
34use crate::utils;
35use crate::utils::misc::env::is_env_truthy;
36use crate::utils::{proofs_api::ensure_proof_params_downloaded, version::FOREST_VERSION_STRING};
37use anyhow::{Context as _, bail};
38use dialoguer::theme::ColorfulTheme;
39use futures::{Future, FutureExt};
40use std::path::Path;
41use std::sync::Arc;
42use std::sync::OnceLock;
43use std::time::Instant;
44use tokio::{
45 net::TcpListener,
46 signal::{
47 ctrl_c,
48 unix::{SignalKind, signal},
49 },
50 sync::mpsc,
51 task::JoinSet,
52};
53use tracing::{debug, info, warn};
54
55pub static GLOBAL_SNAPSHOT_GC: OnceLock<Arc<SnapshotGarbageCollector<DbType>>> = OnceLock::new();
56
57fn maybe_increase_fd_limit() -> anyhow::Result<()> {
61 static DESIRED_SOFT_LIMIT: u64 = 8192;
62 let (soft_before, _) = rlimit::Resource::NOFILE.get()?;
63
64 let soft_after = rlimit::increase_nofile_limit(DESIRED_SOFT_LIMIT)?;
65 if soft_before < soft_after {
66 debug!("Increased file descriptor limit from {soft_before} to {soft_after}");
67 }
68 if soft_after < DESIRED_SOFT_LIMIT {
69 warn!(
70 "File descriptor limit is too low: {soft_after} < {DESIRED_SOFT_LIMIT}. \
71 You may encounter 'too many open files' errors.",
72 );
73 }
74
75 Ok(())
76}
77
78pub async fn start_interruptable(opts: CliOpts, config: Config) -> anyhow::Result<()> {
80 let start_time = chrono::Utc::now();
81 let mut terminate = signal(SignalKind::terminate())?;
82 let (shutdown_send, mut shutdown_recv) = mpsc::channel(1);
83 let result = tokio::select! {
84 ret = start(start_time, opts, config, shutdown_send) => ret,
85 _ = ctrl_c() => {
86 info!("Keyboard interrupt.");
87 Ok(())
88 },
89 _ = terminate.recv() => {
90 info!("Received SIGTERM.");
91 Ok(())
92 },
93 _ = shutdown_recv.recv() => {
94 info!("Client requested a shutdown.");
95 Ok(())
96 },
97 };
98 crate::utils::io::terminal_cleanup();
99 result
100}
101
102fn startup_init(config: &Config) -> anyhow::Result<()> {
107 maybe_increase_fd_limit()?;
108 crate::utils::proofs_api::maybe_set_proofs_parameter_cache_dir_env(&config.client.data_dir);
111 info!(
112 "Starting Forest daemon, version {}",
113 FOREST_VERSION_STRING.as_str()
114 );
115 Ok(())
116}
117
118async fn maybe_import_snapshot(
119 opts: &CliOpts,
120 config: &mut Config,
121 ctx: &AppContext,
122) -> anyhow::Result<()> {
123 let chain_config = ctx.state_manager.chain_config();
124 if config.client.snapshot_path.is_none() && !opts.stateless {
126 maybe_set_snapshot_path(
127 config,
128 chain_config,
129 ctx.state_manager.chain_store().heaviest_tipset().epoch(),
130 opts.auto_download_snapshot,
131 &ctx.db_meta_data.get_root_dir(),
132 )
133 .await?;
134 }
135
136 let snapshot_tracker = ctx.snapshot_progress_tracker.clone();
137 if !opts.skip_load.unwrap_or_default()
139 && let Some(path) = &config.client.snapshot_path
140 {
141 let (car_db_path, ts) = import_chain_as_forest_car(
142 path,
143 &ctx.db_meta_data.get_forest_car_db_dir(),
144 config.client.import_mode,
145 config.client.rpc_v1_endpoint()?,
146 &crate::f3::get_f3_root(config),
147 ctx.chain_config(),
148 &snapshot_tracker,
149 )
150 .await?;
151 ctx.db
152 .read_only_files(std::iter::once(car_db_path.clone()))?;
153 let ts_epoch = ts.epoch();
154 ctx.state_manager.chain_store().set_heaviest_tipset(ts)?;
157 debug!(
158 "Loaded car DB at {} and set current head to epoch {ts_epoch}",
159 car_db_path.display(),
160 );
161 }
162
163 if !snapshot_tracker.is_completed() {
166 snapshot_tracker.not_required();
167 }
168
169 if let Some(validate_from) = config.client.snapshot_height {
170 ensure_proof_params_downloaded().await?;
172 let current_height = config
174 .client
175 .snapshot_head
176 .unwrap_or_else(|| ctx.state_manager.chain_store().heaviest_tipset().epoch());
177 assert!(current_height.is_positive());
178 match validate_from.is_negative() {
179 true => ctx
181 .state_manager
182 .validate_range((current_height + validate_from)..=current_height)?,
183 false => ctx
184 .state_manager
185 .validate_range(validate_from..=current_height)?,
186 }
187 }
188
189 Ok(())
190}
191
192async fn maybe_start_metrics_service(
193 services: &mut JoinSet<anyhow::Result<()>>,
194 config: &Config,
195 ctx: &AppContext,
196) -> anyhow::Result<()> {
197 if config.client.enable_metrics_endpoint {
198 let prometheus_listener = TcpListener::bind(config.client.metrics_address)
200 .await
201 .with_context(|| format!("could not bind to {}", config.client.metrics_address))?;
202 info!(
203 "Prometheus server started at {}",
204 config.client.metrics_address
205 );
206 let db_directory = crate::db::db_engine::db_root(&chain_path(config))?;
207 let db = ctx.db.writer().clone();
208
209 let get_chain_head_height = Arc::new({
210 let chain_store = Arc::downgrade(ctx.state_manager.chain_store());
212 move || {
213 chain_store
214 .upgrade()
215 .map(|cs| cs.heaviest_tipset().epoch())
216 .unwrap_or_default()
217 }
218 });
219 let get_chain_head_actor_version = Arc::new({
220 let chain_store = Arc::downgrade(ctx.state_manager.chain_store());
222 move || {
223 if let Some(cs) = chain_store.upgrade()
224 && let Ok(state) = StateTree::new_from_root(
225 cs.blockstore().clone(),
226 cs.heaviest_tipset().parent_state(),
227 )
228 && let Ok(bundle_meta) = state.get_actor_bundle_metadata()
229 && let Ok(actor_version) = bundle_meta.actor_major_version()
230 {
231 return actor_version;
232 }
233 0
234 }
235 });
236 services.spawn({
237 let chain_config = ctx.chain_config().clone();
238 let get_chain_head_height = get_chain_head_height.clone();
239 async {
240 crate::metrics::init_prometheus(
241 prometheus_listener,
242 db_directory,
243 db,
244 chain_config,
245 get_chain_head_height,
246 get_chain_head_actor_version,
247 )
248 .await
249 .context("Failed to initiate prometheus server")
250 }
251 });
252
253 crate::metrics::register_collector(Box::new(
254 networks::metrics::NetworkHeightCollector::new(
255 ctx.state_manager.chain_config().block_delay_secs,
256 ctx.state_manager
257 .chain_store()
258 .genesis_block_header()
259 .timestamp,
260 get_chain_head_height,
261 ),
262 ));
263 }
264 Ok(())
265}
266
267async fn create_p2p_service(
268 services: &mut JoinSet<anyhow::Result<()>>,
269 config: &mut Config,
270 ctx: &AppContext,
271) -> anyhow::Result<Libp2pService<DbType>> {
272 if config.network.bootstrap_peers.is_empty() {
274 config.network.bootstrap_peers = ctx.state_manager.chain_config().bootstrap_peers.clone();
275 }
276
277 let peer_manager = Arc::new(PeerManager::default());
278 services.spawn(peer_manager.clone().peer_operation_event_loop_task());
279 let p2p_service = Libp2pService::new(
281 config.network.clone(),
282 Arc::clone(ctx.state_manager.chain_store()),
283 peer_manager.clone(),
284 ctx.net_keypair.clone(),
285 config.chain.genesis_name(),
286 *ctx.state_manager.chain_store().genesis_block_header().cid(),
287 )
288 .await?;
289 Ok(p2p_service)
290}
291
292fn create_mpool(
293 services: &mut JoinSet<anyhow::Result<()>>,
294 p2p_service: &Libp2pService<DbType>,
295 ctx: &AppContext,
296) -> anyhow::Result<Arc<MessagePool<MpoolRpcProvider<DbType>>>> {
297 let publisher = ctx.state_manager.chain_store().publisher();
298 let provider = MpoolRpcProvider::new(publisher.clone(), ctx.state_manager.clone());
299 Ok(MessagePool::new(
300 provider,
301 p2p_service.network_sender().clone(),
302 MpoolConfig::load_config(ctx.db.writer().as_ref())?,
303 ctx.state_manager.chain_config().clone(),
304 services,
305 )
306 .map(Arc::new)?)
307}
308
309fn create_chain_follower(
310 opts: &CliOpts,
311 p2p_service: &Libp2pService<DbType>,
312 mpool: Arc<MessagePool<MpoolRpcProvider<DbType>>>,
313 ctx: &AppContext,
314) -> anyhow::Result<ChainFollower<DbType>> {
315 let network_send = p2p_service.network_sender().clone();
316 let peer_manager = p2p_service.peer_manager().clone();
317 let network = SyncNetworkContext::new(network_send, peer_manager, ctx.db.clone());
318 let chain_follower = ChainFollower::new(
319 ctx.state_manager.clone(),
320 network,
321 Tipset::from(ctx.state_manager.chain_store().genesis_block_header()),
322 p2p_service.network_receiver(),
323 opts.stateless,
324 mpool,
325 );
326 Ok(chain_follower)
327}
328
329fn start_chain_follower_service(
330 services: &mut JoinSet<anyhow::Result<()>>,
331 chain_follower: ChainFollower<DbType>,
332) {
333 services.spawn(async move { chain_follower.run().await });
334}
335
336async fn maybe_start_health_check_service(
337 services: &mut JoinSet<anyhow::Result<()>>,
338 config: &Config,
339 p2p_service: &Libp2pService<DbType>,
340 chain_follower: &ChainFollower<DbType>,
341 ctx: &AppContext,
342) -> anyhow::Result<()> {
343 if config.client.enable_health_check {
344 let forest_state = crate::health::ForestState {
345 config: config.clone(),
346 chain_config: ctx.state_manager.chain_config().clone(),
347 genesis_timestamp: ctx
348 .state_manager
349 .chain_store()
350 .genesis_block_header()
351 .timestamp,
352 sync_status: chain_follower.sync_status.clone(),
353 peer_manager: p2p_service.peer_manager().clone(),
354 };
355 let healthcheck_address = forest_state.config.client.healthcheck_address;
356 info!("Healthcheck endpoint will listen at {healthcheck_address}");
357 let listener = tokio::net::TcpListener::bind(healthcheck_address).await?;
358 services.spawn(async move {
359 crate::health::init_healthcheck_server(forest_state, listener)
360 .await
361 .context("Failed to initiate healthcheck server")
362 });
363 } else {
364 info!("Healthcheck service is disabled");
365 }
366 Ok(())
367}
368
369#[allow(clippy::too_many_arguments)]
370fn maybe_start_rpc_service(
371 services: &mut JoinSet<anyhow::Result<()>>,
372 config: &Config,
373 mpool: Arc<MessagePool<MpoolRpcProvider<DbType>>>,
374 chain_follower: &ChainFollower<DbType>,
375 start_time: chrono::DateTime<chrono::Utc>,
376 shutdown: mpsc::Sender<()>,
377 rpc_stop_handle: jsonrpsee::server::StopHandle,
378 ctx: &AppContext,
379) -> anyhow::Result<()> {
380 if config.client.enable_rpc {
381 let rpc_address = config.client.rpc_address;
382 let filter_list = config
383 .client
384 .rpc_filter_list
385 .as_ref()
386 .map(|path| crate::rpc::FilterList::new_from_file(path))
387 .transpose()?;
388 info!("JSON-RPC endpoint will listen at {rpc_address}");
389 let eth_event_handler = Arc::new(EthEventHandler::from_config(&config.events));
390 if is_env_truthy("FOREST_JWT_DISABLE_EXP_VALIDATION") {
391 warn!(
392 "JWT expiration validation is disabled; this significantly weakens security and should only be used in tightly controlled environments"
393 );
394 }
395 services.spawn({
396 let state_manager = ctx.state_manager.clone();
397 let bad_blocks = chain_follower.bad_blocks.clone();
398 let sync_status = chain_follower.sync_status.clone();
399 let sync_network_context = chain_follower.network.clone();
400 let tipset_send = chain_follower.tipset_sender.clone();
401 let keystore = ctx.keystore.clone();
402 let snapshot_progress_tracker = ctx.snapshot_progress_tracker.clone();
403 let msgs_in_tipset = Arc::new(crate::chain::MsgsInTipsetCache::default());
404 async move {
405 let rpc_listener = tokio::net::TcpListener::bind(rpc_address)
406 .await
407 .map_err(|e| {
408 anyhow::anyhow!("Unable to listen on RPC endpoint {rpc_address}: {e}")
409 })
410 .unwrap();
411 start_rpc(
412 RPCState {
413 state_manager,
414 keystore,
415 mpool,
416 bad_blocks,
417 msgs_in_tipset,
418 sync_status,
419 eth_event_handler,
420 sync_network_context,
421 start_time,
422 shutdown,
423 tipset_send,
424 snapshot_progress_tracker,
425 },
426 rpc_listener,
427 rpc_stop_handle,
428 filter_list,
429 )
430 .await
431 }
432 });
433 } else {
434 debug!("RPC disabled.");
435 };
436 Ok(())
437}
438
439fn maybe_start_f3_service(opts: &CliOpts, config: &Config, ctx: &AppContext) -> anyhow::Result<()> {
440 if crate::rpc::f3::F3_LEASE_MANAGER.get().is_some() {
442 return Ok(());
443 }
444
445 if !config.client.enable_rpc {
446 if crate::f3::is_sidecar_ffi_enabled(ctx.state_manager.chain_config()) {
447 tracing::warn!("F3 sidecar is enabled but not run because RPC is disabled. ")
448 }
449 return Ok(());
450 }
451
452 if !opts.halt_after_import && !opts.stateless {
453 let rpc_endpoint = config.client.rpc_v1_endpoint()?;
454 let state_manager = &ctx.state_manager;
455 let p2p_peer_id = ctx.p2p_peer_id;
456 let admin_jwt = ctx.admin_jwt.clone();
457 tokio::task::spawn_blocking({
458 crate::rpc::f3::F3_LEASE_MANAGER
459 .set(crate::rpc::f3::F3LeaseManager::new(
460 state_manager.chain_config().network.clone(),
461 p2p_peer_id,
462 ))
463 .expect("F3 lease manager should not have been initialized before");
464 let chain_config = state_manager.chain_config().clone();
465 let f3_root = crate::f3::get_f3_root(config);
466 let crate::f3::F3Options {
467 chain_finality,
468 bootstrap_epoch,
469 initial_power_table,
470 } = crate::f3::get_f3_sidecar_params(&chain_config);
471 move || {
472 crate::f3::run_f3_sidecar_if_enabled(
473 &chain_config,
474 rpc_endpoint.to_string(),
475 admin_jwt,
476 crate::rpc::f3::get_f3_rpc_endpoint().to_string(),
477 initial_power_table
478 .map(|i| i.to_string())
479 .unwrap_or_default(),
480 bootstrap_epoch,
481 chain_finality,
482 f3_root.display().to_string(),
483 );
484 }
485 });
486 }
487
488 Ok(())
489}
490
491fn maybe_start_indexer_service(
492 services: &mut JoinSet<anyhow::Result<()>>,
493 opts: &CliOpts,
494 config: &Config,
495 ctx: &AppContext,
496) {
497 if config.chain_indexer.enable_indexer
498 && !opts.stateless
499 && !ctx.state_manager.chain_config().is_devnet()
500 {
501 let mut receiver = ctx.state_manager.chain_store().publisher().subscribe();
502 let chain_store = ctx.state_manager.chain_store().clone();
503 services.spawn(async move {
504 tracing::info!("Starting indexer service");
505
506 loop {
508 let msg = receiver.recv().await?;
509
510 let HeadChange::Apply(ts) = msg;
511 tracing::debug!("Indexing tipset {}", ts.key());
512
513 chain_store.put_tipset_key(ts.key())?;
514
515 let delegated_messages =
516 chain_store.headers_delegated_messages(ts.block_headers().iter())?;
517
518 chain_store.process_signed_messages(&delegated_messages)?;
519 }
520 });
521
522 if let Some(retention_epochs) = config.chain_indexer.gc_retention_epochs {
524 let chain_store = ctx.state_manager.chain_store().clone();
525 let chain_config = ctx.state_manager.chain_config().clone();
526 services.spawn(async move {
527 tracing::info!("Starting collector for eth_mappings");
528 let mut collector = EthMappingCollector::new(
529 chain_store.blockstore().clone(),
530 chain_config.eth_chain_id,
531 retention_epochs.into(),
532 );
533 collector.run().await
534 });
535 }
536 }
537}
538
539pub(super) async fn start(
541 start_time: chrono::DateTime<chrono::Utc>,
542 opts: CliOpts,
543 config: Config,
544 shutdown_send: mpsc::Sender<()>,
545) -> anyhow::Result<()> {
546 startup_init(&config)?;
547 let (snap_gc, snap_gc_reboot_rx) = SnapshotGarbageCollector::new(&config)?;
548 let snap_gc = Arc::new(snap_gc);
549 GLOBAL_SNAPSHOT_GC
550 .set(snap_gc.clone())
551 .ok()
552 .context("failed to set GLOBAL_SNAPSHOT_GC")?;
553
554 if !opts.stateless {
556 tokio::task::spawn({
557 let snap_gc = snap_gc.clone();
558 async move { snap_gc.event_loop().await }
559 });
560 }
561 if !opts.no_gc && !opts.stateless {
563 tokio::task::spawn({
564 let snap_gc = snap_gc.clone();
565 async move { snap_gc.scheduler_loop().await }
566 });
567 }
568 loop {
569 let (rpc_stop_handle, rpc_server_handle) = jsonrpsee::server::stop_channel();
570 tokio::select! {
571 _ = snap_gc_reboot_rx.recv_async() => {
572 if let Err(e) = rpc_server_handle.stop() {
574 tracing::warn!("failed to stop RPC server: {e}");
575 }
576 snap_gc.cleanup_before_reboot().await;
577 }
578 result = start_services(start_time, &opts, config.clone(), shutdown_send.clone(), rpc_stop_handle, |ctx, sync_status| {
579 snap_gc.set_db(ctx.db.clone());
580 snap_gc.set_sync_status(sync_status);
581 snap_gc.set_car_db_head_epoch(ctx.db.heaviest_tipset().map(|ts|ts.epoch()).unwrap_or_default());
582 }) => {
583 break result
584 }
585 }
586 }
587}
588
589pub(super) async fn start_services(
590 start_time: chrono::DateTime<chrono::Utc>,
591 opts: &CliOpts,
592 mut config: Config,
593 shutdown_send: mpsc::Sender<()>,
594 rpc_stop_handle: jsonrpsee::server::StopHandle,
595 on_app_context_and_db_initialized: impl FnOnce(&AppContext, SyncStatus),
596) -> anyhow::Result<()> {
597 crate::metrics::reset_collector_registry();
599 let mut services = JoinSet::new();
600 let network = config.chain();
601 let ctx = AppContext::init(opts, &config).await?;
602 info!("Using network :: {network}");
603 utils::misc::display_chain_logo(config.chain());
604 if opts.exit_after_init {
605 return Ok(());
606 }
607 if !opts.stateless
608 && !opts.skip_load_actors
609 && let Err(e) = ctx.state_manager.maybe_rewind_heaviest_tipset()
610 {
611 tracing::warn!("error in maybe_rewind_heaviest_tipset: {e}");
612 }
613 let p2p_service = create_p2p_service(&mut services, &mut config, &ctx).await?;
614 let mpool = create_mpool(&mut services, &p2p_service, &ctx)?;
615 let chain_follower = create_chain_follower(opts, &p2p_service, mpool.clone(), &ctx)?;
616
617 maybe_start_rpc_service(
618 &mut services,
619 &config,
620 mpool.clone(),
621 &chain_follower,
622 start_time,
623 shutdown_send.clone(),
624 rpc_stop_handle,
625 &ctx,
626 )?;
627
628 maybe_import_snapshot(opts, &mut config, &ctx).await?;
629 if opts.halt_after_import {
630 services.shutdown().await;
632 return Ok(());
633 }
634 on_app_context_and_db_initialized(&ctx, chain_follower.sync_status.clone());
635 warmup_in_background(&ctx);
636 ctx.state_manager.populate_cache();
637 maybe_start_metrics_service(&mut services, &config, &ctx).await?;
638 maybe_start_f3_service(opts, &config, &ctx)?;
639 maybe_start_health_check_service(&mut services, &config, &p2p_service, &chain_follower, &ctx)
640 .await?;
641 maybe_start_indexer_service(&mut services, opts, &config, &ctx);
642 if !opts.stateless {
643 ensure_proof_params_downloaded().await?;
644 }
645 services.spawn(p2p_service.run());
646 start_chain_follower_service(&mut services, chain_follower);
647 propagate_error(&mut services)
649 .await
650 .context("services failure")
651 .map(|_| {})
652}
653
654fn warmup_in_background(ctx: &AppContext) {
655 let cs = ctx.chain_store().clone();
657 tokio::task::spawn_blocking(move || {
658 let start = Instant::now();
659 match cs.chain_index().tipset_by_height(
660 1,
662 cs.heaviest_tipset(),
663 ResolveNullTipset::TakeOlder,
664 ) {
665 Ok(_) => {
666 tracing::info!(
667 "Successfully populated tipset_by_height cache, took {}",
668 humantime::format_duration(start.elapsed())
669 );
670 }
671 Err(e) => {
672 tracing::warn!("Failed to populate tipset_by_height cache: {e}");
673 }
674 }
675 });
676}
677
678async fn maybe_set_snapshot_path(
683 config: &mut Config,
684 chain_config: &ChainConfig,
685 epoch: ChainEpoch,
686 auto_download_snapshot: bool,
687 download_directory: &Path,
688) -> anyhow::Result<()> {
689 if !download_directory.is_dir() {
690 anyhow::bail!(
691 "`download_directory` does not exist: {}",
692 download_directory.display()
693 );
694 }
695
696 let vendor = snapshot::TrustedVendor::default();
697 let chain = config.chain();
698
699 let network_version = chain_config.network_version(epoch);
701 let network_version_is_small = network_version < NetworkVersion::V16;
702
703 let require_a_snapshot = network_version_is_small;
706 let have_a_snapshot = config.client.snapshot_path.is_some();
707
708 match (require_a_snapshot, have_a_snapshot, auto_download_snapshot) {
709 (false, _, _) => {} (true, true, _) => {} (true, false, true) => {
712 let url = crate::cli_shared::snapshot::stable_url(vendor, chain)?;
713 config.client.snapshot_path = Some(url.to_string().into());
714 }
715 (true, false, false) => {
716 let (url, num_bytes, _path) = crate::cli_shared::snapshot::peek(vendor, chain)
718 .await
719 .context("couldn't get snapshot size")?;
720 println!(
723 "Forest requires a snapshot to sync with the network, but automatic fetching is disabled."
724 );
725 let message = format!(
726 "Fetch a {} snapshot to the current directory? (denying will exit the program). ",
727 indicatif::HumanBytes(num_bytes)
728 );
729 let have_permission = asyncify(|| {
730 dialoguer::Confirm::with_theme(&ColorfulTheme::default())
731 .with_prompt(message)
732 .default(false)
733 .interact()
734 .unwrap_or(false)
736 })
737 .await;
738 if !have_permission {
739 bail!(
740 "Forest requires a snapshot to sync with the network, but automatic fetching is disabled."
741 )
742 }
743 config.client.snapshot_path = Some(url.to_string().into());
744 }
745 };
746
747 Ok(())
748}
749
750async fn propagate_error(
753 services: &mut JoinSet<Result<(), anyhow::Error>>,
754) -> anyhow::Result<std::convert::Infallible> {
755 while let Some(result) = services.join_next().await {
756 if let Ok(Err(error_message)) = result {
757 return Err(error_message);
758 }
759 }
760 std::future::pending().await
761}
762
763fn asyncify<T>(f: impl FnOnce() -> T + Send + 'static) -> impl Future<Output = T>
768where
769 T: Send + 'static,
770{
771 tokio::task::spawn_blocking(f).then(|res| async { res.expect("spawned task panicked") })
772}