Skip to main content

qubit_rayon_executor/
rayon_task_handle.rs

1/*******************************************************************************
2 *
3 *    Copyright (c) 2025 - 2026.
4 *    Haixing Hu, Qubit Co. Ltd.
5 *
6 *    All rights reserved.
7 *
8 ******************************************************************************/
9use std::{
10    future::Future,
11    pin::Pin,
12    sync::Arc,
13    task::{
14        Context,
15        Poll,
16    },
17};
18
19use qubit_executor::{
20    TaskHandle,
21    TaskResult,
22};
23
24use crate::{
25    pending_cancel::PendingCancel,
26    rayon_executor_service_state::RayonExecutorServiceState,
27};
28
29/// Handle returned by [`crate::RayonExecutorService`] for accepted tasks.
30///
31/// This handle supports blocking [`Self::get`], asynchronous `.await`, and
32/// best-effort cancellation before a Rayon worker starts the task.
33pub struct RayonTaskHandle<R, E> {
34    /// Shared task result observed through blocking and async APIs.
35    inner: TaskHandle<R, E>,
36    /// Stable identifier assigned by the owning executor service.
37    task_id: usize,
38    /// Shared service state used to keep cancellation counters consistent.
39    state: Arc<RayonExecutorServiceState>,
40    /// Cancellation hook that wins only before task start.
41    cancel: PendingCancel,
42}
43
44impl<R, E> RayonTaskHandle<R, E> {
45    /// Creates a Rayon task handle from a shared task handle and cancel hook.
46    ///
47    /// # Parameters
48    ///
49    /// * `inner` - Shared task handle used for result observation.
50    /// * `task_id` - Stable identifier assigned to the accepted task.
51    /// * `state` - Shared service state that owns lifecycle counters.
52    /// * `cancel` - Cancellation hook that may cancel the task before start.
53    ///
54    /// # Returns
55    ///
56    /// A handle for the accepted Rayon task.
57    pub(crate) fn new(
58        inner: TaskHandle<R, E>,
59        task_id: usize,
60        state: Arc<RayonExecutorServiceState>,
61        cancel: PendingCancel,
62    ) -> Self {
63        Self {
64            inner,
65            task_id,
66            state,
67            cancel,
68        }
69    }
70
71    /// Waits for the task to finish and returns its final result.
72    ///
73    /// # Returns
74    ///
75    /// The final task result reported through the underlying [`TaskHandle`].
76    #[inline]
77    pub fn get(self) -> TaskResult<R, E> {
78        self.inner.get()
79    }
80
81    /// Attempts to cancel the task before any Rayon worker starts it.
82    ///
83    /// # Returns
84    ///
85    /// `true` if the task was cancelled before start, or `false` if it had
86    /// already started or completed.
87    #[inline]
88    pub fn cancel(&self) -> bool {
89        self.state.cancel_pending_task(self.task_id, &self.cancel)
90    }
91
92    /// Returns whether the task has reported completion.
93    ///
94    /// # Returns
95    ///
96    /// `true` after the task has finished or has been cancelled.
97    #[inline]
98    pub fn is_done(&self) -> bool {
99        self.inner.is_done()
100    }
101}
102
103impl<R, E> Future for RayonTaskHandle<R, E> {
104    type Output = TaskResult<R, E>;
105
106    /// Polls the accepted Rayon task for completion.
107    ///
108    /// # Parameters
109    ///
110    /// * `cx` - Async task context used to register the current waker.
111    ///
112    /// # Returns
113    ///
114    /// `Poll::Ready` with the task result after completion, or
115    /// `Poll::Pending` while the task is still running or queued.
116    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
117        let this = self.get_mut();
118        Pin::new(&mut this.inner).poll(cx)
119    }
120}