qubit_execution_services/execution_services.rs
1/*******************************************************************************
2 *
3 * Copyright (c) 2025 - 2026 Haixing Hu.
4 *
5 * SPDX-License-Identifier: Apache-2.0
6 *
7 * Licensed under the Apache License, Version 2.0.
8 *
9 ******************************************************************************/
10use std::{
11 future::Future,
12 pin::Pin,
13 sync::Arc,
14};
15
16use qubit_executor::{
17 TaskHandle,
18 TrackedTask,
19};
20use qubit_function::{
21 Callable,
22 Runnable,
23};
24use qubit_thread_pool::{
25 ThreadPool,
26 ThreadPoolBuilder,
27};
28use qubit_tokio_executor::TokioExecutorService;
29
30use super::{
31 ExecutionServicesBuildError,
32 ExecutionServicesBuilder,
33 ExecutionServicesStopReport,
34 ExecutorService,
35 ExecutorServiceLifecycle,
36 RayonExecutorService,
37 RayonTaskHandle,
38 SubmissionError,
39 TokioBlockingTaskHandle,
40 TokioIoExecutorService,
41 TokioTaskHandle,
42};
43
44/// Default managed service for synchronous tasks that may block an OS thread.
45pub type BlockingExecutorService = ThreadPool;
46
47/// Builder alias for configuring [`BlockingExecutorService`].
48pub type BlockingExecutorServiceBuilder = ThreadPoolBuilder;
49
50/// Tokio-backed blocking executor service routed through `spawn_blocking`.
51pub type TokioBlockingExecutorService = TokioExecutorService;
52
53/// Unified facade exposing separate execution domains through one owner.
54///
55/// The facade does not implement a single scheduling core. Instead it routes
56/// work to one of four dedicated execution domains:
57///
58/// - `blocking`: synchronous tasks that may block an OS thread.
59/// - `cpu`: CPU-bound synchronous tasks backed by Rayon.
60/// - `tokio_blocking`: blocking tasks routed through Tokio `spawn_blocking`.
61/// - `io`: async futures spawned on Tokio's async runtime.
62pub struct ExecutionServices {
63 /// Managed service for synchronous tasks that may block OS threads.
64 blocking: Arc<BlockingExecutorService>,
65 /// Managed service for CPU-bound synchronous tasks.
66 cpu: RayonExecutorService,
67 /// Tokio-backed blocking service using `spawn_blocking`.
68 tokio_blocking: TokioBlockingExecutorService,
69 /// Tokio-backed async service for Future-based tasks.
70 io: TokioIoExecutorService,
71}
72
73impl ExecutionServices {
74 /// Creates an execution-services facade from its four execution domains.
75 ///
76 /// # Parameters
77 ///
78 /// * `blocking` - Blocking executor domain.
79 /// * `cpu` - CPU-bound executor domain.
80 /// * `tokio_blocking` - Tokio blocking executor domain.
81 /// * `io` - Tokio async IO executor domain.
82 ///
83 /// # Returns
84 ///
85 /// An execution-services facade owning all supplied domains.
86 pub(crate) fn from_parts(
87 blocking: BlockingExecutorService,
88 cpu: RayonExecutorService,
89 tokio_blocking: TokioBlockingExecutorService,
90 io: TokioIoExecutorService,
91 ) -> Self {
92 Self {
93 blocking: Arc::new(blocking),
94 cpu,
95 tokio_blocking,
96 io,
97 }
98 }
99
100 /// Creates an execution-services facade with default builder settings.
101 ///
102 /// # Returns
103 ///
104 /// `Ok(ExecutionServices)` if the default blocking and CPU domains build
105 /// successfully.
106 ///
107 /// # Errors
108 ///
109 /// Returns [`ExecutionServicesBuildError`] if the default builder
110 /// configuration is rejected.
111 #[inline]
112 pub fn new() -> Result<Self, ExecutionServicesBuildError> {
113 Self::builder().build()
114 }
115
116 /// Creates a builder for configuring the execution-services facade.
117 ///
118 /// # Returns
119 ///
120 /// A builder configured with CPU-parallelism defaults.
121 #[inline]
122 pub fn builder() -> ExecutionServicesBuilder {
123 ExecutionServicesBuilder::default()
124 }
125
126 /// Returns the blocking execution domain.
127 ///
128 /// # Returns
129 ///
130 /// A shared reference to the blocking executor service.
131 #[inline]
132 pub fn blocking(&self) -> &BlockingExecutorService {
133 self.blocking.as_ref()
134 }
135
136 /// Returns the CPU execution domain.
137 ///
138 /// # Returns
139 ///
140 /// A shared reference to the Rayon-backed CPU executor service.
141 #[inline]
142 pub fn cpu(&self) -> &RayonExecutorService {
143 &self.cpu
144 }
145
146 /// Returns the Tokio blocking execution domain.
147 ///
148 /// # Returns
149 ///
150 /// A shared reference to the Tokio blocking executor service.
151 #[inline]
152 pub fn tokio_blocking(&self) -> &TokioBlockingExecutorService {
153 &self.tokio_blocking
154 }
155
156 /// Returns the Tokio async IO execution domain.
157 ///
158 /// # Returns
159 ///
160 /// A shared reference to the Tokio IO executor service.
161 #[inline]
162 pub fn io(&self) -> &TokioIoExecutorService {
163 &self.io
164 }
165
166 /// Submits a blocking runnable task to the blocking domain.
167 ///
168 /// # Parameters
169 ///
170 /// * `task` - Runnable task that may block an OS thread.
171 ///
172 /// # Returns
173 ///
174 /// `Ok(())` if the blocking domain accepts the task.
175 ///
176 /// # Errors
177 ///
178 /// Returns [`SubmissionError`] if the blocking domain refuses the task.
179 #[inline]
180 pub fn submit_blocking<T, E>(&self, task: T) -> Result<(), SubmissionError>
181 where
182 T: Runnable<E> + Send + 'static,
183 E: Send + 'static,
184 {
185 self.blocking.submit(task)
186 }
187
188 /// Submits a blocking runnable task and returns a tracked handle.
189 ///
190 /// # Parameters
191 ///
192 /// * `task` - Runnable task that may block an OS thread.
193 ///
194 /// # Returns
195 ///
196 /// A [`TrackedTask`] for the accepted blocking task.
197 ///
198 /// # Errors
199 ///
200 /// Returns [`SubmissionError`] if the blocking domain refuses the task.
201 #[inline]
202 pub fn submit_tracked_blocking<T, E>(
203 &self,
204 task: T,
205 ) -> Result<TrackedTask<(), E>, SubmissionError>
206 where
207 T: Runnable<E> + Send + 'static,
208 E: Send + 'static,
209 {
210 self.blocking.submit_tracked(task)
211 }
212
213 /// Submits a blocking callable task to the blocking domain.
214 ///
215 /// # Parameters
216 ///
217 /// * `task` - Callable task that may block an OS thread.
218 ///
219 /// # Returns
220 ///
221 /// A [`TaskHandle`] for the accepted blocking task.
222 ///
223 /// # Errors
224 ///
225 /// Returns [`SubmissionError`] if the blocking domain refuses the task.
226 #[inline]
227 pub fn submit_blocking_callable<C, R, E>(
228 &self,
229 task: C,
230 ) -> Result<TaskHandle<R, E>, SubmissionError>
231 where
232 C: Callable<R, E> + Send + 'static,
233 R: Send + 'static,
234 E: Send + 'static,
235 {
236 self.blocking.submit_callable(task)
237 }
238
239 /// Submits a blocking callable task and returns a tracked handle.
240 ///
241 /// # Parameters
242 ///
243 /// * `task` - Callable task that may block an OS thread.
244 ///
245 /// # Returns
246 ///
247 /// A [`TrackedTask`] for the accepted blocking task.
248 ///
249 /// # Errors
250 ///
251 /// Returns [`SubmissionError`] if the blocking domain refuses the task.
252 #[inline]
253 pub fn submit_tracked_blocking_callable<C, R, E>(
254 &self,
255 task: C,
256 ) -> Result<TrackedTask<R, E>, SubmissionError>
257 where
258 C: Callable<R, E> + Send + 'static,
259 R: Send + 'static,
260 E: Send + 'static,
261 {
262 self.blocking.submit_tracked_callable(task)
263 }
264
265 /// Submits a CPU-bound runnable task to the Rayon domain.
266 ///
267 /// # Parameters
268 ///
269 /// * `task` - Runnable CPU task.
270 ///
271 /// # Returns
272 ///
273 /// `Ok(())` if the CPU domain accepts the task.
274 ///
275 /// # Errors
276 ///
277 /// Returns [`SubmissionError`] if the CPU domain refuses the task.
278 #[inline]
279 pub fn submit_cpu<T, E>(&self, task: T) -> Result<(), SubmissionError>
280 where
281 T: Runnable<E> + Send + 'static,
282 E: Send + 'static,
283 {
284 self.cpu.submit(task)
285 }
286
287 /// Submits a CPU-bound runnable task and returns a tracked handle.
288 ///
289 /// # Parameters
290 ///
291 /// * `task` - Runnable CPU task.
292 ///
293 /// # Returns
294 ///
295 /// A [`RayonTaskHandle`] for the accepted CPU task.
296 ///
297 /// # Errors
298 ///
299 /// Returns [`SubmissionError`] if the CPU domain refuses the task.
300 #[inline]
301 pub fn submit_tracked_cpu<T, E>(
302 &self,
303 task: T,
304 ) -> Result<RayonTaskHandle<(), E>, SubmissionError>
305 where
306 T: Runnable<E> + Send + 'static,
307 E: Send + 'static,
308 {
309 self.cpu.submit_tracked(task)
310 }
311
312 /// Submits a CPU-bound callable task to the Rayon domain.
313 ///
314 /// # Parameters
315 ///
316 /// * `task` - Callable CPU task.
317 ///
318 /// # Returns
319 ///
320 /// A [`TaskHandle`] for the accepted CPU task.
321 ///
322 /// # Errors
323 ///
324 /// Returns [`SubmissionError`] if the CPU domain refuses the task.
325 #[inline]
326 pub fn submit_cpu_callable<C, R, E>(&self, task: C) -> Result<TaskHandle<R, E>, SubmissionError>
327 where
328 C: Callable<R, E> + Send + 'static,
329 R: Send + 'static,
330 E: Send + 'static,
331 {
332 self.cpu.submit_callable(task)
333 }
334
335 /// Submits a CPU-bound callable task and returns a tracked handle.
336 ///
337 /// # Parameters
338 ///
339 /// * `task` - Callable CPU task.
340 ///
341 /// # Returns
342 ///
343 /// A [`RayonTaskHandle`] for the accepted CPU task.
344 ///
345 /// # Errors
346 ///
347 /// Returns [`SubmissionError`] if the CPU domain refuses the task.
348 #[inline]
349 pub fn submit_tracked_cpu_callable<C, R, E>(
350 &self,
351 task: C,
352 ) -> Result<RayonTaskHandle<R, E>, SubmissionError>
353 where
354 C: Callable<R, E> + Send + 'static,
355 R: Send + 'static,
356 E: Send + 'static,
357 {
358 self.cpu.submit_tracked_callable(task)
359 }
360
361 /// Submits a blocking runnable task to Tokio `spawn_blocking`.
362 ///
363 /// # Parameters
364 ///
365 /// * `task` - Runnable task to execute on Tokio's blocking pool.
366 ///
367 /// # Returns
368 ///
369 /// `Ok(())` if the Tokio blocking domain accepts the task.
370 ///
371 /// # Errors
372 ///
373 /// Returns [`SubmissionError`] if the Tokio blocking domain refuses the
374 /// task.
375 #[inline]
376 pub fn submit_tokio_blocking<T, E>(&self, task: T) -> Result<(), SubmissionError>
377 where
378 T: Runnable<E> + Send + 'static,
379 E: Send + 'static,
380 {
381 self.tokio_blocking.submit(task)
382 }
383
384 /// Submits a blocking runnable task to Tokio and returns a tracked handle.
385 ///
386 /// # Parameters
387 ///
388 /// * `task` - Runnable task to execute on Tokio's blocking pool.
389 ///
390 /// # Returns
391 ///
392 /// A [`TokioBlockingTaskHandle`] for the accepted blocking task.
393 ///
394 /// # Errors
395 ///
396 /// Returns [`SubmissionError`] if the Tokio blocking domain refuses the
397 /// task.
398 #[inline]
399 pub fn submit_tracked_tokio_blocking<T, E>(
400 &self,
401 task: T,
402 ) -> Result<TokioBlockingTaskHandle<(), E>, SubmissionError>
403 where
404 T: Runnable<E> + Send + 'static,
405 E: Send + 'static,
406 {
407 self.tokio_blocking.submit_tracked(task)
408 }
409
410 /// Submits a blocking callable task to Tokio `spawn_blocking`.
411 ///
412 /// # Parameters
413 ///
414 /// * `task` - Callable task to execute on Tokio's blocking pool.
415 ///
416 /// # Returns
417 ///
418 /// A [`TaskHandle`] for the accepted blocking task.
419 ///
420 /// # Errors
421 ///
422 /// Returns [`SubmissionError`] if the Tokio blocking domain refuses the
423 /// task.
424 #[inline]
425 pub fn submit_tokio_blocking_callable<C, R, E>(
426 &self,
427 task: C,
428 ) -> Result<TaskHandle<R, E>, SubmissionError>
429 where
430 C: Callable<R, E> + Send + 'static,
431 R: Send + 'static,
432 E: Send + 'static,
433 {
434 self.tokio_blocking.submit_callable(task)
435 }
436
437 /// Submits a blocking callable task to Tokio and returns a tracked handle.
438 ///
439 /// # Parameters
440 ///
441 /// * `task` - Callable task to execute on Tokio's blocking pool.
442 ///
443 /// # Returns
444 ///
445 /// A [`TokioBlockingTaskHandle`] for the accepted blocking task.
446 ///
447 /// # Errors
448 ///
449 /// Returns [`SubmissionError`] if the Tokio blocking domain refuses the
450 /// task.
451 #[inline]
452 pub fn submit_tracked_tokio_blocking_callable<C, R, E>(
453 &self,
454 task: C,
455 ) -> Result<TokioBlockingTaskHandle<R, E>, SubmissionError>
456 where
457 C: Callable<R, E> + Send + 'static,
458 R: Send + 'static,
459 E: Send + 'static,
460 {
461 self.tokio_blocking.submit_tracked_callable(task)
462 }
463
464 /// Spawns an async IO or Future-based task on Tokio's async runtime.
465 ///
466 /// # Parameters
467 ///
468 /// * `future` - Future to execute on Tokio's async scheduler.
469 ///
470 /// # Returns
471 ///
472 /// A [`TokioTaskHandle`] for the accepted async task.
473 ///
474 /// # Errors
475 ///
476 /// Returns [`SubmissionError`] if the Tokio IO domain refuses the task.
477 #[inline]
478 pub fn spawn_io<F, R, E>(&self, future: F) -> Result<TokioTaskHandle<R, E>, SubmissionError>
479 where
480 F: Future<Output = Result<R, E>> + Send + 'static,
481 R: Send + 'static,
482 E: Send + 'static,
483 {
484 self.io.spawn(future)
485 }
486
487 /// Requests graceful shutdown for every execution domain.
488 pub fn shutdown(&self) {
489 self.blocking.shutdown();
490 self.cpu.shutdown();
491 self.tokio_blocking.shutdown();
492 self.io.shutdown();
493 }
494
495 /// Requests abrupt stop for every execution domain.
496 ///
497 /// # Returns
498 ///
499 /// A per-domain aggregate report describing queued, running, and cancelled
500 /// work observed during shutdown.
501 pub fn stop(&self) -> ExecutionServicesStopReport {
502 ExecutionServicesStopReport {
503 blocking: self.blocking.stop(),
504 cpu: self.cpu.stop(),
505 tokio_blocking: self.tokio_blocking.stop(),
506 io: self.io.stop(),
507 }
508 }
509
510 /// Returns the aggregate lifecycle state.
511 ///
512 /// # Returns
513 ///
514 /// [`ExecutorServiceLifecycle::Terminated`] if all domains have
515 /// terminated; [`ExecutorServiceLifecycle::Stopping`] if any domain is
516 /// stopping; [`ExecutorServiceLifecycle::ShuttingDown`] if any domain is no
517 /// longer running; otherwise [`ExecutorServiceLifecycle::Running`].
518 pub fn lifecycle(&self) -> ExecutorServiceLifecycle {
519 let lifecycles = [
520 self.blocking.lifecycle(),
521 self.cpu.lifecycle(),
522 self.tokio_blocking.lifecycle(),
523 self.io.lifecycle(),
524 ];
525 if lifecycles
526 .iter()
527 .all(|state| *state == ExecutorServiceLifecycle::Terminated)
528 {
529 ExecutorServiceLifecycle::Terminated
530 } else if lifecycles.contains(&ExecutorServiceLifecycle::Stopping) {
531 ExecutorServiceLifecycle::Stopping
532 } else if lifecycles
533 .iter()
534 .any(|state| *state != ExecutorServiceLifecycle::Running)
535 {
536 ExecutorServiceLifecycle::ShuttingDown
537 } else {
538 ExecutorServiceLifecycle::Running
539 }
540 }
541
542 /// Returns whether every execution domain is running.
543 ///
544 /// # Returns
545 ///
546 /// `true` only if all execution domains are running.
547 #[inline]
548 pub fn is_running(&self) -> bool {
549 self.lifecycle() == ExecutorServiceLifecycle::Running
550 }
551
552 /// Returns whether any execution domain is gracefully shutting down.
553 ///
554 /// # Returns
555 ///
556 /// `true` when the aggregate lifecycle is
557 /// [`ExecutorServiceLifecycle::ShuttingDown`].
558 #[inline]
559 pub fn is_shutting_down(&self) -> bool {
560 self.lifecycle() == ExecutorServiceLifecycle::ShuttingDown
561 }
562
563 /// Returns whether any execution domain is stopping abruptly.
564 ///
565 /// # Returns
566 ///
567 /// `true` when the aggregate lifecycle is
568 /// [`ExecutorServiceLifecycle::Stopping`].
569 #[inline]
570 pub fn is_stopping(&self) -> bool {
571 self.lifecycle() == ExecutorServiceLifecycle::Stopping
572 }
573
574 /// Returns whether the facade is no longer fully running.
575 ///
576 /// # Returns
577 ///
578 /// `true` after any execution domain starts shutdown, stop, or has already
579 /// terminated.
580 #[inline]
581 pub fn is_not_running(&self) -> bool {
582 self.lifecycle() != ExecutorServiceLifecycle::Running
583 }
584
585 /// Returns whether every execution domain has terminated.
586 ///
587 /// # Returns
588 ///
589 /// `true` only after all execution domains have terminated.
590 #[inline]
591 pub fn is_terminated(&self) -> bool {
592 self.lifecycle() == ExecutorServiceLifecycle::Terminated
593 }
594
595 /// Waits until every execution domain has terminated.
596 ///
597 /// # Returns
598 ///
599 /// A future that resolves after all execution domains have terminated.
600 pub fn await_termination(&self) -> Pin<Box<dyn Future<Output = ()> + Send + '_>> {
601 Box::pin(async move {
602 let blocking = Arc::clone(&self.blocking);
603 let cpu = self.cpu.clone();
604 let tokio_blocking = self.tokio_blocking.clone();
605 let blocking_wait = tokio::task::spawn_blocking(move || blocking.wait_termination());
606 let cpu_wait = tokio::task::spawn_blocking(move || cpu.wait_termination());
607 let tokio_blocking_wait =
608 tokio::task::spawn_blocking(move || tokio_blocking.wait_termination());
609 self.io.await_termination().await;
610 let _ = blocking_wait.await;
611 let _ = cpu_wait.await;
612 let _ = tokio_blocking_wait.await;
613 })
614 }
615}