Skip to main content

qubit_execution_services/
execution_services.rs

1/*******************************************************************************
2 *
3 *    Copyright (c) 2025 - 2026.
4 *    Haixing Hu, Qubit Co. Ltd.
5 *
6 *    All rights reserved.
7 *
8 ******************************************************************************/
9// qubit-style: allow multiple-public-types
10use std::{
11    future::Future,
12    pin::Pin,
13    thread,
14    time::Duration,
15};
16
17use qubit_function::{
18    Callable,
19    Runnable,
20};
21use thiserror::Error;
22
23use qubit_executor::TaskHandle;
24
25use super::{
26    BlockingExecutorService,
27    BlockingExecutorServiceBuilder,
28    ExecutorService,
29    RayonExecutorService,
30    RayonExecutorServiceBuildError,
31    RayonExecutorServiceBuilder,
32    RayonTaskHandle,
33    RejectedExecution,
34    ShutdownReport,
35    TokioBlockingExecutorService,
36    TokioIoExecutorService,
37    TokioTaskHandle,
38};
39
40/// Error returned when [`ExecutionServicesBuilder`] cannot build the facade.
41#[derive(Debug, Error)]
42pub enum ExecutionServicesBuildError {
43    /// The blocking executor-service configuration is invalid.
44    #[error("failed to build blocking executor service: {source}")]
45    Blocking {
46        /// Error returned by the underlying blocking executor builder.
47        #[from]
48        source: super::ThreadPoolBuildError,
49    },
50
51    /// The CPU executor-service configuration is invalid.
52    #[error("failed to build cpu executor service: {source}")]
53    Cpu {
54        /// Error returned by the underlying Rayon executor builder.
55        #[from]
56        source: RayonExecutorServiceBuildError,
57    },
58}
59
60/// Aggregate report returned by [`ExecutionServices::shutdown_now`].
61#[derive(Debug, Default, Clone, Copy, PartialEq, Eq)]
62pub struct ExecutionServicesShutdownReport {
63    /// Shutdown report for the blocking executor domain.
64    pub blocking: ShutdownReport,
65    /// Shutdown report for the CPU executor domain.
66    pub cpu: ShutdownReport,
67    /// Shutdown report for the Tokio blocking executor domain.
68    pub tokio_blocking: ShutdownReport,
69    /// Shutdown report for the Tokio async IO executor domain.
70    pub io: ShutdownReport,
71}
72
73impl ExecutionServicesShutdownReport {
74    /// Returns the total queued task count across all execution domains.
75    ///
76    /// # Returns
77    ///
78    /// The sum of every domain's queued-task count.
79    #[inline]
80    pub const fn total_queued(&self) -> usize {
81        self.blocking.queued + self.cpu.queued + self.tokio_blocking.queued + self.io.queued
82    }
83
84    /// Returns the total running task count across all execution domains.
85    ///
86    /// # Returns
87    ///
88    /// The sum of every domain's running-task count.
89    #[inline]
90    pub const fn total_running(&self) -> usize {
91        self.blocking.running + self.cpu.running + self.tokio_blocking.running + self.io.running
92    }
93
94    /// Returns the total cancellation count across all execution domains.
95    ///
96    /// # Returns
97    ///
98    /// The sum of every domain's cancelled-task count.
99    #[inline]
100    pub const fn total_cancelled(&self) -> usize {
101        self.blocking.cancelled
102            + self.cpu.cancelled
103            + self.tokio_blocking.cancelled
104            + self.io.cancelled
105    }
106}
107
108/// Builder for [`ExecutionServices`].
109///
110/// The builder exposes blocking-pool options by delegating to
111/// [`BlockingExecutorServiceBuilder`] and CPU-pool options by delegating to
112/// [`RayonExecutorServiceBuilder`]. Tokio-backed domains are created with their
113/// default constructors because they do not currently expose custom builders.
114#[derive(Debug, Clone)]
115pub struct ExecutionServicesBuilder {
116    /// Builder for the blocking executor domain.
117    blocking: BlockingExecutorServiceBuilder,
118    /// Builder for the CPU executor domain.
119    cpu: RayonExecutorServiceBuilder,
120}
121
122impl ExecutionServicesBuilder {
123    /// Sets both the blocking core and maximum pool sizes to the same value.
124    ///
125    /// # Parameters
126    ///
127    /// * `pool_size` - Pool size applied as both core and maximum limits.
128    ///
129    /// # Returns
130    ///
131    /// This builder for fluent configuration.
132    #[inline]
133    pub fn blocking_pool_size(mut self, pool_size: usize) -> Self {
134        self.blocking = self.blocking.pool_size(pool_size);
135        self
136    }
137
138    /// Sets the blocking core pool size.
139    ///
140    /// # Parameters
141    ///
142    /// * `core_pool_size` - Core pool size for the blocking domain.
143    ///
144    /// # Returns
145    ///
146    /// This builder for fluent configuration.
147    #[inline]
148    pub fn blocking_core_pool_size(mut self, core_pool_size: usize) -> Self {
149        self.blocking = self.blocking.core_pool_size(core_pool_size);
150        self
151    }
152
153    /// Sets the blocking maximum pool size.
154    ///
155    /// # Parameters
156    ///
157    /// * `maximum_pool_size` - Maximum pool size for the blocking domain.
158    ///
159    /// # Returns
160    ///
161    /// This builder for fluent configuration.
162    #[inline]
163    pub fn blocking_maximum_pool_size(mut self, maximum_pool_size: usize) -> Self {
164        self.blocking = self.blocking.maximum_pool_size(maximum_pool_size);
165        self
166    }
167
168    /// Sets a bounded queue capacity for the blocking domain.
169    ///
170    /// # Parameters
171    ///
172    /// * `capacity` - Maximum number of queued blocking tasks.
173    ///
174    /// # Returns
175    ///
176    /// This builder for fluent configuration.
177    #[inline]
178    pub fn blocking_queue_capacity(mut self, capacity: usize) -> Self {
179        self.blocking = self.blocking.queue_capacity(capacity);
180        self
181    }
182
183    /// Configures the blocking domain to use an unbounded queue.
184    ///
185    /// # Returns
186    ///
187    /// This builder for fluent configuration.
188    #[inline]
189    pub fn blocking_unbounded_queue(mut self) -> Self {
190        self.blocking = self.blocking.unbounded_queue();
191        self
192    }
193
194    /// Sets the blocking worker-thread name prefix.
195    ///
196    /// # Parameters
197    ///
198    /// * `prefix` - Prefix appended with the worker index.
199    ///
200    /// # Returns
201    ///
202    /// This builder for fluent configuration.
203    #[inline]
204    pub fn blocking_thread_name_prefix(mut self, prefix: &str) -> Self {
205        self.blocking = self.blocking.thread_name_prefix(prefix);
206        self
207    }
208
209    /// Sets the blocking worker-thread stack size.
210    ///
211    /// # Parameters
212    ///
213    /// * `stack_size` - Stack size in bytes for each blocking worker.
214    ///
215    /// # Returns
216    ///
217    /// This builder for fluent configuration.
218    #[inline]
219    pub fn blocking_stack_size(mut self, stack_size: usize) -> Self {
220        self.blocking = self.blocking.stack_size(stack_size);
221        self
222    }
223
224    /// Sets the blocking worker keep-alive timeout.
225    ///
226    /// # Parameters
227    ///
228    /// * `keep_alive` - Idle timeout for blocking workers allowed to retire.
229    ///
230    /// # Returns
231    ///
232    /// This builder for fluent configuration.
233    #[inline]
234    pub fn blocking_keep_alive(mut self, keep_alive: Duration) -> Self {
235        self.blocking = self.blocking.keep_alive(keep_alive);
236        self
237    }
238
239    /// Allows blocking core workers to retire after keep-alive timeout.
240    ///
241    /// # Parameters
242    ///
243    /// * `allow` - Whether idle blocking core workers may time out.
244    ///
245    /// # Returns
246    ///
247    /// This builder for fluent configuration.
248    #[inline]
249    pub fn blocking_allow_core_thread_timeout(mut self, allow: bool) -> Self {
250        self.blocking = self.blocking.allow_core_thread_timeout(allow);
251        self
252    }
253
254    /// Starts all blocking core workers during build.
255    ///
256    /// # Returns
257    ///
258    /// This builder for fluent configuration.
259    #[inline]
260    pub fn blocking_prestart_core_threads(mut self) -> Self {
261        self.blocking = self.blocking.prestart_core_threads();
262        self
263    }
264
265    /// Sets the number of Rayon worker threads in the CPU domain.
266    ///
267    /// # Parameters
268    ///
269    /// * `num_threads` - Number of Rayon worker threads.
270    ///
271    /// # Returns
272    ///
273    /// This builder for fluent configuration.
274    #[inline]
275    pub fn cpu_threads(mut self, num_threads: usize) -> Self {
276        self.cpu = self.cpu.num_threads(num_threads);
277        self
278    }
279
280    /// Sets the Rayon worker-thread name prefix in the CPU domain.
281    ///
282    /// # Parameters
283    ///
284    /// * `prefix` - Prefix appended with the worker index.
285    ///
286    /// # Returns
287    ///
288    /// This builder for fluent configuration.
289    #[inline]
290    pub fn cpu_thread_name_prefix(mut self, prefix: &str) -> Self {
291        self.cpu = self.cpu.thread_name_prefix(prefix);
292        self
293    }
294
295    /// Sets the Rayon worker-thread stack size in the CPU domain.
296    ///
297    /// # Parameters
298    ///
299    /// * `stack_size` - Stack size in bytes for each Rayon worker.
300    ///
301    /// # Returns
302    ///
303    /// This builder for fluent configuration.
304    #[inline]
305    pub fn cpu_stack_size(mut self, stack_size: usize) -> Self {
306        self.cpu = self.cpu.stack_size(stack_size);
307        self
308    }
309
310    /// Builds the configured execution-services facade.
311    ///
312    /// # Returns
313    ///
314    /// `Ok(ExecutionServices)` if the blocking and CPU domains build
315    /// successfully.
316    ///
317    /// # Errors
318    ///
319    /// Returns [`ExecutionServicesBuildError`] if either the blocking or CPU
320    /// domain rejects its builder configuration.
321    pub fn build(self) -> Result<ExecutionServices, ExecutionServicesBuildError> {
322        let blocking = self
323            .blocking
324            .build()
325            .map_err(|source| ExecutionServicesBuildError::Blocking { source })?;
326        let cpu = self
327            .cpu
328            .build()
329            .map_err(|source| ExecutionServicesBuildError::Cpu { source })?;
330        let tokio_blocking = TokioBlockingExecutorService::new();
331        let io = TokioIoExecutorService::new();
332        Ok(ExecutionServices {
333            blocking,
334            cpu,
335            tokio_blocking,
336            io,
337        })
338    }
339}
340
341impl Default for ExecutionServicesBuilder {
342    /// Creates a builder with CPU-parallelism defaults.
343    ///
344    /// # Returns
345    ///
346    /// A builder configured with available parallelism for both blocking and
347    /// CPU domains.
348    fn default() -> Self {
349        let pool_size = default_pool_size();
350        Self {
351            blocking: BlockingExecutorService::builder().pool_size(pool_size),
352            cpu: RayonExecutorService::builder().num_threads(pool_size),
353        }
354    }
355}
356
357/// Unified facade exposing separate execution domains through one owner.
358///
359/// The facade does not implement a single scheduling core. Instead it routes
360/// work to one of four dedicated execution domains:
361///
362/// - `blocking`: synchronous tasks that may block an OS thread.
363/// - `cpu`: CPU-bound synchronous tasks backed by Rayon.
364/// - `tokio_blocking`: blocking tasks routed through Tokio `spawn_blocking`.
365/// - `io`: async futures spawned on Tokio's async runtime.
366pub struct ExecutionServices {
367    /// Managed service for synchronous tasks that may block OS threads.
368    blocking: BlockingExecutorService,
369    /// Managed service for CPU-bound synchronous tasks.
370    cpu: RayonExecutorService,
371    /// Tokio-backed blocking service using `spawn_blocking`.
372    tokio_blocking: TokioBlockingExecutorService,
373    /// Tokio-backed async service for Future-based tasks.
374    io: TokioIoExecutorService,
375}
376
377impl ExecutionServices {
378    /// Creates an execution-services facade with default builder settings.
379    ///
380    /// # Returns
381    ///
382    /// `Ok(ExecutionServices)` if the default blocking and CPU domains build
383    /// successfully.
384    ///
385    /// # Errors
386    ///
387    /// Returns [`ExecutionServicesBuildError`] if the default builder
388    /// configuration is rejected.
389    #[inline]
390    pub fn new() -> Result<Self, ExecutionServicesBuildError> {
391        Self::builder().build()
392    }
393
394    /// Creates a builder for configuring the execution-services facade.
395    ///
396    /// # Returns
397    ///
398    /// A builder configured with CPU-parallelism defaults.
399    #[inline]
400    pub fn builder() -> ExecutionServicesBuilder {
401        ExecutionServicesBuilder::default()
402    }
403
404    /// Returns the blocking execution domain.
405    ///
406    /// # Returns
407    ///
408    /// A shared reference to the blocking executor service.
409    #[inline]
410    pub fn blocking(&self) -> &BlockingExecutorService {
411        &self.blocking
412    }
413
414    /// Returns the CPU execution domain.
415    ///
416    /// # Returns
417    ///
418    /// A shared reference to the Rayon-backed CPU executor service.
419    #[inline]
420    pub fn cpu(&self) -> &RayonExecutorService {
421        &self.cpu
422    }
423
424    /// Returns the Tokio blocking execution domain.
425    ///
426    /// # Returns
427    ///
428    /// A shared reference to the Tokio blocking executor service.
429    #[inline]
430    pub fn tokio_blocking(&self) -> &TokioBlockingExecutorService {
431        &self.tokio_blocking
432    }
433
434    /// Returns the Tokio async IO execution domain.
435    ///
436    /// # Returns
437    ///
438    /// A shared reference to the Tokio IO executor service.
439    #[inline]
440    pub fn io(&self) -> &TokioIoExecutorService {
441        &self.io
442    }
443
444    /// Submits a blocking runnable task to the blocking domain.
445    ///
446    /// # Parameters
447    ///
448    /// * `task` - Runnable task that may block an OS thread.
449    ///
450    /// # Returns
451    ///
452    /// A [`TaskHandle`] for the accepted blocking task.
453    ///
454    /// # Errors
455    ///
456    /// Returns [`RejectedExecution`] if the blocking domain refuses the task.
457    #[inline]
458    pub fn submit_blocking<T, E>(&self, task: T) -> Result<TaskHandle<(), E>, RejectedExecution>
459    where
460        T: Runnable<E> + Send + 'static,
461        E: Send + 'static,
462    {
463        self.blocking.submit(task)
464    }
465
466    /// Submits a blocking callable task to the blocking domain.
467    ///
468    /// # Parameters
469    ///
470    /// * `task` - Callable task that may block an OS thread.
471    ///
472    /// # Returns
473    ///
474    /// A [`TaskHandle`] for the accepted blocking task.
475    ///
476    /// # Errors
477    ///
478    /// Returns [`RejectedExecution`] if the blocking domain refuses the task.
479    #[inline]
480    pub fn submit_blocking_callable<C, R, E>(
481        &self,
482        task: C,
483    ) -> Result<TaskHandle<R, E>, RejectedExecution>
484    where
485        C: Callable<R, E> + Send + 'static,
486        R: Send + 'static,
487        E: Send + 'static,
488    {
489        self.blocking.submit_callable(task)
490    }
491
492    /// Submits a CPU-bound runnable task to the Rayon domain.
493    ///
494    /// # Parameters
495    ///
496    /// * `task` - Runnable CPU task.
497    ///
498    /// # Returns
499    ///
500    /// A [`RayonTaskHandle`] for the accepted CPU task.
501    ///
502    /// # Errors
503    ///
504    /// Returns [`RejectedExecution`] if the CPU domain refuses the task.
505    #[inline]
506    pub fn submit_cpu<T, E>(&self, task: T) -> Result<RayonTaskHandle<(), E>, RejectedExecution>
507    where
508        T: Runnable<E> + Send + 'static,
509        E: Send + 'static,
510    {
511        self.cpu.submit(task)
512    }
513
514    /// Submits a CPU-bound callable task to the Rayon domain.
515    ///
516    /// # Parameters
517    ///
518    /// * `task` - Callable CPU task.
519    ///
520    /// # Returns
521    ///
522    /// A [`RayonTaskHandle`] for the accepted CPU task.
523    ///
524    /// # Errors
525    ///
526    /// Returns [`RejectedExecution`] if the CPU domain refuses the task.
527    #[inline]
528    pub fn submit_cpu_callable<C, R, E>(
529        &self,
530        task: C,
531    ) -> Result<RayonTaskHandle<R, E>, RejectedExecution>
532    where
533        C: Callable<R, E> + Send + 'static,
534        R: Send + 'static,
535        E: Send + 'static,
536    {
537        self.cpu.submit_callable(task)
538    }
539
540    /// Submits a blocking runnable task to Tokio `spawn_blocking`.
541    ///
542    /// # Parameters
543    ///
544    /// * `task` - Runnable task to execute on Tokio's blocking pool.
545    ///
546    /// # Returns
547    ///
548    /// A [`TokioTaskHandle`] for the accepted blocking task.
549    ///
550    /// # Errors
551    ///
552    /// Returns [`RejectedExecution`] if the Tokio blocking domain refuses the
553    /// task.
554    #[inline]
555    pub fn submit_tokio_blocking<T, E>(
556        &self,
557        task: T,
558    ) -> Result<TokioTaskHandle<(), E>, RejectedExecution>
559    where
560        T: Runnable<E> + Send + 'static,
561        E: Send + 'static,
562    {
563        self.tokio_blocking.submit(task)
564    }
565
566    /// Submits a blocking callable task to Tokio `spawn_blocking`.
567    ///
568    /// # Parameters
569    ///
570    /// * `task` - Callable task to execute on Tokio's blocking pool.
571    ///
572    /// # Returns
573    ///
574    /// A [`TokioTaskHandle`] for the accepted blocking task.
575    ///
576    /// # Errors
577    ///
578    /// Returns [`RejectedExecution`] if the Tokio blocking domain refuses the
579    /// task.
580    #[inline]
581    pub fn submit_tokio_blocking_callable<C, R, E>(
582        &self,
583        task: C,
584    ) -> Result<TokioTaskHandle<R, E>, RejectedExecution>
585    where
586        C: Callable<R, E> + Send + 'static,
587        R: Send + 'static,
588        E: Send + 'static,
589    {
590        self.tokio_blocking.submit_callable(task)
591    }
592
593    /// Spawns an async IO or Future-based task on Tokio's async runtime.
594    ///
595    /// # Parameters
596    ///
597    /// * `future` - Future to execute on Tokio's async scheduler.
598    ///
599    /// # Returns
600    ///
601    /// A [`TokioTaskHandle`] for the accepted async task.
602    ///
603    /// # Errors
604    ///
605    /// Returns [`RejectedExecution`] if the Tokio IO domain refuses the task.
606    #[inline]
607    pub fn spawn_io<F, R, E>(&self, future: F) -> Result<TokioTaskHandle<R, E>, RejectedExecution>
608    where
609        F: Future<Output = Result<R, E>> + Send + 'static,
610        R: Send + 'static,
611        E: Send + 'static,
612    {
613        self.io.spawn(future)
614    }
615
616    /// Requests graceful shutdown for every execution domain.
617    pub fn shutdown(&self) {
618        self.blocking.shutdown();
619        self.cpu.shutdown();
620        self.tokio_blocking.shutdown();
621        self.io.shutdown();
622    }
623
624    /// Requests immediate shutdown for every execution domain.
625    ///
626    /// # Returns
627    ///
628    /// A per-domain aggregate report describing queued, running, and cancelled
629    /// work observed during shutdown.
630    pub fn shutdown_now(&self) -> ExecutionServicesShutdownReport {
631        ExecutionServicesShutdownReport {
632            blocking: self.blocking.shutdown_now(),
633            cpu: self.cpu.shutdown_now(),
634            tokio_blocking: self.tokio_blocking.shutdown_now(),
635            io: self.io.shutdown_now(),
636        }
637    }
638
639    /// Returns whether every execution domain has been shut down.
640    ///
641    /// # Returns
642    ///
643    /// `true` only if all execution domains no longer accept new tasks.
644    #[inline]
645    pub fn is_shutdown(&self) -> bool {
646        self.blocking.is_shutdown()
647            && self.cpu.is_shutdown()
648            && self.tokio_blocking.is_shutdown()
649            && self.io.is_shutdown()
650    }
651
652    /// Returns whether every execution domain has terminated.
653    ///
654    /// # Returns
655    ///
656    /// `true` only after all execution domains have terminated.
657    #[inline]
658    pub fn is_terminated(&self) -> bool {
659        self.blocking.is_terminated()
660            && self.cpu.is_terminated()
661            && self.tokio_blocking.is_terminated()
662            && self.io.is_terminated()
663    }
664
665    /// Waits until every execution domain has terminated.
666    ///
667    /// # Returns
668    ///
669    /// A future that resolves after all execution domains have terminated.
670    pub fn await_termination(&self) -> Pin<Box<dyn Future<Output = ()> + Send + '_>> {
671        Box::pin(async move {
672            self.blocking.await_termination().await;
673            self.cpu.await_termination().await;
674            self.tokio_blocking.await_termination().await;
675            self.io.await_termination().await;
676        })
677    }
678}
679
680/// Returns the default pool size for blocking and CPU domains.
681///
682/// # Returns
683///
684/// The available CPU parallelism, or `1` if it cannot be detected.
685fn default_pool_size() -> usize {
686    thread::available_parallelism()
687        .map(usize::from)
688        .unwrap_or(1)
689}