Skip to main content

qubit_rayon_executor/
rayon_executor_service.rs

1/*******************************************************************************
2 *
3 *    Copyright (c) 2025 - 2026.
4 *    Haixing Hu, Qubit Co. Ltd.
5 *
6 *    All rights reserved.
7 *
8 ******************************************************************************/
9use std::{
10    future::Future,
11    pin::Pin,
12    sync::Arc,
13};
14
15use qubit_function::Callable;
16use rayon::ThreadPool as RayonThreadPool;
17
18use qubit_executor::{
19    TaskCompletionPair,
20    TaskRunner,
21};
22
23use qubit_executor::service::{
24    ExecutorService,
25    RejectedExecution,
26    ShutdownReport,
27};
28
29use crate::{
30    pending_cancel::PendingCancel,
31    rayon_executor_service_build_error::RayonExecutorServiceBuildError,
32    rayon_executor_service_builder::RayonExecutorServiceBuilder,
33    rayon_executor_service_state::RayonExecutorServiceState,
34    rayon_task_handle::RayonTaskHandle,
35};
36
37/// Rayon-backed executor service for CPU-bound synchronous tasks.
38///
39/// Accepted tasks are executed on a dedicated Rayon thread pool. The service
40/// preserves the crate's `ExecutorService` lifecycle semantics and task-handle
41/// APIs while delegating scheduling to Rayon.
42#[derive(Clone)]
43pub struct RayonExecutorService {
44    /// Rayon thread pool used to execute accepted tasks.
45    pub(crate) pool: Arc<RayonThreadPool>,
46    /// Shared lifecycle and cancellation state.
47    pub(crate) state: Arc<RayonExecutorServiceState>,
48}
49
50impl RayonExecutorService {
51    /// Creates a Rayon executor service with default builder settings.
52    ///
53    /// # Returns
54    ///
55    /// `Ok(RayonExecutorService)` if the default Rayon thread pool can be
56    /// built.
57    ///
58    /// # Errors
59    ///
60    /// Returns [`RayonExecutorServiceBuildError`] if the default builder
61    /// configuration is rejected.
62    #[inline]
63    pub fn new() -> Result<Self, RayonExecutorServiceBuildError> {
64        Self::builder().build()
65    }
66
67    /// Creates a builder for configuring a Rayon executor service.
68    ///
69    /// # Returns
70    ///
71    /// A builder configured with CPU-parallelism defaults.
72    #[inline]
73    pub fn builder() -> RayonExecutorServiceBuilder {
74        RayonExecutorServiceBuilder::default()
75    }
76}
77
78impl ExecutorService for RayonExecutorService {
79    type Handle<R, E>
80        = RayonTaskHandle<R, E>
81    where
82        R: Send + 'static,
83        E: Send + 'static;
84
85    type Termination<'a>
86        = Pin<Box<dyn Future<Output = ()> + Send + 'a>>
87    where
88        Self: 'a;
89
90    /// Accepts a callable and schedules it on the Rayon thread pool.
91    ///
92    /// # Parameters
93    ///
94    /// * `task` - Callable to execute on a Rayon worker.
95    ///
96    /// # Returns
97    ///
98    /// A [`RayonTaskHandle`] for the accepted task.
99    ///
100    /// # Errors
101    ///
102    /// Returns [`RejectedExecution::Shutdown`] if shutdown has already been
103    /// requested before the task is accepted.
104    fn submit_callable<C, R, E>(&self, task: C) -> Result<Self::Handle<R, E>, RejectedExecution>
105    where
106        C: Callable<R, E> + Send + 'static,
107        R: Send + 'static,
108        E: Send + 'static,
109    {
110        let submission_guard = self.state.lock_submission();
111        if self.state.is_shutdown() {
112            return Err(RejectedExecution::Shutdown);
113        }
114        let task_id = self.state.next_task_id();
115        self.state.on_task_accepted();
116        let (handle, completion) = TaskCompletionPair::new().into_parts();
117        let completion_for_cancel = completion.clone();
118        let cancel: PendingCancel = Arc::new(move || completion_for_cancel.cancel());
119        self.state
120            .register_pending_task(task_id, Arc::clone(&cancel));
121        drop(submission_guard);
122
123        let completion_for_run = completion;
124        let state_for_run = Arc::clone(&self.state);
125        self.pool.spawn_fifo(move || {
126            if !state_for_run.start_pending_task(task_id, || completion_for_run.start()) {
127                return;
128            }
129            completion_for_run.complete(TaskRunner::new(task).call());
130            state_for_run.on_task_completed();
131        });
132        Ok(RayonTaskHandle::new(
133            handle,
134            task_id,
135            Arc::clone(&self.state),
136            cancel,
137        ))
138    }
139
140    /// Stops accepting new tasks.
141    ///
142    /// Already accepted Rayon tasks are allowed to finish normally.
143    fn shutdown(&self) {
144        let _guard = self.state.lock_submission();
145        self.state.shutdown();
146        self.state.notify_if_terminated();
147    }
148
149    /// Stops accepting new tasks and cancels tasks that have not started yet.
150    ///
151    /// Running Rayon tasks cannot be preempted. Cancellation therefore applies
152    /// only to tasks that are still pending when the cancellation hook wins the
153    /// race against task start.
154    ///
155    /// # Returns
156    ///
157    /// A count-based report describing the pending and running work observed at
158    /// the time of the shutdown request, plus the number of pending tasks for
159    /// which cancellation succeeded.
160    fn shutdown_now(&self) -> ShutdownReport {
161        let _guard = self.state.lock_submission();
162        self.state.shutdown();
163        let (queued, running, pending) = self.state.drain_pending_tasks_for_shutdown();
164        drop(_guard);
165
166        let cancelled = self.state.cancel_drained_pending_tasks(pending);
167        ShutdownReport::new(queued, running, cancelled)
168    }
169
170    /// Returns whether shutdown has been requested.
171    fn is_shutdown(&self) -> bool {
172        self.state.is_shutdown()
173    }
174
175    /// Returns whether shutdown was requested and no accepted tasks remain.
176    fn is_terminated(&self) -> bool {
177        self.is_shutdown() && self.state.has_no_active_tasks()
178    }
179
180    /// Waits until the service has terminated.
181    ///
182    /// # Returns
183    ///
184    /// A future that resolves after shutdown has been requested and all
185    /// accepted Rayon tasks have completed or been cancelled before start.
186    fn await_termination(&self) -> Self::Termination<'_> {
187        Box::pin(async move {
188            loop {
189                let notified = self.state.notified();
190                if self.is_terminated() {
191                    return;
192                }
193                notified.await;
194            }
195        })
196    }
197}