Skip to main content

qubit_tokio_executor/
tokio_blocking_task_handle.rs

1/*******************************************************************************
2 *
3 *    Copyright (c) 2025 - 2026 Haixing Hu.
4 *
5 *    SPDX-License-Identifier: Apache-2.0
6 *
7 *    Licensed under the Apache License, Version 2.0.
8 *
9 ******************************************************************************/
10use std::future::IntoFuture;
11
12use qubit_executor::task::TaskHandleFuture;
13use qubit_executor::{
14    CancelResult,
15    TaskResult,
16    TaskStatus,
17    TrackedTask,
18    TryGet,
19    task::spi::{
20        TaskResultHandle,
21        TrackedTaskHandle,
22    },
23};
24use tokio::task::AbortHandle;
25
26/// Callback used to finish service-side queued-task accounting.
27type CancelQueuedTask = Box<dyn Fn() + Send + Sync + 'static>;
28
29/// Tracked handle for tasks submitted to Tokio's blocking task pool.
30///
31/// This handle wraps the standard [`TrackedTask`] result/status endpoint and
32/// additionally keeps Tokio's [`AbortHandle`] so pre-start cancellation can
33/// remove queued `spawn_blocking` work from the Tokio runtime.
34///
35/// Tokio cannot abort blocking work after the closure has started. In that
36/// case [`Self::cancel`] reports [`CancelResult::AlreadyRunning`] through the
37/// underlying tracked task state.
38pub struct TokioBlockingTaskHandle<R, E> {
39    /// Standard tracked task endpoint used for result and status observation.
40    handle: TrackedTask<R, E>,
41    /// Tokio abort handle used to remove queued blocking work after cancellation.
42    abort_handle: AbortHandle,
43    /// Callback that completes queued-task accounting after cancellation wins.
44    cancel_queued_task: CancelQueuedTask,
45}
46
47impl<R, E> TokioBlockingTaskHandle<R, E> {
48    /// Creates a blocking task handle.
49    ///
50    /// # Parameters
51    ///
52    /// * `handle` - Standard tracked task endpoint.
53    /// * `abort_handle` - Tokio abort handle for the submitted blocking task.
54    /// * `cancel_queued_task` - Callback that finishes service-side queued
55    ///   task accounting when cancellation wins before the task starts.
56    ///
57    /// # Returns
58    ///
59    /// A tracked Tokio blocking task handle.
60    #[inline]
61    pub(crate) fn new<F>(handle: TrackedTask<R, E>, abort_handle: AbortHandle, cancel_queued_task: F) -> Self
62    where
63        F: Fn() + Send + Sync + 'static,
64    {
65        Self {
66            handle,
67            abort_handle,
68            cancel_queued_task: Box::new(cancel_queued_task),
69        }
70    }
71
72    /// Waits for the task to finish and returns its final result.
73    ///
74    /// This method blocks the current thread until a result is available.
75    ///
76    /// # Returns
77    ///
78    /// The final task result.
79    #[inline]
80    pub fn get(self) -> TaskResult<R, E>
81    where
82        R: Send,
83        E: Send,
84    {
85        <Self as TaskResultHandle<R, E>>::get(self)
86    }
87
88    /// Attempts to retrieve the final result without blocking.
89    ///
90    /// # Returns
91    ///
92    /// A ready result or the pending handle.
93    #[inline]
94    pub fn try_get(self) -> TryGet<Self, R, E>
95    where
96        R: Send,
97        E: Send,
98    {
99        <Self as TaskResultHandle<R, E>>::try_get(self)
100    }
101
102    /// Returns whether the task has installed a terminal state.
103    ///
104    /// # Returns
105    ///
106    /// `true` after the task succeeds, fails, panics, is cancelled, or loses
107    /// its runner endpoint.
108    #[inline]
109    pub fn is_done(&self) -> bool
110    where
111        R: Send,
112        E: Send,
113    {
114        <Self as TaskResultHandle<R, E>>::is_done(self)
115    }
116
117    /// Returns the currently observed task status.
118    ///
119    /// # Returns
120    ///
121    /// The current task status.
122    #[inline]
123    pub fn status(&self) -> TaskStatus {
124        self.handle.status()
125    }
126
127    /// Attempts to cancel this task before its blocking closure starts.
128    ///
129    /// When cancellation wins the pending-state race, this method also aborts
130    /// the Tokio `spawn_blocking` task so queued work is dropped without waiting
131    /// for an available blocking thread.
132    ///
133    /// # Returns
134    ///
135    /// The observed cancellation outcome.
136    #[must_use]
137    #[inline]
138    pub fn cancel(&self) -> CancelResult {
139        let result = self.handle.cancel();
140        if result == CancelResult::Cancelled {
141            (self.cancel_queued_task)();
142            self.abort_handle.abort();
143        }
144        result
145    }
146}
147
148impl<R, E> TaskResultHandle<R, E> for TokioBlockingTaskHandle<R, E>
149where
150    R: Send,
151    E: Send,
152{
153    /// Returns whether the tracked state is terminal.
154    #[inline]
155    fn is_done(&self) -> bool {
156        self.handle.is_done()
157    }
158
159    /// Blocks until the underlying result handle yields a result.
160    #[inline]
161    fn get(self) -> TaskResult<R, E> {
162        self.handle.get()
163    }
164
165    /// Attempts to retrieve the underlying result without blocking.
166    #[inline]
167    fn try_get(self) -> TryGet<Self, R, E> {
168        let Self {
169            handle,
170            abort_handle,
171            cancel_queued_task,
172        } = self;
173        match handle.try_get() {
174            TryGet::Ready(result) => TryGet::Ready(result),
175            TryGet::Pending(handle) => TryGet::Pending(Self {
176                handle,
177                abort_handle,
178                cancel_queued_task,
179            }),
180        }
181    }
182}
183
184impl<R, E> TrackedTaskHandle<R, E> for TokioBlockingTaskHandle<R, E>
185where
186    R: Send,
187    E: Send,
188{
189    /// Returns the currently observed task status.
190    #[inline]
191    fn status(&self) -> TaskStatus {
192        self.handle.status()
193    }
194
195    /// Attempts to cancel the task before it starts.
196    #[inline]
197    fn cancel(&self) -> CancelResult {
198        Self::cancel(self)
199    }
200}
201
202impl<R, E> IntoFuture for TokioBlockingTaskHandle<R, E> {
203    type Output = TaskResult<R, E>;
204    type IntoFuture = TaskHandleFuture<R, E>;
205
206    /// Converts this tracked handle into a future resolving to the task result.
207    #[inline]
208    fn into_future(self) -> Self::IntoFuture {
209        self.handle.into_future()
210    }
211}