Skip to main content

qubit_execution_services/
execution_services.rs

1/*******************************************************************************
2 *
3 *    Copyright (c) 2025 - 2026 Haixing Hu.
4 *
5 *    SPDX-License-Identifier: Apache-2.0
6 *
7 *    Licensed under the Apache License, Version 2.0.
8 *
9 ******************************************************************************/
10use std::{
11    future::Future,
12    pin::Pin,
13    sync::Arc,
14};
15
16use qubit_executor::{
17    TaskHandle,
18    TrackedTask,
19};
20use qubit_function::{
21    Callable,
22    Runnable,
23};
24use qubit_thread_pool::{
25    ThreadPool,
26    ThreadPoolBuilder,
27};
28use qubit_tokio_executor::TokioExecutorService;
29
30use super::{
31    ExecutionServicesBuildError,
32    ExecutionServicesBuilder,
33    ExecutionServicesStopReport,
34    ExecutorService,
35    ExecutorServiceLifecycle,
36    RayonExecutorService,
37    RayonTaskHandle,
38    SubmissionError,
39    TokioIoExecutorService,
40    TokioTaskHandle,
41};
42
43/// Default managed service for synchronous tasks that may block an OS thread.
44pub type BlockingExecutorService = ThreadPool;
45
46/// Builder alias for configuring [`BlockingExecutorService`].
47pub type BlockingExecutorServiceBuilder = ThreadPoolBuilder;
48
49/// Tokio-backed blocking executor service routed through `spawn_blocking`.
50pub type TokioBlockingExecutorService = TokioExecutorService;
51
52/// Unified facade exposing separate execution domains through one owner.
53///
54/// The facade does not implement a single scheduling core. Instead it routes
55/// work to one of four dedicated execution domains:
56///
57/// - `blocking`: synchronous tasks that may block an OS thread.
58/// - `cpu`: CPU-bound synchronous tasks backed by Rayon.
59/// - `tokio_blocking`: blocking tasks routed through Tokio `spawn_blocking`.
60/// - `io`: async futures spawned on Tokio's async runtime.
61pub struct ExecutionServices {
62    /// Managed service for synchronous tasks that may block OS threads.
63    blocking: Arc<BlockingExecutorService>,
64    /// Managed service for CPU-bound synchronous tasks.
65    cpu: RayonExecutorService,
66    /// Tokio-backed blocking service using `spawn_blocking`.
67    tokio_blocking: TokioBlockingExecutorService,
68    /// Tokio-backed async service for Future-based tasks.
69    io: TokioIoExecutorService,
70}
71
72impl ExecutionServices {
73    /// Creates an execution-services facade from its four execution domains.
74    ///
75    /// # Parameters
76    ///
77    /// * `blocking` - Blocking executor domain.
78    /// * `cpu` - CPU-bound executor domain.
79    /// * `tokio_blocking` - Tokio blocking executor domain.
80    /// * `io` - Tokio async IO executor domain.
81    ///
82    /// # Returns
83    ///
84    /// An execution-services facade owning all supplied domains.
85    pub(crate) fn from_parts(
86        blocking: BlockingExecutorService,
87        cpu: RayonExecutorService,
88        tokio_blocking: TokioBlockingExecutorService,
89        io: TokioIoExecutorService,
90    ) -> Self {
91        Self {
92            blocking: Arc::new(blocking),
93            cpu,
94            tokio_blocking,
95            io,
96        }
97    }
98
99    /// Creates an execution-services facade with default builder settings.
100    ///
101    /// # Returns
102    ///
103    /// `Ok(ExecutionServices)` if the default blocking and CPU domains build
104    /// successfully.
105    ///
106    /// # Errors
107    ///
108    /// Returns [`ExecutionServicesBuildError`] if the default builder
109    /// configuration is rejected.
110    #[inline]
111    pub fn new() -> Result<Self, ExecutionServicesBuildError> {
112        Self::builder().build()
113    }
114
115    /// Creates a builder for configuring the execution-services facade.
116    ///
117    /// # Returns
118    ///
119    /// A builder configured with CPU-parallelism defaults.
120    #[inline]
121    pub fn builder() -> ExecutionServicesBuilder {
122        ExecutionServicesBuilder::default()
123    }
124
125    /// Returns the blocking execution domain.
126    ///
127    /// # Returns
128    ///
129    /// A shared reference to the blocking executor service.
130    #[inline]
131    pub fn blocking(&self) -> &BlockingExecutorService {
132        self.blocking.as_ref()
133    }
134
135    /// Returns the CPU execution domain.
136    ///
137    /// # Returns
138    ///
139    /// A shared reference to the Rayon-backed CPU executor service.
140    #[inline]
141    pub fn cpu(&self) -> &RayonExecutorService {
142        &self.cpu
143    }
144
145    /// Returns the Tokio blocking execution domain.
146    ///
147    /// # Returns
148    ///
149    /// A shared reference to the Tokio blocking executor service.
150    #[inline]
151    pub fn tokio_blocking(&self) -> &TokioBlockingExecutorService {
152        &self.tokio_blocking
153    }
154
155    /// Returns the Tokio async IO execution domain.
156    ///
157    /// # Returns
158    ///
159    /// A shared reference to the Tokio IO executor service.
160    #[inline]
161    pub fn io(&self) -> &TokioIoExecutorService {
162        &self.io
163    }
164
165    /// Submits a blocking runnable task to the blocking domain.
166    ///
167    /// # Parameters
168    ///
169    /// * `task` - Runnable task that may block an OS thread.
170    ///
171    /// # Returns
172    ///
173    /// `Ok(())` if the blocking domain accepts the task.
174    ///
175    /// # Errors
176    ///
177    /// Returns [`SubmissionError`] if the blocking domain refuses the task.
178    #[inline]
179    pub fn submit_blocking<T, E>(&self, task: T) -> Result<(), SubmissionError>
180    where
181        T: Runnable<E> + Send + 'static,
182        E: Send + 'static,
183    {
184        self.blocking.submit(task)
185    }
186
187    /// Submits a blocking runnable task and returns a tracked handle.
188    ///
189    /// # Parameters
190    ///
191    /// * `task` - Runnable task that may block an OS thread.
192    ///
193    /// # Returns
194    ///
195    /// A [`TrackedTask`] for the accepted blocking task.
196    ///
197    /// # Errors
198    ///
199    /// Returns [`SubmissionError`] if the blocking domain refuses the task.
200    #[inline]
201    pub fn submit_tracked_blocking<T, E>(
202        &self,
203        task: T,
204    ) -> Result<TrackedTask<(), E>, SubmissionError>
205    where
206        T: Runnable<E> + Send + 'static,
207        E: Send + 'static,
208    {
209        self.blocking.submit_tracked(task)
210    }
211
212    /// Submits a blocking callable task to the blocking domain.
213    ///
214    /// # Parameters
215    ///
216    /// * `task` - Callable task that may block an OS thread.
217    ///
218    /// # Returns
219    ///
220    /// A [`TaskHandle`] for the accepted blocking task.
221    ///
222    /// # Errors
223    ///
224    /// Returns [`SubmissionError`] if the blocking domain refuses the task.
225    #[inline]
226    pub fn submit_blocking_callable<C, R, E>(
227        &self,
228        task: C,
229    ) -> Result<TaskHandle<R, E>, SubmissionError>
230    where
231        C: Callable<R, E> + Send + 'static,
232        R: Send + 'static,
233        E: Send + 'static,
234    {
235        self.blocking.submit_callable(task)
236    }
237
238    /// Submits a blocking callable task and returns a tracked handle.
239    ///
240    /// # Parameters
241    ///
242    /// * `task` - Callable task that may block an OS thread.
243    ///
244    /// # Returns
245    ///
246    /// A [`TrackedTask`] for the accepted blocking task.
247    ///
248    /// # Errors
249    ///
250    /// Returns [`SubmissionError`] if the blocking domain refuses the task.
251    #[inline]
252    pub fn submit_tracked_blocking_callable<C, R, E>(
253        &self,
254        task: C,
255    ) -> Result<TrackedTask<R, E>, SubmissionError>
256    where
257        C: Callable<R, E> + Send + 'static,
258        R: Send + 'static,
259        E: Send + 'static,
260    {
261        self.blocking.submit_tracked_callable(task)
262    }
263
264    /// Submits a CPU-bound runnable task to the Rayon domain.
265    ///
266    /// # Parameters
267    ///
268    /// * `task` - Runnable CPU task.
269    ///
270    /// # Returns
271    ///
272    /// `Ok(())` if the CPU domain accepts the task.
273    ///
274    /// # Errors
275    ///
276    /// Returns [`SubmissionError`] if the CPU domain refuses the task.
277    #[inline]
278    pub fn submit_cpu<T, E>(&self, task: T) -> Result<(), SubmissionError>
279    where
280        T: Runnable<E> + Send + 'static,
281        E: Send + 'static,
282    {
283        self.cpu.submit(task)
284    }
285
286    /// Submits a CPU-bound runnable task and returns a tracked handle.
287    ///
288    /// # Parameters
289    ///
290    /// * `task` - Runnable CPU task.
291    ///
292    /// # Returns
293    ///
294    /// A [`RayonTaskHandle`] for the accepted CPU task.
295    ///
296    /// # Errors
297    ///
298    /// Returns [`SubmissionError`] if the CPU domain refuses the task.
299    #[inline]
300    pub fn submit_tracked_cpu<T, E>(
301        &self,
302        task: T,
303    ) -> Result<RayonTaskHandle<(), E>, SubmissionError>
304    where
305        T: Runnable<E> + Send + 'static,
306        E: Send + 'static,
307    {
308        self.cpu.submit_tracked(task)
309    }
310
311    /// Submits a CPU-bound callable task to the Rayon domain.
312    ///
313    /// # Parameters
314    ///
315    /// * `task` - Callable CPU task.
316    ///
317    /// # Returns
318    ///
319    /// A [`TaskHandle`] for the accepted CPU task.
320    ///
321    /// # Errors
322    ///
323    /// Returns [`SubmissionError`] if the CPU domain refuses the task.
324    #[inline]
325    pub fn submit_cpu_callable<C, R, E>(&self, task: C) -> Result<TaskHandle<R, E>, SubmissionError>
326    where
327        C: Callable<R, E> + Send + 'static,
328        R: Send + 'static,
329        E: Send + 'static,
330    {
331        self.cpu.submit_callable(task)
332    }
333
334    /// Submits a CPU-bound callable task and returns a tracked handle.
335    ///
336    /// # Parameters
337    ///
338    /// * `task` - Callable CPU task.
339    ///
340    /// # Returns
341    ///
342    /// A [`RayonTaskHandle`] for the accepted CPU task.
343    ///
344    /// # Errors
345    ///
346    /// Returns [`SubmissionError`] if the CPU domain refuses the task.
347    #[inline]
348    pub fn submit_tracked_cpu_callable<C, R, E>(
349        &self,
350        task: C,
351    ) -> Result<RayonTaskHandle<R, E>, SubmissionError>
352    where
353        C: Callable<R, E> + Send + 'static,
354        R: Send + 'static,
355        E: Send + 'static,
356    {
357        self.cpu.submit_tracked_callable(task)
358    }
359
360    /// Submits a blocking runnable task to Tokio `spawn_blocking`.
361    ///
362    /// # Parameters
363    ///
364    /// * `task` - Runnable task to execute on Tokio's blocking pool.
365    ///
366    /// # Returns
367    ///
368    /// `Ok(())` if the Tokio blocking domain accepts the task.
369    ///
370    /// # Errors
371    ///
372    /// Returns [`SubmissionError`] if the Tokio blocking domain refuses the
373    /// task.
374    #[inline]
375    pub fn submit_tokio_blocking<T, E>(&self, task: T) -> Result<(), SubmissionError>
376    where
377        T: Runnable<E> + Send + 'static,
378        E: Send + 'static,
379    {
380        self.tokio_blocking.submit(task)
381    }
382
383    /// Submits a blocking runnable task to Tokio and returns a tracked handle.
384    ///
385    /// # Parameters
386    ///
387    /// * `task` - Runnable task to execute on Tokio's blocking pool.
388    ///
389    /// # Returns
390    ///
391    /// A [`TrackedTask`] for the accepted blocking task.
392    ///
393    /// # Errors
394    ///
395    /// Returns [`SubmissionError`] if the Tokio blocking domain refuses the
396    /// task.
397    #[inline]
398    pub fn submit_tracked_tokio_blocking<T, E>(
399        &self,
400        task: T,
401    ) -> Result<TrackedTask<(), E>, SubmissionError>
402    where
403        T: Runnable<E> + Send + 'static,
404        E: Send + 'static,
405    {
406        self.tokio_blocking.submit_tracked(task)
407    }
408
409    /// Submits a blocking callable task to Tokio `spawn_blocking`.
410    ///
411    /// # Parameters
412    ///
413    /// * `task` - Callable task to execute on Tokio's blocking pool.
414    ///
415    /// # Returns
416    ///
417    /// A [`TaskHandle`] for the accepted blocking task.
418    ///
419    /// # Errors
420    ///
421    /// Returns [`SubmissionError`] if the Tokio blocking domain refuses the
422    /// task.
423    #[inline]
424    pub fn submit_tokio_blocking_callable<C, R, E>(
425        &self,
426        task: C,
427    ) -> Result<TaskHandle<R, E>, SubmissionError>
428    where
429        C: Callable<R, E> + Send + 'static,
430        R: Send + 'static,
431        E: Send + 'static,
432    {
433        self.tokio_blocking.submit_callable(task)
434    }
435
436    /// Submits a blocking callable task to Tokio and returns a tracked handle.
437    ///
438    /// # Parameters
439    ///
440    /// * `task` - Callable task to execute on Tokio's blocking pool.
441    ///
442    /// # Returns
443    ///
444    /// A [`TrackedTask`] for the accepted blocking task.
445    ///
446    /// # Errors
447    ///
448    /// Returns [`SubmissionError`] if the Tokio blocking domain refuses the
449    /// task.
450    #[inline]
451    pub fn submit_tracked_tokio_blocking_callable<C, R, E>(
452        &self,
453        task: C,
454    ) -> Result<TrackedTask<R, E>, SubmissionError>
455    where
456        C: Callable<R, E> + Send + 'static,
457        R: Send + 'static,
458        E: Send + 'static,
459    {
460        self.tokio_blocking.submit_tracked_callable(task)
461    }
462
463    /// Spawns an async IO or Future-based task on Tokio's async runtime.
464    ///
465    /// # Parameters
466    ///
467    /// * `future` - Future to execute on Tokio's async scheduler.
468    ///
469    /// # Returns
470    ///
471    /// A [`TokioTaskHandle`] for the accepted async task.
472    ///
473    /// # Errors
474    ///
475    /// Returns [`SubmissionError`] if the Tokio IO domain refuses the task.
476    #[inline]
477    pub fn spawn_io<F, R, E>(&self, future: F) -> Result<TokioTaskHandle<R, E>, SubmissionError>
478    where
479        F: Future<Output = Result<R, E>> + Send + 'static,
480        R: Send + 'static,
481        E: Send + 'static,
482    {
483        self.io.spawn(future)
484    }
485
486    /// Requests graceful shutdown for every execution domain.
487    pub fn shutdown(&self) {
488        self.blocking.shutdown();
489        self.cpu.shutdown();
490        self.tokio_blocking.shutdown();
491        self.io.shutdown();
492    }
493
494    /// Requests abrupt stop for every execution domain.
495    ///
496    /// # Returns
497    ///
498    /// A per-domain aggregate report describing queued, running, and cancelled
499    /// work observed during shutdown.
500    pub fn stop(&self) -> ExecutionServicesStopReport {
501        ExecutionServicesStopReport {
502            blocking: self.blocking.stop(),
503            cpu: self.cpu.stop(),
504            tokio_blocking: self.tokio_blocking.stop(),
505            io: self.io.stop(),
506        }
507    }
508
509    /// Returns the aggregate lifecycle state.
510    ///
511    /// # Returns
512    ///
513    /// [`ExecutorServiceLifecycle::Terminated`] if all domains have
514    /// terminated; [`ExecutorServiceLifecycle::Stopping`] if any domain is
515    /// stopping; [`ExecutorServiceLifecycle::ShuttingDown`] if any domain is no
516    /// longer running; otherwise [`ExecutorServiceLifecycle::Running`].
517    pub fn lifecycle(&self) -> ExecutorServiceLifecycle {
518        let lifecycles = [
519            self.blocking.lifecycle(),
520            self.cpu.lifecycle(),
521            self.tokio_blocking.lifecycle(),
522            self.io.lifecycle(),
523        ];
524        if lifecycles
525            .iter()
526            .all(|state| *state == ExecutorServiceLifecycle::Terminated)
527        {
528            ExecutorServiceLifecycle::Terminated
529        } else if lifecycles.contains(&ExecutorServiceLifecycle::Stopping) {
530            ExecutorServiceLifecycle::Stopping
531        } else if lifecycles
532            .iter()
533            .any(|state| *state != ExecutorServiceLifecycle::Running)
534        {
535            ExecutorServiceLifecycle::ShuttingDown
536        } else {
537            ExecutorServiceLifecycle::Running
538        }
539    }
540
541    /// Returns whether every execution domain is running.
542    ///
543    /// # Returns
544    ///
545    /// `true` only if all execution domains are running.
546    #[inline]
547    pub fn is_running(&self) -> bool {
548        self.lifecycle() == ExecutorServiceLifecycle::Running
549    }
550
551    /// Returns whether any execution domain is gracefully shutting down.
552    ///
553    /// # Returns
554    ///
555    /// `true` when the aggregate lifecycle is
556    /// [`ExecutorServiceLifecycle::ShuttingDown`].
557    #[inline]
558    pub fn is_shutting_down(&self) -> bool {
559        self.lifecycle() == ExecutorServiceLifecycle::ShuttingDown
560    }
561
562    /// Returns whether any execution domain is stopping abruptly.
563    ///
564    /// # Returns
565    ///
566    /// `true` when the aggregate lifecycle is
567    /// [`ExecutorServiceLifecycle::Stopping`].
568    #[inline]
569    pub fn is_stopping(&self) -> bool {
570        self.lifecycle() == ExecutorServiceLifecycle::Stopping
571    }
572
573    /// Returns whether the facade is no longer fully running.
574    ///
575    /// # Returns
576    ///
577    /// `true` after any execution domain starts shutdown, stop, or has already
578    /// terminated.
579    #[inline]
580    pub fn is_not_running(&self) -> bool {
581        self.lifecycle() != ExecutorServiceLifecycle::Running
582    }
583
584    /// Returns whether every execution domain has terminated.
585    ///
586    /// # Returns
587    ///
588    /// `true` only after all execution domains have terminated.
589    #[inline]
590    pub fn is_terminated(&self) -> bool {
591        self.lifecycle() == ExecutorServiceLifecycle::Terminated
592    }
593
594    /// Waits until every execution domain has terminated.
595    ///
596    /// # Returns
597    ///
598    /// A future that resolves after all execution domains have terminated.
599    pub fn await_termination(&self) -> Pin<Box<dyn Future<Output = ()> + Send + '_>> {
600        Box::pin(async move {
601            let blocking = Arc::clone(&self.blocking);
602            let cpu = self.cpu.clone();
603            let tokio_blocking = self.tokio_blocking.clone();
604            let blocking_wait = tokio::task::spawn_blocking(move || blocking.wait_termination());
605            let cpu_wait = tokio::task::spawn_blocking(move || cpu.wait_termination());
606            let tokio_blocking_wait =
607                tokio::task::spawn_blocking(move || tokio_blocking.wait_termination());
608            self.io.await_termination().await;
609            let _ = blocking_wait.await;
610            let _ = cpu_wait.await;
611            let _ = tokio_blocking_wait.await;
612        })
613    }
614}