Skip to main content

qubit_rayon_executor/
rayon_task_handle.rs

1/*******************************************************************************
2 *
3 *    Copyright (c) 2025 - 2026 Haixing Hu.
4 *
5 *    SPDX-License-Identifier: Apache-2.0
6 *
7 *    Licensed under the Apache License, Version 2.0.
8 *
9 ******************************************************************************/
10use std::{
11    future::IntoFuture,
12    sync::Arc,
13};
14
15use qubit_executor::{
16    CancelResult,
17    TaskResult,
18    TaskStatus,
19    TrackedTask,
20    TryGet,
21    task::{
22        TaskHandleFuture,
23        spi::{
24            TaskResultHandle,
25            TrackedTaskHandle,
26        },
27    },
28};
29
30use crate::{
31    pending_cancel::PendingCancel,
32    rayon_executor_service_state::RayonExecutorServiceState,
33};
34
35/// Tracked handle returned by [`crate::RayonExecutorService`] for accepted tasks.
36///
37/// This handle supports blocking [`Self::get`], asynchronous `.await`, status
38/// inspection, and best-effort cancellation before a Rayon worker starts the
39/// task.
40pub struct RayonTaskHandle<R, E> {
41    /// Shared task result and status observed through blocking and async APIs.
42    inner: TrackedTask<R, E>,
43    /// Stable identifier assigned by the owning executor service.
44    task_id: usize,
45    /// Shared service state used to keep cancellation counters consistent.
46    state: Arc<RayonExecutorServiceState>,
47    /// Cancellation hook that wins only before task start.
48    cancel: PendingCancel,
49}
50
51impl<R, E> RayonTaskHandle<R, E> {
52    /// Creates a Rayon task handle from a tracked task and cancel hook.
53    ///
54    /// # Parameters
55    ///
56    /// * `inner` - Tracked task used for result and status observation.
57    /// * `task_id` - Stable identifier assigned to the accepted task.
58    /// * `state` - Shared service state that owns lifecycle counters.
59    /// * `cancel` - Cancellation hook that may cancel the task before start.
60    ///
61    /// # Returns
62    ///
63    /// A tracked handle for the accepted Rayon task.
64    pub(crate) fn new(
65        inner: TrackedTask<R, E>,
66        task_id: usize,
67        state: Arc<RayonExecutorServiceState>,
68        cancel: PendingCancel,
69    ) -> Self {
70        Self {
71            inner,
72            task_id,
73            state,
74            cancel,
75        }
76    }
77
78    /// Waits for the task to finish and returns its final result.
79    ///
80    /// # Returns
81    ///
82    /// The final task result reported through the underlying tracked task.
83    #[inline]
84    pub fn get(self) -> TaskResult<R, E>
85    where
86        R: Send,
87        E: Send,
88    {
89        self.inner.get()
90    }
91
92    /// Attempts to retrieve the final result without blocking.
93    ///
94    /// # Returns
95    ///
96    /// A ready result or the pending Rayon task handle.
97    #[inline]
98    pub fn try_get(self) -> TryGet<Self, R, E>
99    where
100        R: Send,
101        E: Send,
102    {
103        <Self as TaskResultHandle<R, E>>::try_get(self)
104    }
105
106    /// Attempts to cancel the task before any Rayon worker starts it.
107    ///
108    /// # Returns
109    ///
110    /// The observed cancellation outcome.
111    #[inline]
112    pub fn cancel(&self) -> CancelResult
113    where
114        R: Send,
115        E: Send,
116    {
117        <Self as TrackedTaskHandle<R, E>>::cancel(self)
118    }
119
120    /// Returns whether the task has reported completion.
121    ///
122    /// # Returns
123    ///
124    /// `true` after the task has finished or has been cancelled.
125    #[inline]
126    pub fn is_done(&self) -> bool
127    where
128        R: Send,
129        E: Send,
130    {
131        <Self as TaskResultHandle<R, E>>::is_done(self)
132    }
133
134    /// Returns the currently observed task status.
135    ///
136    /// # Returns
137    ///
138    /// The task's pending, running, or terminal status.
139    #[inline]
140    pub fn status(&self) -> TaskStatus {
141        self.inner.status()
142    }
143}
144
145impl<R, E> TaskResultHandle<R, E> for RayonTaskHandle<R, E>
146where
147    R: Send,
148    E: Send,
149{
150    /// Returns whether the inner tracked task is done.
151    #[inline]
152    fn is_done(&self) -> bool {
153        self.inner.is_done()
154    }
155
156    /// Blocks until the inner tracked task yields a final result.
157    #[inline]
158    fn get(self) -> TaskResult<R, E> {
159        self.inner.get()
160    }
161
162    /// Attempts to retrieve the inner tracked task result without blocking.
163    #[inline]
164    fn try_get(self) -> TryGet<Self, R, E> {
165        let Self {
166            inner,
167            task_id,
168            state,
169            cancel,
170        } = self;
171        match inner.try_get() {
172            TryGet::Ready(result) => TryGet::Ready(result),
173            TryGet::Pending(inner) => TryGet::Pending(Self {
174                inner,
175                task_id,
176                state,
177                cancel,
178            }),
179        }
180    }
181}
182
183impl<R, E> TrackedTaskHandle<R, E> for RayonTaskHandle<R, E>
184where
185    R: Send,
186    E: Send,
187{
188    /// Returns the currently observed task status.
189    #[inline]
190    fn status(&self) -> TaskStatus {
191        self.inner.status()
192    }
193
194    /// Cancels the task through the owning service state.
195    #[inline]
196    fn cancel(&self) -> CancelResult {
197        if self.state.cancel_pending_task(self.task_id, &self.cancel) {
198            return CancelResult::Cancelled;
199        }
200        match self.status() {
201            TaskStatus::Pending | TaskStatus::Running => CancelResult::AlreadyRunning,
202            _ => CancelResult::AlreadyFinished,
203        }
204    }
205}
206
207impl<R, E> IntoFuture for RayonTaskHandle<R, E> {
208    type Output = TaskResult<R, E>;
209    type IntoFuture = TaskHandleFuture<R, E>;
210
211    /// Converts this handle into a future resolving to the task result.
212    #[inline]
213    fn into_future(self) -> Self::IntoFuture {
214        self.inner.into_future()
215    }
216}