qubit_tokio_executor/tokio_task_handle.rs
1/*******************************************************************************
2 *
3 * Copyright (c) 2025 - 2026 Haixing Hu.
4 *
5 * SPDX-License-Identifier: Apache-2.0
6 *
7 * Licensed under the Apache License, Version 2.0.
8 *
9 ******************************************************************************/
10use std::{
11 future::Future,
12 pin::Pin,
13 task::{
14 Context,
15 Poll,
16 },
17};
18
19use tokio::task::{
20 JoinError,
21 JoinHandle,
22};
23
24use qubit_executor::{
25 CancelResult,
26 TaskExecutionError,
27 TaskResult,
28};
29
30/// Async handle returned by Tokio-backed executor services.
31///
32/// Awaiting this handle reports the accepted task's final result, including
33/// task failure, panic, or cancellation.
34///
35/// # Type Parameters
36///
37/// * `R` - The task success value.
38/// * `E` - The task error value.
39///
40pub struct TokioTaskHandle<R, E> {
41 /// Tokio task whose output is the accepted task's final result.
42 handle: JoinHandle<TaskResult<R, E>>,
43}
44
45impl<R, E> TokioTaskHandle<R, E> {
46 /// Creates a handle from a Tokio join handle.
47 ///
48 /// # Parameters
49 ///
50 /// * `handle` - The Tokio join handle that resolves to a task result.
51 ///
52 /// # Returns
53 ///
54 /// A task handle that can be awaited.
55 #[inline]
56 pub(crate) fn new(handle: JoinHandle<TaskResult<R, E>>) -> Self {
57 Self { handle }
58 }
59
60 /// Requests cancellation of the underlying Tokio task.
61 ///
62 /// For blocking tasks submitted through `spawn_blocking`, Tokio can prevent
63 /// execution only if the task has not started yet. Already running blocking
64 /// work continues until the closure returns.
65 ///
66 /// # Returns
67 ///
68 /// [`CancelResult::Cancelled`] when an abort request was sent, or
69 /// [`CancelResult::AlreadyFinished`] if the Tokio task had already
70 /// completed.
71 #[must_use]
72 #[inline]
73 pub fn cancel(&self) -> CancelResult {
74 if self.handle.is_finished() {
75 return CancelResult::AlreadyFinished;
76 }
77 self.handle.abort();
78 CancelResult::Cancelled
79 }
80
81 /// Returns whether the underlying Tokio task has finished.
82 ///
83 /// # Returns
84 ///
85 /// `true` if the Tokio task is complete.
86 #[inline]
87 pub fn is_done(&self) -> bool {
88 self.handle.is_finished()
89 }
90}
91
92impl<R, E> Future for TokioTaskHandle<R, E> {
93 type Output = TaskResult<R, E>;
94
95 /// Polls the underlying Tokio task.
96 ///
97 /// # Parameters
98 ///
99 /// * `cx` - Async task context used to register the current waker.
100 ///
101 /// # Returns
102 ///
103 /// `Poll::Ready` with the task result when the Tokio task completes, or
104 /// `Poll::Pending` while it is still running. Tokio cancellation and panic
105 /// join errors are converted to [`TaskExecutionError`] values.
106 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
107 let this = self.get_mut();
108 match Pin::new(&mut this.handle).poll(cx) {
109 Poll::Ready(Ok(result)) => Poll::Ready(result),
110 Poll::Ready(Err(error)) => Poll::Ready(Err(join_error_to_task_error(error))),
111 Poll::Pending => Poll::Pending,
112 }
113 }
114}
115
116/// Converts a Tokio join error into a task execution error.
117///
118/// # Parameters
119///
120/// * `error` - Join error returned by Tokio for an aborted or panicked task.
121///
122/// # Returns
123///
124/// [`TaskExecutionError::Cancelled`] for aborted tasks, otherwise
125/// [`TaskExecutionError::Panicked`].
126fn join_error_to_task_error<E>(error: JoinError) -> TaskExecutionError<E> {
127 if error.is_cancelled() {
128 TaskExecutionError::Cancelled
129 } else {
130 TaskExecutionError::Panicked
131 }
132}