qubit_tokio_executor/tokio_blocking_task_handle.rs
1/*******************************************************************************
2 *
3 * Copyright (c) 2025 - 2026 Haixing Hu.
4 *
5 * SPDX-License-Identifier: Apache-2.0
6 *
7 * Licensed under the Apache License, Version 2.0.
8 *
9 ******************************************************************************/
10use std::future::IntoFuture;
11
12use qubit_executor::task::TaskHandleFuture;
13use qubit_executor::{
14 CancelResult,
15 TaskResult,
16 TaskStatus,
17 TrackedTask,
18 TryGet,
19 task::spi::{
20 TaskResultHandle,
21 TrackedTaskHandle,
22 },
23};
24use tokio::task::AbortHandle;
25
26/// Callback used to finish service-side queued-task accounting.
27type CancelQueuedTask = Box<dyn Fn() + Send + Sync + 'static>;
28
29/// Tracked handle for tasks submitted to Tokio's blocking task pool.
30///
31/// This handle wraps the standard [`TrackedTask`] result/status endpoint and
32/// additionally keeps Tokio's [`AbortHandle`] so pre-start cancellation can
33/// remove queued `spawn_blocking` work from the Tokio runtime.
34///
35/// Tokio cannot abort blocking work after the closure has started. In that
36/// case [`Self::cancel`] reports [`CancelResult::AlreadyRunning`] through the
37/// underlying tracked task state.
38pub struct TokioBlockingTaskHandle<R, E> {
39 /// Standard tracked task endpoint used for result and status observation.
40 handle: TrackedTask<R, E>,
41 /// Tokio abort handle used to remove queued blocking work after cancellation.
42 abort_handle: AbortHandle,
43 /// Callback that completes queued-task accounting after cancellation wins.
44 cancel_queued_task: CancelQueuedTask,
45}
46
47impl<R, E> TokioBlockingTaskHandle<R, E> {
48 /// Creates a blocking task handle.
49 ///
50 /// # Parameters
51 ///
52 /// * `handle` - Standard tracked task endpoint.
53 /// * `abort_handle` - Tokio abort handle for the submitted blocking task.
54 /// * `cancel_queued_task` - Callback that finishes service-side queued
55 /// task accounting when cancellation wins before the task starts.
56 ///
57 /// # Returns
58 ///
59 /// A tracked Tokio blocking task handle.
60 #[inline]
61 pub(crate) fn new<F>(
62 handle: TrackedTask<R, E>,
63 abort_handle: AbortHandle,
64 cancel_queued_task: F,
65 ) -> Self
66 where
67 F: Fn() + Send + Sync + 'static,
68 {
69 Self {
70 handle,
71 abort_handle,
72 cancel_queued_task: Box::new(cancel_queued_task),
73 }
74 }
75
76 /// Waits for the task to finish and returns its final result.
77 ///
78 /// This method blocks the current thread until a result is available.
79 ///
80 /// # Returns
81 ///
82 /// The final task result.
83 #[inline]
84 pub fn get(self) -> TaskResult<R, E>
85 where
86 R: Send,
87 E: Send,
88 {
89 <Self as TaskResultHandle<R, E>>::get(self)
90 }
91
92 /// Attempts to retrieve the final result without blocking.
93 ///
94 /// # Returns
95 ///
96 /// A ready result or the pending handle.
97 #[inline]
98 pub fn try_get(self) -> TryGet<Self, R, E>
99 where
100 R: Send,
101 E: Send,
102 {
103 <Self as TaskResultHandle<R, E>>::try_get(self)
104 }
105
106 /// Returns whether the task has installed a terminal state.
107 ///
108 /// # Returns
109 ///
110 /// `true` after the task succeeds, fails, panics, is cancelled, or loses
111 /// its runner endpoint.
112 #[inline]
113 pub fn is_done(&self) -> bool
114 where
115 R: Send,
116 E: Send,
117 {
118 <Self as TaskResultHandle<R, E>>::is_done(self)
119 }
120
121 /// Returns the currently observed task status.
122 ///
123 /// # Returns
124 ///
125 /// The current task status.
126 #[inline]
127 pub fn status(&self) -> TaskStatus {
128 self.handle.status()
129 }
130
131 /// Attempts to cancel this task before its blocking closure starts.
132 ///
133 /// When cancellation wins the pending-state race, this method also aborts
134 /// the Tokio `spawn_blocking` task so queued work is dropped without waiting
135 /// for an available blocking thread.
136 ///
137 /// # Returns
138 ///
139 /// The observed cancellation outcome.
140 #[must_use]
141 #[inline]
142 pub fn cancel(&self) -> CancelResult {
143 let result = self.handle.cancel();
144 if result == CancelResult::Cancelled {
145 self.abort_handle.abort();
146 (self.cancel_queued_task)();
147 }
148 result
149 }
150}
151
152impl<R, E> TaskResultHandle<R, E> for TokioBlockingTaskHandle<R, E>
153where
154 R: Send,
155 E: Send,
156{
157 /// Returns whether the tracked state is terminal.
158 #[inline]
159 fn is_done(&self) -> bool {
160 self.handle.is_done()
161 }
162
163 /// Blocks until the underlying result handle yields a result.
164 #[inline]
165 fn get(self) -> TaskResult<R, E> {
166 self.handle.get()
167 }
168
169 /// Attempts to retrieve the underlying result without blocking.
170 #[inline]
171 fn try_get(self) -> TryGet<Self, R, E> {
172 let Self {
173 handle,
174 abort_handle,
175 cancel_queued_task,
176 } = self;
177 match handle.try_get() {
178 TryGet::Ready(result) => TryGet::Ready(result),
179 TryGet::Pending(handle) => TryGet::Pending(Self {
180 handle,
181 abort_handle,
182 cancel_queued_task,
183 }),
184 }
185 }
186}
187
188impl<R, E> TrackedTaskHandle<R, E> for TokioBlockingTaskHandle<R, E>
189where
190 R: Send,
191 E: Send,
192{
193 /// Returns the currently observed task status.
194 #[inline]
195 fn status(&self) -> TaskStatus {
196 self.handle.status()
197 }
198
199 /// Attempts to cancel the task before it starts.
200 #[inline]
201 fn cancel(&self) -> CancelResult {
202 Self::cancel(self)
203 }
204}
205
206impl<R, E> IntoFuture for TokioBlockingTaskHandle<R, E> {
207 type Output = TaskResult<R, E>;
208 type IntoFuture = TaskHandleFuture<R, E>;
209
210 /// Converts this tracked handle into a future resolving to the task result.
211 #[inline]
212 fn into_future(self) -> Self::IntoFuture {
213 self.handle.into_future()
214 }
215}