global_executor/
lib.rs

1//! Configure a global executor you can reuse everywhere
2
3#![forbid(unsafe_code)]
4#![warn(missing_docs, missing_debug_implementations, rust_2018_idioms)]
5#![no_std]
6extern crate alloc;
7
8use alloc::boxed::Box;
9use async_channel::Receiver;
10use core::{
11    fmt,
12    future::Future,
13    pin::Pin,
14    task::{Context, Poll},
15};
16use executor_trait::Executor;
17use once_cell::sync::OnceCell;
18
19static EXECUTOR: OnceCell<Box<dyn Executor + Send + Sync>> = OnceCell::new();
20
21pub fn init(executor: impl Executor + Send + Sync + 'static) {
22    EXECUTOR.set(Box::new(executor)).map_err(|_| ()).unwrap();
23}
24
25pub fn block_on<T: 'static>(future: impl Future<Output = T> + 'static) -> T {
26    let (send, recv) = async_channel::bounded(1);
27    EXECUTOR.get().unwrap().block_on(Box::pin(async move {
28        drop(send.send(future.await).await);
29    }));
30    recv.try_recv().unwrap()
31}
32
33pub fn spawn<T: Send + 'static>(future: impl Future<Output = T> + Send + 'static) -> Task<T> {
34    let (send, recv) = async_channel::bounded(1);
35    let inner = EXECUTOR.get().unwrap().spawn(Box::pin(async move {
36        drop(send.send(future.await).await);
37    }));
38    Task {
39        inner,
40        recv: recv.into(),
41    }
42}
43
44pub fn spawn_local<T: 'static>(future: impl Future<Output = T> + 'static) -> Task<T> {
45    let (send, recv) = async_channel::bounded(1);
46    let inner = EXECUTOR.get().unwrap().spawn_local(Box::pin(async move {
47        drop(send.send(future.await).await);
48    }));
49    Task {
50        inner,
51        recv: recv.into(),
52    }
53}
54
55pub async fn spawn_blocking<T: Send + 'static>(f: impl FnOnce() -> T + Send + 'static) -> T {
56    let (send, recv) = async_channel::bounded(1);
57    EXECUTOR
58        .get()
59        .unwrap()
60        .spawn_blocking(Box::new(|| {
61            let res = f();
62            crate::spawn(async move {
63                drop(send.send(res).await);
64            })
65            .detach();
66        }))
67        .await;
68    recv.recv().await.unwrap()
69}
70
71pub struct Task<T> {
72    inner: Box<dyn executor_trait::Task>,
73    recv: ReceiverWrapper<T>,
74}
75
76impl<T: 'static> Task<T> {
77    pub fn detach(self) {
78        self.inner.detach();
79    }
80
81    pub async fn cancel(self) -> Option<T> {
82        self.inner.cancel().await?;
83        Some(self.recv.await)
84    }
85}
86
87impl<T> fmt::Debug for Task<T> {
88    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
89        f.debug_struct("Task").finish()
90    }
91}
92
93impl<T: 'static> Future for Task<T> {
94    type Output = T;
95
96    fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<T> {
97        Pin::new(&mut self.recv).poll(cx)
98    }
99}
100
101struct ReceiverWrapper<T> {
102    recv: Receiver<T>,
103    recv_fut: Option<Pin<Box<dyn Future<Output = T>>>>,
104}
105
106impl<T: 'static> Future for ReceiverWrapper<T> {
107    type Output = T;
108
109    fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<T> {
110        if self.recv_fut.is_none() {
111            let recv = self.recv.clone();
112            self.recv_fut = Some(Box::pin(async move { recv.recv().await.unwrap() }));
113        }
114        match self.recv_fut.as_mut().unwrap().as_mut().poll(cx) {
115            Poll::Pending => Poll::Pending,
116            Poll::Ready(t) => {
117                self.recv_fut = None;
118                Poll::Ready(t)
119            }
120        }
121    }
122}
123
124impl<T> From<Receiver<T>> for ReceiverWrapper<T> {
125    fn from(recv: Receiver<T>) -> Self {
126        Self {
127            recv,
128            recv_fut: None,
129        }
130    }
131}