Skip to main content

qubit_executor/schedule/
scheduled_task_handle.rs

1/*******************************************************************************
2 *
3 *    Copyright (c) 2025 - 2026 Haixing Hu.
4 *
5 *    SPDX-License-Identifier: Apache-2.0
6 *
7 *    Licensed under the Apache License, Version 2.0.
8 *
9 ******************************************************************************/
10use std::{
11    future::IntoFuture,
12    sync::Arc,
13};
14
15use qubit_atomic::Atomic;
16
17use crate::{
18    CancelResult,
19    TaskResult,
20    TaskStatus,
21    TrackedTask,
22    TryGet,
23    hook::TaskId,
24    task::{
25        TaskHandleFuture,
26        spi::{
27            TaskResultHandle,
28            TrackedTaskHandle,
29        },
30    },
31};
32
33/// Tracked handle for a task accepted by a scheduled executor service.
34///
35/// The handle delegates result and status observation to the standard
36/// [`TrackedTask`] while also waking the scheduler when pre-start cancellation
37/// wins. This prevents a cancelled timer entry from keeping the single scheduler
38/// thread asleep until the original deadline.
39pub struct ScheduledTaskHandle<R, E> {
40    /// Standard tracked task handle.
41    inner: TrackedTask<R, E>,
42    /// Shared marker observed by the scheduler heap.
43    cancellation_marker: Arc<Atomic<bool>>,
44    /// Callback invoked after this handle cancels the pending task.
45    on_cancelled: Arc<dyn Fn() + Send + Sync + 'static>,
46}
47
48impl<R, E> ScheduledTaskHandle<R, E> {
49    /// Creates a scheduled task handle.
50    ///
51    /// # Parameters
52    ///
53    /// * `inner` - Standard tracked task handle.
54    /// * `cancellation_marker` - Shared marker observed by the scheduler heap.
55    /// * `on_cancelled` - Callback invoked when this handle cancels the task.
56    ///
57    /// # Returns
58    ///
59    /// A scheduled task handle.
60    pub(crate) const fn new(
61        inner: TrackedTask<R, E>,
62        cancellation_marker: Arc<Atomic<bool>>,
63        on_cancelled: Arc<dyn Fn() + Send + Sync + 'static>,
64    ) -> Self {
65        Self {
66            inner,
67            cancellation_marker,
68            on_cancelled,
69        }
70    }
71
72    /// Waits for the task to finish and returns its final result.
73    ///
74    /// # Returns
75    ///
76    /// The final task result.
77    #[inline]
78    pub fn get(self) -> TaskResult<R, E>
79    where
80        R: Send,
81        E: Send,
82    {
83        <Self as TaskResultHandle<R, E>>::get(self)
84    }
85
86    /// Attempts to retrieve the final result without blocking.
87    ///
88    /// # Returns
89    ///
90    /// A ready result or the pending scheduled handle.
91    #[inline]
92    pub fn try_get(self) -> TryGet<Self, R, E>
93    where
94        R: Send,
95        E: Send,
96    {
97        <Self as TaskResultHandle<R, E>>::try_get(self)
98    }
99
100    /// Returns whether the tracked task has installed a terminal state.
101    ///
102    /// # Returns
103    ///
104    /// `true` after the task succeeds, fails, panics, is cancelled, or loses its
105    /// completion endpoint.
106    #[inline]
107    pub fn is_done(&self) -> bool
108    where
109        R: Send,
110        E: Send,
111    {
112        <Self as TaskResultHandle<R, E>>::is_done(self)
113    }
114
115    /// Returns the currently observed task status.
116    ///
117    /// # Returns
118    ///
119    /// The current task status.
120    #[inline]
121    pub fn status(&self) -> TaskStatus {
122        self.inner.status()
123    }
124
125    /// Returns the identifier assigned to this task.
126    ///
127    /// # Returns
128    ///
129    /// The task id stored in the shared task state.
130    #[inline]
131    pub fn task_id(&self) -> TaskId {
132        self.inner.task_id()
133    }
134
135    /// Attempts to cancel this task before it starts.
136    ///
137    /// # Returns
138    ///
139    /// The observed cancellation outcome.
140    #[inline]
141    pub fn cancel(&self) -> CancelResult {
142        self.cancel_inner()
143    }
144
145    /// Performs cancellation and wakes the scheduler if this handle won.
146    ///
147    /// # Returns
148    ///
149    /// The observed cancellation outcome.
150    fn cancel_inner(&self) -> CancelResult {
151        let result = self.inner.cancel();
152        if result == CancelResult::Cancelled {
153            self.cancellation_marker.store(true);
154            (self.on_cancelled)();
155        }
156        result
157    }
158}
159
160impl<R, E> TaskResultHandle<R, E> for ScheduledTaskHandle<R, E>
161where
162    R: Send,
163    E: Send,
164{
165    /// Returns whether the tracked state is terminal.
166    #[inline]
167    fn is_done(&self) -> bool {
168        self.inner.is_done()
169    }
170
171    /// Blocks until the underlying result handle yields a result.
172    #[inline]
173    fn get(self) -> TaskResult<R, E> {
174        self.inner.get()
175    }
176
177    /// Attempts to retrieve the underlying result without blocking.
178    #[inline]
179    fn try_get(self) -> TryGet<Self, R, E> {
180        let Self {
181            inner,
182            cancellation_marker,
183            on_cancelled,
184        } = self;
185        match inner.try_get() {
186            TryGet::Ready(result) => TryGet::Ready(result),
187            TryGet::Pending(inner) => TryGet::Pending(Self {
188                inner,
189                cancellation_marker,
190                on_cancelled,
191            }),
192        }
193    }
194}
195
196impl<R, E> TrackedTaskHandle<R, E> for ScheduledTaskHandle<R, E>
197where
198    R: Send,
199    E: Send,
200{
201    /// Returns the currently observed task status.
202    #[inline]
203    fn status(&self) -> TaskStatus {
204        self.inner.status()
205    }
206
207    /// Attempts to publish a cancellation result while the task is pending.
208    #[inline]
209    fn cancel(&self) -> CancelResult {
210        self.cancel_inner()
211    }
212}
213
214impl<R, E> IntoFuture for ScheduledTaskHandle<R, E> {
215    type Output = TaskResult<R, E>;
216    type IntoFuture = TaskHandleFuture<R, E>;
217
218    /// Converts this scheduled handle into a future resolving to the task result.
219    #[inline]
220    fn into_future(self) -> Self::IntoFuture {
221        self.inner.into_future()
222    }
223}