Skip to main content

qubit_tokio_executor/
tokio_task_handle.rs

1/*******************************************************************************
2 *
3 *    Copyright (c) 2025 - 2026 Haixing Hu.
4 *
5 *    SPDX-License-Identifier: Apache-2.0
6 *
7 *    Licensed under the Apache License, Version 2.0.
8 *
9 ******************************************************************************/
10use std::{
11    future::Future,
12    pin::Pin,
13    task::{
14        Context,
15        Poll,
16    },
17};
18
19use tokio::task::{
20    JoinError,
21    JoinHandle,
22};
23
24use qubit_executor::{
25    TaskExecutionError,
26    TaskResult,
27};
28
29/// Async handle returned by Tokio-backed executor services.
30///
31/// Awaiting this handle reports the accepted task's final result, including
32/// task failure, panic, or cancellation.
33///
34/// # Type Parameters
35///
36/// * `R` - The task success value.
37/// * `E` - The task error value.
38///
39pub struct TokioTaskHandle<R, E> {
40    /// Tokio task whose output is the accepted task's final result.
41    handle: JoinHandle<TaskResult<R, E>>,
42}
43
44impl<R, E> TokioTaskHandle<R, E> {
45    /// Creates a handle from a Tokio join handle.
46    ///
47    /// # Parameters
48    ///
49    /// * `handle` - The Tokio join handle that resolves to a task result.
50    ///
51    /// # Returns
52    ///
53    /// A task handle that can be awaited.
54    #[inline]
55    pub(crate) fn new(handle: JoinHandle<TaskResult<R, E>>) -> Self {
56        Self { handle }
57    }
58
59    /// Requests cancellation of the underlying Tokio task.
60    ///
61    /// For blocking tasks submitted through `spawn_blocking`, Tokio can prevent
62    /// execution only if the task has not started yet. Already running blocking
63    /// work continues until the closure returns.
64    ///
65    /// # Returns
66    ///
67    /// `true` after cancellation has been requested.
68    #[inline]
69    pub fn cancel(&self) -> bool {
70        self.handle.abort();
71        true
72    }
73
74    /// Returns whether the underlying Tokio task has finished.
75    ///
76    /// # Returns
77    ///
78    /// `true` if the Tokio task is complete.
79    #[inline]
80    pub fn is_done(&self) -> bool {
81        self.handle.is_finished()
82    }
83}
84
85impl<R, E> Future for TokioTaskHandle<R, E> {
86    type Output = TaskResult<R, E>;
87
88    /// Polls the underlying Tokio task.
89    ///
90    /// # Parameters
91    ///
92    /// * `cx` - Async task context used to register the current waker.
93    ///
94    /// # Returns
95    ///
96    /// `Poll::Ready` with the task result when the Tokio task completes, or
97    /// `Poll::Pending` while it is still running. Tokio cancellation and panic
98    /// join errors are converted to [`TaskExecutionError`] values.
99    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
100        let this = self.get_mut();
101        match Pin::new(&mut this.handle).poll(cx) {
102            Poll::Ready(Ok(result)) => Poll::Ready(result),
103            Poll::Ready(Err(error)) => Poll::Ready(Err(join_error_to_task_error(error))),
104            Poll::Pending => Poll::Pending,
105        }
106    }
107}
108
109/// Converts a Tokio join error into a task execution error.
110///
111/// # Parameters
112///
113/// * `error` - Join error returned by Tokio for an aborted or panicked task.
114///
115/// # Returns
116///
117/// [`TaskExecutionError::Cancelled`] for aborted tasks, otherwise
118/// [`TaskExecutionError::Panicked`].
119fn join_error_to_task_error<E>(error: JoinError) -> TaskExecutionError<E> {
120    if error.is_cancelled() {
121        TaskExecutionError::Cancelled
122    } else {
123        TaskExecutionError::Panicked
124    }
125}