1pub mod bundle;
5mod context;
6pub mod db_util;
7pub mod main;
8
9use crate::blocks::Tipset;
10use crate::chain::HeadChange;
11use crate::chain::index::ResolveNullTipset;
12use crate::chain_sync::network_context::SyncNetworkContext;
13use crate::chain_sync::{ChainFollower, SyncStatus};
14use crate::cli_shared::snapshot;
15use crate::cli_shared::{
16 chain_path,
17 cli::{CliOpts, Config},
18};
19use crate::daemon::{
20 context::{AppContext, DbType},
21 db_util::import_chain_as_forest_car,
22};
23use crate::db::gc::SnapshotGarbageCollector;
24use crate::db::ttl::EthMappingCollector;
25use crate::libp2p::{Libp2pService, PeerManager};
26use crate::message_pool::{MessagePool, MpoolConfig, MpoolRpcProvider};
27use crate::networks::{self, ChainConfig};
28use crate::rpc::RPCState;
29use crate::rpc::eth::filter::EthEventHandler;
30use crate::rpc::start_rpc;
31use crate::shim::clock::ChainEpoch;
32use crate::shim::state_tree::StateTree;
33use crate::shim::version::NetworkVersion;
34use crate::utils;
35use crate::utils::misc::env::is_env_truthy;
36use crate::utils::{proofs_api::ensure_proof_params_downloaded, version::FOREST_VERSION_STRING};
37use anyhow::{Context as _, bail};
38use dialoguer::theme::ColorfulTheme;
39use futures::{Future, FutureExt, select};
40use std::path::Path;
41use std::sync::Arc;
42use std::sync::OnceLock;
43use std::time::Instant;
44use tokio::{
45 net::TcpListener,
46 signal::{
47 ctrl_c,
48 unix::{SignalKind, signal},
49 },
50 sync::mpsc,
51 task::JoinSet,
52};
53use tracing::{debug, info, warn};
54
55pub static GLOBAL_SNAPSHOT_GC: OnceLock<Arc<SnapshotGarbageCollector<DbType>>> = OnceLock::new();
56
57fn maybe_increase_fd_limit() -> anyhow::Result<()> {
61 static DESIRED_SOFT_LIMIT: u64 = 8192;
62 let (soft_before, _) = rlimit::Resource::NOFILE.get()?;
63
64 let soft_after = rlimit::increase_nofile_limit(DESIRED_SOFT_LIMIT)?;
65 if soft_before < soft_after {
66 debug!("Increased file descriptor limit from {soft_before} to {soft_after}");
67 }
68 if soft_after < DESIRED_SOFT_LIMIT {
69 warn!(
70 "File descriptor limit is too low: {soft_after} < {DESIRED_SOFT_LIMIT}. \
71 You may encounter 'too many open files' errors.",
72 );
73 }
74
75 Ok(())
76}
77
78pub async fn start_interruptable(opts: CliOpts, config: Config) -> anyhow::Result<()> {
80 let start_time = chrono::Utc::now();
81 let mut terminate = signal(SignalKind::terminate())?;
82 let (shutdown_send, mut shutdown_recv) = mpsc::channel(1);
83
84 let result = tokio::select! {
85 ret = start(start_time, opts, config, shutdown_send) => ret,
86 _ = ctrl_c() => {
87 info!("Keyboard interrupt.");
88 Ok(())
89 },
90 _ = terminate.recv() => {
91 info!("Received SIGTERM.");
92 Ok(())
93 },
94 _ = shutdown_recv.recv() => {
95 info!("Client requested a shutdown.");
96 Ok(())
97 },
98 };
99 crate::utils::io::terminal_cleanup();
100 result
101}
102
103fn startup_init(config: &Config) -> anyhow::Result<()> {
108 maybe_increase_fd_limit()?;
109 crate::utils::proofs_api::maybe_set_proofs_parameter_cache_dir_env(&config.client.data_dir);
112 info!(
113 "Starting Forest daemon, version {}",
114 FOREST_VERSION_STRING.as_str()
115 );
116 Ok(())
117}
118
119async fn maybe_import_snapshot(
120 opts: &CliOpts,
121 config: &mut Config,
122 ctx: &AppContext,
123) -> anyhow::Result<()> {
124 let chain_config = ctx.state_manager.chain_config();
125 if config.client.snapshot_path.is_none() && !opts.stateless {
127 maybe_set_snapshot_path(
128 config,
129 chain_config,
130 ctx.state_manager.chain_store().heaviest_tipset().epoch(),
131 opts.auto_download_snapshot,
132 &ctx.db_meta_data.get_root_dir(),
133 )
134 .await?;
135 }
136
137 let snapshot_tracker = ctx.snapshot_progress_tracker.clone();
138 if !opts.skip_load.unwrap_or_default()
140 && let Some(path) = &config.client.snapshot_path
141 {
142 let (car_db_path, ts) = import_chain_as_forest_car(
143 path,
144 &ctx.db_meta_data.get_forest_car_db_dir(),
145 config.client.import_mode,
146 config.client.rpc_v1_endpoint()?,
147 &crate::f3::get_f3_root(config),
148 ctx.chain_config(),
149 &snapshot_tracker,
150 )
151 .await?;
152 ctx.db
153 .read_only_files(std::iter::once(car_db_path.clone()))?;
154 let ts_epoch = ts.epoch();
155 ctx.state_manager
158 .chain_store()
159 .set_heaviest_tipset(ts.into())?;
160 debug!(
161 "Loaded car DB at {} and set current head to epoch {ts_epoch}",
162 car_db_path.display(),
163 );
164 }
165
166 if !snapshot_tracker.is_completed() {
169 snapshot_tracker.not_required();
170 }
171
172 if let Some(validate_from) = config.client.snapshot_height {
173 ensure_proof_params_downloaded().await?;
175 let current_height = config
177 .client
178 .snapshot_head
179 .unwrap_or_else(|| ctx.state_manager.chain_store().heaviest_tipset().epoch());
180 assert!(current_height.is_positive());
181 match validate_from.is_negative() {
182 true => ctx
184 .state_manager
185 .validate_range((current_height + validate_from)..=current_height)?,
186 false => ctx
187 .state_manager
188 .validate_range(validate_from..=current_height)?,
189 }
190 }
191
192 Ok(())
193}
194
195async fn maybe_start_metrics_service(
196 services: &mut JoinSet<anyhow::Result<()>>,
197 config: &Config,
198 ctx: &AppContext,
199) -> anyhow::Result<()> {
200 if config.client.enable_metrics_endpoint {
201 let prometheus_listener = TcpListener::bind(config.client.metrics_address)
203 .await
204 .with_context(|| format!("could not bind to {}", config.client.metrics_address))?;
205 info!(
206 "Prometheus server started at {}",
207 config.client.metrics_address
208 );
209 let db_directory = crate::db::db_engine::db_root(&chain_path(config))?;
210 let db = ctx.db.writer().clone();
211
212 let get_chain_head_height = Arc::new({
213 let chain_store = Arc::downgrade(ctx.state_manager.chain_store());
215 move || {
216 chain_store
217 .upgrade()
218 .map(|cs| cs.heaviest_tipset().epoch())
219 .unwrap_or_default()
220 }
221 });
222 let get_chain_head_actor_version = Arc::new({
223 let chain_store = Arc::downgrade(ctx.state_manager.chain_store());
225 move || {
226 if let Some(cs) = chain_store.upgrade()
227 && let Ok(state) = StateTree::new_from_root(
228 cs.blockstore().clone(),
229 cs.heaviest_tipset().parent_state(),
230 )
231 && let Ok(bundle_meta) = state.get_actor_bundle_metadata()
232 && let Ok(actor_version) = bundle_meta.actor_major_version()
233 {
234 return actor_version;
235 }
236 0
237 }
238 });
239 services.spawn({
240 let chain_config = ctx.chain_config().clone();
241 let get_chain_head_height = get_chain_head_height.clone();
242 async {
243 crate::metrics::init_prometheus(
244 prometheus_listener,
245 db_directory,
246 db,
247 chain_config,
248 get_chain_head_height,
249 get_chain_head_actor_version,
250 )
251 .await
252 .context("Failed to initiate prometheus server")
253 }
254 });
255
256 crate::metrics::register_collector(Box::new(
257 networks::metrics::NetworkHeightCollector::new(
258 ctx.state_manager.chain_config().block_delay_secs,
259 ctx.state_manager
260 .chain_store()
261 .genesis_block_header()
262 .timestamp,
263 get_chain_head_height,
264 ),
265 ));
266 }
267 Ok(())
268}
269
270async fn create_p2p_service(
271 services: &mut JoinSet<anyhow::Result<()>>,
272 config: &mut Config,
273 ctx: &AppContext,
274) -> anyhow::Result<Libp2pService<DbType>> {
275 if config.network.bootstrap_peers.is_empty() {
277 config.network.bootstrap_peers = ctx.state_manager.chain_config().bootstrap_peers.clone();
278 }
279
280 let peer_manager = Arc::new(PeerManager::default());
281 services.spawn(peer_manager.clone().peer_operation_event_loop_task());
282 let p2p_service = Libp2pService::new(
284 config.network.clone(),
285 Arc::clone(ctx.state_manager.chain_store()),
286 peer_manager.clone(),
287 ctx.net_keypair.clone(),
288 config.chain.genesis_name(),
289 *ctx.state_manager.chain_store().genesis_block_header().cid(),
290 )
291 .await?;
292 Ok(p2p_service)
293}
294
295fn create_mpool(
296 services: &mut JoinSet<anyhow::Result<()>>,
297 p2p_service: &Libp2pService<DbType>,
298 ctx: &AppContext,
299) -> anyhow::Result<Arc<MessagePool<MpoolRpcProvider<DbType>>>> {
300 let publisher = ctx.state_manager.chain_store().publisher();
301 let provider = MpoolRpcProvider::new(publisher.clone(), ctx.state_manager.clone());
302 Ok(MessagePool::new(
303 provider,
304 p2p_service.network_sender().clone(),
305 MpoolConfig::load_config(ctx.db.writer().as_ref())?,
306 ctx.state_manager.chain_config().clone(),
307 services,
308 )
309 .map(Arc::new)?)
310}
311
312fn create_chain_follower(
313 opts: &CliOpts,
314 p2p_service: &Libp2pService<DbType>,
315 mpool: Arc<MessagePool<MpoolRpcProvider<DbType>>>,
316 ctx: &AppContext,
317) -> anyhow::Result<ChainFollower<DbType>> {
318 let network_send = p2p_service.network_sender().clone();
319 let peer_manager = p2p_service.peer_manager().clone();
320 let network = SyncNetworkContext::new(network_send, peer_manager, ctx.db.clone());
321 let chain_follower = ChainFollower::new(
322 ctx.state_manager.clone(),
323 network,
324 Arc::new(Tipset::from(
325 ctx.state_manager.chain_store().genesis_block_header(),
326 )),
327 p2p_service.network_receiver(),
328 opts.stateless,
329 mpool,
330 );
331 Ok(chain_follower)
332}
333
334fn start_chain_follower_service(
335 services: &mut JoinSet<anyhow::Result<()>>,
336 chain_follower: ChainFollower<DbType>,
337) {
338 services.spawn(async move { chain_follower.run().await });
339}
340
341async fn maybe_start_health_check_service(
342 services: &mut JoinSet<anyhow::Result<()>>,
343 config: &Config,
344 p2p_service: &Libp2pService<DbType>,
345 chain_follower: &ChainFollower<DbType>,
346 ctx: &AppContext,
347) -> anyhow::Result<()> {
348 if config.client.enable_health_check {
349 let forest_state = crate::health::ForestState {
350 config: config.clone(),
351 chain_config: ctx.state_manager.chain_config().clone(),
352 genesis_timestamp: ctx
353 .state_manager
354 .chain_store()
355 .genesis_block_header()
356 .timestamp,
357 sync_status: chain_follower.sync_status.clone(),
358 peer_manager: p2p_service.peer_manager().clone(),
359 };
360 let healthcheck_address = forest_state.config.client.healthcheck_address;
361 info!("Healthcheck endpoint will listen at {healthcheck_address}");
362 let listener = tokio::net::TcpListener::bind(healthcheck_address).await?;
363 services.spawn(async move {
364 crate::health::init_healthcheck_server(forest_state, listener)
365 .await
366 .context("Failed to initiate healthcheck server")
367 });
368 } else {
369 info!("Healthcheck service is disabled");
370 }
371 Ok(())
372}
373
374#[allow(clippy::too_many_arguments)]
375fn maybe_start_rpc_service(
376 services: &mut JoinSet<anyhow::Result<()>>,
377 config: &Config,
378 mpool: Arc<MessagePool<MpoolRpcProvider<DbType>>>,
379 chain_follower: &ChainFollower<DbType>,
380 start_time: chrono::DateTime<chrono::Utc>,
381 shutdown: mpsc::Sender<()>,
382 ctx: &AppContext,
383) -> anyhow::Result<()> {
384 if config.client.enable_rpc {
385 let rpc_address = config.client.rpc_address;
386 let filter_list = config
387 .client
388 .rpc_filter_list
389 .as_ref()
390 .map(|path| crate::rpc::FilterList::new_from_file(path))
391 .transpose()?;
392 info!("JSON-RPC endpoint will listen at {rpc_address}");
393 let eth_event_handler = Arc::new(EthEventHandler::from_config(&config.events));
394 if is_env_truthy("FOREST_JWT_DISABLE_EXP_VALIDATION") {
395 warn!(
396 "JWT expiration validation is disabled; this significantly weakens security and should only be used in tightly controlled environments"
397 );
398 }
399 services.spawn({
400 let state_manager = ctx.state_manager.clone();
401 let bad_blocks = chain_follower.bad_blocks.clone();
402 let sync_status = chain_follower.sync_status.clone();
403 let sync_network_context = chain_follower.network.clone();
404 let tipset_send = chain_follower.tipset_sender.clone();
405 let keystore = ctx.keystore.clone();
406 let snapshot_progress_tracker = ctx.snapshot_progress_tracker.clone();
407 let msgs_in_tipset = Arc::new(crate::chain::MsgsInTipsetCache::default());
408 async move {
409 start_rpc(
410 RPCState {
411 state_manager,
412 keystore,
413 mpool,
414 bad_blocks,
415 msgs_in_tipset,
416 sync_status,
417 eth_event_handler,
418 sync_network_context,
419 start_time,
420 shutdown,
421 tipset_send,
422 snapshot_progress_tracker,
423 },
424 rpc_address,
425 filter_list,
426 )
427 .await
428 }
429 });
430 } else {
431 debug!("RPC disabled.");
432 };
433 Ok(())
434}
435
436fn maybe_start_f3_service(opts: &CliOpts, config: &Config, ctx: &AppContext) -> anyhow::Result<()> {
437 if crate::rpc::f3::F3_LEASE_MANAGER.get().is_some() {
439 return Ok(());
440 }
441
442 if !config.client.enable_rpc {
443 if crate::f3::is_sidecar_ffi_enabled(ctx.state_manager.chain_config()) {
444 tracing::warn!("F3 sidecar is enabled but not run because RPC is disabled. ")
445 }
446 return Ok(());
447 }
448
449 if !opts.halt_after_import && !opts.stateless {
450 let rpc_endpoint = config.client.rpc_v1_endpoint()?;
451 let state_manager = &ctx.state_manager;
452 let p2p_peer_id = ctx.p2p_peer_id;
453 let admin_jwt = ctx.admin_jwt.clone();
454 tokio::task::spawn_blocking({
455 crate::rpc::f3::F3_LEASE_MANAGER
456 .set(crate::rpc::f3::F3LeaseManager::new(
457 state_manager.chain_config().network.clone(),
458 p2p_peer_id,
459 ))
460 .expect("F3 lease manager should not have been initialized before");
461 let chain_config = state_manager.chain_config().clone();
462 let f3_root = crate::f3::get_f3_root(config);
463 let crate::f3::F3Options {
464 chain_finality,
465 bootstrap_epoch,
466 initial_power_table,
467 } = crate::f3::get_f3_sidecar_params(&chain_config);
468 move || {
469 crate::f3::run_f3_sidecar_if_enabled(
470 &chain_config,
471 rpc_endpoint.to_string(),
472 admin_jwt,
473 crate::rpc::f3::get_f3_rpc_endpoint().to_string(),
474 initial_power_table
475 .map(|i| i.to_string())
476 .unwrap_or_default(),
477 bootstrap_epoch,
478 chain_finality,
479 f3_root.display().to_string(),
480 );
481 }
482 });
483 }
484
485 Ok(())
486}
487
488fn maybe_start_indexer_service(
489 services: &mut JoinSet<anyhow::Result<()>>,
490 opts: &CliOpts,
491 config: &Config,
492 ctx: &AppContext,
493) {
494 if config.chain_indexer.enable_indexer
495 && !opts.stateless
496 && !ctx.state_manager.chain_config().is_devnet()
497 {
498 let mut receiver = ctx.state_manager.chain_store().publisher().subscribe();
499 let chain_store = ctx.state_manager.chain_store().clone();
500 services.spawn(async move {
501 tracing::info!("Starting indexer service");
502
503 loop {
505 let msg = receiver.recv().await?;
506
507 let HeadChange::Apply(ts) = msg;
508 tracing::debug!("Indexing tipset {}", ts.key());
509
510 chain_store.put_tipset_key(ts.key())?;
511
512 let delegated_messages =
513 chain_store.headers_delegated_messages(ts.block_headers().iter())?;
514
515 chain_store.process_signed_messages(&delegated_messages)?;
516 }
517 });
518
519 if let Some(retention_epochs) = config.chain_indexer.gc_retention_epochs {
521 let chain_store = ctx.state_manager.chain_store().clone();
522 let chain_config = ctx.state_manager.chain_config().clone();
523 services.spawn(async move {
524 tracing::info!("Starting collector for eth_mappings");
525 let mut collector = EthMappingCollector::new(
526 chain_store.blockstore().clone(),
527 chain_config.eth_chain_id,
528 retention_epochs.into(),
529 );
530 collector.run().await
531 });
532 }
533 }
534}
535
536pub(super) async fn start(
538 start_time: chrono::DateTime<chrono::Utc>,
539 opts: CliOpts,
540 config: Config,
541 shutdown_send: mpsc::Sender<()>,
542) -> anyhow::Result<()> {
543 startup_init(&config)?;
544 let (snap_gc, snap_gc_reboot_rx) = SnapshotGarbageCollector::new(&config)?;
545 let snap_gc = Arc::new(snap_gc);
546 GLOBAL_SNAPSHOT_GC
547 .set(snap_gc.clone())
548 .ok()
549 .context("failed to set GLOBAL_SNAPSHOT_GC")?;
550
551 if !opts.stateless {
553 tokio::task::spawn({
554 let snap_gc = snap_gc.clone();
555 async move { snap_gc.event_loop().await }
556 });
557 }
558 if !opts.no_gc && !opts.stateless {
560 tokio::task::spawn({
561 let snap_gc = snap_gc.clone();
562 async move { snap_gc.scheduler_loop().await }
563 });
564 }
565 loop {
566 tokio::select! {
567 _ = snap_gc_reboot_rx.recv_async() => {
568 snap_gc.cleanup_before_reboot().await;
569 }
570 result = start_services(start_time, &opts, config.clone(), shutdown_send.clone(), |ctx, sync_status| {
571 snap_gc.set_db(ctx.db.clone());
572 snap_gc.set_sync_status(sync_status);
573 snap_gc.set_car_db_head_epoch(ctx.db.heaviest_tipset().map(|ts|ts.epoch()).unwrap_or_default());
574 }) => {
575 break result
576 }
577 }
578 }
579}
580
581pub(super) async fn start_services(
582 start_time: chrono::DateTime<chrono::Utc>,
583 opts: &CliOpts,
584 mut config: Config,
585 shutdown_send: mpsc::Sender<()>,
586 on_app_context_and_db_initialized: impl FnOnce(&AppContext, SyncStatus),
587) -> anyhow::Result<()> {
588 crate::metrics::reset_collector_registry();
590 let mut services = JoinSet::new();
591 let network = config.chain();
592 let ctx = AppContext::init(opts, &config).await?;
593 info!("Using network :: {network}");
594 utils::misc::display_chain_logo(config.chain());
595 if opts.exit_after_init {
596 return Ok(());
597 }
598 if !opts.stateless {
599 ctx.state_manager.maybe_rewind_heaviest_tipset()?;
600 }
601 let p2p_service = create_p2p_service(&mut services, &mut config, &ctx).await?;
602 let mpool = create_mpool(&mut services, &p2p_service, &ctx)?;
603 let chain_follower = create_chain_follower(opts, &p2p_service, mpool.clone(), &ctx)?;
604
605 maybe_start_rpc_service(
606 &mut services,
607 &config,
608 mpool.clone(),
609 &chain_follower,
610 start_time,
611 shutdown_send.clone(),
612 &ctx,
613 )?;
614
615 maybe_import_snapshot(opts, &mut config, &ctx).await?;
616 if opts.halt_after_import {
617 services.shutdown().await;
619 return Ok(());
620 }
621 on_app_context_and_db_initialized(&ctx, chain_follower.sync_status.clone());
622 warmup_in_background(&ctx);
623 ctx.state_manager.populate_cache();
624 maybe_start_metrics_service(&mut services, &config, &ctx).await?;
625 maybe_start_f3_service(opts, &config, &ctx)?;
626 maybe_start_health_check_service(&mut services, &config, &p2p_service, &chain_follower, &ctx)
627 .await?;
628 maybe_start_indexer_service(&mut services, opts, &config, &ctx);
629 if !opts.stateless {
630 ensure_proof_params_downloaded().await?;
631 }
632 services.spawn(p2p_service.run());
633 start_chain_follower_service(&mut services, chain_follower);
634 propagate_error(&mut services)
636 .await
637 .context("services failure")
638 .map(|_| {})
639}
640
641fn warmup_in_background(ctx: &AppContext) {
642 let cs = ctx.chain_store().clone();
644 tokio::task::spawn_blocking(move || {
645 let start = Instant::now();
646 match cs.chain_index().tipset_by_height(
647 1,
649 cs.heaviest_tipset(),
650 ResolveNullTipset::TakeOlder,
651 ) {
652 Ok(_) => {
653 tracing::info!(
654 "Successfully populated tipset_by_height cache, took {}",
655 humantime::format_duration(start.elapsed())
656 );
657 }
658 Err(e) => {
659 tracing::warn!("Failed to populate tipset_by_height cache: {e}");
660 }
661 }
662 });
663}
664
665async fn maybe_set_snapshot_path(
670 config: &mut Config,
671 chain_config: &ChainConfig,
672 epoch: ChainEpoch,
673 auto_download_snapshot: bool,
674 download_directory: &Path,
675) -> anyhow::Result<()> {
676 if !download_directory.is_dir() {
677 anyhow::bail!(
678 "`download_directory` does not exist: {}",
679 download_directory.display()
680 );
681 }
682
683 let vendor = snapshot::TrustedVendor::default();
684 let chain = config.chain();
685
686 let network_version = chain_config.network_version(epoch);
688 let network_version_is_small = network_version < NetworkVersion::V16;
689
690 let require_a_snapshot = network_version_is_small;
693 let have_a_snapshot = config.client.snapshot_path.is_some();
694
695 match (require_a_snapshot, have_a_snapshot, auto_download_snapshot) {
696 (false, _, _) => {} (true, true, _) => {} (true, false, true) => {
699 let url = crate::cli_shared::snapshot::stable_url(vendor, chain)?;
700 config.client.snapshot_path = Some(url.to_string().into());
701 }
702 (true, false, false) => {
703 let (url, num_bytes, _path) = crate::cli_shared::snapshot::peek(vendor, chain)
705 .await
706 .context("couldn't get snapshot size")?;
707 println!(
710 "Forest requires a snapshot to sync with the network, but automatic fetching is disabled."
711 );
712 let message = format!(
713 "Fetch a {} snapshot to the current directory? (denying will exit the program). ",
714 indicatif::HumanBytes(num_bytes)
715 );
716 let have_permission = asyncify(|| {
717 dialoguer::Confirm::with_theme(&ColorfulTheme::default())
718 .with_prompt(message)
719 .default(false)
720 .interact()
721 .unwrap_or(false)
723 })
724 .await;
725 if !have_permission {
726 bail!(
727 "Forest requires a snapshot to sync with the network, but automatic fetching is disabled."
728 )
729 }
730 config.client.snapshot_path = Some(url.to_string().into());
731 }
732 };
733
734 Ok(())
735}
736
737async fn propagate_error(
740 services: &mut JoinSet<Result<(), anyhow::Error>>,
741) -> anyhow::Result<std::convert::Infallible> {
742 while !services.is_empty() {
743 select! {
744 option = services.join_next().fuse() => {
745 if let Some(Ok(Err(error_message))) = option {
746 return Err(error_message)
747 }
748 },
749 }
750 }
751 std::future::pending().await
752}
753
754fn asyncify<T>(f: impl FnOnce() -> T + Send + 'static) -> impl Future<Output = T>
759where
760 T: Send + 'static,
761{
762 tokio::task::spawn_blocking(f).then(|res| async { res.expect("spawned task panicked") })
763}