1use std::sync::OnceLock;
2
3use futures::{
4 executor::ThreadPool, future::BoxFuture, task::SpawnExt, Future, FutureExt, SinkExt, StreamExt,
5};
6
7pub trait FutureSpawner {
9 fn spawn_boxed_future(&self, future: BoxFuture<'static, ()>);
11}
12
13static REGISTER: OnceLock<Box<dyn FutureSpawner + Send + Sync + 'static>> = OnceLock::new();
14
15pub fn register_spawner<S: FutureSpawner + Send + Sync + 'static>(spawner: S) {
17 if REGISTER.set(Box::new(spawner)).is_err() {
18 panic!("Call register_spawner twice.");
19 }
20}
21
22pub fn future_spawn<Fut>(fut: Fut)
24where
25 Fut: Future<Output = ()> + Send + 'static,
26{
27 let spawner = REGISTER.get_or_init(|| {
28 #[cfg(not(feature = "futures-executor"))]
29 panic!("Call register_spawner first");
30
31 #[cfg(feature = "futures-executor")]
32 Box::new(
33 ThreadPool::builder()
34 .pool_size(num_cpus::get())
35 .create()
36 .unwrap(),
37 )
38 });
39
40 spawner.spawn_boxed_future(fut.boxed())
41}
42
43impl FutureSpawner for futures::executor::ThreadPool {
44 fn spawn_boxed_future(&self, future: BoxFuture<'static, ()>) {
45 self.spawn(future)
46 .expect("futures::executor::ThreadPool spawn failed");
47 }
48}
49
50pub fn block_on<Fut, R>(fut: Fut) -> R
51where
52 Fut: Future<Output = R> + Send + 'static,
53 R: Send + 'static,
54{
55 let (mut sender, mut receiver) = futures::channel::mpsc::channel::<R>(0);
56
57 future_spawn(async move {
58 let r = fut.await;
59 _ = sender.send(r).await;
60 });
61
62 futures::executor::block_on(async move { receiver.next().await.unwrap() })
63}