#![deny(missing_docs)]
#![deny(warnings)]
use core::future::Future;
use core::pin::Pin;
use core::task::{Context, Poll};
use futures_util::future::FutureExt;
use futures_util::stream::{Stream, StreamExt};
use once_cell::sync::OnceCell;
#[cfg(feature = "async-std")]
pub mod async_std;
#[cfg(feature = "tokio")]
pub mod tokio;
type BoxedFuture = Pin<Box<dyn Future<Output = ()> + Send + 'static>>;
pub trait Executor: Send + Sync {
fn block_on(&self, future: BoxedFuture);
fn spawn(&self, future: BoxedFuture) -> BoxedFuture;
fn spawn_blocking(&self, task: Box<dyn FnOnce() + Send>) -> BoxedFuture;
fn spawn_local(&self, future: Pin<Box<dyn Future<Output = ()> + 'static>>) -> BoxedFuture;
}
static EXECUTOR: OnceCell<Box<dyn Executor>> = OnceCell::new();
#[derive(Debug)]
pub struct ExecutorRegistered;
impl core::fmt::Display for ExecutorRegistered {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "async_spawner: executor already registered")
}
}
impl std::error::Error for ExecutorRegistered {}
pub fn try_register_executor(executor: Box<dyn Executor>) -> Result<(), ExecutorRegistered> {
EXECUTOR.set(executor).map_err(|_| ExecutorRegistered)
}
pub fn register_executor(executor: Box<dyn Executor>) {
try_register_executor(executor).unwrap();
}
pub fn executor() -> &'static dyn Executor {
&**EXECUTOR
.get()
.expect("async_spawner: no executor registered")
}
pub fn block_on<F, T>(future: F) -> T
where
F: Future<Output = T> + Send + 'static,
T: Send + 'static,
{
let (tx, mut rx) = async_channel::bounded(1);
executor().block_on(Box::pin(async move {
let res = future.await;
tx.try_send(res).ok();
}));
rx.next().now_or_never().unwrap().unwrap()
}
pub struct JoinHandle<T> {
handle: BoxedFuture,
rx: async_channel::Receiver<T>,
}
impl<T> Future for JoinHandle<T> {
type Output = T;
fn poll(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> {
if let Poll::Ready(()) = Pin::new(&mut self.handle).poll(cx) {
if let Poll::Ready(Some(res)) = Pin::new(&mut self.rx).poll_next(cx) {
Poll::Ready(res)
} else {
panic!("task paniced");
}
} else {
Poll::Pending
}
}
}
pub fn spawn<F, T>(future: F) -> JoinHandle<T>
where
F: Future<Output = T> + Send + 'static,
T: Send + 'static,
{
let (tx, rx) = async_channel::bounded(1);
let handle = executor().spawn(Box::pin(async move {
let res = future.await;
tx.try_send(res).ok();
}));
JoinHandle { handle, rx }
}
pub fn spawn_blocking<F, T>(task: F) -> JoinHandle<T>
where
F: FnOnce() -> T + Send + 'static,
T: Send + 'static,
{
let (tx, rx) = async_channel::bounded(1);
let handle = executor().spawn_blocking(Box::new(move || {
let res = task();
tx.try_send(res).ok();
}));
JoinHandle { handle, rx }
}
pub fn spawn_local<F, T>(future: F) -> JoinHandle<T>
where
F: Future<Output = T> + 'static,
T: Send + 'static,
{
let (tx, rx) = async_channel::bounded(1);
let handle = executor().spawn_local(Box::pin(async move {
let res = future.await;
tx.try_send(res).ok();
}));
JoinHandle { handle, rx }
}