executor_core/
async_executor.rs1use crate::{Executor, LocalExecutor, async_task::AsyncTask};
7use core::future::Future;
8
9pub use async_executor::{Executor as AsyncExecutor, LocalExecutor as AsyncLocalExecutor};
10
11impl Executor for async_executor::Executor<'static> {
12 type Task<T: Send + 'static> = AsyncTask<T>;
13
14 fn spawn<Fut>(&self, fut: Fut) -> Self::Task<Fut::Output>
15 where
16 Fut: Future<Output: Send> + Send + 'static,
17 {
18 async_executor::Executor::spawn(self, fut).into()
19 }
20}
21
22impl LocalExecutor for async_executor::LocalExecutor<'static> {
23 type Task<T: 'static> = AsyncTask<T>;
24
25 fn spawn_local<Fut>(&self, fut: Fut) -> Self::Task<Fut::Output>
26 where
27 Fut: Future + 'static,
28 {
29 async_executor::LocalExecutor::spawn(self, fut).into()
30 }
31}
32
33#[cfg(test)]
34mod tests {
35 #[cfg(feature = "std")]
36 extern crate std;
37
38 use crate::{Executor, LocalExecutor, Task, async_task::AsyncTask};
39 use alloc::task::Wake;
40 use alloc::{format, sync::Arc};
41 use core::future::Future;
42 use core::{
43 pin::Pin,
44 task::{Context, Poll, Waker},
45 };
46
47 struct TestWaker;
48 impl Wake for TestWaker {
49 fn wake(self: Arc<Self>) {}
50 }
51
52 fn create_waker() -> Waker {
53 Arc::new(TestWaker).into()
54 }
55
56 async fn sleep_ms(ms: u64) {
57 #[cfg(feature = "std")]
58 {
59 use std::time::{Duration, Instant};
60 let start = Instant::now();
61 while start.elapsed() < Duration::from_millis(ms) {
62 futures_lite::future::yield_now().await;
63 if start.elapsed() >= Duration::from_millis(ms) {
64 break;
65 }
66 }
67 }
68 #[cfg(not(feature = "std"))]
69 {
70 for _ in 0..ms {
71 futures_lite::future::yield_now().await;
72 }
73 }
74 }
75
76 #[test]
77 fn test_async_executor_spawn() {
78 let ex = async_executor::Executor::new();
79 let task: AsyncTask<i32> = Executor::spawn(&ex, async { 42 });
80
81 let result = futures_lite::future::block_on(ex.run(task));
82 assert_eq!(result, 42);
83 }
84
85 #[test]
86 fn test_async_executor_spawn_async_operation() {
87 let ex = async_executor::Executor::new();
88 let task: AsyncTask<&str> = Executor::spawn(&ex, async {
89 sleep_ms(1).await;
90 "completed"
91 });
92
93 let result = futures_lite::future::block_on(ex.run(task));
94 assert_eq!(result, "completed");
95 }
96
97 #[test]
98 fn test_async_task_future_impl() {
99 let ex = async_executor::Executor::new();
100 let mut task: AsyncTask<i32> = Executor::spawn(&ex, async { 100 });
101
102 let waker = create_waker();
103 let mut cx = Context::from_waker(&waker);
104
105 match Pin::new(&mut task).poll(&mut cx) {
106 Poll::Ready(result) => assert_eq!(result, 100),
107 Poll::Pending => {
108 let result = futures_lite::future::block_on(ex.run(task));
109 assert_eq!(result, 100);
110 }
111 }
112 }
113
114 #[test]
115 fn test_async_task_poll_result() {
116 let ex = async_executor::Executor::new();
117 let mut task: AsyncTask<&str> = Executor::spawn(&ex, async { "success" });
118
119 let waker = create_waker();
120 let mut cx = Context::from_waker(&waker);
121
122 match Pin::new(&mut task).poll_result(&mut cx) {
123 Poll::Ready(Ok(result)) => assert_eq!(result, "success"),
124 Poll::Ready(Err(_)) => panic!("Task should not fail"),
125 Poll::Pending => {
126 let result = futures_lite::future::block_on(ex.run(task.result()));
127 assert!(result.is_ok());
128 assert_eq!(result.unwrap(), "success");
129 }
130 }
131 }
132
133 #[test]
134 fn test_async_task_panic_handling() {
135 let ex = async_executor::Executor::new();
136 let task: AsyncTask<()> = Executor::spawn(&ex, async {
137 panic!("test panic");
138 });
139
140 let result = futures_lite::future::block_on(ex.run(task.result()));
141 assert!(result.is_err());
142 }
143
144 #[test]
145 fn test_async_task_from_impl() {
146 let ex = async_executor::Executor::new();
147 let async_task = async_executor::Executor::spawn(&ex, async { 42 });
148 let wrapped_task: AsyncTask<i32> = async_task.into();
149
150 let result = futures_lite::future::block_on(ex.run(wrapped_task));
151 assert_eq!(result, 42);
152 }
153
154 #[test]
155 fn test_local_executor_spawn() {
156 let local_ex = async_executor::LocalExecutor::new();
157 let task: AsyncTask<&str> = LocalExecutor::spawn_local(&local_ex, async { "local task" });
158
159 let result = futures_lite::future::block_on(local_ex.run(task));
160 assert_eq!(result, "local task");
161 }
162
163 #[test]
164 fn test_local_executor_spawn_non_send() {
165 use alloc::rc::Rc;
166
167 let local_ex = async_executor::LocalExecutor::new();
168 let non_send_data = Rc::new(42);
169
170 let task: AsyncTask<i32> =
171 LocalExecutor::spawn_local(&local_ex, async move { *non_send_data });
172
173 let result = futures_lite::future::block_on(local_ex.run(task));
174 assert_eq!(result, 42);
175 }
176
177 #[test]
178 fn test_async_task_poll_result_local() {
179 let local_ex = async_executor::LocalExecutor::new();
180 let mut task: AsyncTask<&str> =
181 LocalExecutor::spawn_local(&local_ex, async { "local success" });
182
183 let waker = create_waker();
184 let mut cx = Context::from_waker(&waker);
185
186 match Pin::new(&mut task).poll_result(&mut cx) {
187 Poll::Ready(Ok(result)) => assert_eq!(result, "local success"),
188 Poll::Ready(Err(_)) => panic!("Local task should not fail"),
189 Poll::Pending => {
190 let result = futures_lite::future::block_on(local_ex.run(task.result()));
191 assert!(result.is_ok());
192 assert_eq!(result.unwrap(), "local success");
193 }
194 }
195 }
196
197 #[test]
198 fn test_async_task_panic_handling_local() {
199 let local_ex = async_executor::LocalExecutor::new();
200 let task: AsyncTask<()> = LocalExecutor::spawn_local(&local_ex, async {
201 panic!("local panic");
202 });
203
204 let result = futures_lite::future::block_on(local_ex.run(task.result()));
205 assert!(result.is_err());
206 }
207
208 #[test]
209 fn test_async_task_debug() {
210 let ex = async_executor::Executor::new();
211 let task: AsyncTask<i32> = Executor::spawn(&ex, async { 42 });
212 let debug_str = format!("{:?}", task);
213 assert!(debug_str.contains("AsyncTask"));
214 }
215
216 #[test]
217 fn test_async_task_result_future() {
218 let ex = async_executor::Executor::new();
219 let task: AsyncTask<i32> = Executor::spawn(&ex, async { 123 });
220
221 let result = futures_lite::future::block_on(ex.run(task.result()));
222 assert!(result.is_ok());
223 assert_eq!(result.unwrap(), 123);
224 }
225
226 #[test]
227 fn test_multiple_tasks_concurrency() {
228 let ex = async_executor::Executor::new();
229
230 let task1: AsyncTask<i32> = Executor::spawn(&ex, async {
231 sleep_ms(10).await;
232 1
233 });
234
235 let task2: AsyncTask<i32> = Executor::spawn(&ex, async {
236 sleep_ms(5).await;
237 2
238 });
239
240 let task3: AsyncTask<i32> = Executor::spawn(&ex, async { 3 });
241
242 let result = futures_lite::future::block_on(ex.run(async {
243 let r1 = task1.await;
244 let r2 = task2.await;
245 let r3 = task3.await;
246 (r1, r2, r3)
247 }));
248
249 assert_eq!(result, (1, 2, 3));
250 }
251
252 #[test]
253 fn test_async_task_manually_drop_safety() {
254 let ex = async_executor::Executor::new();
255 let mut task: AsyncTask<i32> = Executor::spawn(&ex, async { 42 });
256
257 let waker = create_waker();
258 let mut cx = Context::from_waker(&waker);
259
260 let _poll_result = Pin::new(&mut task).poll_result(&mut cx);
261
262 #[allow(clippy::drop_non_drop)]
263 drop(task);
264 }
265}