executor_core/
async_executor.rs1use crate::{Executor, LocalExecutor, Task};
7use core::{future::Future, mem::ManuallyDrop, pin::pin, task::Poll};
8
9pub use async_executor::{Executor as AsyncExecutor, LocalExecutor as AsyncLocalExecutor};
10
11#[cfg(feature = "std")]
12use crate::catch_unwind;
13
14#[cfg(not(feature = "std"))]
15fn catch_unwind<F, R>(f: F) -> Result<R, crate::Error>
16where
17 F: FnOnce() -> R,
18{
19 Ok(f())
22}
23
24pub struct AsyncTask<T>(ManuallyDrop<Option<async_task::Task<T>>>);
29
30impl<T> core::fmt::Debug for AsyncTask<T> {
31 fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
32 f.debug_struct("AsyncTask").finish_non_exhaustive()
33 }
34}
35
36impl<T> From<async_task::Task<T>> for AsyncTask<T> {
37 fn from(task: async_task::Task<T>) -> Self {
38 Self(ManuallyDrop::new(Some(task)))
39 }
40}
41
42impl<T> Future for AsyncTask<T> {
43 type Output = T;
44
45 fn poll(
46 mut self: core::pin::Pin<&mut Self>,
47 cx: &mut core::task::Context<'_>,
48 ) -> core::task::Poll<Self::Output> {
49 self.as_mut()
50 .poll_result(cx)
51 .map(|res| res.expect("Task panicked"))
52 }
53}
54
55impl<T> Task<T> for AsyncTask<T> {
56 fn poll_result(
57 mut self: core::pin::Pin<&mut Self>,
58 cx: &mut core::task::Context<'_>,
59 ) -> core::task::Poll<Result<T, crate::Error>> {
60 let mut this = self.as_mut();
61
62 let task = this.0.as_mut().expect("Task has already been cancelled");
63 let result = catch_unwind(|| pin!(task).poll(cx));
64
65 match result {
66 Ok(Poll::Ready(value)) => Poll::Ready(Ok(value)),
67 Ok(Poll::Pending) => Poll::Pending,
68 Err(error) => Poll::Ready(Err(error)),
69 }
70 }
71 fn poll_cancel(
72 mut self: core::pin::Pin<&mut Self>,
73 cx: &mut core::task::Context<'_>,
74 ) -> core::task::Poll<()> {
75 let task = self.0.take().expect("Task has already been cancelled");
76 let cancel_fut = task.cancel();
77 pin!(cancel_fut).poll(cx).map(|_| {})
78 }
79}
80
81impl Executor for async_executor::Executor<'static> {
82 type Task<T: Send + 'static> = AsyncTask<T>;
83
84 fn spawn<Fut>(&self, fut: Fut) -> Self::Task<Fut::Output>
85 where
86 Fut: Future<Output: Send> + Send + 'static,
87 {
88 async_executor::Executor::spawn(self, fut).into()
89 }
90}
91
92impl LocalExecutor for async_executor::LocalExecutor<'static> {
93 type Task<T: 'static> = AsyncTask<T>;
94
95 fn spawn<Fut>(&self, fut: Fut) -> Self::Task<Fut::Output>
96 where
97 Fut: Future + 'static,
98 {
99 async_executor::LocalExecutor::spawn(self, fut).into()
100 }
101}
102
103#[cfg(test)]
104mod tests {
105 #[cfg(feature = "std")]
106 extern crate std;
107
108 use super::*;
109 use crate::{Executor, LocalExecutor, Task};
110 use core::{pin::Pin, task::{Context, Poll, Waker}};
111 use core::future::Future;
112 use alloc::{sync::Arc, format};
113 use alloc::task::Wake;
114
115 struct TestWaker;
116 impl Wake for TestWaker {
117 fn wake(self: Arc<Self>) {}
118 }
119
120 fn create_waker() -> Waker {
121 Arc::new(TestWaker).into()
122 }
123
124 async fn sleep_ms(ms: u64) {
125 #[cfg(feature = "std")]
126 {
127 use std::time::{Duration, Instant};
128 let start = Instant::now();
129 while start.elapsed() < Duration::from_millis(ms) {
130 futures_lite::future::yield_now().await;
131 if start.elapsed() >= Duration::from_millis(ms) {
132 break;
133 }
134 }
135 }
136 #[cfg(not(feature = "std"))]
137 {
138 for _ in 0..ms {
139 futures_lite::future::yield_now().await;
140 }
141 }
142 }
143
144 #[test]
145 fn test_async_executor_spawn() {
146 let ex = async_executor::Executor::new();
147 let task: AsyncTask<i32> = Executor::spawn(&ex, async { 42 });
148
149 let result = futures_lite::future::block_on(ex.run(task));
150 assert_eq!(result, 42);
151 }
152
153 #[test]
154 fn test_async_executor_spawn_async_operation() {
155 let ex = async_executor::Executor::new();
156 let task: AsyncTask<&str> = Executor::spawn(&ex, async {
157 sleep_ms(1).await;
158 "completed"
159 });
160
161 let result = futures_lite::future::block_on(ex.run(task));
162 assert_eq!(result, "completed");
163 }
164
165 #[test]
166 fn test_async_task_future_impl() {
167 let ex = async_executor::Executor::new();
168 let mut task: AsyncTask<i32> = Executor::spawn(&ex, async { 100 });
169
170 let waker = create_waker();
171 let mut cx = Context::from_waker(&waker);
172
173 match Pin::new(&mut task).poll(&mut cx) {
174 Poll::Ready(result) => assert_eq!(result, 100),
175 Poll::Pending => {
176 let result = futures_lite::future::block_on(ex.run(task));
177 assert_eq!(result, 100);
178 }
179 }
180 }
181
182 #[test]
183 fn test_async_task_poll_result() {
184 let ex = async_executor::Executor::new();
185 let mut task: AsyncTask<&str> = Executor::spawn(&ex, async { "success" });
186
187 let waker = create_waker();
188 let mut cx = Context::from_waker(&waker);
189
190 match Pin::new(&mut task).poll_result(&mut cx) {
191 Poll::Ready(Ok(result)) => assert_eq!(result, "success"),
192 Poll::Ready(Err(_)) => panic!("Task should not fail"),
193 Poll::Pending => {
194 let result = futures_lite::future::block_on(ex.run(task.result()));
195 assert!(result.is_ok());
196 assert_eq!(result.unwrap(), "success");
197 }
198 }
199 }
200
201
202 #[test]
203 fn test_async_task_panic_handling() {
204 let ex = async_executor::Executor::new();
205 let task: AsyncTask<()> = Executor::spawn(&ex, async {
206 panic!("test panic");
207 });
208
209 let result = futures_lite::future::block_on(ex.run(task.result()));
210 assert!(result.is_err());
211 }
212
213 #[test]
214 fn test_async_task_from_impl() {
215 let ex = async_executor::Executor::new();
216 let async_task = async_executor::Executor::spawn(&ex, async { 42 });
217 let wrapped_task: AsyncTask<i32> = async_task.into();
218
219 let result = futures_lite::future::block_on(ex.run(wrapped_task));
220 assert_eq!(result, 42);
221 }
222
223 #[test]
224 fn test_local_executor_spawn() {
225 let local_ex = async_executor::LocalExecutor::new();
226 let task: AsyncTask<&str> = LocalExecutor::spawn(&local_ex, async { "local task" });
227
228 let result = futures_lite::future::block_on(local_ex.run(task));
229 assert_eq!(result, "local task");
230 }
231
232 #[test]
233 fn test_local_executor_spawn_non_send() {
234 use alloc::rc::Rc;
235
236 let local_ex = async_executor::LocalExecutor::new();
237 let non_send_data = Rc::new(42);
238
239 let task: AsyncTask<i32> = LocalExecutor::spawn(&local_ex, async move {
240 *non_send_data
241 });
242
243 let result = futures_lite::future::block_on(local_ex.run(task));
244 assert_eq!(result, 42);
245 }
246
247 #[test]
248 fn test_async_task_poll_result_local() {
249 let local_ex = async_executor::LocalExecutor::new();
250 let mut task: AsyncTask<&str> = LocalExecutor::spawn(&local_ex, async { "local success" });
251
252 let waker = create_waker();
253 let mut cx = Context::from_waker(&waker);
254
255 match Pin::new(&mut task).poll_result(&mut cx) {
256 Poll::Ready(Ok(result)) => assert_eq!(result, "local success"),
257 Poll::Ready(Err(_)) => panic!("Local task should not fail"),
258 Poll::Pending => {
259 let result = futures_lite::future::block_on(local_ex.run(task.result()));
260 assert!(result.is_ok());
261 assert_eq!(result.unwrap(), "local success");
262 }
263 }
264 }
265
266
267 #[test]
268 fn test_async_task_panic_handling_local() {
269 let local_ex = async_executor::LocalExecutor::new();
270 let task: AsyncTask<()> = LocalExecutor::spawn(&local_ex, async {
271 panic!("local panic");
272 });
273
274 let result = futures_lite::future::block_on(local_ex.run(task.result()));
275 assert!(result.is_err());
276 }
277
278 #[test]
279 fn test_async_task_debug() {
280 let ex = async_executor::Executor::new();
281 let task: AsyncTask<i32> = Executor::spawn(&ex, async { 42 });
282 let debug_str = format!("{:?}", task);
283 assert!(debug_str.contains("AsyncTask"));
284 }
285
286 #[test]
287 fn test_async_task_result_future() {
288 let ex = async_executor::Executor::new();
289 let task: AsyncTask<i32> = Executor::spawn(&ex, async { 123 });
290
291 let result = futures_lite::future::block_on(ex.run(task.result()));
292 assert!(result.is_ok());
293 assert_eq!(result.unwrap(), 123);
294 }
295
296
297 #[test]
298 fn test_multiple_tasks_concurrency() {
299 let ex = async_executor::Executor::new();
300
301 let task1: AsyncTask<i32> = Executor::spawn(&ex, async {
302 sleep_ms(10).await;
303 1
304 });
305
306 let task2: AsyncTask<i32> = Executor::spawn(&ex, async {
307 sleep_ms(5).await;
308 2
309 });
310
311 let task3: AsyncTask<i32> = Executor::spawn(&ex, async { 3 });
312
313 let result = futures_lite::future::block_on(ex.run(async {
314 let r1 = task1.await;
315 let r2 = task2.await;
316 let r3 = task3.await;
317 (r1, r2, r3)
318 }));
319
320 assert_eq!(result, (1, 2, 3));
321 }
322
323 #[test]
324 fn test_async_task_manually_drop_safety() {
325 let ex = async_executor::Executor::new();
326 let mut task: AsyncTask<i32> = Executor::spawn(&ex, async { 42 });
327
328 let waker = create_waker();
329 let mut cx = Context::from_waker(&waker);
330
331 let _poll_result = Pin::new(&mut task).poll_result(&mut cx);
332
333 #[allow(clippy::drop_non_drop)]
334 drop(task);
335 }
336
337 #[test]
338 fn test_catch_unwind_no_std() {
339 use super::catch_unwind;
340
341 let result = catch_unwind(|| {
342 42
343 });
344
345 assert!(result.is_ok());
346 assert_eq!(result.unwrap(), 42);
347 }
348}