executor_core/
async_task.rs

1//! Integration with the `async-task` crate.
2//!
3//! This module provides a unified wrapper around the `async-task` crate that can be used
4//! by different executor implementations. It offers task spawning utilities and a
5//! task wrapper that implements the [`Task`] trait.
6
7use crate::{Error, Task};
8use core::{
9    future::Future,
10    mem::ManuallyDrop,
11    pin::{Pin, pin},
12    task::{Context, Poll},
13};
14
15pub use async_task::{Runnable, Task as RawTask};
16
17#[cfg(feature = "std")]
18use crate::catch_unwind;
19
20#[cfg(not(feature = "std"))]
21fn catch_unwind<F, R>(f: F) -> Result<R, Error>
22where
23    F: FnOnce() -> R,
24{
25    // In no-std environments (like WASM), we can't catch panics
26    // so we just execute the function directly
27    Ok(f())
28}
29
30/// A task wrapper that implements the [`Task`] trait.
31///
32/// This provides panic safety and proper error handling for tasks created
33/// with the `async-task` crate.
34pub struct AsyncTask<T>(ManuallyDrop<Option<async_task::Task<T>>>);
35
36impl<T> core::fmt::Debug for AsyncTask<T> {
37    fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
38        f.debug_struct("AsyncTask").finish_non_exhaustive()
39    }
40}
41
42impl<T> From<async_task::Task<T>> for AsyncTask<T> {
43    fn from(task: async_task::Task<T>) -> Self {
44        Self(ManuallyDrop::new(Some(task)))
45    }
46}
47
48impl<T> Future for AsyncTask<T> {
49    type Output = T;
50
51    fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
52        self.as_mut()
53            .poll_result(cx)
54            .map(|res| res.expect("Task panicked"))
55    }
56}
57
58impl<T> Task<T> for AsyncTask<T> {
59    fn poll_result(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<T, Error>> {
60        let mut this = self.as_mut();
61
62        let task = this.0.as_mut().expect("Task has already been cancelled");
63        let result = catch_unwind(|| pin!(task).poll(cx));
64
65        match result {
66            Ok(Poll::Ready(value)) => Poll::Ready(Ok(value)),
67            Ok(Poll::Pending) => Poll::Pending,
68            Err(error) => Poll::Ready(Err(error)),
69        }
70    }
71
72    fn poll_cancel(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<()> {
73        let task = self.0.take().expect("Task has already been cancelled");
74        let cancel_fut = task.cancel();
75        pin!(cancel_fut).poll(cx).map(|_| ())
76    }
77}
78
79/// Spawn a future with a custom scheduler using `async_task`.
80///
81/// This function creates a task that will be scheduled using the provided scheduler function.
82/// The scheduler receives a [`Runnable`] that should be executed to make progress on the task.
83///
84/// Returns a tuple of (runnable, task) where:
85/// - `runnable` should be scheduled immediately to start the task
86/// - `task` is an [`AsyncTask`] that can be awaited for the result
87pub fn spawn<F, S>(future: F, scheduler: S) -> (Runnable, AsyncTask<F::Output>)
88where
89    F: Future + Send + 'static,
90    F::Output: Send + 'static,
91    S: Fn(Runnable) + Send + Sync + 'static,
92{
93    let (runnable, task) = async_task::spawn(future, scheduler);
94    (runnable, AsyncTask::from(task))
95}
96
97/// Spawn a local (non-Send) future with a custom scheduler using `async_task`.
98///
99/// This is similar to [`spawn`] but works with futures that are not `Send`.
100/// It uses `async_task::spawn_local` internally.
101///
102/// This function is only available when the `std` feature is enabled.
103#[cfg(feature = "std")]
104pub fn spawn_local<F, S>(future: F, scheduler: S) -> (Runnable, AsyncTask<F::Output>)
105where
106    F: Future + 'static,
107    S: Fn(Runnable) + Send + Sync + 'static,
108{
109    let (runnable, task) = async_task::spawn_local(future, scheduler);
110    (runnable, AsyncTask::from(task))
111}