async_spawner/
lib.rs

1//! Executor agnostic task spawning
2//!
3//! ```rust
4//! #[async_std::main]
5//! async fn main() {
6//!     async_spawner::async_std::register_executor();
7//!     let res = async_spawner::spawn(async {
8//!         println!("executor agnostic spawning");
9//!         1
10//!     })
11//!     .await;
12//!     assert_eq!(res, 1);
13//! }
14//! ```
15//!
16//! ```rust
17//! #[tokio::main]
18//! async fn main() {
19//!     async_spawner::tokio::register_executor();
20//!     let res = async_spawner::spawn(async {
21//!         println!("executor agnostic spawning");
22//!         1
23//!     })
24//!     .await;
25//!     assert_eq!(res, 1);
26//! }
27//! ```
28#![deny(missing_docs)]
29#![deny(warnings)]
30use core::future::Future;
31use core::pin::Pin;
32use core::task::{Context, Poll};
33use futures_util::future::FutureExt;
34use futures_util::stream::{Stream, StreamExt};
35use once_cell::sync::OnceCell;
36
37#[cfg(feature = "async-std")]
38pub mod async_std;
39#[cfg(feature = "tokio")]
40pub mod tokio;
41
42type BoxedFuture = Pin<Box<dyn Future<Output = ()> + Send + 'static>>;
43
44/// Trait abstracting over an executor.
45pub trait Executor: Send + Sync {
46    /// Blocks until the future has finished.
47    fn block_on(&self, future: BoxedFuture);
48
49    /// Spawns an asynchronous task using the underlying executor.
50    fn spawn(&self, future: BoxedFuture) -> BoxedFuture;
51
52    /// Runs the provided closure on a thread, which can execute blocking tasks asynchronously.
53    fn spawn_blocking(&self, task: Box<dyn FnOnce() + Send>) -> BoxedFuture;
54
55    /// Spawns a future that doesn't implement [Send].
56    ///
57    /// The spawned future will be executed on the same thread that called `spawn_local`.
58    ///
59    /// [Send]: https://doc.rust-lang.org/std/marker/trait.Send.html
60    fn spawn_local(&self, future: Pin<Box<dyn Future<Output = ()> + 'static>>) -> BoxedFuture;
61}
62
63static EXECUTOR: OnceCell<Box<dyn Executor>> = OnceCell::new();
64
65/// Error returned by `try_register_executor` indicating that an executor was registered.
66#[derive(Debug)]
67pub struct ExecutorRegistered;
68
69impl core::fmt::Display for ExecutorRegistered {
70    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
71        write!(f, "async_spawner: executor already registered")
72    }
73}
74
75impl std::error::Error for ExecutorRegistered {}
76
77/// Tries registering an executor.
78pub fn try_register_executor(executor: Box<dyn Executor>) -> Result<(), ExecutorRegistered> {
79    EXECUTOR.set(executor).map_err(|_| ExecutorRegistered)
80}
81
82/// Register an executor. Panics if an executor was already registered.
83pub fn register_executor(executor: Box<dyn Executor>) {
84    try_register_executor(executor).unwrap();
85}
86
87/// Returns the registered executor.
88pub fn executor() -> &'static dyn Executor {
89    &**EXECUTOR
90        .get()
91        .expect("async_spawner: no executor registered")
92}
93
94/// Blocks until the future has finished.
95pub fn block_on<F, T>(future: F) -> T
96where
97    F: Future<Output = T> + Send + 'static,
98    T: Send + 'static,
99{
100    let (tx, mut rx) = async_channel::bounded(1);
101    executor().block_on(Box::pin(async move {
102        let res = future.await;
103        tx.try_send(res).ok();
104    }));
105    rx.next().now_or_never().unwrap().unwrap()
106}
107
108/// Executor agnostic join handle.
109pub struct JoinHandle<T> {
110    handle: BoxedFuture,
111    rx: async_channel::Receiver<T>,
112}
113
114impl<T> Future for JoinHandle<T> {
115    type Output = T;
116
117    fn poll(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> {
118        if let Poll::Ready(()) = Pin::new(&mut self.handle).poll(cx) {
119            if let Poll::Ready(Some(res)) = Pin::new(&mut self.rx).poll_next(cx) {
120                Poll::Ready(res)
121            } else {
122                panic!("task paniced");
123            }
124        } else {
125            Poll::Pending
126        }
127    }
128}
129
130/// Spawns an asynchronous task using the underlying executor.
131pub fn spawn<F, T>(future: F) -> JoinHandle<T>
132where
133    F: Future<Output = T> + Send + 'static,
134    T: Send + 'static,
135{
136    let (tx, rx) = async_channel::bounded(1);
137    let handle = executor().spawn(Box::pin(async move {
138        let res = future.await;
139        tx.try_send(res).ok();
140    }));
141    JoinHandle { handle, rx }
142}
143
144/// Runs the provided closure on a thread, which can execute blocking tasks asynchronously.
145pub fn spawn_blocking<F, T>(task: F) -> JoinHandle<T>
146where
147    F: FnOnce() -> T + Send + 'static,
148    T: Send + 'static,
149{
150    let (tx, rx) = async_channel::bounded(1);
151    let handle = executor().spawn_blocking(Box::new(move || {
152        let res = task();
153        tx.try_send(res).ok();
154    }));
155    JoinHandle { handle, rx }
156}
157
158/// Spawns a future that doesn't implement [Send].
159///
160/// The spawned future will be executed on the same thread that called `spawn_local`.
161///
162/// [Send]: https://doc.rust-lang.org/std/marker/trait.Send.html
163pub fn spawn_local<F, T>(future: F) -> JoinHandle<T>
164where
165    F: Future<Output = T> + 'static,
166    T: Send + 'static,
167{
168    let (tx, rx) = async_channel::bounded(1);
169    let handle = executor().spawn_local(Box::pin(async move {
170        let res = future.await;
171        tx.try_send(res).ok();
172    }));
173    JoinHandle { handle, rx }
174}