qubit_tokio_executor/tokio_blocking_task_handle.rs
1/*******************************************************************************
2 *
3 * Copyright (c) 2025 - 2026 Haixing Hu.
4 *
5 * SPDX-License-Identifier: Apache-2.0
6 *
7 * Licensed under the Apache License, Version 2.0.
8 *
9 ******************************************************************************/
10use std::future::IntoFuture;
11
12use qubit_executor::task::TaskHandleFuture;
13use qubit_executor::{
14 CancelResult,
15 TaskResult,
16 TaskStatus,
17 TrackedTask,
18 TryGet,
19 task::spi::{
20 TaskResultHandle,
21 TrackedTaskHandle,
22 },
23};
24use tokio::task::AbortHandle;
25
26/// Callback used to finish service-side queued-task accounting.
27type CancelQueuedTask = Box<dyn Fn() + Send + Sync + 'static>;
28
29/// Tracked handle for tasks submitted to Tokio's blocking task pool.
30///
31/// This handle wraps the standard [`TrackedTask`] result/status endpoint and
32/// additionally keeps Tokio's [`AbortHandle`] so pre-start cancellation can
33/// remove queued `spawn_blocking` work from the Tokio runtime.
34///
35/// Tokio cannot abort blocking work after the closure has started. In that
36/// case [`Self::cancel`] reports [`CancelResult::AlreadyRunning`] through the
37/// underlying tracked task state.
38pub struct TokioBlockingTaskHandle<R, E> {
39 /// Standard tracked task endpoint used for result and status observation.
40 handle: TrackedTask<R, E>,
41 /// Tokio abort handle used to remove queued blocking work after cancellation.
42 abort_handle: AbortHandle,
43 /// Callback that completes queued-task accounting after cancellation wins.
44 cancel_queued_task: CancelQueuedTask,
45}
46
47impl<R, E> TokioBlockingTaskHandle<R, E> {
48 /// Creates a blocking task handle.
49 ///
50 /// # Parameters
51 ///
52 /// * `handle` - Standard tracked task endpoint.
53 /// * `abort_handle` - Tokio abort handle for the submitted blocking task.
54 /// * `cancel_queued_task` - Callback that finishes service-side queued
55 /// task accounting when cancellation wins before the task starts.
56 ///
57 /// # Returns
58 ///
59 /// A tracked Tokio blocking task handle.
60 #[inline]
61 pub(crate) fn new<F>(handle: TrackedTask<R, E>, abort_handle: AbortHandle, cancel_queued_task: F) -> Self
62 where
63 F: Fn() + Send + Sync + 'static,
64 {
65 Self {
66 handle,
67 abort_handle,
68 cancel_queued_task: Box::new(cancel_queued_task),
69 }
70 }
71
72 /// Waits for the task to finish and returns its final result.
73 ///
74 /// This method blocks the current thread until a result is available.
75 ///
76 /// # Returns
77 ///
78 /// The final task result.
79 #[inline]
80 pub fn get(self) -> TaskResult<R, E>
81 where
82 R: Send,
83 E: Send,
84 {
85 <Self as TaskResultHandle<R, E>>::get(self)
86 }
87
88 /// Attempts to retrieve the final result without blocking.
89 ///
90 /// # Returns
91 ///
92 /// A ready result or the pending handle.
93 #[inline]
94 pub fn try_get(self) -> TryGet<Self, R, E>
95 where
96 R: Send,
97 E: Send,
98 {
99 <Self as TaskResultHandle<R, E>>::try_get(self)
100 }
101
102 /// Returns whether the task has installed a terminal state.
103 ///
104 /// # Returns
105 ///
106 /// `true` after the task succeeds, fails, panics, is cancelled, or loses
107 /// its runner endpoint.
108 #[inline]
109 pub fn is_done(&self) -> bool
110 where
111 R: Send,
112 E: Send,
113 {
114 <Self as TaskResultHandle<R, E>>::is_done(self)
115 }
116
117 /// Returns the currently observed task status.
118 ///
119 /// # Returns
120 ///
121 /// The current task status.
122 #[inline]
123 pub fn status(&self) -> TaskStatus {
124 self.handle.status()
125 }
126
127 /// Attempts to cancel this task before its blocking closure starts.
128 ///
129 /// When cancellation wins the pending-state race, this method also aborts
130 /// the Tokio `spawn_blocking` task so queued work is dropped without waiting
131 /// for an available blocking thread.
132 ///
133 /// # Returns
134 ///
135 /// The observed cancellation outcome.
136 #[must_use]
137 #[inline]
138 pub fn cancel(&self) -> CancelResult {
139 let result = self.handle.cancel();
140 if result == CancelResult::Cancelled {
141 (self.cancel_queued_task)();
142 self.abort_handle.abort();
143 }
144 result
145 }
146}
147
148impl<R, E> TaskResultHandle<R, E> for TokioBlockingTaskHandle<R, E>
149where
150 R: Send,
151 E: Send,
152{
153 /// Returns whether the tracked state is terminal.
154 #[inline]
155 fn is_done(&self) -> bool {
156 self.handle.is_done()
157 }
158
159 /// Blocks until the underlying result handle yields a result.
160 #[inline]
161 fn get(self) -> TaskResult<R, E> {
162 self.handle.get()
163 }
164
165 /// Attempts to retrieve the underlying result without blocking.
166 #[inline]
167 fn try_get(self) -> TryGet<Self, R, E> {
168 let Self {
169 handle,
170 abort_handle,
171 cancel_queued_task,
172 } = self;
173 match handle.try_get() {
174 TryGet::Ready(result) => TryGet::Ready(result),
175 TryGet::Pending(handle) => TryGet::Pending(Self {
176 handle,
177 abort_handle,
178 cancel_queued_task,
179 }),
180 }
181 }
182}
183
184impl<R, E> TrackedTaskHandle<R, E> for TokioBlockingTaskHandle<R, E>
185where
186 R: Send,
187 E: Send,
188{
189 /// Returns the currently observed task status.
190 #[inline]
191 fn status(&self) -> TaskStatus {
192 self.handle.status()
193 }
194
195 /// Attempts to cancel the task before it starts.
196 #[inline]
197 fn cancel(&self) -> CancelResult {
198 Self::cancel(self)
199 }
200}
201
202impl<R, E> IntoFuture for TokioBlockingTaskHandle<R, E> {
203 type Output = TaskResult<R, E>;
204 type IntoFuture = TaskHandleFuture<R, E>;
205
206 /// Converts this tracked handle into a future resolving to the task result.
207 #[inline]
208 fn into_future(self) -> Self::IntoFuture {
209 self.handle.into_future()
210 }
211}