qubit_execution_services/execution_services.rs
1/*******************************************************************************
2 *
3 * Copyright (c) 2025 - 2026 Haixing Hu.
4 *
5 * SPDX-License-Identifier: Apache-2.0
6 *
7 * Licensed under the Apache License, Version 2.0.
8 *
9 ******************************************************************************/
10use std::{
11 future::Future,
12 pin::Pin,
13};
14
15use qubit_executor::TaskHandle;
16use qubit_function::{
17 Callable,
18 Runnable,
19};
20use qubit_thread_pool::{
21 ThreadPool,
22 ThreadPoolBuilder,
23};
24use qubit_tokio_executor::TokioExecutorService;
25
26use super::{
27 ExecutionServicesBuildError,
28 ExecutionServicesBuilder,
29 ExecutionServicesShutdownReport,
30 ExecutorService,
31 RayonExecutorService,
32 RayonTaskHandle,
33 RejectedExecution,
34 TokioIoExecutorService,
35 TokioTaskHandle,
36};
37
38/// Default managed service for synchronous tasks that may block an OS thread.
39pub type BlockingExecutorService = ThreadPool;
40
41/// Builder alias for configuring [`BlockingExecutorService`].
42pub type BlockingExecutorServiceBuilder = ThreadPoolBuilder;
43
44/// Tokio-backed blocking executor service routed through `spawn_blocking`.
45pub type TokioBlockingExecutorService = TokioExecutorService;
46
47/// Unified facade exposing separate execution domains through one owner.
48///
49/// The facade does not implement a single scheduling core. Instead it routes
50/// work to one of four dedicated execution domains:
51///
52/// - `blocking`: synchronous tasks that may block an OS thread.
53/// - `cpu`: CPU-bound synchronous tasks backed by Rayon.
54/// - `tokio_blocking`: blocking tasks routed through Tokio `spawn_blocking`.
55/// - `io`: async futures spawned on Tokio's async runtime.
56pub struct ExecutionServices {
57 /// Managed service for synchronous tasks that may block OS threads.
58 blocking: BlockingExecutorService,
59 /// Managed service for CPU-bound synchronous tasks.
60 cpu: RayonExecutorService,
61 /// Tokio-backed blocking service using `spawn_blocking`.
62 tokio_blocking: TokioBlockingExecutorService,
63 /// Tokio-backed async service for Future-based tasks.
64 io: TokioIoExecutorService,
65}
66
67impl ExecutionServices {
68 /// Creates an execution-services facade from its four execution domains.
69 ///
70 /// # Parameters
71 ///
72 /// * `blocking` - Blocking executor domain.
73 /// * `cpu` - CPU-bound executor domain.
74 /// * `tokio_blocking` - Tokio blocking executor domain.
75 /// * `io` - Tokio async IO executor domain.
76 ///
77 /// # Returns
78 ///
79 /// An execution-services facade owning all supplied domains.
80 pub(crate) fn from_parts(
81 blocking: BlockingExecutorService,
82 cpu: RayonExecutorService,
83 tokio_blocking: TokioBlockingExecutorService,
84 io: TokioIoExecutorService,
85 ) -> Self {
86 Self {
87 blocking,
88 cpu,
89 tokio_blocking,
90 io,
91 }
92 }
93
94 /// Creates an execution-services facade with default builder settings.
95 ///
96 /// # Returns
97 ///
98 /// `Ok(ExecutionServices)` if the default blocking and CPU domains build
99 /// successfully.
100 ///
101 /// # Errors
102 ///
103 /// Returns [`ExecutionServicesBuildError`] if the default builder
104 /// configuration is rejected.
105 #[inline]
106 pub fn new() -> Result<Self, ExecutionServicesBuildError> {
107 Self::builder().build()
108 }
109
110 /// Creates a builder for configuring the execution-services facade.
111 ///
112 /// # Returns
113 ///
114 /// A builder configured with CPU-parallelism defaults.
115 #[inline]
116 pub fn builder() -> ExecutionServicesBuilder {
117 ExecutionServicesBuilder::default()
118 }
119
120 /// Returns the blocking execution domain.
121 ///
122 /// # Returns
123 ///
124 /// A shared reference to the blocking executor service.
125 #[inline]
126 pub fn blocking(&self) -> &BlockingExecutorService {
127 &self.blocking
128 }
129
130 /// Returns the CPU execution domain.
131 ///
132 /// # Returns
133 ///
134 /// A shared reference to the Rayon-backed CPU executor service.
135 #[inline]
136 pub fn cpu(&self) -> &RayonExecutorService {
137 &self.cpu
138 }
139
140 /// Returns the Tokio blocking execution domain.
141 ///
142 /// # Returns
143 ///
144 /// A shared reference to the Tokio blocking executor service.
145 #[inline]
146 pub fn tokio_blocking(&self) -> &TokioBlockingExecutorService {
147 &self.tokio_blocking
148 }
149
150 /// Returns the Tokio async IO execution domain.
151 ///
152 /// # Returns
153 ///
154 /// A shared reference to the Tokio IO executor service.
155 #[inline]
156 pub fn io(&self) -> &TokioIoExecutorService {
157 &self.io
158 }
159
160 /// Submits a blocking runnable task to the blocking domain.
161 ///
162 /// # Parameters
163 ///
164 /// * `task` - Runnable task that may block an OS thread.
165 ///
166 /// # Returns
167 ///
168 /// A [`TaskHandle`] for the accepted blocking task.
169 ///
170 /// # Errors
171 ///
172 /// Returns [`RejectedExecution`] if the blocking domain refuses the task.
173 #[inline]
174 pub fn submit_blocking<T, E>(&self, task: T) -> Result<TaskHandle<(), E>, RejectedExecution>
175 where
176 T: Runnable<E> + Send + 'static,
177 E: Send + 'static,
178 {
179 self.blocking.submit(task)
180 }
181
182 /// Submits a blocking callable task to the blocking domain.
183 ///
184 /// # Parameters
185 ///
186 /// * `task` - Callable task that may block an OS thread.
187 ///
188 /// # Returns
189 ///
190 /// A [`TaskHandle`] for the accepted blocking task.
191 ///
192 /// # Errors
193 ///
194 /// Returns [`RejectedExecution`] if the blocking domain refuses the task.
195 #[inline]
196 pub fn submit_blocking_callable<C, R, E>(
197 &self,
198 task: C,
199 ) -> Result<TaskHandle<R, E>, RejectedExecution>
200 where
201 C: Callable<R, E> + Send + 'static,
202 R: Send + 'static,
203 E: Send + 'static,
204 {
205 self.blocking.submit_callable(task)
206 }
207
208 /// Submits a CPU-bound runnable task to the Rayon domain.
209 ///
210 /// # Parameters
211 ///
212 /// * `task` - Runnable CPU task.
213 ///
214 /// # Returns
215 ///
216 /// A [`RayonTaskHandle`] for the accepted CPU task.
217 ///
218 /// # Errors
219 ///
220 /// Returns [`RejectedExecution`] if the CPU domain refuses the task.
221 #[inline]
222 pub fn submit_cpu<T, E>(&self, task: T) -> Result<RayonTaskHandle<(), E>, RejectedExecution>
223 where
224 T: Runnable<E> + Send + 'static,
225 E: Send + 'static,
226 {
227 self.cpu.submit(task)
228 }
229
230 /// Submits a CPU-bound callable task to the Rayon domain.
231 ///
232 /// # Parameters
233 ///
234 /// * `task` - Callable CPU task.
235 ///
236 /// # Returns
237 ///
238 /// A [`RayonTaskHandle`] for the accepted CPU task.
239 ///
240 /// # Errors
241 ///
242 /// Returns [`RejectedExecution`] if the CPU domain refuses the task.
243 #[inline]
244 pub fn submit_cpu_callable<C, R, E>(
245 &self,
246 task: C,
247 ) -> Result<RayonTaskHandle<R, E>, RejectedExecution>
248 where
249 C: Callable<R, E> + Send + 'static,
250 R: Send + 'static,
251 E: Send + 'static,
252 {
253 self.cpu.submit_callable(task)
254 }
255
256 /// Submits a blocking runnable task to Tokio `spawn_blocking`.
257 ///
258 /// # Parameters
259 ///
260 /// * `task` - Runnable task to execute on Tokio's blocking pool.
261 ///
262 /// # Returns
263 ///
264 /// A [`TokioTaskHandle`] for the accepted blocking task.
265 ///
266 /// # Errors
267 ///
268 /// Returns [`RejectedExecution`] if the Tokio blocking domain refuses the
269 /// task.
270 #[inline]
271 pub fn submit_tokio_blocking<T, E>(
272 &self,
273 task: T,
274 ) -> Result<TokioTaskHandle<(), E>, RejectedExecution>
275 where
276 T: Runnable<E> + Send + 'static,
277 E: Send + 'static,
278 {
279 self.tokio_blocking.submit(task)
280 }
281
282 /// Submits a blocking callable task to Tokio `spawn_blocking`.
283 ///
284 /// # Parameters
285 ///
286 /// * `task` - Callable task to execute on Tokio's blocking pool.
287 ///
288 /// # Returns
289 ///
290 /// A [`TokioTaskHandle`] for the accepted blocking task.
291 ///
292 /// # Errors
293 ///
294 /// Returns [`RejectedExecution`] if the Tokio blocking domain refuses the
295 /// task.
296 #[inline]
297 pub fn submit_tokio_blocking_callable<C, R, E>(
298 &self,
299 task: C,
300 ) -> Result<TokioTaskHandle<R, E>, RejectedExecution>
301 where
302 C: Callable<R, E> + Send + 'static,
303 R: Send + 'static,
304 E: Send + 'static,
305 {
306 self.tokio_blocking.submit_callable(task)
307 }
308
309 /// Spawns an async IO or Future-based task on Tokio's async runtime.
310 ///
311 /// # Parameters
312 ///
313 /// * `future` - Future to execute on Tokio's async scheduler.
314 ///
315 /// # Returns
316 ///
317 /// A [`TokioTaskHandle`] for the accepted async task.
318 ///
319 /// # Errors
320 ///
321 /// Returns [`RejectedExecution`] if the Tokio IO domain refuses the task.
322 #[inline]
323 pub fn spawn_io<F, R, E>(&self, future: F) -> Result<TokioTaskHandle<R, E>, RejectedExecution>
324 where
325 F: Future<Output = Result<R, E>> + Send + 'static,
326 R: Send + 'static,
327 E: Send + 'static,
328 {
329 self.io.spawn(future)
330 }
331
332 /// Requests graceful shutdown for every execution domain.
333 pub fn shutdown(&self) {
334 self.blocking.shutdown();
335 self.cpu.shutdown();
336 self.tokio_blocking.shutdown();
337 self.io.shutdown();
338 }
339
340 /// Requests immediate shutdown for every execution domain.
341 ///
342 /// # Returns
343 ///
344 /// A per-domain aggregate report describing queued, running, and cancelled
345 /// work observed during shutdown.
346 pub fn shutdown_now(&self) -> ExecutionServicesShutdownReport {
347 ExecutionServicesShutdownReport {
348 blocking: self.blocking.shutdown_now(),
349 cpu: self.cpu.shutdown_now(),
350 tokio_blocking: self.tokio_blocking.shutdown_now(),
351 io: self.io.shutdown_now(),
352 }
353 }
354
355 /// Returns whether every execution domain has been shut down.
356 ///
357 /// # Returns
358 ///
359 /// `true` only if all execution domains no longer accept new tasks.
360 #[inline]
361 pub fn is_shutdown(&self) -> bool {
362 self.blocking.is_shutdown()
363 && self.cpu.is_shutdown()
364 && self.tokio_blocking.is_shutdown()
365 && self.io.is_shutdown()
366 }
367
368 /// Returns whether every execution domain has terminated.
369 ///
370 /// # Returns
371 ///
372 /// `true` only after all execution domains have terminated.
373 #[inline]
374 pub fn is_terminated(&self) -> bool {
375 self.blocking.is_terminated()
376 && self.cpu.is_terminated()
377 && self.tokio_blocking.is_terminated()
378 && self.io.is_terminated()
379 }
380
381 /// Waits until every execution domain has terminated.
382 ///
383 /// # Returns
384 ///
385 /// A future that resolves after all execution domains have terminated.
386 pub fn await_termination(&self) -> Pin<Box<dyn Future<Output = ()> + Send + '_>> {
387 Box::pin(async move {
388 self.blocking.await_termination().await;
389 self.cpu.await_termination().await;
390 self.tokio_blocking.await_termination().await;
391 self.io.await_termination().await;
392 })
393 }
394}