qubit_execution_services/execution_services.rs
1/*******************************************************************************
2 *
3 * Copyright (c) 2025 - 2026 Haixing Hu.
4 *
5 * SPDX-License-Identifier: Apache-2.0
6 *
7 * Licensed under the Apache License, Version 2.0.
8 *
9 ******************************************************************************/
10use std::{
11 future::Future,
12 pin::Pin,
13 sync::Arc,
14};
15
16use qubit_executor::{
17 TaskHandle,
18 TrackedTask,
19};
20use qubit_function::{
21 Callable,
22 Runnable,
23};
24use qubit_thread_pool::{
25 ThreadPool,
26 ThreadPoolBuilder,
27};
28use qubit_tokio_executor::TokioExecutorService;
29
30use super::{
31 ExecutionServicesBuildError,
32 ExecutionServicesBuilder,
33 ExecutionServicesStopReport,
34 ExecutorService,
35 ExecutorServiceLifecycle,
36 RayonExecutorService,
37 RayonTaskHandle,
38 SubmissionError,
39 TokioIoExecutorService,
40 TokioTaskHandle,
41};
42
43/// Default managed service for synchronous tasks that may block an OS thread.
44pub type BlockingExecutorService = ThreadPool;
45
46/// Builder alias for configuring [`BlockingExecutorService`].
47pub type BlockingExecutorServiceBuilder = ThreadPoolBuilder;
48
49/// Tokio-backed blocking executor service routed through `spawn_blocking`.
50pub type TokioBlockingExecutorService = TokioExecutorService;
51
52/// Unified facade exposing separate execution domains through one owner.
53///
54/// The facade does not implement a single scheduling core. Instead it routes
55/// work to one of four dedicated execution domains:
56///
57/// - `blocking`: synchronous tasks that may block an OS thread.
58/// - `cpu`: CPU-bound synchronous tasks backed by Rayon.
59/// - `tokio_blocking`: blocking tasks routed through Tokio `spawn_blocking`.
60/// - `io`: async futures spawned on Tokio's async runtime.
61pub struct ExecutionServices {
62 /// Managed service for synchronous tasks that may block OS threads.
63 blocking: Arc<BlockingExecutorService>,
64 /// Managed service for CPU-bound synchronous tasks.
65 cpu: RayonExecutorService,
66 /// Tokio-backed blocking service using `spawn_blocking`.
67 tokio_blocking: TokioBlockingExecutorService,
68 /// Tokio-backed async service for Future-based tasks.
69 io: TokioIoExecutorService,
70}
71
72impl ExecutionServices {
73 /// Creates an execution-services facade from its four execution domains.
74 ///
75 /// # Parameters
76 ///
77 /// * `blocking` - Blocking executor domain.
78 /// * `cpu` - CPU-bound executor domain.
79 /// * `tokio_blocking` - Tokio blocking executor domain.
80 /// * `io` - Tokio async IO executor domain.
81 ///
82 /// # Returns
83 ///
84 /// An execution-services facade owning all supplied domains.
85 pub(crate) fn from_parts(
86 blocking: BlockingExecutorService,
87 cpu: RayonExecutorService,
88 tokio_blocking: TokioBlockingExecutorService,
89 io: TokioIoExecutorService,
90 ) -> Self {
91 Self {
92 blocking: Arc::new(blocking),
93 cpu,
94 tokio_blocking,
95 io,
96 }
97 }
98
99 /// Creates an execution-services facade with default builder settings.
100 ///
101 /// # Returns
102 ///
103 /// `Ok(ExecutionServices)` if the default blocking and CPU domains build
104 /// successfully.
105 ///
106 /// # Errors
107 ///
108 /// Returns [`ExecutionServicesBuildError`] if the default builder
109 /// configuration is rejected.
110 #[inline]
111 pub fn new() -> Result<Self, ExecutionServicesBuildError> {
112 Self::builder().build()
113 }
114
115 /// Creates a builder for configuring the execution-services facade.
116 ///
117 /// # Returns
118 ///
119 /// A builder configured with CPU-parallelism defaults.
120 #[inline]
121 pub fn builder() -> ExecutionServicesBuilder {
122 ExecutionServicesBuilder::default()
123 }
124
125 /// Returns the blocking execution domain.
126 ///
127 /// # Returns
128 ///
129 /// A shared reference to the blocking executor service.
130 #[inline]
131 pub fn blocking(&self) -> &BlockingExecutorService {
132 self.blocking.as_ref()
133 }
134
135 /// Returns the CPU execution domain.
136 ///
137 /// # Returns
138 ///
139 /// A shared reference to the Rayon-backed CPU executor service.
140 #[inline]
141 pub fn cpu(&self) -> &RayonExecutorService {
142 &self.cpu
143 }
144
145 /// Returns the Tokio blocking execution domain.
146 ///
147 /// # Returns
148 ///
149 /// A shared reference to the Tokio blocking executor service.
150 #[inline]
151 pub fn tokio_blocking(&self) -> &TokioBlockingExecutorService {
152 &self.tokio_blocking
153 }
154
155 /// Returns the Tokio async IO execution domain.
156 ///
157 /// # Returns
158 ///
159 /// A shared reference to the Tokio IO executor service.
160 #[inline]
161 pub fn io(&self) -> &TokioIoExecutorService {
162 &self.io
163 }
164
165 /// Submits a blocking runnable task to the blocking domain.
166 ///
167 /// # Parameters
168 ///
169 /// * `task` - Runnable task that may block an OS thread.
170 ///
171 /// # Returns
172 ///
173 /// `Ok(())` if the blocking domain accepts the task.
174 ///
175 /// # Errors
176 ///
177 /// Returns [`SubmissionError`] if the blocking domain refuses the task.
178 #[inline]
179 pub fn submit_blocking<T, E>(&self, task: T) -> Result<(), SubmissionError>
180 where
181 T: Runnable<E> + Send + 'static,
182 E: Send + 'static,
183 {
184 self.blocking.submit(task)
185 }
186
187 /// Submits a blocking runnable task and returns a tracked handle.
188 ///
189 /// # Parameters
190 ///
191 /// * `task` - Runnable task that may block an OS thread.
192 ///
193 /// # Returns
194 ///
195 /// A [`TrackedTask`] for the accepted blocking task.
196 ///
197 /// # Errors
198 ///
199 /// Returns [`SubmissionError`] if the blocking domain refuses the task.
200 #[inline]
201 pub fn submit_tracked_blocking<T, E>(
202 &self,
203 task: T,
204 ) -> Result<TrackedTask<(), E>, SubmissionError>
205 where
206 T: Runnable<E> + Send + 'static,
207 E: Send + 'static,
208 {
209 self.blocking.submit_tracked(task)
210 }
211
212 /// Submits a blocking callable task to the blocking domain.
213 ///
214 /// # Parameters
215 ///
216 /// * `task` - Callable task that may block an OS thread.
217 ///
218 /// # Returns
219 ///
220 /// A [`TaskHandle`] for the accepted blocking task.
221 ///
222 /// # Errors
223 ///
224 /// Returns [`SubmissionError`] if the blocking domain refuses the task.
225 #[inline]
226 pub fn submit_blocking_callable<C, R, E>(
227 &self,
228 task: C,
229 ) -> Result<TaskHandle<R, E>, SubmissionError>
230 where
231 C: Callable<R, E> + Send + 'static,
232 R: Send + 'static,
233 E: Send + 'static,
234 {
235 self.blocking.submit_callable(task)
236 }
237
238 /// Submits a blocking callable task and returns a tracked handle.
239 ///
240 /// # Parameters
241 ///
242 /// * `task` - Callable task that may block an OS thread.
243 ///
244 /// # Returns
245 ///
246 /// A [`TrackedTask`] for the accepted blocking task.
247 ///
248 /// # Errors
249 ///
250 /// Returns [`SubmissionError`] if the blocking domain refuses the task.
251 #[inline]
252 pub fn submit_tracked_blocking_callable<C, R, E>(
253 &self,
254 task: C,
255 ) -> Result<TrackedTask<R, E>, SubmissionError>
256 where
257 C: Callable<R, E> + Send + 'static,
258 R: Send + 'static,
259 E: Send + 'static,
260 {
261 self.blocking.submit_tracked_callable(task)
262 }
263
264 /// Submits a CPU-bound runnable task to the Rayon domain.
265 ///
266 /// # Parameters
267 ///
268 /// * `task` - Runnable CPU task.
269 ///
270 /// # Returns
271 ///
272 /// `Ok(())` if the CPU domain accepts the task.
273 ///
274 /// # Errors
275 ///
276 /// Returns [`SubmissionError`] if the CPU domain refuses the task.
277 #[inline]
278 pub fn submit_cpu<T, E>(&self, task: T) -> Result<(), SubmissionError>
279 where
280 T: Runnable<E> + Send + 'static,
281 E: Send + 'static,
282 {
283 self.cpu.submit(task)
284 }
285
286 /// Submits a CPU-bound runnable task and returns a tracked handle.
287 ///
288 /// # Parameters
289 ///
290 /// * `task` - Runnable CPU task.
291 ///
292 /// # Returns
293 ///
294 /// A [`RayonTaskHandle`] for the accepted CPU task.
295 ///
296 /// # Errors
297 ///
298 /// Returns [`SubmissionError`] if the CPU domain refuses the task.
299 #[inline]
300 pub fn submit_tracked_cpu<T, E>(
301 &self,
302 task: T,
303 ) -> Result<RayonTaskHandle<(), E>, SubmissionError>
304 where
305 T: Runnable<E> + Send + 'static,
306 E: Send + 'static,
307 {
308 self.cpu.submit_tracked(task)
309 }
310
311 /// Submits a CPU-bound callable task to the Rayon domain.
312 ///
313 /// # Parameters
314 ///
315 /// * `task` - Callable CPU task.
316 ///
317 /// # Returns
318 ///
319 /// A [`TaskHandle`] for the accepted CPU task.
320 ///
321 /// # Errors
322 ///
323 /// Returns [`SubmissionError`] if the CPU domain refuses the task.
324 #[inline]
325 pub fn submit_cpu_callable<C, R, E>(&self, task: C) -> Result<TaskHandle<R, E>, SubmissionError>
326 where
327 C: Callable<R, E> + Send + 'static,
328 R: Send + 'static,
329 E: Send + 'static,
330 {
331 self.cpu.submit_callable(task)
332 }
333
334 /// Submits a CPU-bound callable task and returns a tracked handle.
335 ///
336 /// # Parameters
337 ///
338 /// * `task` - Callable CPU task.
339 ///
340 /// # Returns
341 ///
342 /// A [`RayonTaskHandle`] for the accepted CPU task.
343 ///
344 /// # Errors
345 ///
346 /// Returns [`SubmissionError`] if the CPU domain refuses the task.
347 #[inline]
348 pub fn submit_tracked_cpu_callable<C, R, E>(
349 &self,
350 task: C,
351 ) -> Result<RayonTaskHandle<R, E>, SubmissionError>
352 where
353 C: Callable<R, E> + Send + 'static,
354 R: Send + 'static,
355 E: Send + 'static,
356 {
357 self.cpu.submit_tracked_callable(task)
358 }
359
360 /// Submits a blocking runnable task to Tokio `spawn_blocking`.
361 ///
362 /// # Parameters
363 ///
364 /// * `task` - Runnable task to execute on Tokio's blocking pool.
365 ///
366 /// # Returns
367 ///
368 /// `Ok(())` if the Tokio blocking domain accepts the task.
369 ///
370 /// # Errors
371 ///
372 /// Returns [`SubmissionError`] if the Tokio blocking domain refuses the
373 /// task.
374 #[inline]
375 pub fn submit_tokio_blocking<T, E>(&self, task: T) -> Result<(), SubmissionError>
376 where
377 T: Runnable<E> + Send + 'static,
378 E: Send + 'static,
379 {
380 self.tokio_blocking.submit(task)
381 }
382
383 /// Submits a blocking runnable task to Tokio and returns a tracked handle.
384 ///
385 /// # Parameters
386 ///
387 /// * `task` - Runnable task to execute on Tokio's blocking pool.
388 ///
389 /// # Returns
390 ///
391 /// A [`TrackedTask`] for the accepted blocking task.
392 ///
393 /// # Errors
394 ///
395 /// Returns [`SubmissionError`] if the Tokio blocking domain refuses the
396 /// task.
397 #[inline]
398 pub fn submit_tracked_tokio_blocking<T, E>(
399 &self,
400 task: T,
401 ) -> Result<TrackedTask<(), E>, SubmissionError>
402 where
403 T: Runnable<E> + Send + 'static,
404 E: Send + 'static,
405 {
406 self.tokio_blocking.submit_tracked(task)
407 }
408
409 /// Submits a blocking callable task to Tokio `spawn_blocking`.
410 ///
411 /// # Parameters
412 ///
413 /// * `task` - Callable task to execute on Tokio's blocking pool.
414 ///
415 /// # Returns
416 ///
417 /// A [`TaskHandle`] for the accepted blocking task.
418 ///
419 /// # Errors
420 ///
421 /// Returns [`SubmissionError`] if the Tokio blocking domain refuses the
422 /// task.
423 #[inline]
424 pub fn submit_tokio_blocking_callable<C, R, E>(
425 &self,
426 task: C,
427 ) -> Result<TaskHandle<R, E>, SubmissionError>
428 where
429 C: Callable<R, E> + Send + 'static,
430 R: Send + 'static,
431 E: Send + 'static,
432 {
433 self.tokio_blocking.submit_callable(task)
434 }
435
436 /// Submits a blocking callable task to Tokio and returns a tracked handle.
437 ///
438 /// # Parameters
439 ///
440 /// * `task` - Callable task to execute on Tokio's blocking pool.
441 ///
442 /// # Returns
443 ///
444 /// A [`TrackedTask`] for the accepted blocking task.
445 ///
446 /// # Errors
447 ///
448 /// Returns [`SubmissionError`] if the Tokio blocking domain refuses the
449 /// task.
450 #[inline]
451 pub fn submit_tracked_tokio_blocking_callable<C, R, E>(
452 &self,
453 task: C,
454 ) -> Result<TrackedTask<R, E>, SubmissionError>
455 where
456 C: Callable<R, E> + Send + 'static,
457 R: Send + 'static,
458 E: Send + 'static,
459 {
460 self.tokio_blocking.submit_tracked_callable(task)
461 }
462
463 /// Spawns an async IO or Future-based task on Tokio's async runtime.
464 ///
465 /// # Parameters
466 ///
467 /// * `future` - Future to execute on Tokio's async scheduler.
468 ///
469 /// # Returns
470 ///
471 /// A [`TokioTaskHandle`] for the accepted async task.
472 ///
473 /// # Errors
474 ///
475 /// Returns [`SubmissionError`] if the Tokio IO domain refuses the task.
476 #[inline]
477 pub fn spawn_io<F, R, E>(&self, future: F) -> Result<TokioTaskHandle<R, E>, SubmissionError>
478 where
479 F: Future<Output = Result<R, E>> + Send + 'static,
480 R: Send + 'static,
481 E: Send + 'static,
482 {
483 self.io.spawn(future)
484 }
485
486 /// Requests graceful shutdown for every execution domain.
487 pub fn shutdown(&self) {
488 self.blocking.shutdown();
489 self.cpu.shutdown();
490 self.tokio_blocking.shutdown();
491 self.io.shutdown();
492 }
493
494 /// Requests abrupt stop for every execution domain.
495 ///
496 /// # Returns
497 ///
498 /// A per-domain aggregate report describing queued, running, and cancelled
499 /// work observed during shutdown.
500 pub fn stop(&self) -> ExecutionServicesStopReport {
501 ExecutionServicesStopReport {
502 blocking: self.blocking.stop(),
503 cpu: self.cpu.stop(),
504 tokio_blocking: self.tokio_blocking.stop(),
505 io: self.io.stop(),
506 }
507 }
508
509 /// Returns the aggregate lifecycle state.
510 ///
511 /// # Returns
512 ///
513 /// [`ExecutorServiceLifecycle::Terminated`] if all domains have
514 /// terminated; [`ExecutorServiceLifecycle::Stopping`] if any domain is
515 /// stopping; [`ExecutorServiceLifecycle::ShuttingDown`] if any domain is no
516 /// longer running; otherwise [`ExecutorServiceLifecycle::Running`].
517 pub fn lifecycle(&self) -> ExecutorServiceLifecycle {
518 let lifecycles = [
519 self.blocking.lifecycle(),
520 self.cpu.lifecycle(),
521 self.tokio_blocking.lifecycle(),
522 self.io.lifecycle(),
523 ];
524 if lifecycles
525 .iter()
526 .all(|state| *state == ExecutorServiceLifecycle::Terminated)
527 {
528 ExecutorServiceLifecycle::Terminated
529 } else if lifecycles.contains(&ExecutorServiceLifecycle::Stopping) {
530 ExecutorServiceLifecycle::Stopping
531 } else if lifecycles
532 .iter()
533 .any(|state| *state != ExecutorServiceLifecycle::Running)
534 {
535 ExecutorServiceLifecycle::ShuttingDown
536 } else {
537 ExecutorServiceLifecycle::Running
538 }
539 }
540
541 /// Returns whether every execution domain is running.
542 ///
543 /// # Returns
544 ///
545 /// `true` only if all execution domains are running.
546 #[inline]
547 pub fn is_running(&self) -> bool {
548 self.lifecycle() == ExecutorServiceLifecycle::Running
549 }
550
551 /// Returns whether any execution domain is gracefully shutting down.
552 ///
553 /// # Returns
554 ///
555 /// `true` when the aggregate lifecycle is
556 /// [`ExecutorServiceLifecycle::ShuttingDown`].
557 #[inline]
558 pub fn is_shutting_down(&self) -> bool {
559 self.lifecycle() == ExecutorServiceLifecycle::ShuttingDown
560 }
561
562 /// Returns whether any execution domain is stopping abruptly.
563 ///
564 /// # Returns
565 ///
566 /// `true` when the aggregate lifecycle is
567 /// [`ExecutorServiceLifecycle::Stopping`].
568 #[inline]
569 pub fn is_stopping(&self) -> bool {
570 self.lifecycle() == ExecutorServiceLifecycle::Stopping
571 }
572
573 /// Returns whether the facade is no longer fully running.
574 ///
575 /// # Returns
576 ///
577 /// `true` after any execution domain starts shutdown, stop, or has already
578 /// terminated.
579 #[inline]
580 pub fn is_not_running(&self) -> bool {
581 self.lifecycle() != ExecutorServiceLifecycle::Running
582 }
583
584 /// Returns whether every execution domain has terminated.
585 ///
586 /// # Returns
587 ///
588 /// `true` only after all execution domains have terminated.
589 #[inline]
590 pub fn is_terminated(&self) -> bool {
591 self.lifecycle() == ExecutorServiceLifecycle::Terminated
592 }
593
594 /// Waits until every execution domain has terminated.
595 ///
596 /// # Returns
597 ///
598 /// A future that resolves after all execution domains have terminated.
599 pub fn await_termination(&self) -> Pin<Box<dyn Future<Output = ()> + Send + '_>> {
600 Box::pin(async move {
601 let blocking = Arc::clone(&self.blocking);
602 let cpu = self.cpu.clone();
603 let tokio_blocking = self.tokio_blocking.clone();
604 let blocking_wait = tokio::task::spawn_blocking(move || blocking.wait_termination());
605 let cpu_wait = tokio::task::spawn_blocking(move || cpu.wait_termination());
606 let tokio_blocking_wait =
607 tokio::task::spawn_blocking(move || tokio_blocking.wait_termination());
608 self.io.await_termination().await;
609 let _ = blocking_wait.await;
610 let _ = cpu_wait.await;
611 let _ = tokio_blocking_wait.await;
612 })
613 }
614}