Skip to main content

qubit_rayon_executor/
rayon_executor_service.rs

1/*******************************************************************************
2 *
3 *    Copyright (c) 2025 - 2026 Haixing Hu.
4 *
5 *    SPDX-License-Identifier: Apache-2.0
6 *
7 *    Licensed under the Apache License, Version 2.0.
8 *
9 ******************************************************************************/
10use std::{
11    future::Future,
12    pin::Pin,
13    sync::Arc,
14};
15
16use qubit_function::Callable;
17use rayon::ThreadPool as RayonThreadPool;
18
19use qubit_executor::{
20    TaskCompletionPair,
21    TaskRunner,
22};
23
24use qubit_executor::service::{
25    ExecutorService,
26    RejectedExecution,
27    ShutdownReport,
28};
29
30use crate::{
31    pending_cancel::PendingCancel,
32    rayon_executor_service_build_error::RayonExecutorServiceBuildError,
33    rayon_executor_service_builder::RayonExecutorServiceBuilder,
34    rayon_executor_service_state::RayonExecutorServiceState,
35    rayon_task_handle::RayonTaskHandle,
36};
37
38/// Rayon-backed executor service for CPU-bound synchronous tasks.
39///
40/// Accepted tasks are executed on a dedicated Rayon thread pool. The service
41/// preserves the crate's `ExecutorService` lifecycle semantics and task-handle
42/// APIs while delegating scheduling to Rayon.
43#[derive(Clone)]
44pub struct RayonExecutorService {
45    /// Rayon thread pool used to execute accepted tasks.
46    pub(crate) pool: Arc<RayonThreadPool>,
47    /// Shared lifecycle and cancellation state.
48    pub(crate) state: Arc<RayonExecutorServiceState>,
49}
50
51impl RayonExecutorService {
52    /// Creates a Rayon executor service with default builder settings.
53    ///
54    /// # Returns
55    ///
56    /// `Ok(RayonExecutorService)` if the default Rayon thread pool can be
57    /// built.
58    ///
59    /// # Errors
60    ///
61    /// Returns [`RayonExecutorServiceBuildError`] if the default builder
62    /// configuration is rejected.
63    #[inline]
64    pub fn new() -> Result<Self, RayonExecutorServiceBuildError> {
65        Self::builder().build()
66    }
67
68    /// Creates a builder for configuring a Rayon executor service.
69    ///
70    /// # Returns
71    ///
72    /// A builder configured with CPU-parallelism defaults.
73    #[inline]
74    pub fn builder() -> RayonExecutorServiceBuilder {
75        RayonExecutorServiceBuilder::default()
76    }
77}
78
79impl ExecutorService for RayonExecutorService {
80    type Handle<R, E>
81        = RayonTaskHandle<R, E>
82    where
83        R: Send + 'static,
84        E: Send + 'static;
85
86    type Termination<'a>
87        = Pin<Box<dyn Future<Output = ()> + Send + 'a>>
88    where
89        Self: 'a;
90
91    /// Accepts a callable and schedules it on the Rayon thread pool.
92    ///
93    /// # Parameters
94    ///
95    /// * `task` - Callable to execute on a Rayon worker.
96    ///
97    /// # Returns
98    ///
99    /// A [`RayonTaskHandle`] for the accepted task.
100    ///
101    /// # Errors
102    ///
103    /// Returns [`RejectedExecution::Shutdown`] if shutdown has already been
104    /// requested before the task is accepted.
105    fn submit_callable<C, R, E>(&self, task: C) -> Result<Self::Handle<R, E>, RejectedExecution>
106    where
107        C: Callable<R, E> + Send + 'static,
108        R: Send + 'static,
109        E: Send + 'static,
110    {
111        let submission_guard = self.state.lock_submission();
112        if self.state.is_shutdown() {
113            return Err(RejectedExecution::Shutdown);
114        }
115        let task_id = self.state.next_task_id();
116        self.state.on_task_accepted();
117        let (handle, completion) = TaskCompletionPair::new().into_parts();
118        let completion_for_cancel = completion.clone();
119        let cancel: PendingCancel = Arc::new(move || completion_for_cancel.cancel());
120        self.state
121            .register_pending_task(task_id, Arc::clone(&cancel));
122        drop(submission_guard);
123
124        let completion_for_run = completion;
125        let state_for_run = Arc::clone(&self.state);
126        self.pool.spawn_fifo(move || {
127            if !state_for_run.start_pending_task(task_id, || completion_for_run.start()) {
128                return;
129            }
130            completion_for_run.complete(TaskRunner::new(task).call());
131            state_for_run.on_task_completed();
132        });
133        Ok(RayonTaskHandle::new(
134            handle,
135            task_id,
136            Arc::clone(&self.state),
137            cancel,
138        ))
139    }
140
141    /// Stops accepting new tasks.
142    ///
143    /// Already accepted Rayon tasks are allowed to finish normally.
144    fn shutdown(&self) {
145        let _guard = self.state.lock_submission();
146        self.state.shutdown();
147        self.state.notify_if_terminated();
148    }
149
150    /// Stops accepting new tasks and cancels tasks that have not started yet.
151    ///
152    /// Running Rayon tasks cannot be preempted. Cancellation therefore applies
153    /// only to tasks that are still pending when the cancellation hook wins the
154    /// race against task start.
155    ///
156    /// # Returns
157    ///
158    /// A count-based report describing the pending and running work observed at
159    /// the time of the shutdown request, plus the number of pending tasks for
160    /// which cancellation succeeded.
161    fn shutdown_now(&self) -> ShutdownReport {
162        let _guard = self.state.lock_submission();
163        self.state.shutdown();
164        let (queued, running, pending) = self.state.drain_pending_tasks_for_shutdown();
165        drop(_guard);
166
167        let cancelled = self.state.cancel_drained_pending_tasks(pending);
168        ShutdownReport::new(queued, running, cancelled)
169    }
170
171    /// Returns whether shutdown has been requested.
172    fn is_shutdown(&self) -> bool {
173        self.state.is_shutdown()
174    }
175
176    /// Returns whether shutdown was requested and no accepted tasks remain.
177    fn is_terminated(&self) -> bool {
178        self.is_shutdown() && self.state.has_no_active_tasks()
179    }
180
181    /// Waits until the service has terminated.
182    ///
183    /// # Returns
184    ///
185    /// A future that resolves after shutdown has been requested and all
186    /// accepted Rayon tasks have completed or been cancelled before start.
187    fn await_termination(&self) -> Self::Termination<'_> {
188        Box::pin(async move {
189            loop {
190                let notified = self.state.notified();
191                if self.is_terminated() {
192                    return;
193                }
194                notified.await;
195            }
196        })
197    }
198}