Skip to main content

qubit_tokio_executor/
tokio_blocking_task_handle.rs

1/*******************************************************************************
2 *
3 *    Copyright (c) 2025 - 2026 Haixing Hu.
4 *
5 *    SPDX-License-Identifier: Apache-2.0
6 *
7 *    Licensed under the Apache License, Version 2.0.
8 *
9 ******************************************************************************/
10use std::future::IntoFuture;
11
12use qubit_executor::task::TaskHandleFuture;
13use qubit_executor::{
14    CancelResult,
15    TaskResult,
16    TaskStatus,
17    TrackedTask,
18    TryGet,
19    task::spi::{
20        TaskResultHandle,
21        TrackedTaskHandle,
22    },
23};
24use tokio::task::AbortHandle;
25
26/// Callback used to finish service-side queued-task accounting.
27type CancelQueuedTask = Box<dyn Fn() + Send + Sync + 'static>;
28
29/// Tracked handle for tasks submitted to Tokio's blocking task pool.
30///
31/// This handle wraps the standard [`TrackedTask`] result/status endpoint and
32/// additionally keeps Tokio's [`AbortHandle`] so pre-start cancellation can
33/// remove queued `spawn_blocking` work from the Tokio runtime.
34///
35/// Tokio cannot abort blocking work after the closure has started. In that
36/// case [`Self::cancel`] reports [`CancelResult::AlreadyRunning`] through the
37/// underlying tracked task state.
38pub struct TokioBlockingTaskHandle<R, E> {
39    /// Standard tracked task endpoint used for result and status observation.
40    handle: TrackedTask<R, E>,
41    /// Tokio abort handle used to remove queued blocking work after cancellation.
42    abort_handle: AbortHandle,
43    /// Callback that completes queued-task accounting after cancellation wins.
44    cancel_queued_task: CancelQueuedTask,
45}
46
47impl<R, E> TokioBlockingTaskHandle<R, E> {
48    /// Creates a blocking task handle.
49    ///
50    /// # Parameters
51    ///
52    /// * `handle` - Standard tracked task endpoint.
53    /// * `abort_handle` - Tokio abort handle for the submitted blocking task.
54    /// * `cancel_queued_task` - Callback that finishes service-side queued
55    ///   task accounting when cancellation wins before the task starts.
56    ///
57    /// # Returns
58    ///
59    /// A tracked Tokio blocking task handle.
60    #[inline]
61    pub(crate) fn new<F>(
62        handle: TrackedTask<R, E>,
63        abort_handle: AbortHandle,
64        cancel_queued_task: F,
65    ) -> Self
66    where
67        F: Fn() + Send + Sync + 'static,
68    {
69        Self {
70            handle,
71            abort_handle,
72            cancel_queued_task: Box::new(cancel_queued_task),
73        }
74    }
75
76    /// Waits for the task to finish and returns its final result.
77    ///
78    /// This method blocks the current thread until a result is available.
79    ///
80    /// # Returns
81    ///
82    /// The final task result.
83    #[inline]
84    pub fn get(self) -> TaskResult<R, E>
85    where
86        R: Send,
87        E: Send,
88    {
89        <Self as TaskResultHandle<R, E>>::get(self)
90    }
91
92    /// Attempts to retrieve the final result without blocking.
93    ///
94    /// # Returns
95    ///
96    /// A ready result or the pending handle.
97    #[inline]
98    pub fn try_get(self) -> TryGet<Self, R, E>
99    where
100        R: Send,
101        E: Send,
102    {
103        <Self as TaskResultHandle<R, E>>::try_get(self)
104    }
105
106    /// Returns whether the task has installed a terminal state.
107    ///
108    /// # Returns
109    ///
110    /// `true` after the task succeeds, fails, panics, is cancelled, or loses
111    /// its runner endpoint.
112    #[inline]
113    pub fn is_done(&self) -> bool
114    where
115        R: Send,
116        E: Send,
117    {
118        <Self as TaskResultHandle<R, E>>::is_done(self)
119    }
120
121    /// Returns the currently observed task status.
122    ///
123    /// # Returns
124    ///
125    /// The current task status.
126    #[inline]
127    pub fn status(&self) -> TaskStatus {
128        self.handle.status()
129    }
130
131    /// Attempts to cancel this task before its blocking closure starts.
132    ///
133    /// When cancellation wins the pending-state race, this method also aborts
134    /// the Tokio `spawn_blocking` task so queued work is dropped without waiting
135    /// for an available blocking thread.
136    ///
137    /// # Returns
138    ///
139    /// The observed cancellation outcome.
140    #[must_use]
141    #[inline]
142    pub fn cancel(&self) -> CancelResult {
143        let result = self.handle.cancel();
144        if result == CancelResult::Cancelled {
145            (self.cancel_queued_task)();
146            self.abort_handle.abort();
147        }
148        result
149    }
150}
151
152impl<R, E> TaskResultHandle<R, E> for TokioBlockingTaskHandle<R, E>
153where
154    R: Send,
155    E: Send,
156{
157    /// Returns whether the tracked state is terminal.
158    #[inline]
159    fn is_done(&self) -> bool {
160        self.handle.is_done()
161    }
162
163    /// Blocks until the underlying result handle yields a result.
164    #[inline]
165    fn get(self) -> TaskResult<R, E> {
166        self.handle.get()
167    }
168
169    /// Attempts to retrieve the underlying result without blocking.
170    #[inline]
171    fn try_get(self) -> TryGet<Self, R, E> {
172        let Self {
173            handle,
174            abort_handle,
175            cancel_queued_task,
176        } = self;
177        match handle.try_get() {
178            TryGet::Ready(result) => TryGet::Ready(result),
179            TryGet::Pending(handle) => TryGet::Pending(Self {
180                handle,
181                abort_handle,
182                cancel_queued_task,
183            }),
184        }
185    }
186}
187
188impl<R, E> TrackedTaskHandle<R, E> for TokioBlockingTaskHandle<R, E>
189where
190    R: Send,
191    E: Send,
192{
193    /// Returns the currently observed task status.
194    #[inline]
195    fn status(&self) -> TaskStatus {
196        self.handle.status()
197    }
198
199    /// Attempts to cancel the task before it starts.
200    #[inline]
201    fn cancel(&self) -> CancelResult {
202        Self::cancel(self)
203    }
204}
205
206impl<R, E> IntoFuture for TokioBlockingTaskHandle<R, E> {
207    type Output = TaskResult<R, E>;
208    type IntoFuture = TaskHandleFuture<R, E>;
209
210    /// Converts this tracked handle into a future resolving to the task result.
211    #[inline]
212    fn into_future(self) -> Self::IntoFuture {
213        self.handle.into_future()
214    }
215}