qubit_rayon_executor/rayon_task_handle.rs
1/*******************************************************************************
2 *
3 * Copyright (c) 2025 - 2026 Haixing Hu.
4 *
5 * SPDX-License-Identifier: Apache-2.0
6 *
7 * Licensed under the Apache License, Version 2.0.
8 *
9 ******************************************************************************/
10use std::{
11 future::IntoFuture,
12 sync::Arc,
13};
14
15use qubit_executor::{
16 CancelResult,
17 TaskResult,
18 TaskStatus,
19 TrackedTask,
20 TryGet,
21 task::{
22 TaskHandleFuture,
23 spi::{
24 TaskResultHandle,
25 TrackedTaskHandle,
26 },
27 },
28};
29
30use crate::{
31 pending_cancel::PendingCancel,
32 rayon_executor_service_state::RayonExecutorServiceState,
33};
34
35/// Tracked handle returned by [`crate::RayonExecutorService`] for accepted tasks.
36///
37/// This handle supports blocking [`Self::get`], asynchronous `.await`, status
38/// inspection, and best-effort cancellation before a Rayon worker starts the
39/// task.
40pub struct RayonTaskHandle<R, E> {
41 /// Shared task result and status observed through blocking and async APIs.
42 inner: TrackedTask<R, E>,
43 /// Stable identifier assigned by the owning executor service.
44 task_id: usize,
45 /// Shared service state used to keep cancellation counters consistent.
46 state: Arc<RayonExecutorServiceState>,
47 /// Cancellation hook that wins only before task start.
48 cancel: PendingCancel,
49}
50
51impl<R, E> RayonTaskHandle<R, E> {
52 /// Creates a Rayon task handle from a tracked task and cancel hook.
53 ///
54 /// # Parameters
55 ///
56 /// * `inner` - Tracked task used for result and status observation.
57 /// * `task_id` - Stable identifier assigned to the accepted task.
58 /// * `state` - Shared service state that owns lifecycle counters.
59 /// * `cancel` - Cancellation hook that may cancel the task before start.
60 ///
61 /// # Returns
62 ///
63 /// A tracked handle for the accepted Rayon task.
64 pub(crate) fn new(
65 inner: TrackedTask<R, E>,
66 task_id: usize,
67 state: Arc<RayonExecutorServiceState>,
68 cancel: PendingCancel,
69 ) -> Self {
70 Self {
71 inner,
72 task_id,
73 state,
74 cancel,
75 }
76 }
77
78 /// Waits for the task to finish and returns its final result.
79 ///
80 /// # Returns
81 ///
82 /// The final task result reported through the underlying tracked task.
83 #[inline]
84 pub fn get(self) -> TaskResult<R, E>
85 where
86 R: Send,
87 E: Send,
88 {
89 self.inner.get()
90 }
91
92 /// Attempts to retrieve the final result without blocking.
93 ///
94 /// # Returns
95 ///
96 /// A ready result or the pending Rayon task handle.
97 #[inline]
98 pub fn try_get(self) -> TryGet<Self, R, E>
99 where
100 R: Send,
101 E: Send,
102 {
103 <Self as TaskResultHandle<R, E>>::try_get(self)
104 }
105
106 /// Attempts to cancel the task before any Rayon worker starts it.
107 ///
108 /// # Returns
109 ///
110 /// The observed cancellation outcome.
111 #[inline]
112 pub fn cancel(&self) -> CancelResult
113 where
114 R: Send,
115 E: Send,
116 {
117 <Self as TrackedTaskHandle<R, E>>::cancel(self)
118 }
119
120 /// Returns whether the task has reported completion.
121 ///
122 /// # Returns
123 ///
124 /// `true` after the task has finished or has been cancelled.
125 #[inline]
126 pub fn is_done(&self) -> bool
127 where
128 R: Send,
129 E: Send,
130 {
131 <Self as TaskResultHandle<R, E>>::is_done(self)
132 }
133
134 /// Returns the currently observed task status.
135 ///
136 /// # Returns
137 ///
138 /// The task's pending, running, or terminal status.
139 #[inline]
140 pub fn status(&self) -> TaskStatus {
141 self.inner.status()
142 }
143}
144
145impl<R, E> TaskResultHandle<R, E> for RayonTaskHandle<R, E>
146where
147 R: Send,
148 E: Send,
149{
150 /// Returns whether the inner tracked task is done.
151 #[inline]
152 fn is_done(&self) -> bool {
153 self.inner.is_done()
154 }
155
156 /// Blocks until the inner tracked task yields a final result.
157 #[inline]
158 fn get(self) -> TaskResult<R, E> {
159 self.inner.get()
160 }
161
162 /// Attempts to retrieve the inner tracked task result without blocking.
163 #[inline]
164 fn try_get(self) -> TryGet<Self, R, E> {
165 let Self {
166 inner,
167 task_id,
168 state,
169 cancel,
170 } = self;
171 match inner.try_get() {
172 TryGet::Ready(result) => TryGet::Ready(result),
173 TryGet::Pending(inner) => TryGet::Pending(Self {
174 inner,
175 task_id,
176 state,
177 cancel,
178 }),
179 }
180 }
181}
182
183impl<R, E> TrackedTaskHandle<R, E> for RayonTaskHandle<R, E>
184where
185 R: Send,
186 E: Send,
187{
188 /// Returns the currently observed task status.
189 #[inline]
190 fn status(&self) -> TaskStatus {
191 self.inner.status()
192 }
193
194 /// Cancels the task through the owning service state.
195 #[inline]
196 fn cancel(&self) -> CancelResult {
197 if self.state.cancel_pending_task(self.task_id, &self.cancel) {
198 return CancelResult::Cancelled;
199 }
200 match self.status() {
201 TaskStatus::Pending | TaskStatus::Running => CancelResult::AlreadyRunning,
202 _ => CancelResult::AlreadyFinished,
203 }
204 }
205}
206
207impl<R, E> IntoFuture for RayonTaskHandle<R, E> {
208 type Output = TaskResult<R, E>;
209 type IntoFuture = TaskHandleFuture<R, E>;
210
211 /// Converts this handle into a future resolving to the task result.
212 #[inline]
213 fn into_future(self) -> Self::IntoFuture {
214 self.inner.into_future()
215 }
216}