qubit_execution_services/execution_services.rs
1/*******************************************************************************
2 *
3 * Copyright (c) 2025 - 2026.
4 * Haixing Hu, Qubit Co. Ltd.
5 *
6 * All rights reserved.
7 *
8 ******************************************************************************/
9// qubit-style: allow multiple-public-types
10use std::{
11 future::Future,
12 pin::Pin,
13 thread,
14 time::Duration,
15};
16
17use qubit_function::{
18 Callable,
19 Runnable,
20};
21use thiserror::Error;
22
23use qubit_executor::TaskHandle;
24
25use super::{
26 BlockingExecutorService,
27 BlockingExecutorServiceBuilder,
28 ExecutorService,
29 RayonExecutorService,
30 RayonExecutorServiceBuildError,
31 RayonExecutorServiceBuilder,
32 RayonTaskHandle,
33 RejectedExecution,
34 ShutdownReport,
35 TokioBlockingExecutorService,
36 TokioIoExecutorService,
37 TokioTaskHandle,
38};
39
40/// Error returned when [`ExecutionServicesBuilder`] cannot build the facade.
41#[derive(Debug, Error)]
42pub enum ExecutionServicesBuildError {
43 /// The blocking executor-service configuration is invalid.
44 #[error("failed to build blocking executor service: {source}")]
45 Blocking {
46 /// Error returned by the underlying blocking executor builder.
47 #[from]
48 source: super::ThreadPoolBuildError,
49 },
50
51 /// The CPU executor-service configuration is invalid.
52 #[error("failed to build cpu executor service: {source}")]
53 Cpu {
54 /// Error returned by the underlying Rayon executor builder.
55 #[from]
56 source: RayonExecutorServiceBuildError,
57 },
58}
59
60/// Aggregate report returned by [`ExecutionServices::shutdown_now`].
61#[derive(Debug, Default, Clone, Copy, PartialEq, Eq)]
62pub struct ExecutionServicesShutdownReport {
63 /// Shutdown report for the blocking executor domain.
64 pub blocking: ShutdownReport,
65 /// Shutdown report for the CPU executor domain.
66 pub cpu: ShutdownReport,
67 /// Shutdown report for the Tokio blocking executor domain.
68 pub tokio_blocking: ShutdownReport,
69 /// Shutdown report for the Tokio async IO executor domain.
70 pub io: ShutdownReport,
71}
72
73impl ExecutionServicesShutdownReport {
74 /// Returns the total queued task count across all execution domains.
75 ///
76 /// # Returns
77 ///
78 /// The sum of every domain's queued-task count.
79 #[inline]
80 pub const fn total_queued(&self) -> usize {
81 self.blocking.queued + self.cpu.queued + self.tokio_blocking.queued + self.io.queued
82 }
83
84 /// Returns the total running task count across all execution domains.
85 ///
86 /// # Returns
87 ///
88 /// The sum of every domain's running-task count.
89 #[inline]
90 pub const fn total_running(&self) -> usize {
91 self.blocking.running + self.cpu.running + self.tokio_blocking.running + self.io.running
92 }
93
94 /// Returns the total cancellation count across all execution domains.
95 ///
96 /// # Returns
97 ///
98 /// The sum of every domain's cancelled-task count.
99 #[inline]
100 pub const fn total_cancelled(&self) -> usize {
101 self.blocking.cancelled
102 + self.cpu.cancelled
103 + self.tokio_blocking.cancelled
104 + self.io.cancelled
105 }
106}
107
108/// Builder for [`ExecutionServices`].
109///
110/// The builder exposes blocking-pool options by delegating to
111/// [`BlockingExecutorServiceBuilder`] and CPU-pool options by delegating to
112/// [`RayonExecutorServiceBuilder`]. Tokio-backed domains are created with their
113/// default constructors because they do not currently expose custom builders.
114#[derive(Debug, Clone)]
115pub struct ExecutionServicesBuilder {
116 /// Builder for the blocking executor domain.
117 blocking: BlockingExecutorServiceBuilder,
118 /// Builder for the CPU executor domain.
119 cpu: RayonExecutorServiceBuilder,
120}
121
122impl ExecutionServicesBuilder {
123 /// Sets both the blocking core and maximum pool sizes to the same value.
124 ///
125 /// # Parameters
126 ///
127 /// * `pool_size` - Pool size applied as both core and maximum limits.
128 ///
129 /// # Returns
130 ///
131 /// This builder for fluent configuration.
132 #[inline]
133 pub fn blocking_pool_size(mut self, pool_size: usize) -> Self {
134 self.blocking = self.blocking.pool_size(pool_size);
135 self
136 }
137
138 /// Sets the blocking core pool size.
139 ///
140 /// # Parameters
141 ///
142 /// * `core_pool_size` - Core pool size for the blocking domain.
143 ///
144 /// # Returns
145 ///
146 /// This builder for fluent configuration.
147 #[inline]
148 pub fn blocking_core_pool_size(mut self, core_pool_size: usize) -> Self {
149 self.blocking = self.blocking.core_pool_size(core_pool_size);
150 self
151 }
152
153 /// Sets the blocking maximum pool size.
154 ///
155 /// # Parameters
156 ///
157 /// * `maximum_pool_size` - Maximum pool size for the blocking domain.
158 ///
159 /// # Returns
160 ///
161 /// This builder for fluent configuration.
162 #[inline]
163 pub fn blocking_maximum_pool_size(mut self, maximum_pool_size: usize) -> Self {
164 self.blocking = self.blocking.maximum_pool_size(maximum_pool_size);
165 self
166 }
167
168 /// Sets a bounded queue capacity for the blocking domain.
169 ///
170 /// # Parameters
171 ///
172 /// * `capacity` - Maximum number of queued blocking tasks.
173 ///
174 /// # Returns
175 ///
176 /// This builder for fluent configuration.
177 #[inline]
178 pub fn blocking_queue_capacity(mut self, capacity: usize) -> Self {
179 self.blocking = self.blocking.queue_capacity(capacity);
180 self
181 }
182
183 /// Configures the blocking domain to use an unbounded queue.
184 ///
185 /// # Returns
186 ///
187 /// This builder for fluent configuration.
188 #[inline]
189 pub fn blocking_unbounded_queue(mut self) -> Self {
190 self.blocking = self.blocking.unbounded_queue();
191 self
192 }
193
194 /// Sets the blocking worker-thread name prefix.
195 ///
196 /// # Parameters
197 ///
198 /// * `prefix` - Prefix appended with the worker index.
199 ///
200 /// # Returns
201 ///
202 /// This builder for fluent configuration.
203 #[inline]
204 pub fn blocking_thread_name_prefix(mut self, prefix: &str) -> Self {
205 self.blocking = self.blocking.thread_name_prefix(prefix);
206 self
207 }
208
209 /// Sets the blocking worker-thread stack size.
210 ///
211 /// # Parameters
212 ///
213 /// * `stack_size` - Stack size in bytes for each blocking worker.
214 ///
215 /// # Returns
216 ///
217 /// This builder for fluent configuration.
218 #[inline]
219 pub fn blocking_stack_size(mut self, stack_size: usize) -> Self {
220 self.blocking = self.blocking.stack_size(stack_size);
221 self
222 }
223
224 /// Sets the blocking worker keep-alive timeout.
225 ///
226 /// # Parameters
227 ///
228 /// * `keep_alive` - Idle timeout for blocking workers allowed to retire.
229 ///
230 /// # Returns
231 ///
232 /// This builder for fluent configuration.
233 #[inline]
234 pub fn blocking_keep_alive(mut self, keep_alive: Duration) -> Self {
235 self.blocking = self.blocking.keep_alive(keep_alive);
236 self
237 }
238
239 /// Allows blocking core workers to retire after keep-alive timeout.
240 ///
241 /// # Parameters
242 ///
243 /// * `allow` - Whether idle blocking core workers may time out.
244 ///
245 /// # Returns
246 ///
247 /// This builder for fluent configuration.
248 #[inline]
249 pub fn blocking_allow_core_thread_timeout(mut self, allow: bool) -> Self {
250 self.blocking = self.blocking.allow_core_thread_timeout(allow);
251 self
252 }
253
254 /// Starts all blocking core workers during build.
255 ///
256 /// # Returns
257 ///
258 /// This builder for fluent configuration.
259 #[inline]
260 pub fn blocking_prestart_core_threads(mut self) -> Self {
261 self.blocking = self.blocking.prestart_core_threads();
262 self
263 }
264
265 /// Sets the number of Rayon worker threads in the CPU domain.
266 ///
267 /// # Parameters
268 ///
269 /// * `num_threads` - Number of Rayon worker threads.
270 ///
271 /// # Returns
272 ///
273 /// This builder for fluent configuration.
274 #[inline]
275 pub fn cpu_threads(mut self, num_threads: usize) -> Self {
276 self.cpu = self.cpu.num_threads(num_threads);
277 self
278 }
279
280 /// Sets the Rayon worker-thread name prefix in the CPU domain.
281 ///
282 /// # Parameters
283 ///
284 /// * `prefix` - Prefix appended with the worker index.
285 ///
286 /// # Returns
287 ///
288 /// This builder for fluent configuration.
289 #[inline]
290 pub fn cpu_thread_name_prefix(mut self, prefix: &str) -> Self {
291 self.cpu = self.cpu.thread_name_prefix(prefix);
292 self
293 }
294
295 /// Sets the Rayon worker-thread stack size in the CPU domain.
296 ///
297 /// # Parameters
298 ///
299 /// * `stack_size` - Stack size in bytes for each Rayon worker.
300 ///
301 /// # Returns
302 ///
303 /// This builder for fluent configuration.
304 #[inline]
305 pub fn cpu_stack_size(mut self, stack_size: usize) -> Self {
306 self.cpu = self.cpu.stack_size(stack_size);
307 self
308 }
309
310 /// Builds the configured execution-services facade.
311 ///
312 /// # Returns
313 ///
314 /// `Ok(ExecutionServices)` if the blocking and CPU domains build
315 /// successfully.
316 ///
317 /// # Errors
318 ///
319 /// Returns [`ExecutionServicesBuildError`] if either the blocking or CPU
320 /// domain rejects its builder configuration.
321 pub fn build(self) -> Result<ExecutionServices, ExecutionServicesBuildError> {
322 let blocking = self
323 .blocking
324 .build()
325 .map_err(|source| ExecutionServicesBuildError::Blocking { source })?;
326 let cpu = self
327 .cpu
328 .build()
329 .map_err(|source| ExecutionServicesBuildError::Cpu { source })?;
330 let tokio_blocking = TokioBlockingExecutorService::new();
331 let io = TokioIoExecutorService::new();
332 Ok(ExecutionServices {
333 blocking,
334 cpu,
335 tokio_blocking,
336 io,
337 })
338 }
339}
340
341impl Default for ExecutionServicesBuilder {
342 /// Creates a builder with CPU-parallelism defaults.
343 ///
344 /// # Returns
345 ///
346 /// A builder configured with available parallelism for both blocking and
347 /// CPU domains.
348 fn default() -> Self {
349 let pool_size = default_pool_size();
350 Self {
351 blocking: BlockingExecutorService::builder().pool_size(pool_size),
352 cpu: RayonExecutorService::builder().num_threads(pool_size),
353 }
354 }
355}
356
357/// Unified facade exposing separate execution domains through one owner.
358///
359/// The facade does not implement a single scheduling core. Instead it routes
360/// work to one of four dedicated execution domains:
361///
362/// - `blocking`: synchronous tasks that may block an OS thread.
363/// - `cpu`: CPU-bound synchronous tasks backed by Rayon.
364/// - `tokio_blocking`: blocking tasks routed through Tokio `spawn_blocking`.
365/// - `io`: async futures spawned on Tokio's async runtime.
366pub struct ExecutionServices {
367 /// Managed service for synchronous tasks that may block OS threads.
368 blocking: BlockingExecutorService,
369 /// Managed service for CPU-bound synchronous tasks.
370 cpu: RayonExecutorService,
371 /// Tokio-backed blocking service using `spawn_blocking`.
372 tokio_blocking: TokioBlockingExecutorService,
373 /// Tokio-backed async service for Future-based tasks.
374 io: TokioIoExecutorService,
375}
376
377impl ExecutionServices {
378 /// Creates an execution-services facade with default builder settings.
379 ///
380 /// # Returns
381 ///
382 /// `Ok(ExecutionServices)` if the default blocking and CPU domains build
383 /// successfully.
384 ///
385 /// # Errors
386 ///
387 /// Returns [`ExecutionServicesBuildError`] if the default builder
388 /// configuration is rejected.
389 #[inline]
390 pub fn new() -> Result<Self, ExecutionServicesBuildError> {
391 Self::builder().build()
392 }
393
394 /// Creates a builder for configuring the execution-services facade.
395 ///
396 /// # Returns
397 ///
398 /// A builder configured with CPU-parallelism defaults.
399 #[inline]
400 pub fn builder() -> ExecutionServicesBuilder {
401 ExecutionServicesBuilder::default()
402 }
403
404 /// Returns the blocking execution domain.
405 ///
406 /// # Returns
407 ///
408 /// A shared reference to the blocking executor service.
409 #[inline]
410 pub fn blocking(&self) -> &BlockingExecutorService {
411 &self.blocking
412 }
413
414 /// Returns the CPU execution domain.
415 ///
416 /// # Returns
417 ///
418 /// A shared reference to the Rayon-backed CPU executor service.
419 #[inline]
420 pub fn cpu(&self) -> &RayonExecutorService {
421 &self.cpu
422 }
423
424 /// Returns the Tokio blocking execution domain.
425 ///
426 /// # Returns
427 ///
428 /// A shared reference to the Tokio blocking executor service.
429 #[inline]
430 pub fn tokio_blocking(&self) -> &TokioBlockingExecutorService {
431 &self.tokio_blocking
432 }
433
434 /// Returns the Tokio async IO execution domain.
435 ///
436 /// # Returns
437 ///
438 /// A shared reference to the Tokio IO executor service.
439 #[inline]
440 pub fn io(&self) -> &TokioIoExecutorService {
441 &self.io
442 }
443
444 /// Submits a blocking runnable task to the blocking domain.
445 ///
446 /// # Parameters
447 ///
448 /// * `task` - Runnable task that may block an OS thread.
449 ///
450 /// # Returns
451 ///
452 /// A [`TaskHandle`] for the accepted blocking task.
453 ///
454 /// # Errors
455 ///
456 /// Returns [`RejectedExecution`] if the blocking domain refuses the task.
457 #[inline]
458 pub fn submit_blocking<T, E>(&self, task: T) -> Result<TaskHandle<(), E>, RejectedExecution>
459 where
460 T: Runnable<E> + Send + 'static,
461 E: Send + 'static,
462 {
463 self.blocking.submit(task)
464 }
465
466 /// Submits a blocking callable task to the blocking domain.
467 ///
468 /// # Parameters
469 ///
470 /// * `task` - Callable task that may block an OS thread.
471 ///
472 /// # Returns
473 ///
474 /// A [`TaskHandle`] for the accepted blocking task.
475 ///
476 /// # Errors
477 ///
478 /// Returns [`RejectedExecution`] if the blocking domain refuses the task.
479 #[inline]
480 pub fn submit_blocking_callable<C, R, E>(
481 &self,
482 task: C,
483 ) -> Result<TaskHandle<R, E>, RejectedExecution>
484 where
485 C: Callable<R, E> + Send + 'static,
486 R: Send + 'static,
487 E: Send + 'static,
488 {
489 self.blocking.submit_callable(task)
490 }
491
492 /// Submits a CPU-bound runnable task to the Rayon domain.
493 ///
494 /// # Parameters
495 ///
496 /// * `task` - Runnable CPU task.
497 ///
498 /// # Returns
499 ///
500 /// A [`RayonTaskHandle`] for the accepted CPU task.
501 ///
502 /// # Errors
503 ///
504 /// Returns [`RejectedExecution`] if the CPU domain refuses the task.
505 #[inline]
506 pub fn submit_cpu<T, E>(&self, task: T) -> Result<RayonTaskHandle<(), E>, RejectedExecution>
507 where
508 T: Runnable<E> + Send + 'static,
509 E: Send + 'static,
510 {
511 self.cpu.submit(task)
512 }
513
514 /// Submits a CPU-bound callable task to the Rayon domain.
515 ///
516 /// # Parameters
517 ///
518 /// * `task` - Callable CPU task.
519 ///
520 /// # Returns
521 ///
522 /// A [`RayonTaskHandle`] for the accepted CPU task.
523 ///
524 /// # Errors
525 ///
526 /// Returns [`RejectedExecution`] if the CPU domain refuses the task.
527 #[inline]
528 pub fn submit_cpu_callable<C, R, E>(
529 &self,
530 task: C,
531 ) -> Result<RayonTaskHandle<R, E>, RejectedExecution>
532 where
533 C: Callable<R, E> + Send + 'static,
534 R: Send + 'static,
535 E: Send + 'static,
536 {
537 self.cpu.submit_callable(task)
538 }
539
540 /// Submits a blocking runnable task to Tokio `spawn_blocking`.
541 ///
542 /// # Parameters
543 ///
544 /// * `task` - Runnable task to execute on Tokio's blocking pool.
545 ///
546 /// # Returns
547 ///
548 /// A [`TokioTaskHandle`] for the accepted blocking task.
549 ///
550 /// # Errors
551 ///
552 /// Returns [`RejectedExecution`] if the Tokio blocking domain refuses the
553 /// task.
554 #[inline]
555 pub fn submit_tokio_blocking<T, E>(
556 &self,
557 task: T,
558 ) -> Result<TokioTaskHandle<(), E>, RejectedExecution>
559 where
560 T: Runnable<E> + Send + 'static,
561 E: Send + 'static,
562 {
563 self.tokio_blocking.submit(task)
564 }
565
566 /// Submits a blocking callable task to Tokio `spawn_blocking`.
567 ///
568 /// # Parameters
569 ///
570 /// * `task` - Callable task to execute on Tokio's blocking pool.
571 ///
572 /// # Returns
573 ///
574 /// A [`TokioTaskHandle`] for the accepted blocking task.
575 ///
576 /// # Errors
577 ///
578 /// Returns [`RejectedExecution`] if the Tokio blocking domain refuses the
579 /// task.
580 #[inline]
581 pub fn submit_tokio_blocking_callable<C, R, E>(
582 &self,
583 task: C,
584 ) -> Result<TokioTaskHandle<R, E>, RejectedExecution>
585 where
586 C: Callable<R, E> + Send + 'static,
587 R: Send + 'static,
588 E: Send + 'static,
589 {
590 self.tokio_blocking.submit_callable(task)
591 }
592
593 /// Spawns an async IO or Future-based task on Tokio's async runtime.
594 ///
595 /// # Parameters
596 ///
597 /// * `future` - Future to execute on Tokio's async scheduler.
598 ///
599 /// # Returns
600 ///
601 /// A [`TokioTaskHandle`] for the accepted async task.
602 ///
603 /// # Errors
604 ///
605 /// Returns [`RejectedExecution`] if the Tokio IO domain refuses the task.
606 #[inline]
607 pub fn spawn_io<F, R, E>(&self, future: F) -> Result<TokioTaskHandle<R, E>, RejectedExecution>
608 where
609 F: Future<Output = Result<R, E>> + Send + 'static,
610 R: Send + 'static,
611 E: Send + 'static,
612 {
613 self.io.spawn(future)
614 }
615
616 /// Requests graceful shutdown for every execution domain.
617 pub fn shutdown(&self) {
618 self.blocking.shutdown();
619 self.cpu.shutdown();
620 self.tokio_blocking.shutdown();
621 self.io.shutdown();
622 }
623
624 /// Requests immediate shutdown for every execution domain.
625 ///
626 /// # Returns
627 ///
628 /// A per-domain aggregate report describing queued, running, and cancelled
629 /// work observed during shutdown.
630 pub fn shutdown_now(&self) -> ExecutionServicesShutdownReport {
631 ExecutionServicesShutdownReport {
632 blocking: self.blocking.shutdown_now(),
633 cpu: self.cpu.shutdown_now(),
634 tokio_blocking: self.tokio_blocking.shutdown_now(),
635 io: self.io.shutdown_now(),
636 }
637 }
638
639 /// Returns whether every execution domain has been shut down.
640 ///
641 /// # Returns
642 ///
643 /// `true` only if all execution domains no longer accept new tasks.
644 #[inline]
645 pub fn is_shutdown(&self) -> bool {
646 self.blocking.is_shutdown()
647 && self.cpu.is_shutdown()
648 && self.tokio_blocking.is_shutdown()
649 && self.io.is_shutdown()
650 }
651
652 /// Returns whether every execution domain has terminated.
653 ///
654 /// # Returns
655 ///
656 /// `true` only after all execution domains have terminated.
657 #[inline]
658 pub fn is_terminated(&self) -> bool {
659 self.blocking.is_terminated()
660 && self.cpu.is_terminated()
661 && self.tokio_blocking.is_terminated()
662 && self.io.is_terminated()
663 }
664
665 /// Waits until every execution domain has terminated.
666 ///
667 /// # Returns
668 ///
669 /// A future that resolves after all execution domains have terminated.
670 pub fn await_termination(&self) -> Pin<Box<dyn Future<Output = ()> + Send + '_>> {
671 Box::pin(async move {
672 self.blocking.await_termination().await;
673 self.cpu.await_termination().await;
674 self.tokio_blocking.await_termination().await;
675 self.io.await_termination().await;
676 })
677 }
678}
679
680/// Returns the default pool size for blocking and CPU domains.
681///
682/// # Returns
683///
684/// The available CPU parallelism, or `1` if it cannot be detected.
685fn default_pool_size() -> usize {
686 thread::available_parallelism()
687 .map(usize::from)
688 .unwrap_or(1)
689}