#![no_std]
#![cfg_attr(docsrs, feature(doc_cfg))]
#![warn(missing_docs, missing_debug_implementations)]
#[cfg(feature = "async-task")]
#[cfg_attr(docsrs, doc(cfg(feature = "async-task")))]
pub mod async_task;
#[cfg(feature = "async-executor")]
#[cfg_attr(docsrs, doc(cfg(feature = "async-executor")))]
pub mod async_executor;
#[cfg(feature = "tokio")]
#[cfg_attr(docsrs, doc(cfg(feature = "tokio")))]
pub mod tokio;
#[cfg(feature = "smol")]
#[cfg_attr(docsrs, doc(cfg(feature = "tokio")))]
pub mod smol;
use core::{
any::Any,
fmt::Debug,
future::{Future, poll_fn},
marker::PhantomData,
panic::AssertUnwindSafe,
pin::Pin,
task::{Context, Poll},
};
pub mod mailbox;
use alloc::boxed::Box;
use async_channel::Receiver;
extern crate alloc;
pub trait Executor: Send + Sync {
type Task<T: Send + 'static>: Task<T> + Send;
fn spawn<Fut>(&self, fut: Fut) -> Self::Task<Fut::Output>
where
Fut: Future<Output: Send> + Send + 'static;
}
impl<E: Executor> Executor for &E {
type Task<T: Send + 'static> = E::Task<T>;
fn spawn<Fut>(&self, fut: Fut) -> Self::Task<Fut::Output>
where
Fut: Future<Output: Send> + Send + 'static,
{
(*self).spawn(fut)
}
}
impl<E: Executor> Executor for &mut E {
type Task<T: Send + 'static> = E::Task<T>;
fn spawn<Fut>(&self, fut: Fut) -> Self::Task<Fut::Output>
where
Fut: Future<Output: Send> + Send + 'static,
{
(**self).spawn(fut)
}
}
impl<E: Executor> Executor for Box<E> {
type Task<T: Send + 'static> = E::Task<T>;
fn spawn<Fut>(&self, fut: Fut) -> Self::Task<Fut::Output>
where
Fut: Future<Output: Send> + Send + 'static,
{
(**self).spawn(fut)
}
}
impl<E: Executor> Executor for alloc::sync::Arc<E> {
type Task<T: Send + 'static> = E::Task<T>;
fn spawn<Fut>(&self, fut: Fut) -> Self::Task<Fut::Output>
where
Fut: Future<Output: Send> + Send + 'static,
{
(**self).spawn(fut)
}
}
pub trait LocalExecutor {
type Task<T: 'static>: Task<T>;
fn spawn_local<Fut>(&self, fut: Fut) -> Self::Task<Fut::Output>
where
Fut: Future + 'static;
}
impl<E: LocalExecutor> LocalExecutor for &E {
type Task<T: 'static> = E::Task<T>;
fn spawn_local<Fut>(&self, fut: Fut) -> Self::Task<Fut::Output>
where
Fut: Future + 'static,
{
(*self).spawn_local(fut)
}
}
impl<E: LocalExecutor> LocalExecutor for &mut E {
type Task<T: 'static> = E::Task<T>;
fn spawn_local<Fut>(&self, fut: Fut) -> Self::Task<Fut::Output>
where
Fut: Future + 'static,
{
(**self).spawn_local(fut)
}
}
impl<E: LocalExecutor> LocalExecutor for Box<E> {
type Task<T: 'static> = E::Task<T>;
fn spawn_local<Fut>(&self, fut: Fut) -> Self::Task<Fut::Output>
where
Fut: Future + 'static,
{
(**self).spawn_local(fut)
}
}
impl<E: LocalExecutor> LocalExecutor for alloc::rc::Rc<E> {
type Task<T: 'static> = E::Task<T>;
fn spawn_local<Fut>(&self, fut: Fut) -> Self::Task<Fut::Output>
where
Fut: Future + 'static,
{
(**self).spawn_local(fut)
}
}
impl<E: LocalExecutor> LocalExecutor for alloc::sync::Arc<E> {
type Task<T: 'static> = E::Task<T>;
fn spawn_local<Fut>(&self, fut: Fut) -> Self::Task<Fut::Output>
where
Fut: Future + 'static,
{
(**self).spawn_local(fut)
}
}
trait AnyLocalExecutorImpl: 'static + Any {
fn spawn_local_boxed(
&self,
fut: Pin<Box<dyn Future<Output = ()>>>,
) -> Pin<Box<dyn Task<()> + 'static>>;
fn as_any(&self) -> &dyn Any;
}
impl<E> AnyLocalExecutorImpl for E
where
E: LocalExecutor + 'static,
{
fn spawn_local_boxed(
&self,
fut: Pin<Box<dyn Future<Output = ()>>>,
) -> Pin<Box<dyn Task<()> + 'static>> {
let task = self.spawn_local(fut);
Box::pin(task)
}
fn as_any(&self) -> &dyn Any {
self
}
}
pub struct AnyLocalExecutor(Box<dyn AnyLocalExecutorImpl>);
impl Debug for AnyLocalExecutor {
fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
f.debug_struct("AnyLocalExecutor").finish()
}
}
impl Debug for AnyExecutor {
fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
f.debug_struct("AnyExecutor").finish()
}
}
impl<T> dyn Task<T> {
pub async fn result(self: Box<Self>) -> Result<T, Error> {
let mut pinned: Pin<Box<Self>> = self.into();
poll_fn(move |cx| pinned.as_mut().poll_result(cx)).await
}
}
impl AnyExecutor {
pub fn new(executor: impl Executor + 'static) -> Self {
Self(Box::new(executor))
}
pub fn downcast_ref<E: Executor + 'static>(&self) -> Option<&E> {
self.0.as_any().downcast_ref()
}
pub fn downcast<E: Executor + 'static>(self) -> Result<Box<E>, Self> {
if let Some(executor) = self.0.as_any().downcast_ref::<E>() {
let ptr: *const E = executor;
let boxed: Box<E> = unsafe { Box::from_raw(ptr as *mut E) };
core::mem::forget(self);
Ok(boxed)
} else {
Err(self)
}
}
}
impl AnyLocalExecutor {
pub fn new(executor: impl LocalExecutor + 'static) -> Self {
Self(Box::new(executor))
}
pub fn downcast_ref<E: LocalExecutor + 'static>(&self) -> Option<&E> {
self.0.as_any().downcast_ref()
}
pub fn downcast<E: LocalExecutor + 'static>(self) -> Result<Box<E>, Self> {
if let Some(executor) = self.0.as_any().downcast_ref::<E>() {
let ptr: *const E = executor;
let boxed: Box<E> = unsafe { Box::from_raw(ptr as *mut E) };
core::mem::forget(self);
Ok(boxed)
} else {
Err(self)
}
}
}
pub struct AnyLocalExecutorTask<T> {
inner: Pin<Box<dyn Task<()> + 'static>>,
receiver: Receiver<Result<T, Error>>,
}
impl<T> AnyLocalExecutorTask<T> {
fn new(inner: Pin<Box<dyn Task<()> + 'static>>, receiver: Receiver<Result<T, Error>>) -> Self {
Self { inner, receiver }
}
pub async fn result(self) -> Result<T, Error> {
<Self as Task<T>>::result(self).await
}
pub fn detach(self) {
<Self as Task<T>>::detach(self)
}
}
impl<T> core::fmt::Debug for AnyLocalExecutorTask<T> {
fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
f.debug_struct("AnyLocalExecutorTask")
.finish_non_exhaustive()
}
}
impl<T> Future for AnyLocalExecutorTask<T> {
type Output = T;
fn poll(
self: Pin<&mut Self>,
cx: &mut core::task::Context<'_>,
) -> core::task::Poll<Self::Output> {
self.poll_result(cx).map(|res| res.unwrap())
}
}
impl<T> Task<T> for AnyLocalExecutorTask<T> {
fn poll_result(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<T, Error>> {
let this = unsafe { self.get_unchecked_mut() };
let _ = this.inner.as_mut().poll(cx);
let mut recv = this.receiver.recv();
unsafe {
Pin::new_unchecked(&mut recv)
.poll(cx)
.map(|res| res.unwrap_or_else(|_| Err(Box::new("Channel closed"))))
}
}
}
impl LocalExecutor for AnyLocalExecutor {
type Task<T: 'static> = AnyLocalExecutorTask<T>;
fn spawn_local<Fut>(&self, fut: Fut) -> Self::Task<Fut::Output>
where
Fut: Future + 'static,
{
let (sender, receiver) = async_channel::bounded(1);
let fut = async move {
let res = AssertUnwindSafe(fut).await;
let _ = sender.send(Ok(res)).await;
};
let inner = self.0.spawn_local_boxed(Box::pin(fut));
AnyLocalExecutorTask::new(inner, receiver)
}
}
type Error = Box<dyn core::any::Any + Send>;
pub trait Task<T>: Future<Output = T> {
fn poll_result(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<T, Error>>;
fn result(self) -> impl Future<Output = Result<T, Error>>
where
Self: Sized,
{
ResultFuture {
task: self,
_phantom: PhantomData,
}
}
fn detach(self)
where
Self: Sized,
{
core::mem::forget(self);
}
}
pub struct ResultFuture<T: Task<U>, U> {
task: T,
_phantom: PhantomData<U>,
}
impl<T: Task<U>, U> core::fmt::Debug for ResultFuture<T, U> {
fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
f.debug_struct("ResultFuture").finish_non_exhaustive()
}
}
impl<T: Task<U>, U> Future for ResultFuture<T, U> {
type Output = Result<U, Error>;
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let this = unsafe { self.get_unchecked_mut() };
unsafe { Pin::new_unchecked(&mut this.task) }.poll_result(cx)
}
}
pub struct AnyExecutor(Box<dyn AnyExecutorImpl>);
pub struct AnyExecutorTask<T> {
inner: Pin<Box<dyn Task<()> + Send>>,
receiver: Receiver<Result<T, Error>>,
}
impl<T: Send> AnyExecutorTask<T> {
fn new(inner: Pin<Box<dyn Task<()> + Send>>, receiver: Receiver<Result<T, Error>>) -> Self {
Self { inner, receiver }
}
pub async fn result(self) -> Result<T, Error> {
<Self as Task<T>>::result(self).await
}
pub fn detach(self) {
<Self as Task<T>>::detach(self)
}
}
impl<T> core::fmt::Debug for AnyExecutorTask<T> {
fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
f.debug_struct("AnyExecutorTask").finish_non_exhaustive()
}
}
impl<T: Send> Future for AnyExecutorTask<T> {
type Output = T;
fn poll(
self: Pin<&mut Self>,
cx: &mut core::task::Context<'_>,
) -> core::task::Poll<Self::Output> {
self.poll_result(cx).map(|res| res.unwrap())
}
}
impl<T: Send> Task<T> for AnyExecutorTask<T> {
fn poll_result(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<T, Error>> {
let this = unsafe { self.get_unchecked_mut() };
let _ = this.inner.as_mut().poll(cx);
let mut recv = this.receiver.recv();
unsafe {
Pin::new_unchecked(&mut recv)
.poll(cx)
.map(|res| res.unwrap_or_else(|_| Err(Box::new("Channel closed"))))
}
}
}
impl Executor for AnyExecutor {
type Task<T: Send + 'static> = AnyExecutorTask<T>;
fn spawn<Fut>(&self, fut: Fut) -> Self::Task<Fut::Output>
where
Fut: Future<Output: Send> + Send + 'static,
{
let (sender, receiver) = async_channel::bounded(1);
let fut = async move {
let res = AssertUnwindSafe(fut).await;
let _ = sender.send(Ok(res)).await;
};
let inner = self.0.spawn_boxed(Box::pin(fut));
AnyExecutorTask::new(inner, receiver)
}
}
trait AnyExecutorImpl: Send + Sync + Any {
fn spawn_boxed(
&self,
fut: Pin<Box<dyn Future<Output = ()> + Send>>,
) -> Pin<Box<dyn Task<()> + Send>>;
fn as_any(&self) -> &dyn Any;
}
impl<T: Task<T>> Task<T> for Pin<Box<T>> {
fn poll_result(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<T, Error>> {
let this = unsafe { self.get_unchecked_mut() };
this.as_mut().poll_result(cx)
}
}
impl<E> AnyExecutorImpl for E
where
E: Executor + 'static,
{
fn spawn_boxed(
&self,
fut: Pin<Box<dyn Future<Output = ()> + Send>>,
) -> Pin<Box<dyn Task<()> + Send>> {
let task = self.spawn(fut);
Box::pin(task)
}
fn as_any(&self) -> &dyn Any {
self
}
}
#[cfg(feature = "std")]
mod std_on {
use alloc::boxed::Box;
use crate::{
AnyExecutor, AnyExecutorTask, AnyLocalExecutor, AnyLocalExecutorTask, Executor,
LocalExecutor,
};
extern crate std;
use core::{cell::OnceCell, future::Future, panic::AssertUnwindSafe};
use std::sync::OnceLock;
std::thread_local! {
static LOCAL_EXECUTOR: OnceCell<AnyLocalExecutor> = const { OnceCell::new() };
}
pub fn init_local_executor(executor: impl LocalExecutor + 'static) {
if try_init_local_executor(executor).is_err() {
panic!("Local executor already set for this thread");
}
}
pub fn try_init_local_executor<E>(executor: E) -> Result<(), E>
where
E: LocalExecutor + 'static,
{
LOCAL_EXECUTOR.with(|cell| {
cell.set(AnyLocalExecutor::new(executor))
.map_err(|e| *e.downcast().unwrap())
})
}
static GLOBAL_EXECUTOR: OnceLock<AnyExecutor> = OnceLock::new();
pub fn init_global_executor(executor: impl crate::Executor + 'static) {
if GLOBAL_EXECUTOR.set(AnyExecutor::new(executor)).is_err() {
panic!("Global executor already set");
}
}
pub fn try_init_global_executor<E>(executor: E) -> Result<(), E>
where
E: crate::Executor + 'static,
{
GLOBAL_EXECUTOR
.set(AnyExecutor::new(executor))
.map_err(|e| *e.downcast().unwrap())
}
pub fn spawn<Fut>(fut: Fut) -> AnyExecutorTask<Fut::Output>
where
Fut: Future<Output: Send> + Send + 'static,
{
let executor = GLOBAL_EXECUTOR.get().expect("Global executor not set");
executor.spawn(fut)
}
pub fn spawn_local<Fut>(fut: Fut) -> AnyLocalExecutorTask<Fut::Output>
where
Fut: Future + 'static,
{
LOCAL_EXECUTOR.with(|cell| {
let executor = cell.get().expect("Local executor not set");
executor.spawn_local(fut)
})
}
#[allow(unused)]
pub(crate) fn catch_unwind<F, R>(f: F) -> Result<R, Box<dyn std::any::Any + Send>>
where
F: FnOnce() -> R,
{
std::panic::catch_unwind(AssertUnwindSafe(f))
}
#[derive(Clone, Copy, Debug)]
pub struct DefaultExecutor;
impl Executor for DefaultExecutor {
type Task<T: Send + 'static> = AnyExecutorTask<T>;
fn spawn<Fut>(&self, fut: Fut) -> Self::Task<Fut::Output>
where
Fut: core::future::Future<Output: Send> + Send + 'static,
{
spawn(fut)
}
}
impl LocalExecutor for DefaultExecutor {
type Task<T: 'static> = AnyLocalExecutorTask<T>;
fn spawn_local<Fut>(&self, fut: Fut) -> Self::Task<Fut::Output>
where
Fut: core::future::Future + 'static,
{
spawn_local(fut)
}
}
}
#[cfg(feature = "std")]
pub use std_on::*;