executor_core/
async_executor.rs

1//! Integration with the `async-executor` crate.
2//!
3//! This module provides implementations of the [`Executor`] and [`LocalExecutor`] traits
4//! for the `async-executor` crate, using the [`AsyncTask`] wrapper from the `async_task` module.
5
6use crate::{Executor, LocalExecutor, async_task::AsyncTask};
7use core::future::Future;
8
9pub use async_executor::{Executor as AsyncExecutor, LocalExecutor as AsyncLocalExecutor};
10
11impl Executor for async_executor::Executor<'static> {
12    type Task<T: Send + 'static> = AsyncTask<T>;
13
14    fn spawn<Fut>(&self, fut: Fut) -> Self::Task<Fut::Output>
15    where
16        Fut: Future<Output: Send> + Send + 'static,
17    {
18        async_executor::Executor::spawn(self, fut).into()
19    }
20}
21
22impl LocalExecutor for async_executor::LocalExecutor<'static> {
23    type Task<T: 'static> = AsyncTask<T>;
24
25    fn spawn_local<Fut>(&self, fut: Fut) -> Self::Task<Fut::Output>
26    where
27        Fut: Future + 'static,
28    {
29        async_executor::LocalExecutor::spawn(self, fut).into()
30    }
31}
32
33#[cfg(test)]
34mod tests {
35    #[cfg(feature = "std")]
36    extern crate std;
37
38    use crate::{Executor, LocalExecutor, Task, async_task::AsyncTask};
39    use alloc::task::Wake;
40    use alloc::{format, sync::Arc};
41    use core::future::Future;
42    use core::{
43        pin::Pin,
44        task::{Context, Poll, Waker},
45    };
46
47    struct TestWaker;
48    impl Wake for TestWaker {
49        fn wake(self: Arc<Self>) {}
50    }
51
52    fn create_waker() -> Waker {
53        Arc::new(TestWaker).into()
54    }
55
56    async fn sleep_ms(ms: u64) {
57        #[cfg(feature = "std")]
58        {
59            use std::time::{Duration, Instant};
60            let start = Instant::now();
61            while start.elapsed() < Duration::from_millis(ms) {
62                futures_lite::future::yield_now().await;
63                if start.elapsed() >= Duration::from_millis(ms) {
64                    break;
65                }
66            }
67        }
68        #[cfg(not(feature = "std"))]
69        {
70            for _ in 0..ms {
71                futures_lite::future::yield_now().await;
72            }
73        }
74    }
75
76    #[test]
77    fn test_async_executor_spawn() {
78        let ex = async_executor::Executor::new();
79        let task: AsyncTask<i32> = Executor::spawn(&ex, async { 42 });
80
81        let result = futures_lite::future::block_on(ex.run(task));
82        assert_eq!(result, 42);
83    }
84
85    #[test]
86    fn test_async_executor_spawn_async_operation() {
87        let ex = async_executor::Executor::new();
88        let task: AsyncTask<&str> = Executor::spawn(&ex, async {
89            sleep_ms(1).await;
90            "completed"
91        });
92
93        let result = futures_lite::future::block_on(ex.run(task));
94        assert_eq!(result, "completed");
95    }
96
97    #[test]
98    fn test_async_task_future_impl() {
99        let ex = async_executor::Executor::new();
100        let mut task: AsyncTask<i32> = Executor::spawn(&ex, async { 100 });
101
102        let waker = create_waker();
103        let mut cx = Context::from_waker(&waker);
104
105        match Pin::new(&mut task).poll(&mut cx) {
106            Poll::Ready(result) => assert_eq!(result, 100),
107            Poll::Pending => {
108                let result = futures_lite::future::block_on(ex.run(task));
109                assert_eq!(result, 100);
110            }
111        }
112    }
113
114    #[test]
115    fn test_async_task_poll_result() {
116        let ex = async_executor::Executor::new();
117        let mut task: AsyncTask<&str> = Executor::spawn(&ex, async { "success" });
118
119        let waker = create_waker();
120        let mut cx = Context::from_waker(&waker);
121
122        match Pin::new(&mut task).poll_result(&mut cx) {
123            Poll::Ready(Ok(result)) => assert_eq!(result, "success"),
124            Poll::Ready(Err(_)) => panic!("Task should not fail"),
125            Poll::Pending => {
126                let result = futures_lite::future::block_on(ex.run(task.result()));
127                assert!(result.is_ok());
128                assert_eq!(result.unwrap(), "success");
129            }
130        }
131    }
132
133    #[test]
134    fn test_async_task_panic_handling() {
135        let ex = async_executor::Executor::new();
136        let task: AsyncTask<()> = Executor::spawn(&ex, async {
137            panic!("test panic");
138        });
139
140        let result = futures_lite::future::block_on(ex.run(task.result()));
141        assert!(result.is_err());
142    }
143
144    #[test]
145    fn test_async_task_from_impl() {
146        let ex = async_executor::Executor::new();
147        let async_task = async_executor::Executor::spawn(&ex, async { 42 });
148        let wrapped_task: AsyncTask<i32> = async_task.into();
149
150        let result = futures_lite::future::block_on(ex.run(wrapped_task));
151        assert_eq!(result, 42);
152    }
153
154    #[test]
155    fn test_local_executor_spawn() {
156        let local_ex = async_executor::LocalExecutor::new();
157        let task: AsyncTask<&str> = LocalExecutor::spawn_local(&local_ex, async { "local task" });
158
159        let result = futures_lite::future::block_on(local_ex.run(task));
160        assert_eq!(result, "local task");
161    }
162
163    #[test]
164    fn test_local_executor_spawn_non_send() {
165        use alloc::rc::Rc;
166
167        let local_ex = async_executor::LocalExecutor::new();
168        let non_send_data = Rc::new(42);
169
170        let task: AsyncTask<i32> =
171            LocalExecutor::spawn_local(&local_ex, async move { *non_send_data });
172
173        let result = futures_lite::future::block_on(local_ex.run(task));
174        assert_eq!(result, 42);
175    }
176
177    #[test]
178    fn test_async_task_poll_result_local() {
179        let local_ex = async_executor::LocalExecutor::new();
180        let mut task: AsyncTask<&str> =
181            LocalExecutor::spawn_local(&local_ex, async { "local success" });
182
183        let waker = create_waker();
184        let mut cx = Context::from_waker(&waker);
185
186        match Pin::new(&mut task).poll_result(&mut cx) {
187            Poll::Ready(Ok(result)) => assert_eq!(result, "local success"),
188            Poll::Ready(Err(_)) => panic!("Local task should not fail"),
189            Poll::Pending => {
190                let result = futures_lite::future::block_on(local_ex.run(task.result()));
191                assert!(result.is_ok());
192                assert_eq!(result.unwrap(), "local success");
193            }
194        }
195    }
196
197    #[test]
198    fn test_async_task_panic_handling_local() {
199        let local_ex = async_executor::LocalExecutor::new();
200        let task: AsyncTask<()> = LocalExecutor::spawn_local(&local_ex, async {
201            panic!("local panic");
202        });
203
204        let result = futures_lite::future::block_on(local_ex.run(task.result()));
205        assert!(result.is_err());
206    }
207
208    #[test]
209    fn test_async_task_debug() {
210        let ex = async_executor::Executor::new();
211        let task: AsyncTask<i32> = Executor::spawn(&ex, async { 42 });
212        let debug_str = format!("{:?}", task);
213        assert!(debug_str.contains("AsyncTask"));
214    }
215
216    #[test]
217    fn test_async_task_result_future() {
218        let ex = async_executor::Executor::new();
219        let task: AsyncTask<i32> = Executor::spawn(&ex, async { 123 });
220
221        let result = futures_lite::future::block_on(ex.run(task.result()));
222        assert!(result.is_ok());
223        assert_eq!(result.unwrap(), 123);
224    }
225
226    #[test]
227    fn test_multiple_tasks_concurrency() {
228        let ex = async_executor::Executor::new();
229
230        let task1: AsyncTask<i32> = Executor::spawn(&ex, async {
231            sleep_ms(10).await;
232            1
233        });
234
235        let task2: AsyncTask<i32> = Executor::spawn(&ex, async {
236            sleep_ms(5).await;
237            2
238        });
239
240        let task3: AsyncTask<i32> = Executor::spawn(&ex, async { 3 });
241
242        let result = futures_lite::future::block_on(ex.run(async {
243            let r1 = task1.await;
244            let r2 = task2.await;
245            let r3 = task3.await;
246            (r1, r2, r3)
247        }));
248
249        assert_eq!(result, (1, 2, 3));
250    }
251
252    #[test]
253    fn test_async_task_manually_drop_safety() {
254        let ex = async_executor::Executor::new();
255        let mut task: AsyncTask<i32> = Executor::spawn(&ex, async { 42 });
256
257        let waker = create_waker();
258        let mut cx = Context::from_waker(&waker);
259
260        let _poll_result = Pin::new(&mut task).poll_result(&mut cx);
261
262        #[allow(clippy::drop_non_drop)]
263        drop(task);
264    }
265}