1#![forbid(unsafe_code)]
4#![warn(missing_docs, missing_debug_implementations, rust_2018_idioms)]
5#![no_std]
6extern crate alloc;
7
8use alloc::boxed::Box;
9use async_channel::Receiver;
10use core::{
11 fmt,
12 future::Future,
13 pin::Pin,
14 task::{Context, Poll},
15};
16use executor_trait::Executor;
17use once_cell::sync::OnceCell;
18
19static EXECUTOR: OnceCell<Box<dyn Executor + Send + Sync>> = OnceCell::new();
20
21pub fn init(executor: impl Executor + Send + Sync + 'static) {
22 EXECUTOR.set(Box::new(executor)).map_err(|_| ()).unwrap();
23}
24
25pub fn block_on<T: 'static>(future: impl Future<Output = T> + 'static) -> T {
26 let (send, recv) = async_channel::bounded(1);
27 EXECUTOR.get().unwrap().block_on(Box::pin(async move {
28 drop(send.send(future.await).await);
29 }));
30 recv.try_recv().unwrap()
31}
32
33pub fn spawn<T: Send + 'static>(future: impl Future<Output = T> + Send + 'static) -> Task<T> {
34 let (send, recv) = async_channel::bounded(1);
35 let inner = EXECUTOR.get().unwrap().spawn(Box::pin(async move {
36 drop(send.send(future.await).await);
37 }));
38 Task {
39 inner,
40 recv: recv.into(),
41 }
42}
43
44pub fn spawn_local<T: 'static>(future: impl Future<Output = T> + 'static) -> Task<T> {
45 let (send, recv) = async_channel::bounded(1);
46 let inner = EXECUTOR.get().unwrap().spawn_local(Box::pin(async move {
47 drop(send.send(future.await).await);
48 }));
49 Task {
50 inner,
51 recv: recv.into(),
52 }
53}
54
55pub async fn spawn_blocking<T: Send + 'static>(f: impl FnOnce() -> T + Send + 'static) -> T {
56 let (send, recv) = async_channel::bounded(1);
57 EXECUTOR
58 .get()
59 .unwrap()
60 .spawn_blocking(Box::new(|| {
61 let res = f();
62 crate::spawn(async move {
63 drop(send.send(res).await);
64 })
65 .detach();
66 }))
67 .await;
68 recv.recv().await.unwrap()
69}
70
71pub struct Task<T> {
72 inner: Box<dyn executor_trait::Task>,
73 recv: ReceiverWrapper<T>,
74}
75
76impl<T: 'static> Task<T> {
77 pub fn detach(self) {
78 self.inner.detach();
79 }
80
81 pub async fn cancel(self) -> Option<T> {
82 self.inner.cancel().await?;
83 Some(self.recv.await)
84 }
85}
86
87impl<T> fmt::Debug for Task<T> {
88 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
89 f.debug_struct("Task").finish()
90 }
91}
92
93impl<T: 'static> Future for Task<T> {
94 type Output = T;
95
96 fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<T> {
97 Pin::new(&mut self.recv).poll(cx)
98 }
99}
100
101struct ReceiverWrapper<T> {
102 recv: Receiver<T>,
103 recv_fut: Option<Pin<Box<dyn Future<Output = T>>>>,
104}
105
106impl<T: 'static> Future for ReceiverWrapper<T> {
107 type Output = T;
108
109 fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<T> {
110 if self.recv_fut.is_none() {
111 let recv = self.recv.clone();
112 self.recv_fut = Some(Box::pin(async move { recv.recv().await.unwrap() }));
113 }
114 match self.recv_fut.as_mut().unwrap().as_mut().poll(cx) {
115 Poll::Pending => Poll::Pending,
116 Poll::Ready(t) => {
117 self.recv_fut = None;
118 Poll::Ready(t)
119 }
120 }
121 }
122}
123
124impl<T> From<Receiver<T>> for ReceiverWrapper<T> {
125 fn from(recv: Receiver<T>) -> Self {
126 Self {
127 recv,
128 recv_fut: None,
129 }
130 }
131}