qubit_rayon_executor/rayon_task_handle.rs
1/*******************************************************************************
2 *
3 * Copyright (c) 2025 - 2026.
4 * Haixing Hu, Qubit Co. Ltd.
5 *
6 * All rights reserved.
7 *
8 ******************************************************************************/
9use std::{
10 future::Future,
11 pin::Pin,
12 sync::Arc,
13 task::{
14 Context,
15 Poll,
16 },
17};
18
19use qubit_executor::{
20 TaskHandle,
21 TaskResult,
22};
23
24use crate::{
25 pending_cancel::PendingCancel,
26 rayon_executor_service_state::RayonExecutorServiceState,
27};
28
29/// Handle returned by [`crate::RayonExecutorService`] for accepted tasks.
30///
31/// This handle supports blocking [`Self::get`], asynchronous `.await`, and
32/// best-effort cancellation before a Rayon worker starts the task.
33pub struct RayonTaskHandle<R, E> {
34 /// Shared task result observed through blocking and async APIs.
35 inner: TaskHandle<R, E>,
36 /// Stable identifier assigned by the owning executor service.
37 task_id: usize,
38 /// Shared service state used to keep cancellation counters consistent.
39 state: Arc<RayonExecutorServiceState>,
40 /// Cancellation hook that wins only before task start.
41 cancel: PendingCancel,
42}
43
44impl<R, E> RayonTaskHandle<R, E> {
45 /// Creates a Rayon task handle from a shared task handle and cancel hook.
46 ///
47 /// # Parameters
48 ///
49 /// * `inner` - Shared task handle used for result observation.
50 /// * `task_id` - Stable identifier assigned to the accepted task.
51 /// * `state` - Shared service state that owns lifecycle counters.
52 /// * `cancel` - Cancellation hook that may cancel the task before start.
53 ///
54 /// # Returns
55 ///
56 /// A handle for the accepted Rayon task.
57 pub(crate) fn new(
58 inner: TaskHandle<R, E>,
59 task_id: usize,
60 state: Arc<RayonExecutorServiceState>,
61 cancel: PendingCancel,
62 ) -> Self {
63 Self {
64 inner,
65 task_id,
66 state,
67 cancel,
68 }
69 }
70
71 /// Waits for the task to finish and returns its final result.
72 ///
73 /// # Returns
74 ///
75 /// The final task result reported through the underlying [`TaskHandle`].
76 #[inline]
77 pub fn get(self) -> TaskResult<R, E> {
78 self.inner.get()
79 }
80
81 /// Attempts to cancel the task before any Rayon worker starts it.
82 ///
83 /// # Returns
84 ///
85 /// `true` if the task was cancelled before start, or `false` if it had
86 /// already started or completed.
87 #[inline]
88 pub fn cancel(&self) -> bool {
89 self.state.cancel_pending_task(self.task_id, &self.cancel)
90 }
91
92 /// Returns whether the task has reported completion.
93 ///
94 /// # Returns
95 ///
96 /// `true` after the task has finished or has been cancelled.
97 #[inline]
98 pub fn is_done(&self) -> bool {
99 self.inner.is_done()
100 }
101}
102
103impl<R, E> Future for RayonTaskHandle<R, E> {
104 type Output = TaskResult<R, E>;
105
106 /// Polls the accepted Rayon task for completion.
107 ///
108 /// # Parameters
109 ///
110 /// * `cx` - Async task context used to register the current waker.
111 ///
112 /// # Returns
113 ///
114 /// `Poll::Ready` with the task result after completion, or
115 /// `Poll::Pending` while the task is still running or queued.
116 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
117 let this = self.get_mut();
118 Pin::new(&mut this.inner).poll(cx)
119 }
120}