qubit_rayon_executor/
rayon_executor_service.rs1use std::{
11 future::Future,
12 pin::Pin,
13 sync::Arc,
14};
15
16use qubit_function::Callable;
17use rayon::ThreadPool as RayonThreadPool;
18
19use qubit_executor::{
20 TaskCompletionPair,
21 TaskRunner,
22};
23
24use qubit_executor::service::{
25 ExecutorService,
26 RejectedExecution,
27 ShutdownReport,
28};
29
30use crate::{
31 pending_cancel::PendingCancel,
32 rayon_executor_service_build_error::RayonExecutorServiceBuildError,
33 rayon_executor_service_builder::RayonExecutorServiceBuilder,
34 rayon_executor_service_state::RayonExecutorServiceState,
35 rayon_task_handle::RayonTaskHandle,
36};
37
38#[derive(Clone)]
44pub struct RayonExecutorService {
45 pub(crate) pool: Arc<RayonThreadPool>,
47 pub(crate) state: Arc<RayonExecutorServiceState>,
49}
50
51impl RayonExecutorService {
52 #[inline]
64 pub fn new() -> Result<Self, RayonExecutorServiceBuildError> {
65 Self::builder().build()
66 }
67
68 #[inline]
74 pub fn builder() -> RayonExecutorServiceBuilder {
75 RayonExecutorServiceBuilder::default()
76 }
77}
78
79impl ExecutorService for RayonExecutorService {
80 type Handle<R, E>
81 = RayonTaskHandle<R, E>
82 where
83 R: Send + 'static,
84 E: Send + 'static;
85
86 type Termination<'a>
87 = Pin<Box<dyn Future<Output = ()> + Send + 'a>>
88 where
89 Self: 'a;
90
91 fn submit_callable<C, R, E>(&self, task: C) -> Result<Self::Handle<R, E>, RejectedExecution>
106 where
107 C: Callable<R, E> + Send + 'static,
108 R: Send + 'static,
109 E: Send + 'static,
110 {
111 let submission_guard = self.state.lock_submission();
112 if self.state.is_shutdown() {
113 return Err(RejectedExecution::Shutdown);
114 }
115 let task_id = self.state.next_task_id();
116 self.state.on_task_accepted();
117 let (handle, completion) = TaskCompletionPair::new().into_parts();
118 let completion_for_cancel = completion.clone();
119 let cancel: PendingCancel = Arc::new(move || completion_for_cancel.cancel());
120 self.state
121 .register_pending_task(task_id, Arc::clone(&cancel));
122 drop(submission_guard);
123
124 let completion_for_run = completion;
125 let state_for_run = Arc::clone(&self.state);
126 self.pool.spawn_fifo(move || {
127 if !state_for_run.start_pending_task(task_id, || completion_for_run.start()) {
128 return;
129 }
130 completion_for_run.complete(TaskRunner::new(task).call());
131 state_for_run.on_task_completed();
132 });
133 Ok(RayonTaskHandle::new(
134 handle,
135 task_id,
136 Arc::clone(&self.state),
137 cancel,
138 ))
139 }
140
141 fn shutdown(&self) {
145 let _guard = self.state.lock_submission();
146 self.state.shutdown();
147 self.state.notify_if_terminated();
148 }
149
150 fn shutdown_now(&self) -> ShutdownReport {
162 let _guard = self.state.lock_submission();
163 self.state.shutdown();
164 let (queued, running, pending) = self.state.drain_pending_tasks_for_shutdown();
165 drop(_guard);
166
167 let cancelled = self.state.cancel_drained_pending_tasks(pending);
168 ShutdownReport::new(queued, running, cancelled)
169 }
170
171 fn is_shutdown(&self) -> bool {
173 self.state.is_shutdown()
174 }
175
176 fn is_terminated(&self) -> bool {
178 self.is_shutdown() && self.state.has_no_active_tasks()
179 }
180
181 fn await_termination(&self) -> Self::Termination<'_> {
188 Box::pin(async move {
189 loop {
190 let notified = self.state.notified();
191 if self.is_terminated() {
192 return;
193 }
194 notified.await;
195 }
196 })
197 }
198}