Skip to main content

qubit_tokio_executor/
tokio_task_handle.rs

1/*******************************************************************************
2 *
3 *    Copyright (c) 2025 - 2026 Haixing Hu.
4 *
5 *    SPDX-License-Identifier: Apache-2.0
6 *
7 *    Licensed under the Apache License, Version 2.0.
8 *
9 ******************************************************************************/
10use std::{
11    future::Future,
12    pin::Pin,
13    task::{
14        Context,
15        Poll,
16    },
17};
18
19use tokio::task::{
20    JoinError,
21    JoinHandle,
22};
23
24use qubit_executor::{
25    CancelResult,
26    TaskExecutionError,
27    TaskResult,
28};
29
30/// Async handle returned by Tokio-backed executor services.
31///
32/// Awaiting this handle reports the accepted task's final result, including
33/// task failure, panic, or cancellation.
34///
35/// # Type Parameters
36///
37/// * `R` - The task success value.
38/// * `E` - The task error value.
39///
40pub struct TokioTaskHandle<R, E> {
41    /// Tokio task whose output is the accepted task's final result.
42    handle: JoinHandle<TaskResult<R, E>>,
43}
44
45impl<R, E> TokioTaskHandle<R, E> {
46    /// Creates a handle from a Tokio join handle.
47    ///
48    /// # Parameters
49    ///
50    /// * `handle` - The Tokio join handle that resolves to a task result.
51    ///
52    /// # Returns
53    ///
54    /// A task handle that can be awaited.
55    #[inline]
56    pub(crate) fn new(handle: JoinHandle<TaskResult<R, E>>) -> Self {
57        Self { handle }
58    }
59
60    /// Requests cancellation of the underlying Tokio task.
61    ///
62    /// For blocking tasks submitted through `spawn_blocking`, Tokio can prevent
63    /// execution only if the task has not started yet. Already running blocking
64    /// work continues until the closure returns.
65    ///
66    /// # Returns
67    ///
68    /// [`CancelResult::Cancelled`] when an abort request was sent, or
69    /// [`CancelResult::AlreadyFinished`] if the Tokio task had already
70    /// completed.
71    #[must_use]
72    #[inline]
73    pub fn cancel(&self) -> CancelResult {
74        if self.handle.is_finished() {
75            return CancelResult::AlreadyFinished;
76        }
77        self.handle.abort();
78        CancelResult::Cancelled
79    }
80
81    /// Returns whether the underlying Tokio task has finished.
82    ///
83    /// # Returns
84    ///
85    /// `true` if the Tokio task is complete.
86    #[inline]
87    pub fn is_done(&self) -> bool {
88        self.handle.is_finished()
89    }
90}
91
92impl<R, E> Future for TokioTaskHandle<R, E> {
93    type Output = TaskResult<R, E>;
94
95    /// Polls the underlying Tokio task.
96    ///
97    /// # Parameters
98    ///
99    /// * `cx` - Async task context used to register the current waker.
100    ///
101    /// # Returns
102    ///
103    /// `Poll::Ready` with the task result when the Tokio task completes, or
104    /// `Poll::Pending` while it is still running. Tokio cancellation and panic
105    /// join errors are converted to [`TaskExecutionError`] values.
106    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
107        let this = self.get_mut();
108        match Pin::new(&mut this.handle).poll(cx) {
109            Poll::Ready(Ok(result)) => Poll::Ready(result),
110            Poll::Ready(Err(error)) => Poll::Ready(Err(join_error_to_task_error(error))),
111            Poll::Pending => Poll::Pending,
112        }
113    }
114}
115
116/// Converts a Tokio join error into a task execution error.
117///
118/// # Parameters
119///
120/// * `error` - Join error returned by Tokio for an aborted or panicked task.
121///
122/// # Returns
123///
124/// [`TaskExecutionError::Cancelled`] for aborted tasks, otherwise
125/// [`TaskExecutionError::Panicked`].
126fn join_error_to_task_error<E>(error: JoinError) -> TaskExecutionError<E> {
127    if error.is_cancelled() {
128        TaskExecutionError::Cancelled
129    } else {
130        TaskExecutionError::Panicked
131    }
132}