qubit_tokio_executor/tokio_task_handle.rs
1/*******************************************************************************
2 *
3 * Copyright (c) 2025 - 2026 Haixing Hu.
4 *
5 * SPDX-License-Identifier: Apache-2.0
6 *
7 * Licensed under the Apache License, Version 2.0.
8 *
9 ******************************************************************************/
10use std::{
11 future::Future,
12 pin::Pin,
13 task::{
14 Context,
15 Poll,
16 },
17};
18
19use tokio::task::{
20 JoinError,
21 JoinHandle,
22};
23
24use qubit_executor::{
25 TaskExecutionError,
26 TaskResult,
27};
28
29/// Async handle returned by Tokio-backed executor services.
30///
31/// Awaiting this handle reports the accepted task's final result, including
32/// task failure, panic, or cancellation.
33///
34/// # Type Parameters
35///
36/// * `R` - The task success value.
37/// * `E` - The task error value.
38///
39pub struct TokioTaskHandle<R, E> {
40 /// Tokio task whose output is the accepted task's final result.
41 handle: JoinHandle<TaskResult<R, E>>,
42}
43
44impl<R, E> TokioTaskHandle<R, E> {
45 /// Creates a handle from a Tokio join handle.
46 ///
47 /// # Parameters
48 ///
49 /// * `handle` - The Tokio join handle that resolves to a task result.
50 ///
51 /// # Returns
52 ///
53 /// A task handle that can be awaited.
54 #[inline]
55 pub(crate) fn new(handle: JoinHandle<TaskResult<R, E>>) -> Self {
56 Self { handle }
57 }
58
59 /// Requests cancellation of the underlying Tokio task.
60 ///
61 /// For blocking tasks submitted through `spawn_blocking`, Tokio can prevent
62 /// execution only if the task has not started yet. Already running blocking
63 /// work continues until the closure returns.
64 ///
65 /// # Returns
66 ///
67 /// `true` after cancellation has been requested.
68 #[inline]
69 pub fn cancel(&self) -> bool {
70 self.handle.abort();
71 true
72 }
73
74 /// Returns whether the underlying Tokio task has finished.
75 ///
76 /// # Returns
77 ///
78 /// `true` if the Tokio task is complete.
79 #[inline]
80 pub fn is_done(&self) -> bool {
81 self.handle.is_finished()
82 }
83}
84
85impl<R, E> Future for TokioTaskHandle<R, E> {
86 type Output = TaskResult<R, E>;
87
88 /// Polls the underlying Tokio task.
89 ///
90 /// # Parameters
91 ///
92 /// * `cx` - Async task context used to register the current waker.
93 ///
94 /// # Returns
95 ///
96 /// `Poll::Ready` with the task result when the Tokio task completes, or
97 /// `Poll::Pending` while it is still running. Tokio cancellation and panic
98 /// join errors are converted to [`TaskExecutionError`] values.
99 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
100 let this = self.get_mut();
101 match Pin::new(&mut this.handle).poll(cx) {
102 Poll::Ready(Ok(result)) => Poll::Ready(result),
103 Poll::Ready(Err(error)) => Poll::Ready(Err(join_error_to_task_error(error))),
104 Poll::Pending => Poll::Pending,
105 }
106 }
107}
108
109/// Converts a Tokio join error into a task execution error.
110///
111/// # Parameters
112///
113/// * `error` - Join error returned by Tokio for an aborted or panicked task.
114///
115/// # Returns
116///
117/// [`TaskExecutionError::Cancelled`] for aborted tasks, otherwise
118/// [`TaskExecutionError::Panicked`].
119fn join_error_to_task_error<E>(error: JoinError) -> TaskExecutionError<E> {
120 if error.is_cancelled() {
121 TaskExecutionError::Cancelled
122 } else {
123 TaskExecutionError::Panicked
124 }
125}