executor_core/
async_executor.rs

1//! Integration with the `async-executor` crate.
2//!
3//! This module provides implementations of the [`Executor`] and [`LocalExecutor`] traits
4//! for the `async-executor` crate, along with the [`AsyncTask`] wrapper.
5
6use crate::{Executor, LocalExecutor, Task};
7use core::{future::Future, mem::ManuallyDrop, pin::pin, task::Poll};
8
9pub use async_executor::{Executor as AsyncExecutor, LocalExecutor as AsyncLocalExecutor};
10
11#[cfg(feature = "std")]
12use crate::catch_unwind;
13
14#[cfg(not(feature = "std"))]
15fn catch_unwind<F, R>(f: F) -> Result<R, crate::Error>
16where
17    F: FnOnce() -> R,
18{
19    // In no-std environments (like WASM), we can't catch panics
20    // so we just execute the function directly
21    Ok(f())
22}
23
24/// A task wrapper for `async_task::Task` that implements the [`Task`] trait.
25///
26/// This provides panic safety and proper error handling for tasks spawned
27/// with the `async-executor` crate.
28pub struct AsyncTask<T>(ManuallyDrop<Option<async_task::Task<T>>>);
29
30impl<T> core::fmt::Debug for AsyncTask<T> {
31    fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
32        f.debug_struct("AsyncTask").finish_non_exhaustive()
33    }
34}
35
36impl<T> From<async_task::Task<T>> for AsyncTask<T> {
37    fn from(task: async_task::Task<T>) -> Self {
38        Self(ManuallyDrop::new(Some(task)))
39    }
40}
41
42impl<T> Future for AsyncTask<T> {
43    type Output = T;
44
45    fn poll(
46        mut self: core::pin::Pin<&mut Self>,
47        cx: &mut core::task::Context<'_>,
48    ) -> core::task::Poll<Self::Output> {
49        self.as_mut()
50            .poll_result(cx)
51            .map(|res| res.expect("Task panicked"))
52    }
53}
54
55impl<T> Task<T> for AsyncTask<T> {
56    fn poll_result(
57        mut self: core::pin::Pin<&mut Self>,
58        cx: &mut core::task::Context<'_>,
59    ) -> core::task::Poll<Result<T, crate::Error>> {
60        let mut this = self.as_mut();
61
62        let task = this.0.as_mut().expect("Task has already been cancelled");
63        let result = catch_unwind(|| pin!(task).poll(cx));
64
65        match result {
66            Ok(Poll::Ready(value)) => Poll::Ready(Ok(value)),
67            Ok(Poll::Pending) => Poll::Pending,
68            Err(error) => Poll::Ready(Err(error)),
69        }
70    }
71    fn poll_cancel(
72        mut self: core::pin::Pin<&mut Self>,
73        cx: &mut core::task::Context<'_>,
74    ) -> core::task::Poll<()> {
75        let task = self.0.take().expect("Task has already been cancelled");
76        let cancel_fut = task.cancel();
77        pin!(cancel_fut).poll(cx).map(|_| {})
78    }
79}
80
81impl Executor for async_executor::Executor<'static> {
82    type Task<T: Send + 'static> = AsyncTask<T>;
83
84    fn spawn<Fut>(&self, fut: Fut) -> Self::Task<Fut::Output>
85    where
86        Fut: Future<Output: Send> + Send + 'static,
87    {
88        async_executor::Executor::spawn(self, fut).into()
89    }
90}
91
92impl LocalExecutor for async_executor::LocalExecutor<'static> {
93    type Task<T: 'static> = AsyncTask<T>;
94
95    fn spawn<Fut>(&self, fut: Fut) -> Self::Task<Fut::Output>
96    where
97        Fut: Future + 'static,
98    {
99        async_executor::LocalExecutor::spawn(self, fut).into()
100    }
101}
102
103#[cfg(test)]
104mod tests {
105    #[cfg(feature = "std")]
106    extern crate std;
107    
108    use super::*;
109    use crate::{Executor, LocalExecutor, Task};
110    use core::{pin::Pin, task::{Context, Poll, Waker}};
111    use core::future::Future;
112    use alloc::{sync::Arc, format};
113    use alloc::task::Wake;
114
115    struct TestWaker;
116    impl Wake for TestWaker {
117        fn wake(self: Arc<Self>) {}
118    }
119
120    fn create_waker() -> Waker {
121        Arc::new(TestWaker).into()
122    }
123
124    async fn sleep_ms(ms: u64) {
125        #[cfg(feature = "std")]
126        {
127            use std::time::{Duration, Instant};
128            let start = Instant::now();
129            while start.elapsed() < Duration::from_millis(ms) {
130                futures_lite::future::yield_now().await;
131                if start.elapsed() >= Duration::from_millis(ms) {
132                    break;
133                }
134            }
135        }
136        #[cfg(not(feature = "std"))]
137        {
138            for _ in 0..ms {
139                futures_lite::future::yield_now().await;
140            }
141        }
142    }
143
144    #[test]
145    fn test_async_executor_spawn() {
146        let ex = async_executor::Executor::new();
147        let task: AsyncTask<i32> = Executor::spawn(&ex, async { 42 });
148        
149        let result = futures_lite::future::block_on(ex.run(task));
150        assert_eq!(result, 42);
151    }
152
153    #[test]
154    fn test_async_executor_spawn_async_operation() {
155        let ex = async_executor::Executor::new();
156        let task: AsyncTask<&str> = Executor::spawn(&ex, async {
157            sleep_ms(1).await;
158            "completed"
159        });
160        
161        let result = futures_lite::future::block_on(ex.run(task));
162        assert_eq!(result, "completed");
163    }
164
165    #[test]
166    fn test_async_task_future_impl() {
167        let ex = async_executor::Executor::new();
168        let mut task: AsyncTask<i32> = Executor::spawn(&ex, async { 100 });
169        
170        let waker = create_waker();
171        let mut cx = Context::from_waker(&waker);
172        
173        match Pin::new(&mut task).poll(&mut cx) {
174            Poll::Ready(result) => assert_eq!(result, 100),
175            Poll::Pending => {
176                let result = futures_lite::future::block_on(ex.run(task));
177                assert_eq!(result, 100);
178            }
179        }
180    }
181
182    #[test]
183    fn test_async_task_poll_result() {
184        let ex = async_executor::Executor::new();
185        let mut task: AsyncTask<&str> = Executor::spawn(&ex, async { "success" });
186        
187        let waker = create_waker();
188        let mut cx = Context::from_waker(&waker);
189        
190        match Pin::new(&mut task).poll_result(&mut cx) {
191            Poll::Ready(Ok(result)) => assert_eq!(result, "success"),
192            Poll::Ready(Err(_)) => panic!("Task should not fail"),
193            Poll::Pending => {
194                let result = futures_lite::future::block_on(ex.run(task.result()));
195                assert!(result.is_ok());
196                assert_eq!(result.unwrap(), "success");
197            }
198        }
199    }
200
201
202    #[test]
203    fn test_async_task_panic_handling() {
204        let ex = async_executor::Executor::new();
205        let task: AsyncTask<()> = Executor::spawn(&ex, async {
206            panic!("test panic");
207        });
208        
209        let result = futures_lite::future::block_on(ex.run(task.result()));
210        assert!(result.is_err());
211    }
212
213    #[test]
214    fn test_async_task_from_impl() {
215        let ex = async_executor::Executor::new();
216        let async_task = async_executor::Executor::spawn(&ex, async { 42 });
217        let wrapped_task: AsyncTask<i32> = async_task.into();
218        
219        let result = futures_lite::future::block_on(ex.run(wrapped_task));
220        assert_eq!(result, 42);
221    }
222
223    #[test]
224    fn test_local_executor_spawn() {
225        let local_ex = async_executor::LocalExecutor::new();
226        let task: AsyncTask<&str> = LocalExecutor::spawn(&local_ex, async { "local task" });
227        
228        let result = futures_lite::future::block_on(local_ex.run(task));
229        assert_eq!(result, "local task");
230    }
231
232    #[test]
233    fn test_local_executor_spawn_non_send() {
234        use alloc::rc::Rc;
235        
236        let local_ex = async_executor::LocalExecutor::new();
237        let non_send_data = Rc::new(42);
238        
239        let task: AsyncTask<i32> = LocalExecutor::spawn(&local_ex, async move {
240            *non_send_data
241        });
242        
243        let result = futures_lite::future::block_on(local_ex.run(task));
244        assert_eq!(result, 42);
245    }
246
247    #[test]
248    fn test_async_task_poll_result_local() {
249        let local_ex = async_executor::LocalExecutor::new();
250        let mut task: AsyncTask<&str> = LocalExecutor::spawn(&local_ex, async { "local success" });
251        
252        let waker = create_waker();
253        let mut cx = Context::from_waker(&waker);
254        
255        match Pin::new(&mut task).poll_result(&mut cx) {
256            Poll::Ready(Ok(result)) => assert_eq!(result, "local success"),
257            Poll::Ready(Err(_)) => panic!("Local task should not fail"),
258            Poll::Pending => {
259                let result = futures_lite::future::block_on(local_ex.run(task.result()));
260                assert!(result.is_ok());
261                assert_eq!(result.unwrap(), "local success");
262            }
263        }
264    }
265
266
267    #[test]
268    fn test_async_task_panic_handling_local() {
269        let local_ex = async_executor::LocalExecutor::new();
270        let task: AsyncTask<()> = LocalExecutor::spawn(&local_ex, async {
271            panic!("local panic");
272        });
273        
274        let result = futures_lite::future::block_on(local_ex.run(task.result()));
275        assert!(result.is_err());
276    }
277
278    #[test]
279    fn test_async_task_debug() {
280        let ex = async_executor::Executor::new();
281        let task: AsyncTask<i32> = Executor::spawn(&ex, async { 42 });
282        let debug_str = format!("{:?}", task);
283        assert!(debug_str.contains("AsyncTask"));
284    }
285
286    #[test]
287    fn test_async_task_result_future() {
288        let ex = async_executor::Executor::new();
289        let task: AsyncTask<i32> = Executor::spawn(&ex, async { 123 });
290        
291        let result = futures_lite::future::block_on(ex.run(task.result()));
292        assert!(result.is_ok());
293        assert_eq!(result.unwrap(), 123);
294    }
295
296
297    #[test]
298    fn test_multiple_tasks_concurrency() {
299        let ex = async_executor::Executor::new();
300        
301        let task1: AsyncTask<i32> = Executor::spawn(&ex, async {
302            sleep_ms(10).await;
303            1
304        });
305        
306        let task2: AsyncTask<i32> = Executor::spawn(&ex, async {
307            sleep_ms(5).await;
308            2
309        });
310        
311        let task3: AsyncTask<i32> = Executor::spawn(&ex, async { 3 });
312        
313        let result = futures_lite::future::block_on(ex.run(async {
314            let r1 = task1.await;
315            let r2 = task2.await;
316            let r3 = task3.await;
317            (r1, r2, r3)
318        }));
319        
320        assert_eq!(result, (1, 2, 3));
321    }
322
323    #[test]
324    fn test_async_task_manually_drop_safety() {
325        let ex = async_executor::Executor::new();
326        let mut task: AsyncTask<i32> = Executor::spawn(&ex, async { 42 });
327        
328        let waker = create_waker();
329        let mut cx = Context::from_waker(&waker);
330        
331        let _poll_result = Pin::new(&mut task).poll_result(&mut cx);
332        
333        #[allow(clippy::drop_non_drop)]
334        drop(task);
335    }
336
337    #[test] 
338    fn test_catch_unwind_no_std() {
339        use super::catch_unwind;
340        
341        let result = catch_unwind(|| {
342            42
343        });
344        
345        assert!(result.is_ok());
346        assert_eq!(result.unwrap(), 42);
347    }
348}