qubit_rayon_executor/
rayon_executor_service.rs1use std::{
10 future::Future,
11 pin::Pin,
12 sync::Arc,
13};
14
15use qubit_function::Callable;
16use rayon::ThreadPool as RayonThreadPool;
17
18use qubit_executor::{
19 TaskCompletionPair,
20 TaskRunner,
21};
22
23use qubit_executor::service::{
24 ExecutorService,
25 RejectedExecution,
26 ShutdownReport,
27};
28
29use crate::{
30 pending_cancel::PendingCancel,
31 rayon_executor_service_build_error::RayonExecutorServiceBuildError,
32 rayon_executor_service_builder::RayonExecutorServiceBuilder,
33 rayon_executor_service_state::RayonExecutorServiceState,
34 rayon_task_handle::RayonTaskHandle,
35};
36
37#[derive(Clone)]
43pub struct RayonExecutorService {
44 pub(crate) pool: Arc<RayonThreadPool>,
46 pub(crate) state: Arc<RayonExecutorServiceState>,
48}
49
50impl RayonExecutorService {
51 #[inline]
63 pub fn new() -> Result<Self, RayonExecutorServiceBuildError> {
64 Self::builder().build()
65 }
66
67 #[inline]
73 pub fn builder() -> RayonExecutorServiceBuilder {
74 RayonExecutorServiceBuilder::default()
75 }
76}
77
78impl ExecutorService for RayonExecutorService {
79 type Handle<R, E>
80 = RayonTaskHandle<R, E>
81 where
82 R: Send + 'static,
83 E: Send + 'static;
84
85 type Termination<'a>
86 = Pin<Box<dyn Future<Output = ()> + Send + 'a>>
87 where
88 Self: 'a;
89
90 fn submit_callable<C, R, E>(&self, task: C) -> Result<Self::Handle<R, E>, RejectedExecution>
105 where
106 C: Callable<R, E> + Send + 'static,
107 R: Send + 'static,
108 E: Send + 'static,
109 {
110 let submission_guard = self.state.lock_submission();
111 if self.state.is_shutdown() {
112 return Err(RejectedExecution::Shutdown);
113 }
114 let task_id = self.state.next_task_id();
115 self.state.on_task_accepted();
116 let (handle, completion) = TaskCompletionPair::new().into_parts();
117 let completion_for_cancel = completion.clone();
118 let cancel: PendingCancel = Arc::new(move || completion_for_cancel.cancel());
119 self.state
120 .register_pending_task(task_id, Arc::clone(&cancel));
121 drop(submission_guard);
122
123 let completion_for_run = completion;
124 let state_for_run = Arc::clone(&self.state);
125 self.pool.spawn_fifo(move || {
126 if !state_for_run.start_pending_task(task_id, || completion_for_run.start()) {
127 return;
128 }
129 completion_for_run.complete(TaskRunner::new(task).call());
130 state_for_run.on_task_completed();
131 });
132 Ok(RayonTaskHandle::new(
133 handle,
134 task_id,
135 Arc::clone(&self.state),
136 cancel,
137 ))
138 }
139
140 fn shutdown(&self) {
144 let _guard = self.state.lock_submission();
145 self.state.shutdown();
146 self.state.notify_if_terminated();
147 }
148
149 fn shutdown_now(&self) -> ShutdownReport {
161 let _guard = self.state.lock_submission();
162 self.state.shutdown();
163 let (queued, running, pending) = self.state.drain_pending_tasks_for_shutdown();
164 drop(_guard);
165
166 let cancelled = self.state.cancel_drained_pending_tasks(pending);
167 ShutdownReport::new(queued, running, cancelled)
168 }
169
170 fn is_shutdown(&self) -> bool {
172 self.state.is_shutdown()
173 }
174
175 fn is_terminated(&self) -> bool {
177 self.is_shutdown() && self.state.has_no_active_tasks()
178 }
179
180 fn await_termination(&self) -> Self::Termination<'_> {
187 Box::pin(async move {
188 loop {
189 let notified = self.state.notified();
190 if self.is_terminated() {
191 return;
192 }
193 notified.await;
194 }
195 })
196 }
197}