executor_core/
lib.rs

1//! # executor-core
2//!
3//! A flexible task executor abstraction layer for Rust async runtimes.
4//!
5//! This crate provides unified traits and type-erased wrappers for different async executors,
6//! allowing you to write code that's agnostic to the underlying executor implementation.
7//!
8//! ## Overview
9//!
10//! The crate is built around two main traits:
11//! - [`Executor`]: For spawning `Send + 'static` futures
12//! - [`LocalExecutor`]: For spawning `'static` futures (not necessarily `Send`)
13//!
14//! Both traits produce tasks that implement the [`Task`] trait, providing:
15//! - [`Future`] implementation for awaiting results
16//! - [`Task::poll_result`] for explicit error handling
17//! - [`Task::poll_cancel`] for task cancellation
18//!
19//! ## Quick Start
20//!
21//! ```
22//! use executor_core::{Executor, init_global_executor, spawn};
23//! use executor_core::tokio::DefaultExecutor;
24//!
25//! #[tokio::main]
26//! async fn main() {
27//!     // Initialize the global executor
28//!     init_global_executor(DefaultExecutor::new());
29//!
30//!     // Spawn a task
31//!     let task = spawn(async {
32//!         println!("Hello from spawned task!");
33//!         42
34//!     });
35//!
36//!     // The task can be awaited to get the result
37//!     let result = task.await;
38//!     println!("Task result: {}", result);
39//! }
40//! ```
41//!
42//! ## Features
43//!
44//! - **Zero-cost Executor Abstraction**: Unified [`Executor`] and [`LocalExecutor`] traits
45//!   using Generic Associated Types (GAT) to prevent unnecessary heap allocation and dynamic dispatch
46//! - **Type Erasure**: [`AnyExecutor`] and [`AnyLocalExecutor`] for runtime flexibility
47//! - **Multiple Runtime Support**: Tokio, async-executor, Web/WASM
48//! - **Task Management**: Rich task API with cancellation and error handling
49//! - **No-std Compatible**: Core functionality works in no-std environments
50//! - **Panic Safety**: Proper panic handling and propagation
51//!
52//! ## Lifetime Constraints
53//!
54//! The current API requires `'static` lifetimes for both futures and their outputs.
55//! This constraint comes from the underlying async runtimes and ensures memory safety
56//! when tasks may outlive their spawning scope. While this limits flexibility, it
57//! matches the constraints of most async runtime implementations in Rust.
58
59#![no_std]
60#![cfg_attr(docsrs, feature(doc_cfg))]
61#![warn(missing_docs, missing_debug_implementations)]
62
63#[cfg(feature = "async-executor")]
64#[cfg_attr(docsrs, doc(cfg(feature = "async-executor")))]
65pub mod async_executor;
66
67#[cfg(feature = "tokio")]
68#[cfg_attr(docsrs, doc(cfg(feature = "tokio")))]
69pub mod tokio;
70
71#[cfg(feature = "web")]
72#[cfg_attr(docsrs, doc(cfg(feature = "web")))]
73pub mod web;
74
75use core::{
76    any::Any,
77    fmt::Debug,
78    future::Future,
79    marker::PhantomData,
80    panic::AssertUnwindSafe,
81    pin::Pin,
82    task::{Context, Poll},
83};
84
85use alloc::boxed::Box;
86use async_channel::Receiver;
87
88extern crate alloc;
89
90/// A trait for spawning `Send + 'static` futures.
91///
92/// This trait is implemented by runtime-agnostic executors that can spawn futures
93/// across thread boundaries. The spawned futures must be `Send` and `'static`.
94///
95/// The `'static` lifetime requirements come from the underlying async runtimes
96/// (like Tokio) which need to ensure memory safety when tasks are moved across
97/// threads and may outlive their spawning scope.
98///
99/// See [AnyExecutor] for a type-erased executor.
100#[cfg_attr(feature = "web", doc = "- [`web::WebExecutor`] for WASM environments")]
101pub trait Executor: Send + Sync {
102    /// The task type returned by [`spawn`](Self::spawn).
103    ///
104    /// The `T: Send + 'static` constraint ensures the task output can be safely
105    /// sent across thread boundaries and doesn't contain any borrowed data.
106    type Task<T: Send + 'static>: Task<T> + Send;
107
108    /// Spawn a future that will run to completion.
109    ///
110    /// The future must be `Send + 'static` to ensure it can be moved across threads.
111    /// Returns a [`Task`] that can be awaited to get the result.
112    fn spawn<Fut>(&self, fut: Fut) -> Self::Task<Fut::Output>
113    where
114        Fut: Future<Output: Send> + Send + 'static;
115}
116
117/// A trait for spawning `'static` futures that may not be `Send`.
118///
119/// This trait is for executors that can spawn futures that don't need to be `Send`,
120/// typically single-threaded executors or local task spawners.
121///
122/// The `'static` lifetime requirements come from the underlying async runtimes
123/// which need to ensure memory safety when tasks may outlive their spawning scope,
124/// even in single-threaded contexts.
125///
126/// See [AnyLocalExecutor] for a type-erased local executor.
127#[cfg_attr(feature = "web", doc = "- [`web::WebExecutor`] for WASM environments")]
128pub trait LocalExecutor {
129    /// The task type returned by [`spawn`](Self::spawn).
130    ///
131    /// The `T: 'static` constraint ensures the task output doesn't contain
132    /// any borrowed data that could become invalid.
133    type Task<T: 'static>: Task<T>;
134
135    /// Spawn a future that will run to completion on the local executor.
136    ///
137    /// The future must be `'static` but does not need to be `Send`.
138    /// Returns a [`Task`] that can be awaited to get the result.
139    fn spawn<Fut>(&self, fut: Fut) -> Self::Task<Fut::Output>
140    where
141        Fut: Future + 'static;
142}
143
144trait AnyLocalExecutorImpl: 'static + Any {
145    fn spawn_boxed(
146        &self,
147        fut: Pin<Box<dyn Future<Output = ()>>>,
148    ) -> Pin<Box<dyn Task<()> + 'static>>;
149}
150
151impl<E> AnyLocalExecutorImpl for E
152where
153    E: LocalExecutor + 'static,
154{
155    fn spawn_boxed(
156        &self,
157        fut: Pin<Box<dyn Future<Output = ()>>>,
158    ) -> Pin<Box<dyn Task<()> + 'static>> {
159        let task = self.spawn(fut);
160        Box::pin(task)
161    }
162}
163
164/// A type-erased [`LocalExecutor`] that can hold any local executor implementation.
165///
166/// This allows for runtime selection of executors and storing different executor
167/// types in the same collection.
168pub struct AnyLocalExecutor(Box<dyn AnyLocalExecutorImpl>);
169
170impl Debug for AnyLocalExecutor {
171    fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
172        f.debug_struct("AnyLocalExecutor").finish()
173    }
174}
175
176impl Debug for AnyExecutor {
177    fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
178        f.debug_struct("AnyExecutor").finish()
179    }
180}
181
182impl AnyExecutor {
183    /// Create a new [`AnyExecutor`] wrapping the given executor.
184    pub fn new(executor: impl Executor + 'static) -> Self {
185        Self(Box::new(executor))
186    }
187
188    /// Attempt to downcast to a concrete executor type by reference.
189    ///
190    /// Returns `Some(&E)` if the underlying executor is of type `E`, `None` otherwise.
191    pub fn downcast_ref<E: Executor + 'static>(&self) -> Option<&E> {
192        let any: &dyn Any = self.0.as_ref();
193
194        any.downcast_ref()
195    }
196
197    /// Attempt to downcast to a concrete executor type by value.
198    ///
199    /// Returns `Ok(Box<E>)` if the underlying executor is of type `E`,
200    /// `Err(Self)` otherwise (returning the original `AnyExecutor`).
201    pub fn downcast<E: Executor + 'static>(self) -> Result<Box<E>, Self> {
202        if (&self.0 as &dyn Any).is::<E>() {
203            Ok((self.0 as Box<dyn Any>).downcast().ok().unwrap())
204        } else {
205            Err(self)
206        }
207    }
208}
209
210impl AnyLocalExecutor {
211    /// Create a new [`AnyLocalExecutor`] wrapping the given local executor.
212    pub fn new(executor: impl LocalExecutor + 'static) -> Self {
213        Self(Box::new(executor))
214    }
215
216    /// Attempt to downcast to a concrete local executor type by reference.
217    ///
218    /// Returns `Some(&E)` if the underlying executor is of type `E`, `None` otherwise.
219    pub fn downcast_ref<E: LocalExecutor + 'static>(&self) -> Option<&E> {
220        let any: &dyn Any = self.0.as_ref();
221
222        any.downcast_ref()
223    }
224
225    /// Attempt to downcast to a concrete local executor type by value.
226    ///
227    /// Returns `Ok(Box<E>)` if the underlying executor is of type `E`,
228    /// `Err(Self)` otherwise (returning the original `AnyLocalExecutor`).
229    pub fn downcast<E: LocalExecutor + 'static>(self) -> Result<Box<E>, Self> {
230        if (&self.0 as &dyn Any).is::<E>() {
231            Ok((self.0 as Box<dyn Any>).downcast().ok().unwrap())
232        } else {
233            Err(self)
234        }
235    }
236}
237
238/// Task type returned by [`AnyLocalExecutor`].
239///
240/// This task can be awaited like any other task and provides the same
241/// cancellation and error handling capabilities as other task implementations.
242/// It wraps tasks from any [`LocalExecutor`] implementation in a type-erased manner.
243pub struct AnyLocalExecutorTask<T> {
244    inner: Pin<Box<dyn Task<()> + 'static>>,
245    receiver: Receiver<Result<T, Error>>,
246}
247
248impl<T> core::fmt::Debug for AnyLocalExecutorTask<T> {
249    fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
250        f.debug_struct("AnyLocalExecutorTask")
251            .finish_non_exhaustive()
252    }
253}
254
255impl<T> Future for AnyLocalExecutorTask<T> {
256    type Output = T;
257
258    fn poll(
259        self: Pin<&mut Self>,
260        cx: &mut core::task::Context<'_>,
261    ) -> core::task::Poll<Self::Output> {
262        self.poll_result(cx).map(|res| res.unwrap())
263    }
264}
265
266impl<T> Task<T> for AnyLocalExecutorTask<T> {
267    fn poll_result(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<T, Error>> {
268        // First, ensure the underlying task is being polled
269        let this = unsafe { self.get_unchecked_mut() };
270        let _ = this.inner.as_mut().poll(cx);
271
272        // Then poll the receiver
273        let mut recv = this.receiver.recv();
274        unsafe {
275            Pin::new_unchecked(&mut recv)
276                .poll(cx)
277                .map(|res| res.unwrap_or_else(|_| Err(Box::new("Channel closed"))))
278        }
279    }
280    fn poll_cancel(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<()> {
281        let this = unsafe { self.get_unchecked_mut() };
282        this.inner.as_mut().poll_cancel(cx)
283    }
284}
285
286impl LocalExecutor for AnyLocalExecutor {
287    type Task<T: 'static> = AnyLocalExecutorTask<T>;
288
289    fn spawn<Fut>(&self, fut: Fut) -> Self::Task<Fut::Output>
290    where
291        Fut: Future + 'static,
292    {
293        let (sender, receiver) = async_channel::bounded(1);
294        let fut = async move {
295            let res = AssertUnwindSafe(fut).await;
296            let _ = sender.send(Ok(res)).await;
297        };
298        let inner = self.0.spawn_boxed(Box::pin(fut));
299        AnyLocalExecutorTask { inner, receiver }
300    }
301}
302
303/// Type alias for errors that can occur during task execution.
304///
305/// This represents panics or other unrecoverable errors from spawned tasks.
306type Error = Box<dyn core::any::Any + Send>;
307
308/// A trait representing a spawned task that can be awaited, cancelled, or queried for results.
309///
310/// This trait extends [`Future`] with additional capabilities for task management:
311/// - Explicit error handling via [`poll_result`](Self::poll_result)
312/// - Task cancellation via [`poll_cancel`](Self::poll_cancel)
313/// - Convenience methods for getting results and cancelling
314///
315pub trait Task<T>: Future<Output = T> {
316    /// Poll the task for completion, returning a [`Result`] that can contain errors.
317    ///
318    /// Unlike the [`Future::poll`] implementation, this method allows you to handle
319    /// task panics and other errors explicitly rather than propagating them.
320    fn poll_result(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<T, Error>>;
321
322    /// Poll for task cancellation.
323    ///
324    /// This method attempts to cancel the task and returns [`Poll::Ready`] when
325    /// the cancellation is complete.
326    fn poll_cancel(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<()>;
327
328    /// Get the result of the task, including any errors that occurred.
329    ///
330    /// This is equivalent to awaiting the task but returns a [`Result`] that
331    /// allows you to handle panics and other errors explicitly.
332    ///
333    fn result(self) -> impl Future<Output = Result<T, Error>>
334    where
335        Self: Sized,
336    {
337        ResultFuture {
338            task: self,
339            _phantom: PhantomData,
340        }
341    }
342
343    /// Cancel the task.
344    ///
345    /// This method requests cancellation of the task and returns a future that
346    /// completes when the cancellation is finished.
347    ///
348    fn cancel(self) -> impl Future<Output = ()>
349    where
350        Self: Sized,
351    {
352        CancelFuture {
353            task: self,
354            _phantom: PhantomData,
355        }
356    }
357}
358
359/// Future returned by [`Task::result()`].
360///
361/// This future resolves to a `Result<T, Error>` when the underlying task completes,
362/// allowing explicit handling of task panics and other errors without propagating them.
363pub struct ResultFuture<T: Task<U>, U> {
364    task: T,
365    _phantom: PhantomData<U>,
366}
367
368impl<T: Task<U>, U> core::fmt::Debug for ResultFuture<T, U> {
369    fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
370        f.debug_struct("ResultFuture").finish_non_exhaustive()
371    }
372}
373
374impl<T: Task<U>, U> Future for ResultFuture<T, U> {
375    type Output = Result<U, Error>;
376
377    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
378        let this = unsafe { self.get_unchecked_mut() };
379        unsafe { Pin::new_unchecked(&mut this.task) }.poll_result(cx)
380    }
381}
382
383/// Future returned by [`Task::cancel()`].
384///
385/// This future resolves when the underlying task cancellation is complete.
386/// The future completes regardless of whether the task was successfully cancelled
387/// or had already finished.
388pub struct CancelFuture<T: Task<U>, U> {
389    task: T,
390    _phantom: PhantomData<U>,
391}
392
393impl<T: Task<U>, U> core::fmt::Debug for CancelFuture<T, U> {
394    fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
395        f.debug_struct("CancelFuture").finish_non_exhaustive()
396    }
397}
398
399impl<T: Task<U>, U> Future for CancelFuture<T, U> {
400    type Output = ();
401
402    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
403        let this = unsafe { self.get_unchecked_mut() };
404        unsafe { Pin::new_unchecked(&mut this.task) }.poll_cancel(cx)
405    }
406}
407
408/// A type-erased [`Executor`] that can hold any executor implementation.
409///
410/// This allows for runtime selection of executors and storing different executor
411/// types in the same collection.
412///
413pub struct AnyExecutor(Box<dyn AnyExecutorImpl>);
414
415/// Task type returned by [`AnyExecutor`].
416///
417/// This task can be awaited like any other task and provides the same
418/// cancellation and error handling capabilities.
419pub struct AnyExecutorTask<T> {
420    inner: Pin<Box<dyn Task<()> + Send>>,
421    receiver: Receiver<Result<T, Error>>,
422}
423
424impl<T> core::fmt::Debug for AnyExecutorTask<T> {
425    fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
426        f.debug_struct("AnyExecutorTask").finish_non_exhaustive()
427    }
428}
429
430impl<T: Send> Future for AnyExecutorTask<T> {
431    type Output = T;
432
433    fn poll(
434        self: Pin<&mut Self>,
435        cx: &mut core::task::Context<'_>,
436    ) -> core::task::Poll<Self::Output> {
437        self.poll_result(cx).map(|res| res.unwrap())
438    }
439}
440
441impl<T: Send> Task<T> for AnyExecutorTask<T> {
442    fn poll_result(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<T, Error>> {
443        // First, ensure the underlying task is being polled
444        let this = unsafe { self.get_unchecked_mut() };
445        let _ = this.inner.as_mut().poll(cx);
446
447        // Then poll the receiver
448        let mut recv = this.receiver.recv();
449        unsafe {
450            Pin::new_unchecked(&mut recv)
451                .poll(cx)
452                .map(|res| res.unwrap_or_else(|_| Err(Box::new("Channel closed"))))
453        }
454    }
455    fn poll_cancel(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<()> {
456        let this = unsafe { self.get_unchecked_mut() };
457        this.inner.as_mut().poll_cancel(cx)
458    }
459}
460
461impl Executor for AnyExecutor {
462    type Task<T: Send + 'static> = AnyExecutorTask<T>;
463
464    fn spawn<Fut>(&self, fut: Fut) -> Self::Task<Fut::Output>
465    where
466        Fut: Future<Output: Send> + Send + 'static,
467    {
468        let (sender, receiver) = async_channel::bounded(1);
469        let fut = async move {
470            let res = AssertUnwindSafe(fut).await;
471            let _ = sender.send(Ok(res)).await;
472        };
473        let inner = self.0.spawn_boxed(Box::pin(fut));
474        AnyExecutorTask { inner, receiver }
475    }
476}
477
478trait AnyExecutorImpl: Send + Sync + Any {
479    fn spawn_boxed(
480        &self,
481        fut: Pin<Box<dyn Future<Output = ()> + Send>>,
482    ) -> Pin<Box<dyn Task<()> + Send>>;
483}
484
485impl<T: Task<T>> Task<T> for Pin<Box<T>> {
486    fn poll_result(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<T, Error>> {
487        let this = unsafe { self.get_unchecked_mut() };
488        this.as_mut().poll_result(cx)
489    }
490    fn poll_cancel(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<()> {
491        let this = unsafe { self.get_unchecked_mut() };
492
493        this.as_mut().poll_cancel(cx)
494    }
495}
496
497impl<E> AnyExecutorImpl for E
498where
499    E: Executor + 'static,
500{
501    fn spawn_boxed(
502        &self,
503        fut: Pin<Box<dyn Future<Output = ()> + Send>>,
504    ) -> Pin<Box<dyn Task<()> + Send>> {
505        let task = self.spawn(fut);
506        Box::pin(task)
507    }
508}
509
510mod std_on {
511    use alloc::boxed::Box;
512
513    use crate::{
514        AnyExecutor, AnyExecutorTask, AnyLocalExecutor, AnyLocalExecutorTask, Executor,
515        LocalExecutor,
516    };
517
518    extern crate std;
519
520    use core::{cell::OnceCell, future::Future, panic::AssertUnwindSafe};
521    use std::sync::OnceLock;
522    std::thread_local! {
523        static LOCAL_EXECUTOR: OnceCell<AnyLocalExecutor> = const { OnceCell::new() };
524    }
525
526    /// Initialize the thread-local executor for spawning non-Send futures.
527    ///
528    /// This must be called before using [`spawn_local`]. The executor will be used
529    /// for all [`spawn_local`] calls on the current thread.
530    ///
531    /// # Panics
532    ///
533    /// Panics if a local executor has already been set for this thread.
534    ///
535    pub fn init_local_executor(executor: impl LocalExecutor + 'static) {
536        if try_init_local_executor(executor).is_err() {
537            panic!("Local executor already set for this thread");
538        }
539    }
540
541    /// Try to initialize the thread-local executor for spawning non-Send futures.
542    ///
543    /// This is a non-panicking version of [`init_local_executor`].
544    pub fn try_init_local_executor<E>(executor: E) -> Result<(), E>
545    where
546        E: LocalExecutor + 'static,
547    {
548        LOCAL_EXECUTOR.with(|cell| {
549            cell.set(AnyLocalExecutor::new(executor))
550                .map_err(|e| *e.downcast().unwrap())
551        })
552    }
553
554    static GLOBAL_EXECUTOR: OnceLock<AnyExecutor> = OnceLock::new();
555
556    /// Initialize the global executor for spawning Send futures.
557    ///
558    /// This must be called before using [`spawn`]. The executor will be used
559    /// for all [`spawn`] calls across all threads.
560    ///
561    /// # Panics
562    ///
563    /// Panics if a global executor has already been set.
564    ///
565    pub fn init_global_executor(executor: impl crate::Executor + 'static) {
566        if GLOBAL_EXECUTOR.set(AnyExecutor::new(executor)).is_err() {
567            panic!("Global executor already set");
568        }
569    }
570
571    /// Try to initialize the global executor for spawning Send futures.
572    ///
573    /// This is a non-panicking version of [`init_global_executor`].
574    pub fn try_init_global_executor<E>(executor: E) -> Result<(), E>
575    where
576        E: crate::Executor + 'static,
577    {
578        GLOBAL_EXECUTOR
579            .set(AnyExecutor::new(executor))
580            .map_err(|e| *e.downcast().unwrap())
581    }
582
583    /// Spawn a `Send` future on the global executor.
584    ///
585    /// The global executor must be initialized with [`init_global_executor`] before
586    /// calling this function.
587    ///
588    /// # Panics
589    ///
590    /// Panics if the global executor has not been set.
591    ///
592    pub fn spawn<Fut>(fut: Fut) -> AnyExecutorTask<Fut::Output>
593    where
594        Fut: Future<Output: Send> + Send + 'static,
595    {
596        let executor = GLOBAL_EXECUTOR.get().expect("Global executor not set");
597        executor.spawn(fut)
598    }
599
600    /// Spawn a future on the thread-local executor.
601    ///
602    /// The local executor must be initialized with [`init_local_executor`] before
603    /// calling this function. Unlike [`spawn`], this can handle futures that are
604    /// not `Send`.
605    ///
606    /// # Panics
607    ///
608    /// Panics if the local executor has not been set for this thread.
609    ///
610    pub fn spawn_local<Fut>(fut: Fut) -> AnyLocalExecutorTask<Fut::Output>
611    where
612        Fut: Future + 'static,
613    {
614        LOCAL_EXECUTOR.with(|cell| {
615            let executor = cell.get().expect("Local executor not set");
616            executor.spawn(fut)
617        })
618    }
619
620    #[allow(unused)]
621    pub(crate) fn catch_unwind<F, R>(f: F) -> Result<R, Box<dyn std::any::Any + Send>>
622    where
623        F: FnOnce() -> R,
624    {
625        std::panic::catch_unwind(AssertUnwindSafe(f))
626    }
627}
628
629pub use std_on::*;
630
631// Re-export async-executor types
632#[cfg(feature = "async-executor")]
633#[cfg_attr(docsrs, doc(cfg(feature = "async-executor")))]
634pub use async_executor::AsyncTask;
635
636// Re-export tokio types
637#[cfg(feature = "tokio")]
638#[cfg_attr(docsrs, doc(cfg(feature = "tokio")))]
639pub use tokio::{DefaultExecutor, TokioLocalTask, TokioTask};
640
641// Re-export web types
642#[cfg(feature = "web")]
643#[cfg_attr(docsrs, doc(cfg(feature = "web")))]
644pub use web::{WebExecutor, WebTask};