1#[cfg(feature = "std")]
7extern crate std;
8
9use crate::{Executor, LocalExecutor, Task};
10use alloc::boxed::Box;
11use core::{
12 future::Future,
13 pin::Pin,
14 task::{Context, Poll},
15};
16
17#[derive(Debug, Clone, Copy)]
19pub struct TokioGlobal;
20
21impl Executor for TokioGlobal {
22 type Task<T: Send + 'static> = TokioTask<T>;
23
24 fn spawn<Fut>(&self, fut: Fut) -> Self::Task<Fut::Output>
25 where
26 Fut: Future<Output: Send> + Send + 'static,
27 {
28 let handle = tokio::task::spawn(fut);
29 TokioTask { handle }
30 }
31}
32
33pub use tokio::{runtime::Runtime, task::JoinHandle, task::LocalSet};
34
35pub struct TokioTask<T> {
40 handle: tokio::task::JoinHandle<T>,
41}
42
43impl<T> core::fmt::Debug for TokioTask<T> {
44 fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
45 f.debug_struct("TokioTask").finish_non_exhaustive()
46 }
47}
48
49impl<T: Send + 'static> Future for TokioTask<T> {
50 type Output = T;
51
52 fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
53 match Pin::new(&mut self.handle).poll(cx) {
54 Poll::Ready(Ok(result)) => Poll::Ready(result),
55 Poll::Ready(Err(err)) => {
56 if err.is_panic() {
57 std::panic::resume_unwind(err.into_panic());
58 } else {
59 std::panic::panic_any("Task was cancelled")
61 }
62 }
63 Poll::Pending => Poll::Pending,
64 }
65 }
66}
67
68impl<T: Send + 'static> Task<T> for TokioTask<T> {
69 fn poll_result(
70 mut self: Pin<&mut Self>,
71 cx: &mut Context<'_>,
72 ) -> Poll<Result<T, crate::Error>> {
73 match Pin::new(&mut self.handle).poll(cx) {
74 Poll::Ready(Ok(result)) => Poll::Ready(Ok(result)),
75 Poll::Ready(Err(err)) => {
76 let error: crate::Error = if err.is_panic() {
77 err.into_panic()
78 } else {
79 Box::new("Task was cancelled")
80 };
81 Poll::Ready(Err(error))
82 }
83 Poll::Pending => Poll::Pending,
84 }
85 }
86}
87
88impl<T> Drop for TokioTask<T> {
89 fn drop(&mut self) {
90 self.handle.abort();
91 }
92}
93
94pub struct TokioLocalTask<T> {
99 handle: tokio::task::JoinHandle<T>,
100}
101
102impl<T> core::fmt::Debug for TokioLocalTask<T> {
103 fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
104 f.debug_struct("TokioLocalTask").finish_non_exhaustive()
105 }
106}
107
108impl<T: 'static> Future for TokioLocalTask<T> {
109 type Output = T;
110
111 fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
112 match Pin::new(&mut self.handle).poll(cx) {
113 Poll::Ready(Ok(result)) => Poll::Ready(result),
114 Poll::Ready(Err(err)) => {
115 if err.is_panic() {
116 std::panic::resume_unwind(err.into_panic());
117 } else {
118 std::panic::panic_any("Task was cancelled")
120 }
121 }
122 Poll::Pending => Poll::Pending,
123 }
124 }
125}
126
127impl<T: 'static> Task<T> for TokioLocalTask<T> {
128 fn poll_result(
129 mut self: Pin<&mut Self>,
130 cx: &mut Context<'_>,
131 ) -> Poll<Result<T, crate::Error>> {
132 match Pin::new(&mut self.handle).poll(cx) {
133 Poll::Ready(Ok(result)) => Poll::Ready(Ok(result)),
134 Poll::Ready(Err(err)) => {
135 let error: crate::Error = if err.is_panic() {
136 err.into_panic()
137 } else {
138 Box::new("Task was cancelled")
139 };
140 Poll::Ready(Err(error))
141 }
142 Poll::Pending => Poll::Pending,
143 }
144 }
145}
146
147impl Executor for tokio::runtime::Runtime {
148 type Task<T: Send + 'static> = TokioTask<T>;
149
150 fn spawn<Fut>(&self, fut: Fut) -> Self::Task<Fut::Output>
151 where
152 Fut: Future<Output: Send> + Send + 'static,
153 {
154 let handle = self.spawn(fut);
155 TokioTask { handle }
156 }
157}
158
159impl LocalExecutor for tokio::task::LocalSet {
160 type Task<T: 'static> = TokioLocalTask<T>;
161
162 fn spawn_local<Fut>(&self, fut: Fut) -> Self::Task<Fut::Output>
163 where
164 Fut: Future + 'static,
165 {
166 let handle = self.spawn_local(fut);
167 TokioLocalTask { handle }
168 }
169}
170
171#[cfg(test)]
172mod tests {
173 use super::*;
174 use crate::{Executor, LocalExecutor, Task};
175 use alloc::task::Wake;
176 use alloc::{format, sync::Arc};
177 use core::future::Future;
178 use core::{
179 pin::Pin,
180 task::{Context, Poll, Waker},
181 };
182 use tokio::time::{Duration, sleep};
183
184 struct TestWaker;
185 impl Wake for TestWaker {
186 fn wake(self: Arc<Self>) {}
187 }
188
189 fn create_waker() -> Waker {
190 Arc::new(TestWaker).into()
191 }
192
193 #[test]
194 fn test_default_executor_spawn() {
195 let executor = Runtime::new().expect("Failed to create Tokio runtime");
196 let task: TokioTask<i32> = Executor::spawn(&executor, async { 42 });
197 let result = executor.block_on(task);
198 assert_eq!(result, 42);
199 }
200
201 #[test]
202 fn test_default_executor_spawn_async_operation() {
203 let executor = Runtime::new().expect("Failed to create Tokio runtime");
204 let task: TokioTask<&str> = Executor::spawn(&executor, async {
205 sleep(Duration::from_millis(10)).await;
206 "completed"
207 });
208 let result = executor.block_on(task);
209 assert_eq!(result, "completed");
210 }
211
212 #[test]
213 fn test_tokio_task_future_impl() {
214 let executor = Runtime::new().expect("Failed to create Tokio runtime");
215 let mut task: TokioTask<i32> = Executor::spawn(&executor, async { 100 });
216
217 let waker = create_waker();
218 let mut cx = Context::from_waker(&waker);
219
220 match Pin::new(&mut task).poll(&mut cx) {
221 Poll::Ready(result) => assert_eq!(result, 100),
222 Poll::Pending => {
223 let result = executor.block_on(task);
224 assert_eq!(result, 100);
225 }
226 }
227 }
228
229 #[test]
230 fn test_tokio_task_poll_result() {
231 let executor = Runtime::new().expect("Failed to create Tokio runtime");
232 let mut task: TokioTask<&str> = Executor::spawn(&executor, async { "success" });
233
234 let waker = create_waker();
235 let mut cx = Context::from_waker(&waker);
236
237 match Pin::new(&mut task).poll_result(&mut cx) {
238 Poll::Ready(Ok(result)) => assert_eq!(result, "success"),
239 Poll::Ready(Err(_)) => panic!("Task should not fail"),
240 Poll::Pending => {
241 let result = executor.block_on(task.result());
242 assert!(result.is_ok());
243 assert_eq!(result.unwrap(), "success");
244 }
245 }
246 }
247
248 #[test]
249 fn test_tokio_task_panic_handling() {
250 let executor = Runtime::new().expect("Failed to create Tokio runtime");
251 let task: TokioTask<()> = Executor::spawn(&executor, async {
252 panic!("test panic");
253 });
254
255 let result = executor.block_on(task.result());
256 assert!(result.is_err());
257 }
258
259 #[test]
260 fn test_default_executor_default() {
261 let executor1 = Runtime::new().expect("Failed to create Tokio runtime");
262 let executor2 = Runtime::new().expect("Failed to create Tokio runtime");
263
264 let task1: TokioTask<i32> = Executor::spawn(&executor1, async { 1 });
265 let task2: TokioTask<i32> = Executor::spawn(&executor2, async { 2 });
266
267 assert_eq!(executor1.block_on(task1), 1);
268 assert_eq!(executor2.block_on(task2), 2);
269 }
270
271 #[test]
272 fn test_runtime_executor_impl() {
273 let rt = tokio::runtime::Runtime::new().unwrap();
274 let task: TokioTask<&str> = Executor::spawn(&rt, async { "runtime task" });
275 let result = rt.block_on(task);
276 assert_eq!(result, "runtime task");
277 }
278
279 #[tokio::test]
280 async fn test_local_set_executor() {
281 let local_set = tokio::task::LocalSet::new();
282
283 local_set
284 .run_until(async {
285 let task: TokioLocalTask<&str> =
286 LocalExecutor::spawn_local(&local_set, async { "local task" });
287 let result = task.await;
288 assert_eq!(result, "local task");
289 })
290 .await;
291 }
292
293 #[tokio::test]
294 async fn test_tokio_local_task_future_impl() {
295 let local_set = tokio::task::LocalSet::new();
296
297 local_set
298 .run_until(async {
299 let mut task: TokioLocalTask<i32> =
300 LocalExecutor::spawn_local(&local_set, async { 200 });
301
302 let waker = create_waker();
303 let mut cx = Context::from_waker(&waker);
304
305 match Pin::new(&mut task).poll(&mut cx) {
306 Poll::Ready(result) => assert_eq!(result, 200),
307 Poll::Pending => {
308 let result = task.await;
309 assert_eq!(result, 200);
310 }
311 }
312 })
313 .await;
314 }
315
316 #[tokio::test]
317 async fn test_tokio_local_task_poll_result() {
318 let local_set = tokio::task::LocalSet::new();
319
320 local_set
321 .run_until(async {
322 let mut task: TokioLocalTask<&str> =
323 LocalExecutor::spawn_local(&local_set, async { "local success" });
324
325 let waker = create_waker();
326 let mut cx = Context::from_waker(&waker);
327
328 match Pin::new(&mut task).poll_result(&mut cx) {
329 Poll::Ready(Ok(result)) => assert_eq!(result, "local success"),
330 Poll::Ready(Err(_)) => panic!("Local task should not fail"),
331 Poll::Pending => {
332 let result = task.result().await;
333 assert!(result.is_ok());
334 assert_eq!(result.unwrap(), "local success");
335 }
336 }
337 })
338 .await;
339 }
340
341 #[tokio::test]
342 async fn test_tokio_local_task_panic_handling() {
343 let local_set = tokio::task::LocalSet::new();
344
345 local_set
346 .run_until(async {
347 let task: TokioLocalTask<()> = LocalExecutor::spawn_local(&local_set, async {
348 panic!("local panic");
349 });
350
351 let result = task.result().await;
352 assert!(result.is_err());
353 })
354 .await;
355 }
356
357 #[test]
358 fn test_tokio_task_debug() {
359 let rt = tokio::runtime::Runtime::new().unwrap();
360 let task: TokioTask<i32> = Executor::spawn(&rt, async { 42 });
361 let debug_str = format!("{:?}", task);
362 assert!(debug_str.contains("TokioTask"));
363 }
364
365 #[test]
366 fn test_tokio_local_task_debug() {
367 let local_set = tokio::task::LocalSet::new();
368 let rt = tokio::runtime::Runtime::new().unwrap();
369
370 rt.block_on(local_set.run_until(async {
371 let task: TokioLocalTask<i32> = LocalExecutor::spawn_local(&local_set, async { 42 });
372 let debug_str = format!("{:?}", task);
373 assert!(debug_str.contains("TokioLocalTask"));
374 }));
375 }
376
377 #[test]
378 fn test_default_executor_debug() {
379 let executor = Runtime::new().expect("Failed to create Tokio runtime");
380 let debug_str = format!("{:?}", executor);
381 assert!(!debug_str.is_empty());
382 }
383
384 #[test]
385 fn test_task_result_future() {
386 let executor = Runtime::new().expect("Failed to create Tokio runtime");
387 let task: TokioTask<i32> = Executor::spawn(&executor, async { 123 });
388
389 let result = executor.block_on(task.result());
390 assert!(result.is_ok());
391 assert_eq!(result.unwrap(), 123);
392 }
393
394 #[test]
395 fn test_multiple_tasks_concurrency() {
396 let executor = Runtime::new().expect("Failed to create Tokio runtime");
397
398 let task1: TokioTask<i32> = Executor::spawn(&executor, async {
399 sleep(Duration::from_millis(50)).await;
400 1
401 });
402
403 let task2: TokioTask<i32> = Executor::spawn(&executor, async {
404 sleep(Duration::from_millis(25)).await;
405 2
406 });
407
408 let task3: TokioTask<i32> = Executor::spawn(&executor, async { 3 });
409
410 let (r1, r2, r3) = executor.block_on(async { tokio::join!(task1, task2, task3) });
411 assert_eq!(r1, 1);
412 assert_eq!(r2, 2);
413 assert_eq!(r3, 3);
414 }
415}