1#![deny(missing_docs)]
29#![deny(warnings)]
30use core::future::Future;
31use core::pin::Pin;
32use core::task::{Context, Poll};
33use futures_util::future::FutureExt;
34use futures_util::stream::{Stream, StreamExt};
35use once_cell::sync::OnceCell;
36
37#[cfg(feature = "async-std")]
38pub mod async_std;
39#[cfg(feature = "tokio")]
40pub mod tokio;
41
42type BoxedFuture = Pin<Box<dyn Future<Output = ()> + Send + 'static>>;
43
44pub trait Executor: Send + Sync {
46 fn block_on(&self, future: BoxedFuture);
48
49 fn spawn(&self, future: BoxedFuture) -> BoxedFuture;
51
52 fn spawn_blocking(&self, task: Box<dyn FnOnce() + Send>) -> BoxedFuture;
54
55 fn spawn_local(&self, future: Pin<Box<dyn Future<Output = ()> + 'static>>) -> BoxedFuture;
61}
62
63static EXECUTOR: OnceCell<Box<dyn Executor>> = OnceCell::new();
64
65#[derive(Debug)]
67pub struct ExecutorRegistered;
68
69impl core::fmt::Display for ExecutorRegistered {
70 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
71 write!(f, "async_spawner: executor already registered")
72 }
73}
74
75impl std::error::Error for ExecutorRegistered {}
76
77pub fn try_register_executor(executor: Box<dyn Executor>) -> Result<(), ExecutorRegistered> {
79 EXECUTOR.set(executor).map_err(|_| ExecutorRegistered)
80}
81
82pub fn register_executor(executor: Box<dyn Executor>) {
84 try_register_executor(executor).unwrap();
85}
86
87pub fn executor() -> &'static dyn Executor {
89 &**EXECUTOR
90 .get()
91 .expect("async_spawner: no executor registered")
92}
93
94pub fn block_on<F, T>(future: F) -> T
96where
97 F: Future<Output = T> + Send + 'static,
98 T: Send + 'static,
99{
100 let (tx, mut rx) = async_channel::bounded(1);
101 executor().block_on(Box::pin(async move {
102 let res = future.await;
103 tx.try_send(res).ok();
104 }));
105 rx.next().now_or_never().unwrap().unwrap()
106}
107
108pub struct JoinHandle<T> {
110 handle: BoxedFuture,
111 rx: async_channel::Receiver<T>,
112}
113
114impl<T> Future for JoinHandle<T> {
115 type Output = T;
116
117 fn poll(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> {
118 if let Poll::Ready(()) = Pin::new(&mut self.handle).poll(cx) {
119 if let Poll::Ready(Some(res)) = Pin::new(&mut self.rx).poll_next(cx) {
120 Poll::Ready(res)
121 } else {
122 panic!("task paniced");
123 }
124 } else {
125 Poll::Pending
126 }
127 }
128}
129
130pub fn spawn<F, T>(future: F) -> JoinHandle<T>
132where
133 F: Future<Output = T> + Send + 'static,
134 T: Send + 'static,
135{
136 let (tx, rx) = async_channel::bounded(1);
137 let handle = executor().spawn(Box::pin(async move {
138 let res = future.await;
139 tx.try_send(res).ok();
140 }));
141 JoinHandle { handle, rx }
142}
143
144pub fn spawn_blocking<F, T>(task: F) -> JoinHandle<T>
146where
147 F: FnOnce() -> T + Send + 'static,
148 T: Send + 'static,
149{
150 let (tx, rx) = async_channel::bounded(1);
151 let handle = executor().spawn_blocking(Box::new(move || {
152 let res = task();
153 tx.try_send(res).ok();
154 }));
155 JoinHandle { handle, rx }
156}
157
158pub fn spawn_local<F, T>(future: F) -> JoinHandle<T>
164where
165 F: Future<Output = T> + 'static,
166 T: Send + 'static,
167{
168 let (tx, rx) = async_channel::bounded(1);
169 let handle = executor().spawn_local(Box::pin(async move {
170 let res = future.await;
171 tx.try_send(res).ok();
172 }));
173 JoinHandle { handle, rx }
174}