Skip to main content

qubit_tokio_executor/
tokio_task_handle.rs

1/*******************************************************************************
2 *
3 *    Copyright (c) 2025 - 2026.
4 *    Haixing Hu, Qubit Co. Ltd.
5 *
6 *    All rights reserved.
7 *
8 ******************************************************************************/
9use std::{
10    future::Future,
11    pin::Pin,
12    task::{
13        Context,
14        Poll,
15    },
16};
17
18use tokio::task::{
19    JoinError,
20    JoinHandle,
21};
22
23use qubit_executor::{
24    TaskExecutionError,
25    TaskResult,
26};
27
28/// Async handle returned by Tokio-backed executor services.
29///
30/// Awaiting this handle reports the accepted task's final result, including
31/// task failure, panic, or cancellation.
32///
33/// # Type Parameters
34///
35/// * `R` - The task success value.
36/// * `E` - The task error value.
37///
38/// # Author
39///
40/// Haixing Hu
41pub struct TokioTaskHandle<R, E> {
42    /// Tokio task whose output is the accepted task's final result.
43    handle: JoinHandle<TaskResult<R, E>>,
44}
45
46impl<R, E> TokioTaskHandle<R, E> {
47    /// Creates a handle from a Tokio join handle.
48    ///
49    /// # Parameters
50    ///
51    /// * `handle` - The Tokio join handle that resolves to a task result.
52    ///
53    /// # Returns
54    ///
55    /// A task handle that can be awaited.
56    #[inline]
57    pub(crate) fn new(handle: JoinHandle<TaskResult<R, E>>) -> Self {
58        Self { handle }
59    }
60
61    /// Requests cancellation of the underlying Tokio task.
62    ///
63    /// For blocking tasks submitted through `spawn_blocking`, Tokio can prevent
64    /// execution only if the task has not started yet. Already running blocking
65    /// work continues until the closure returns.
66    ///
67    /// # Returns
68    ///
69    /// `true` after cancellation has been requested.
70    #[inline]
71    pub fn cancel(&self) -> bool {
72        self.handle.abort();
73        true
74    }
75
76    /// Returns whether the underlying Tokio task has finished.
77    ///
78    /// # Returns
79    ///
80    /// `true` if the Tokio task is complete.
81    #[inline]
82    pub fn is_done(&self) -> bool {
83        self.handle.is_finished()
84    }
85}
86
87impl<R, E> Future for TokioTaskHandle<R, E> {
88    type Output = TaskResult<R, E>;
89
90    /// Polls the underlying Tokio task.
91    ///
92    /// # Parameters
93    ///
94    /// * `cx` - Async task context used to register the current waker.
95    ///
96    /// # Returns
97    ///
98    /// `Poll::Ready` with the task result when the Tokio task completes, or
99    /// `Poll::Pending` while it is still running. Tokio cancellation and panic
100    /// join errors are converted to [`TaskExecutionError`] values.
101    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
102        let this = self.get_mut();
103        match Pin::new(&mut this.handle).poll(cx) {
104            Poll::Ready(Ok(result)) => Poll::Ready(result),
105            Poll::Ready(Err(error)) => Poll::Ready(Err(join_error_to_task_error(error))),
106            Poll::Pending => Poll::Pending,
107        }
108    }
109}
110
111/// Converts a Tokio join error into a task execution error.
112///
113/// # Parameters
114///
115/// * `error` - Join error returned by Tokio for an aborted or panicked task.
116///
117/// # Returns
118///
119/// [`TaskExecutionError::Cancelled`] for aborted tasks, otherwise
120/// [`TaskExecutionError::Panicked`].
121fn join_error_to_task_error<E>(error: JoinError) -> TaskExecutionError<E> {
122    if error.is_cancelled() {
123        TaskExecutionError::Cancelled
124    } else {
125        TaskExecutionError::Panicked
126    }
127}