executor_core/
tokio.rs

1//! Integration with the Tokio async runtime.
2//!
3//! This module provides implementations of the [`Executor`] and [`LocalExecutor`] traits
4//! for the Tokio runtime, along with task wrappers that provide panic safety.
5
6#[cfg(feature = "std")]
7extern crate std;
8
9use crate::{Executor, LocalExecutor, Task};
10use alloc::boxed::Box;
11use core::{
12    future::Future,
13    pin::Pin,
14    task::{Context, Poll},
15};
16
17/// Global Tokio executor that can be used to spawn tasks.
18#[derive(Debug, Clone, Copy)]
19pub struct TokioGlobal;
20
21impl Executor for TokioGlobal {
22    type Task<T: Send + 'static> = TokioTask<T>;
23
24    fn spawn<Fut>(&self, fut: Fut) -> Self::Task<Fut::Output>
25    where
26        Fut: Future<Output: Send> + Send + 'static,
27    {
28        let handle = tokio::task::spawn(fut);
29        TokioTask { handle }
30    }
31}
32
33pub use tokio::{runtime::Runtime, task::JoinHandle, task::LocalSet};
34
35/// Task wrapper for Tokio's `JoinHandle` that implements the [`Task`] trait.
36///
37/// This provides panic safety and proper error handling for tasks spawned
38/// with Tokio's `spawn` function.
39pub struct TokioTask<T> {
40    handle: tokio::task::JoinHandle<T>,
41}
42
43impl<T> core::fmt::Debug for TokioTask<T> {
44    fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
45        f.debug_struct("TokioTask").finish_non_exhaustive()
46    }
47}
48
49impl<T: Send + 'static> Future for TokioTask<T> {
50    type Output = T;
51
52    fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
53        match Pin::new(&mut self.handle).poll(cx) {
54            Poll::Ready(Ok(result)) => Poll::Ready(result),
55            Poll::Ready(Err(err)) => {
56                if err.is_panic() {
57                    std::panic::resume_unwind(err.into_panic());
58                } else {
59                    // Task was cancelled
60                    std::panic::panic_any("Task was cancelled")
61                }
62            }
63            Poll::Pending => Poll::Pending,
64        }
65    }
66}
67
68impl<T: Send + 'static> Task<T> for TokioTask<T> {
69    fn poll_result(
70        mut self: Pin<&mut Self>,
71        cx: &mut Context<'_>,
72    ) -> Poll<Result<T, crate::Error>> {
73        match Pin::new(&mut self.handle).poll(cx) {
74            Poll::Ready(Ok(result)) => Poll::Ready(Ok(result)),
75            Poll::Ready(Err(err)) => {
76                let error: crate::Error = if err.is_panic() {
77                    err.into_panic()
78                } else {
79                    Box::new("Task was cancelled")
80                };
81                Poll::Ready(Err(error))
82            }
83            Poll::Pending => Poll::Pending,
84        }
85    }
86}
87
88impl<T> Drop for TokioTask<T> {
89    fn drop(&mut self) {
90        self.handle.abort();
91    }
92}
93
94/// Task wrapper for Tokio's local `JoinHandle` (non-Send futures).
95///
96/// This provides panic safety and proper error handling for tasks spawned
97/// with Tokio's `spawn_local` function.
98pub struct TokioLocalTask<T> {
99    handle: tokio::task::JoinHandle<T>,
100}
101
102impl<T> core::fmt::Debug for TokioLocalTask<T> {
103    fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
104        f.debug_struct("TokioLocalTask").finish_non_exhaustive()
105    }
106}
107
108impl<T: 'static> Future for TokioLocalTask<T> {
109    type Output = T;
110
111    fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
112        match Pin::new(&mut self.handle).poll(cx) {
113            Poll::Ready(Ok(result)) => Poll::Ready(result),
114            Poll::Ready(Err(err)) => {
115                if err.is_panic() {
116                    std::panic::resume_unwind(err.into_panic());
117                } else {
118                    // Task was cancelled
119                    std::panic::panic_any("Task was cancelled")
120                }
121            }
122            Poll::Pending => Poll::Pending,
123        }
124    }
125}
126
127impl<T: 'static> Task<T> for TokioLocalTask<T> {
128    fn poll_result(
129        mut self: Pin<&mut Self>,
130        cx: &mut Context<'_>,
131    ) -> Poll<Result<T, crate::Error>> {
132        match Pin::new(&mut self.handle).poll(cx) {
133            Poll::Ready(Ok(result)) => Poll::Ready(Ok(result)),
134            Poll::Ready(Err(err)) => {
135                let error: crate::Error = if err.is_panic() {
136                    err.into_panic()
137                } else {
138                    Box::new("Task was cancelled")
139                };
140                Poll::Ready(Err(error))
141            }
142            Poll::Pending => Poll::Pending,
143        }
144    }
145}
146
147impl Executor for tokio::runtime::Runtime {
148    type Task<T: Send + 'static> = TokioTask<T>;
149
150    fn spawn<Fut>(&self, fut: Fut) -> Self::Task<Fut::Output>
151    where
152        Fut: Future<Output: Send> + Send + 'static,
153    {
154        let handle = self.spawn(fut);
155        TokioTask { handle }
156    }
157}
158
159impl LocalExecutor for tokio::task::LocalSet {
160    type Task<T: 'static> = TokioLocalTask<T>;
161
162    fn spawn_local<Fut>(&self, fut: Fut) -> Self::Task<Fut::Output>
163    where
164        Fut: Future + 'static,
165    {
166        let handle = self.spawn_local(fut);
167        TokioLocalTask { handle }
168    }
169}
170
171#[cfg(test)]
172mod tests {
173    use super::*;
174    use crate::{Executor, LocalExecutor, Task};
175    use alloc::task::Wake;
176    use alloc::{format, sync::Arc};
177    use core::future::Future;
178    use core::{
179        pin::Pin,
180        task::{Context, Poll, Waker},
181    };
182    use tokio::time::{Duration, sleep};
183
184    struct TestWaker;
185    impl Wake for TestWaker {
186        fn wake(self: Arc<Self>) {}
187    }
188
189    fn create_waker() -> Waker {
190        Arc::new(TestWaker).into()
191    }
192
193    #[test]
194    fn test_default_executor_spawn() {
195        let executor = Runtime::new().expect("Failed to create Tokio runtime");
196        let task: TokioTask<i32> = Executor::spawn(&executor, async { 42 });
197        let result = executor.block_on(task);
198        assert_eq!(result, 42);
199    }
200
201    #[test]
202    fn test_default_executor_spawn_async_operation() {
203        let executor = Runtime::new().expect("Failed to create Tokio runtime");
204        let task: TokioTask<&str> = Executor::spawn(&executor, async {
205            sleep(Duration::from_millis(10)).await;
206            "completed"
207        });
208        let result = executor.block_on(task);
209        assert_eq!(result, "completed");
210    }
211
212    #[test]
213    fn test_tokio_task_future_impl() {
214        let executor = Runtime::new().expect("Failed to create Tokio runtime");
215        let mut task: TokioTask<i32> = Executor::spawn(&executor, async { 100 });
216
217        let waker = create_waker();
218        let mut cx = Context::from_waker(&waker);
219
220        match Pin::new(&mut task).poll(&mut cx) {
221            Poll::Ready(result) => assert_eq!(result, 100),
222            Poll::Pending => {
223                let result = executor.block_on(task);
224                assert_eq!(result, 100);
225            }
226        }
227    }
228
229    #[test]
230    fn test_tokio_task_poll_result() {
231        let executor = Runtime::new().expect("Failed to create Tokio runtime");
232        let mut task: TokioTask<&str> = Executor::spawn(&executor, async { "success" });
233
234        let waker = create_waker();
235        let mut cx = Context::from_waker(&waker);
236
237        match Pin::new(&mut task).poll_result(&mut cx) {
238            Poll::Ready(Ok(result)) => assert_eq!(result, "success"),
239            Poll::Ready(Err(_)) => panic!("Task should not fail"),
240            Poll::Pending => {
241                let result = executor.block_on(task.result());
242                assert!(result.is_ok());
243                assert_eq!(result.unwrap(), "success");
244            }
245        }
246    }
247
248    #[test]
249    fn test_tokio_task_panic_handling() {
250        let executor = Runtime::new().expect("Failed to create Tokio runtime");
251        let task: TokioTask<()> = Executor::spawn(&executor, async {
252            panic!("test panic");
253        });
254
255        let result = executor.block_on(task.result());
256        assert!(result.is_err());
257    }
258
259    #[test]
260    fn test_default_executor_default() {
261        let executor1 = Runtime::new().expect("Failed to create Tokio runtime");
262        let executor2 = Runtime::new().expect("Failed to create Tokio runtime");
263
264        let task1: TokioTask<i32> = Executor::spawn(&executor1, async { 1 });
265        let task2: TokioTask<i32> = Executor::spawn(&executor2, async { 2 });
266
267        assert_eq!(executor1.block_on(task1), 1);
268        assert_eq!(executor2.block_on(task2), 2);
269    }
270
271    #[test]
272    fn test_runtime_executor_impl() {
273        let rt = tokio::runtime::Runtime::new().unwrap();
274        let task: TokioTask<&str> = Executor::spawn(&rt, async { "runtime task" });
275        let result = rt.block_on(task);
276        assert_eq!(result, "runtime task");
277    }
278
279    #[tokio::test]
280    async fn test_local_set_executor() {
281        let local_set = tokio::task::LocalSet::new();
282
283        local_set
284            .run_until(async {
285                let task: TokioLocalTask<&str> =
286                    LocalExecutor::spawn_local(&local_set, async { "local task" });
287                let result = task.await;
288                assert_eq!(result, "local task");
289            })
290            .await;
291    }
292
293    #[tokio::test]
294    async fn test_tokio_local_task_future_impl() {
295        let local_set = tokio::task::LocalSet::new();
296
297        local_set
298            .run_until(async {
299                let mut task: TokioLocalTask<i32> =
300                    LocalExecutor::spawn_local(&local_set, async { 200 });
301
302                let waker = create_waker();
303                let mut cx = Context::from_waker(&waker);
304
305                match Pin::new(&mut task).poll(&mut cx) {
306                    Poll::Ready(result) => assert_eq!(result, 200),
307                    Poll::Pending => {
308                        let result = task.await;
309                        assert_eq!(result, 200);
310                    }
311                }
312            })
313            .await;
314    }
315
316    #[tokio::test]
317    async fn test_tokio_local_task_poll_result() {
318        let local_set = tokio::task::LocalSet::new();
319
320        local_set
321            .run_until(async {
322                let mut task: TokioLocalTask<&str> =
323                    LocalExecutor::spawn_local(&local_set, async { "local success" });
324
325                let waker = create_waker();
326                let mut cx = Context::from_waker(&waker);
327
328                match Pin::new(&mut task).poll_result(&mut cx) {
329                    Poll::Ready(Ok(result)) => assert_eq!(result, "local success"),
330                    Poll::Ready(Err(_)) => panic!("Local task should not fail"),
331                    Poll::Pending => {
332                        let result = task.result().await;
333                        assert!(result.is_ok());
334                        assert_eq!(result.unwrap(), "local success");
335                    }
336                }
337            })
338            .await;
339    }
340
341    #[tokio::test]
342    async fn test_tokio_local_task_panic_handling() {
343        let local_set = tokio::task::LocalSet::new();
344
345        local_set
346            .run_until(async {
347                let task: TokioLocalTask<()> = LocalExecutor::spawn_local(&local_set, async {
348                    panic!("local panic");
349                });
350
351                let result = task.result().await;
352                assert!(result.is_err());
353            })
354            .await;
355    }
356
357    #[test]
358    fn test_tokio_task_debug() {
359        let rt = tokio::runtime::Runtime::new().unwrap();
360        let task: TokioTask<i32> = Executor::spawn(&rt, async { 42 });
361        let debug_str = format!("{:?}", task);
362        assert!(debug_str.contains("TokioTask"));
363    }
364
365    #[test]
366    fn test_tokio_local_task_debug() {
367        let local_set = tokio::task::LocalSet::new();
368        let rt = tokio::runtime::Runtime::new().unwrap();
369
370        rt.block_on(local_set.run_until(async {
371            let task: TokioLocalTask<i32> = LocalExecutor::spawn_local(&local_set, async { 42 });
372            let debug_str = format!("{:?}", task);
373            assert!(debug_str.contains("TokioLocalTask"));
374        }));
375    }
376
377    #[test]
378    fn test_default_executor_debug() {
379        let executor = Runtime::new().expect("Failed to create Tokio runtime");
380        let debug_str = format!("{:?}", executor);
381        assert!(!debug_str.is_empty());
382    }
383
384    #[test]
385    fn test_task_result_future() {
386        let executor = Runtime::new().expect("Failed to create Tokio runtime");
387        let task: TokioTask<i32> = Executor::spawn(&executor, async { 123 });
388
389        let result = executor.block_on(task.result());
390        assert!(result.is_ok());
391        assert_eq!(result.unwrap(), 123);
392    }
393
394    #[test]
395    fn test_multiple_tasks_concurrency() {
396        let executor = Runtime::new().expect("Failed to create Tokio runtime");
397
398        let task1: TokioTask<i32> = Executor::spawn(&executor, async {
399            sleep(Duration::from_millis(50)).await;
400            1
401        });
402
403        let task2: TokioTask<i32> = Executor::spawn(&executor, async {
404            sleep(Duration::from_millis(25)).await;
405            2
406        });
407
408        let task3: TokioTask<i32> = Executor::spawn(&executor, async { 3 });
409
410        let (r1, r2, r3) = executor.block_on(async { tokio::join!(task1, task2, task3) });
411        assert_eq!(r1, 1);
412        assert_eq!(r2, 2);
413        assert_eq!(r3, 3);
414    }
415}