qubit_execution_services/execution_services.rs
1/*******************************************************************************
2 *
3 * Copyright (c) 2025 - 2026 Haixing Hu.
4 *
5 * SPDX-License-Identifier: Apache-2.0
6 *
7 * Licensed under the Apache License, Version 2.0.
8 *
9 ******************************************************************************/
10use std::{
11 future::Future,
12 pin::Pin,
13 sync::Arc,
14 time::Duration,
15};
16
17use qubit_executor::{
18 TaskHandle,
19 TrackedTask,
20};
21use qubit_function::{
22 Callable,
23 Runnable,
24};
25use qubit_thread_pool::{
26 ThreadPool,
27 ThreadPoolBuilder,
28};
29use qubit_tokio_executor::TokioExecutorService;
30
31use super::{
32 ExecutionServicesBuildError,
33 ExecutionServicesBuilder,
34 ExecutionServicesStopReport,
35 ExecutorService,
36 ExecutorServiceLifecycle,
37 RayonExecutorService,
38 RayonTaskHandle,
39 SubmissionError,
40 TokioBlockingTaskHandle,
41 TokioIoExecutorService,
42 TokioTaskHandle,
43};
44
45/// Default managed service for synchronous tasks that may block an OS thread.
46pub type BlockingExecutorService = ThreadPool;
47
48/// Builder alias for configuring [`BlockingExecutorService`].
49pub type BlockingExecutorServiceBuilder = ThreadPoolBuilder;
50
51/// Tokio-backed blocking executor service routed through `spawn_blocking`.
52pub type TokioBlockingExecutorService = TokioExecutorService;
53
54/// Unified facade exposing separate execution domains through one owner.
55///
56/// The facade does not implement a single scheduling core. Instead it routes
57/// work to one of four dedicated execution domains:
58///
59/// - `blocking`: synchronous tasks that may block an OS thread.
60/// - `cpu`: CPU-bound synchronous tasks backed by Rayon.
61/// - `tokio_blocking`: blocking tasks routed through Tokio `spawn_blocking`.
62/// - `io`: async futures spawned on Tokio's async runtime.
63pub struct ExecutionServices {
64 /// Managed service for synchronous tasks that may block OS threads.
65 blocking: Arc<BlockingExecutorService>,
66 /// Managed service for CPU-bound synchronous tasks.
67 cpu: RayonExecutorService,
68 /// Tokio-backed blocking service using `spawn_blocking`.
69 tokio_blocking: TokioBlockingExecutorService,
70 /// Tokio-backed async service for Future-based tasks.
71 io: TokioIoExecutorService,
72}
73
74impl ExecutionServices {
75 /// Creates an execution-services facade from its four execution domains.
76 ///
77 /// # Parameters
78 ///
79 /// * `blocking` - Blocking executor domain.
80 /// * `cpu` - CPU-bound executor domain.
81 /// * `tokio_blocking` - Tokio blocking executor domain.
82 /// * `io` - Tokio async IO executor domain.
83 ///
84 /// # Returns
85 ///
86 /// An execution-services facade owning all supplied domains.
87 pub(crate) fn from_parts(
88 blocking: BlockingExecutorService,
89 cpu: RayonExecutorService,
90 tokio_blocking: TokioBlockingExecutorService,
91 io: TokioIoExecutorService,
92 ) -> Self {
93 Self {
94 blocking: Arc::new(blocking),
95 cpu,
96 tokio_blocking,
97 io,
98 }
99 }
100
101 /// Creates an execution-services facade with default builder settings.
102 ///
103 /// # Returns
104 ///
105 /// `Ok(ExecutionServices)` if the default blocking and CPU domains build
106 /// successfully.
107 ///
108 /// # Errors
109 ///
110 /// Returns [`ExecutionServicesBuildError`] if the default builder
111 /// configuration is rejected.
112 #[inline]
113 pub fn new() -> Result<Self, ExecutionServicesBuildError> {
114 Self::builder().build()
115 }
116
117 /// Creates a builder for configuring the execution-services facade.
118 ///
119 /// # Returns
120 ///
121 /// A builder configured with CPU-parallelism defaults.
122 #[inline]
123 pub fn builder() -> ExecutionServicesBuilder {
124 ExecutionServicesBuilder::default()
125 }
126
127 /// Returns the blocking execution domain.
128 ///
129 /// # Returns
130 ///
131 /// A shared reference to the blocking executor service.
132 #[inline]
133 pub fn blocking(&self) -> &BlockingExecutorService {
134 self.blocking.as_ref()
135 }
136
137 /// Returns the CPU execution domain.
138 ///
139 /// # Returns
140 ///
141 /// A shared reference to the Rayon-backed CPU executor service.
142 #[inline]
143 pub fn cpu(&self) -> &RayonExecutorService {
144 &self.cpu
145 }
146
147 /// Returns the Tokio blocking execution domain.
148 ///
149 /// # Returns
150 ///
151 /// A shared reference to the Tokio blocking executor service.
152 #[inline]
153 pub fn tokio_blocking(&self) -> &TokioBlockingExecutorService {
154 &self.tokio_blocking
155 }
156
157 /// Returns the Tokio async IO execution domain.
158 ///
159 /// # Returns
160 ///
161 /// A shared reference to the Tokio IO executor service.
162 #[inline]
163 pub fn io(&self) -> &TokioIoExecutorService {
164 &self.io
165 }
166
167 /// Submits a blocking runnable task to the blocking domain.
168 ///
169 /// # Parameters
170 ///
171 /// * `task` - Runnable task that may block an OS thread.
172 ///
173 /// # Returns
174 ///
175 /// `Ok(())` if the blocking domain accepts the task.
176 ///
177 /// # Errors
178 ///
179 /// Returns [`SubmissionError`] if the blocking domain refuses the task.
180 #[inline]
181 pub fn submit_blocking<T, E>(&self, task: T) -> Result<(), SubmissionError>
182 where
183 T: Runnable<E> + Send + 'static,
184 E: Send + 'static,
185 {
186 self.blocking.submit(task)
187 }
188
189 /// Submits a blocking runnable task and returns a tracked handle.
190 ///
191 /// # Parameters
192 ///
193 /// * `task` - Runnable task that may block an OS thread.
194 ///
195 /// # Returns
196 ///
197 /// A [`TrackedTask`] for the accepted blocking task.
198 ///
199 /// # Errors
200 ///
201 /// Returns [`SubmissionError`] if the blocking domain refuses the task.
202 #[inline]
203 pub fn submit_tracked_blocking<T, E>(
204 &self,
205 task: T,
206 ) -> Result<TrackedTask<(), E>, SubmissionError>
207 where
208 T: Runnable<E> + Send + 'static,
209 E: Send + 'static,
210 {
211 self.blocking.submit_tracked(task)
212 }
213
214 /// Submits a blocking callable task to the blocking domain.
215 ///
216 /// # Parameters
217 ///
218 /// * `task` - Callable task that may block an OS thread.
219 ///
220 /// # Returns
221 ///
222 /// A [`TaskHandle`] for the accepted blocking task.
223 ///
224 /// # Errors
225 ///
226 /// Returns [`SubmissionError`] if the blocking domain refuses the task.
227 #[inline]
228 pub fn submit_blocking_callable<C, R, E>(
229 &self,
230 task: C,
231 ) -> Result<TaskHandle<R, E>, SubmissionError>
232 where
233 C: Callable<R, E> + Send + 'static,
234 R: Send + 'static,
235 E: Send + 'static,
236 {
237 self.blocking.submit_callable(task)
238 }
239
240 /// Submits a blocking callable task and returns a tracked handle.
241 ///
242 /// # Parameters
243 ///
244 /// * `task` - Callable task that may block an OS thread.
245 ///
246 /// # Returns
247 ///
248 /// A [`TrackedTask`] for the accepted blocking task.
249 ///
250 /// # Errors
251 ///
252 /// Returns [`SubmissionError`] if the blocking domain refuses the task.
253 #[inline]
254 pub fn submit_tracked_blocking_callable<C, R, E>(
255 &self,
256 task: C,
257 ) -> Result<TrackedTask<R, E>, SubmissionError>
258 where
259 C: Callable<R, E> + Send + 'static,
260 R: Send + 'static,
261 E: Send + 'static,
262 {
263 self.blocking.submit_tracked_callable(task)
264 }
265
266 /// Submits a CPU-bound runnable task to the Rayon domain.
267 ///
268 /// # Parameters
269 ///
270 /// * `task` - Runnable CPU task.
271 ///
272 /// # Returns
273 ///
274 /// `Ok(())` if the CPU domain accepts the task.
275 ///
276 /// # Errors
277 ///
278 /// Returns [`SubmissionError`] if the CPU domain refuses the task.
279 #[inline]
280 pub fn submit_cpu<T, E>(&self, task: T) -> Result<(), SubmissionError>
281 where
282 T: Runnable<E> + Send + 'static,
283 E: Send + 'static,
284 {
285 self.cpu.submit(task)
286 }
287
288 /// Submits a CPU-bound runnable task and returns a tracked handle.
289 ///
290 /// # Parameters
291 ///
292 /// * `task` - Runnable CPU task.
293 ///
294 /// # Returns
295 ///
296 /// A [`RayonTaskHandle`] for the accepted CPU task.
297 ///
298 /// # Errors
299 ///
300 /// Returns [`SubmissionError`] if the CPU domain refuses the task.
301 #[inline]
302 pub fn submit_tracked_cpu<T, E>(
303 &self,
304 task: T,
305 ) -> Result<RayonTaskHandle<(), E>, SubmissionError>
306 where
307 T: Runnable<E> + Send + 'static,
308 E: Send + 'static,
309 {
310 self.cpu.submit_tracked(task)
311 }
312
313 /// Submits a CPU-bound callable task to the Rayon domain.
314 ///
315 /// # Parameters
316 ///
317 /// * `task` - Callable CPU task.
318 ///
319 /// # Returns
320 ///
321 /// A [`TaskHandle`] for the accepted CPU task.
322 ///
323 /// # Errors
324 ///
325 /// Returns [`SubmissionError`] if the CPU domain refuses the task.
326 #[inline]
327 pub fn submit_cpu_callable<C, R, E>(&self, task: C) -> Result<TaskHandle<R, E>, SubmissionError>
328 where
329 C: Callable<R, E> + Send + 'static,
330 R: Send + 'static,
331 E: Send + 'static,
332 {
333 self.cpu.submit_callable(task)
334 }
335
336 /// Submits a CPU-bound callable task and returns a tracked handle.
337 ///
338 /// # Parameters
339 ///
340 /// * `task` - Callable CPU task.
341 ///
342 /// # Returns
343 ///
344 /// A [`RayonTaskHandle`] for the accepted CPU task.
345 ///
346 /// # Errors
347 ///
348 /// Returns [`SubmissionError`] if the CPU domain refuses the task.
349 #[inline]
350 pub fn submit_tracked_cpu_callable<C, R, E>(
351 &self,
352 task: C,
353 ) -> Result<RayonTaskHandle<R, E>, SubmissionError>
354 where
355 C: Callable<R, E> + Send + 'static,
356 R: Send + 'static,
357 E: Send + 'static,
358 {
359 self.cpu.submit_tracked_callable(task)
360 }
361
362 /// Submits a blocking runnable task to Tokio `spawn_blocking`.
363 ///
364 /// # Parameters
365 ///
366 /// * `task` - Runnable task to execute on Tokio's blocking pool.
367 ///
368 /// # Returns
369 ///
370 /// `Ok(())` if the Tokio blocking domain accepts the task.
371 ///
372 /// # Errors
373 ///
374 /// Returns [`SubmissionError`] if the Tokio blocking domain refuses the
375 /// task.
376 #[inline]
377 pub fn submit_tokio_blocking<T, E>(&self, task: T) -> Result<(), SubmissionError>
378 where
379 T: Runnable<E> + Send + 'static,
380 E: Send + 'static,
381 {
382 self.tokio_blocking.submit(task)
383 }
384
385 /// Submits a blocking runnable task to Tokio and returns a tracked handle.
386 ///
387 /// # Parameters
388 ///
389 /// * `task` - Runnable task to execute on Tokio's blocking pool.
390 ///
391 /// # Returns
392 ///
393 /// A [`TokioBlockingTaskHandle`] for the accepted blocking task.
394 ///
395 /// # Errors
396 ///
397 /// Returns [`SubmissionError`] if the Tokio blocking domain refuses the
398 /// task.
399 #[inline]
400 pub fn submit_tracked_tokio_blocking<T, E>(
401 &self,
402 task: T,
403 ) -> Result<TokioBlockingTaskHandle<(), E>, SubmissionError>
404 where
405 T: Runnable<E> + Send + 'static,
406 E: Send + 'static,
407 {
408 self.tokio_blocking.submit_tracked(task)
409 }
410
411 /// Submits a blocking callable task to Tokio `spawn_blocking`.
412 ///
413 /// # Parameters
414 ///
415 /// * `task` - Callable task to execute on Tokio's blocking pool.
416 ///
417 /// # Returns
418 ///
419 /// A [`TaskHandle`] for the accepted blocking task.
420 ///
421 /// # Errors
422 ///
423 /// Returns [`SubmissionError`] if the Tokio blocking domain refuses the
424 /// task.
425 #[inline]
426 pub fn submit_tokio_blocking_callable<C, R, E>(
427 &self,
428 task: C,
429 ) -> Result<TaskHandle<R, E>, SubmissionError>
430 where
431 C: Callable<R, E> + Send + 'static,
432 R: Send + 'static,
433 E: Send + 'static,
434 {
435 self.tokio_blocking.submit_callable(task)
436 }
437
438 /// Submits a blocking callable task to Tokio and returns a tracked handle.
439 ///
440 /// # Parameters
441 ///
442 /// * `task` - Callable task to execute on Tokio's blocking pool.
443 ///
444 /// # Returns
445 ///
446 /// A [`TokioBlockingTaskHandle`] for the accepted blocking task.
447 ///
448 /// # Errors
449 ///
450 /// Returns [`SubmissionError`] if the Tokio blocking domain refuses the
451 /// task.
452 #[inline]
453 pub fn submit_tracked_tokio_blocking_callable<C, R, E>(
454 &self,
455 task: C,
456 ) -> Result<TokioBlockingTaskHandle<R, E>, SubmissionError>
457 where
458 C: Callable<R, E> + Send + 'static,
459 R: Send + 'static,
460 E: Send + 'static,
461 {
462 self.tokio_blocking.submit_tracked_callable(task)
463 }
464
465 /// Spawns an async IO or Future-based task on Tokio's async runtime.
466 ///
467 /// # Parameters
468 ///
469 /// * `future` - Future to execute on Tokio's async scheduler.
470 ///
471 /// # Returns
472 ///
473 /// A [`TokioTaskHandle`] for the accepted async task.
474 ///
475 /// # Errors
476 ///
477 /// Returns [`SubmissionError`] if the Tokio IO domain refuses the task.
478 #[inline]
479 pub fn spawn_io<F, R, E>(&self, future: F) -> Result<TokioTaskHandle<R, E>, SubmissionError>
480 where
481 F: Future<Output = Result<R, E>> + Send + 'static,
482 R: Send + 'static,
483 E: Send + 'static,
484 {
485 self.io.spawn(future)
486 }
487
488 /// Requests graceful shutdown for every execution domain.
489 pub fn shutdown(&self) {
490 self.blocking.shutdown();
491 self.cpu.shutdown();
492 self.tokio_blocking.shutdown();
493 self.io.shutdown();
494 }
495
496 /// Requests abrupt stop for every execution domain.
497 ///
498 /// # Returns
499 ///
500 /// A per-domain aggregate report describing queued, running, and cancelled
501 /// work observed during shutdown.
502 pub fn stop(&self) -> ExecutionServicesStopReport {
503 ExecutionServicesStopReport {
504 blocking: self.blocking.stop(),
505 cpu: self.cpu.stop(),
506 tokio_blocking: self.tokio_blocking.stop(),
507 io: self.io.stop(),
508 }
509 }
510
511 /// Returns the aggregate lifecycle state.
512 ///
513 /// # Returns
514 ///
515 /// [`ExecutorServiceLifecycle::Terminated`] if all domains have
516 /// terminated; [`ExecutorServiceLifecycle::Stopping`] if any domain is
517 /// stopping; [`ExecutorServiceLifecycle::ShuttingDown`] if any domain is no
518 /// longer running; otherwise [`ExecutorServiceLifecycle::Running`].
519 pub fn lifecycle(&self) -> ExecutorServiceLifecycle {
520 let lifecycles = [
521 self.blocking.lifecycle(),
522 self.cpu.lifecycle(),
523 self.tokio_blocking.lifecycle(),
524 self.io.lifecycle(),
525 ];
526 if lifecycles
527 .iter()
528 .all(|state| *state == ExecutorServiceLifecycle::Terminated)
529 {
530 ExecutorServiceLifecycle::Terminated
531 } else if lifecycles.contains(&ExecutorServiceLifecycle::Stopping) {
532 ExecutorServiceLifecycle::Stopping
533 } else if lifecycles
534 .iter()
535 .any(|state| *state != ExecutorServiceLifecycle::Running)
536 {
537 ExecutorServiceLifecycle::ShuttingDown
538 } else {
539 ExecutorServiceLifecycle::Running
540 }
541 }
542
543 /// Returns whether every execution domain is running.
544 ///
545 /// # Returns
546 ///
547 /// `true` only if all execution domains are running.
548 #[inline]
549 pub fn is_running(&self) -> bool {
550 self.lifecycle() == ExecutorServiceLifecycle::Running
551 }
552
553 /// Returns whether any execution domain is gracefully shutting down.
554 ///
555 /// # Returns
556 ///
557 /// `true` when the aggregate lifecycle is
558 /// [`ExecutorServiceLifecycle::ShuttingDown`].
559 #[inline]
560 pub fn is_shutting_down(&self) -> bool {
561 self.lifecycle() == ExecutorServiceLifecycle::ShuttingDown
562 }
563
564 /// Returns whether any execution domain is stopping abruptly.
565 ///
566 /// # Returns
567 ///
568 /// `true` when the aggregate lifecycle is
569 /// [`ExecutorServiceLifecycle::Stopping`].
570 #[inline]
571 pub fn is_stopping(&self) -> bool {
572 self.lifecycle() == ExecutorServiceLifecycle::Stopping
573 }
574
575 /// Returns whether the facade is no longer fully running.
576 ///
577 /// # Returns
578 ///
579 /// `true` after any execution domain starts shutdown, stop, or has already
580 /// terminated.
581 #[inline]
582 pub fn is_not_running(&self) -> bool {
583 self.lifecycle() != ExecutorServiceLifecycle::Running
584 }
585
586 /// Returns whether every execution domain has terminated.
587 ///
588 /// # Returns
589 ///
590 /// `true` only after all execution domains have terminated.
591 #[inline]
592 pub fn is_terminated(&self) -> bool {
593 self.lifecycle() == ExecutorServiceLifecycle::Terminated
594 }
595
596 /// Waits until every execution domain has terminated.
597 ///
598 /// # Returns
599 ///
600 /// A future that resolves after all execution domains have terminated.
601 pub fn await_termination(&self) -> Pin<Box<dyn Future<Output = ()> + Send + '_>> {
602 Box::pin(async move {
603 let blocking = Arc::clone(&self.blocking);
604 let cpu = self.cpu.clone();
605 let tokio_blocking = self.tokio_blocking.clone();
606 let blocking_wait = tokio::task::spawn_blocking(move || blocking.wait_termination());
607 let cpu_wait = tokio::task::spawn_blocking(move || cpu.wait_termination());
608 let tokio_blocking_wait =
609 tokio::task::spawn_blocking(move || tokio_blocking.wait_termination());
610 while !self.io.is_terminated() {
611 tokio::time::sleep(Duration::from_millis(10)).await;
612 }
613 let _ = blocking_wait.await;
614 let _ = cpu_wait.await;
615 let _ = tokio_blocking_wait.await;
616 })
617 }
618}