qubit_tokio_executor/tokio_task_handle.rs
1/*******************************************************************************
2 *
3 * Copyright (c) 2025 - 2026.
4 * Haixing Hu, Qubit Co. Ltd.
5 *
6 * All rights reserved.
7 *
8 ******************************************************************************/
9use std::{
10 future::Future,
11 pin::Pin,
12 task::{
13 Context,
14 Poll,
15 },
16};
17
18use tokio::task::{
19 JoinError,
20 JoinHandle,
21};
22
23use qubit_executor::{
24 TaskExecutionError,
25 TaskResult,
26};
27
28/// Async handle returned by Tokio-backed executor services.
29///
30/// Awaiting this handle reports the accepted task's final result, including
31/// task failure, panic, or cancellation.
32///
33/// # Type Parameters
34///
35/// * `R` - The task success value.
36/// * `E` - The task error value.
37///
38/// # Author
39///
40/// Haixing Hu
41pub struct TokioTaskHandle<R, E> {
42 /// Tokio task whose output is the accepted task's final result.
43 handle: JoinHandle<TaskResult<R, E>>,
44}
45
46impl<R, E> TokioTaskHandle<R, E> {
47 /// Creates a handle from a Tokio join handle.
48 ///
49 /// # Parameters
50 ///
51 /// * `handle` - The Tokio join handle that resolves to a task result.
52 ///
53 /// # Returns
54 ///
55 /// A task handle that can be awaited.
56 #[inline]
57 pub(crate) fn new(handle: JoinHandle<TaskResult<R, E>>) -> Self {
58 Self { handle }
59 }
60
61 /// Requests cancellation of the underlying Tokio task.
62 ///
63 /// For blocking tasks submitted through `spawn_blocking`, Tokio can prevent
64 /// execution only if the task has not started yet. Already running blocking
65 /// work continues until the closure returns.
66 ///
67 /// # Returns
68 ///
69 /// `true` after cancellation has been requested.
70 #[inline]
71 pub fn cancel(&self) -> bool {
72 self.handle.abort();
73 true
74 }
75
76 /// Returns whether the underlying Tokio task has finished.
77 ///
78 /// # Returns
79 ///
80 /// `true` if the Tokio task is complete.
81 #[inline]
82 pub fn is_done(&self) -> bool {
83 self.handle.is_finished()
84 }
85}
86
87impl<R, E> Future for TokioTaskHandle<R, E> {
88 type Output = TaskResult<R, E>;
89
90 /// Polls the underlying Tokio task.
91 ///
92 /// # Parameters
93 ///
94 /// * `cx` - Async task context used to register the current waker.
95 ///
96 /// # Returns
97 ///
98 /// `Poll::Ready` with the task result when the Tokio task completes, or
99 /// `Poll::Pending` while it is still running. Tokio cancellation and panic
100 /// join errors are converted to [`TaskExecutionError`] values.
101 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
102 let this = self.get_mut();
103 match Pin::new(&mut this.handle).poll(cx) {
104 Poll::Ready(Ok(result)) => Poll::Ready(result),
105 Poll::Ready(Err(error)) => Poll::Ready(Err(join_error_to_task_error(error))),
106 Poll::Pending => Poll::Pending,
107 }
108 }
109}
110
111/// Converts a Tokio join error into a task execution error.
112///
113/// # Parameters
114///
115/// * `error` - Join error returned by Tokio for an aborted or panicked task.
116///
117/// # Returns
118///
119/// [`TaskExecutionError::Cancelled`] for aborted tasks, otherwise
120/// [`TaskExecutionError::Panicked`].
121fn join_error_to_task_error<E>(error: JoinError) -> TaskExecutionError<E> {
122 if error.is_cancelled() {
123 TaskExecutionError::Cancelled
124 } else {
125 TaskExecutionError::Panicked
126 }
127}