hala_future/
executor.rs

1use std::sync::OnceLock;
2
3use futures::{
4    executor::ThreadPool, future::BoxFuture, task::SpawnExt, Future, FutureExt, SinkExt, StreamExt,
5};
6
7/// Future executor must implement this trait to support register to hala register system.
8pub trait FutureSpawner {
9    /// The implementation must panic if this function spawn future failed.
10    fn spawn_boxed_future(&self, future: BoxFuture<'static, ()>);
11}
12
13static REGISTER: OnceLock<Box<dyn FutureSpawner + Send + Sync + 'static>> = OnceLock::new();
14
15/// Register global spawner implementation.
16pub fn register_spawner<S: FutureSpawner + Send + Sync + 'static>(spawner: S) {
17    if REGISTER.set(Box::new(spawner)).is_err() {
18        panic!("Call register_spawner twice.");
19    }
20}
21
22/// Using global register [`FutureSpawner`] to start a new future task.
23pub fn future_spawn<Fut>(fut: Fut)
24where
25    Fut: Future<Output = ()> + Send + 'static,
26{
27    let spawner = REGISTER.get_or_init(|| {
28        #[cfg(not(feature = "futures-executor"))]
29        panic!("Call register_spawner first");
30
31        #[cfg(feature = "futures-executor")]
32        Box::new(
33            ThreadPool::builder()
34                .pool_size(num_cpus::get())
35                .create()
36                .unwrap(),
37        )
38    });
39
40    spawner.spawn_boxed_future(fut.boxed())
41}
42
43impl FutureSpawner for futures::executor::ThreadPool {
44    fn spawn_boxed_future(&self, future: BoxFuture<'static, ()>) {
45        self.spawn(future)
46            .expect("futures::executor::ThreadPool spawn failed");
47    }
48}
49
50pub fn block_on<Fut, R>(fut: Fut) -> R
51where
52    Fut: Future<Output = R> + Send + 'static,
53    R: Send + 'static,
54{
55    let (mut sender, mut receiver) = futures::channel::mpsc::channel::<R>(0);
56
57    future_spawn(async move {
58        let r = fut.await;
59        _ = sender.send(r).await;
60    });
61
62    futures::executor::block_on(async move { receiver.next().await.unwrap() })
63}