executor_core/
async_task.rs1use crate::{Error, Task};
8use core::{
9 future::Future,
10 mem::ManuallyDrop,
11 pin::{Pin, pin},
12 task::{Context, Poll},
13};
14
15pub use async_task::{Runnable, Task as RawTask};
16
17#[cfg(feature = "std")]
18use crate::catch_unwind;
19
20#[cfg(not(feature = "std"))]
21fn catch_unwind<F, R>(f: F) -> Result<R, Error>
22where
23 F: FnOnce() -> R,
24{
25 Ok(f())
28}
29
30pub struct AsyncTask<T>(ManuallyDrop<Option<async_task::Task<T>>>);
35
36impl<T> core::fmt::Debug for AsyncTask<T> {
37 fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
38 f.debug_struct("AsyncTask").finish_non_exhaustive()
39 }
40}
41
42impl<T> From<async_task::Task<T>> for AsyncTask<T> {
43 fn from(task: async_task::Task<T>) -> Self {
44 Self(ManuallyDrop::new(Some(task)))
45 }
46}
47
48impl<T> Future for AsyncTask<T> {
49 type Output = T;
50
51 fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
52 self.as_mut()
53 .poll_result(cx)
54 .map(|res| res.expect("Task panicked"))
55 }
56}
57
58impl<T> Task<T> for AsyncTask<T> {
59 fn poll_result(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<T, Error>> {
60 let mut this = self.as_mut();
61
62 let task = this.0.as_mut().expect("Task has already been cancelled");
63 let result = catch_unwind(|| pin!(task).poll(cx));
64
65 match result {
66 Ok(Poll::Ready(value)) => Poll::Ready(Ok(value)),
67 Ok(Poll::Pending) => Poll::Pending,
68 Err(error) => Poll::Ready(Err(error)),
69 }
70 }
71
72 fn poll_cancel(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<()> {
73 let task = self.0.take().expect("Task has already been cancelled");
74 let cancel_fut = task.cancel();
75 pin!(cancel_fut).poll(cx).map(|_| ())
76 }
77}
78
79pub fn spawn<F, S>(future: F, scheduler: S) -> (Runnable, AsyncTask<F::Output>)
88where
89 F: Future + Send + 'static,
90 F::Output: Send + 'static,
91 S: Fn(Runnable) + Send + Sync + 'static,
92{
93 let (runnable, task) = async_task::spawn(future, scheduler);
94 (runnable, AsyncTask::from(task))
95}
96
97#[cfg(feature = "std")]
104pub fn spawn_local<F, S>(future: F, scheduler: S) -> (Runnable, AsyncTask<F::Output>)
105where
106 F: Future + 'static,
107 S: Fn(Runnable) + Send + Sync + 'static,
108{
109 let (runnable, task) = async_task::spawn_local(future, scheduler);
110 (runnable, AsyncTask::from(task))
111}