Skip to main content

qubit_execution_services/
execution_services.rs

1/*******************************************************************************
2 *
3 *    Copyright (c) 2025 - 2026 Haixing Hu.
4 *
5 *    SPDX-License-Identifier: Apache-2.0
6 *
7 *    Licensed under the Apache License, Version 2.0.
8 *
9 ******************************************************************************/
10use std::{
11    future::Future,
12    pin::Pin,
13    sync::Arc,
14};
15
16use qubit_executor::{
17    TaskHandle,
18    TrackedTask,
19};
20use qubit_function::{
21    Callable,
22    Runnable,
23};
24use qubit_thread_pool::{
25    ThreadPool,
26    ThreadPoolBuilder,
27};
28use qubit_tokio_executor::TokioExecutorService;
29
30use super::{
31    ExecutionServicesBuildError,
32    ExecutionServicesBuilder,
33    ExecutionServicesStopReport,
34    ExecutorService,
35    ExecutorServiceLifecycle,
36    RayonExecutorService,
37    RayonTaskHandle,
38    SubmissionError,
39    TokioBlockingTaskHandle,
40    TokioIoExecutorService,
41    TokioTaskHandle,
42};
43
44/// Default managed service for synchronous tasks that may block an OS thread.
45pub type BlockingExecutorService = ThreadPool;
46
47/// Builder alias for configuring [`BlockingExecutorService`].
48pub type BlockingExecutorServiceBuilder = ThreadPoolBuilder;
49
50/// Tokio-backed blocking executor service routed through `spawn_blocking`.
51pub type TokioBlockingExecutorService = TokioExecutorService;
52
53/// Unified facade exposing separate execution domains through one owner.
54///
55/// The facade does not implement a single scheduling core. Instead it routes
56/// work to one of four dedicated execution domains:
57///
58/// - `blocking`: synchronous tasks that may block an OS thread.
59/// - `cpu`: CPU-bound synchronous tasks backed by Rayon.
60/// - `tokio_blocking`: blocking tasks routed through Tokio `spawn_blocking`.
61/// - `io`: async futures spawned on Tokio's async runtime.
62pub struct ExecutionServices {
63    /// Managed service for synchronous tasks that may block OS threads.
64    blocking: Arc<BlockingExecutorService>,
65    /// Managed service for CPU-bound synchronous tasks.
66    cpu: RayonExecutorService,
67    /// Tokio-backed blocking service using `spawn_blocking`.
68    tokio_blocking: TokioBlockingExecutorService,
69    /// Tokio-backed async service for Future-based tasks.
70    io: TokioIoExecutorService,
71}
72
73impl ExecutionServices {
74    /// Creates an execution-services facade from its four execution domains.
75    ///
76    /// # Parameters
77    ///
78    /// * `blocking` - Blocking executor domain.
79    /// * `cpu` - CPU-bound executor domain.
80    /// * `tokio_blocking` - Tokio blocking executor domain.
81    /// * `io` - Tokio async IO executor domain.
82    ///
83    /// # Returns
84    ///
85    /// An execution-services facade owning all supplied domains.
86    pub(crate) fn from_parts(
87        blocking: BlockingExecutorService,
88        cpu: RayonExecutorService,
89        tokio_blocking: TokioBlockingExecutorService,
90        io: TokioIoExecutorService,
91    ) -> Self {
92        Self {
93            blocking: Arc::new(blocking),
94            cpu,
95            tokio_blocking,
96            io,
97        }
98    }
99
100    /// Creates an execution-services facade with default builder settings.
101    ///
102    /// # Returns
103    ///
104    /// `Ok(ExecutionServices)` if the default blocking and CPU domains build
105    /// successfully.
106    ///
107    /// # Errors
108    ///
109    /// Returns [`ExecutionServicesBuildError`] if the default builder
110    /// configuration is rejected.
111    #[inline]
112    pub fn new() -> Result<Self, ExecutionServicesBuildError> {
113        Self::builder().build()
114    }
115
116    /// Creates a builder for configuring the execution-services facade.
117    ///
118    /// # Returns
119    ///
120    /// A builder configured with CPU-parallelism defaults.
121    #[inline]
122    pub fn builder() -> ExecutionServicesBuilder {
123        ExecutionServicesBuilder::default()
124    }
125
126    /// Returns the blocking execution domain.
127    ///
128    /// # Returns
129    ///
130    /// A shared reference to the blocking executor service.
131    #[inline]
132    pub fn blocking(&self) -> &BlockingExecutorService {
133        self.blocking.as_ref()
134    }
135
136    /// Returns the CPU execution domain.
137    ///
138    /// # Returns
139    ///
140    /// A shared reference to the Rayon-backed CPU executor service.
141    #[inline]
142    pub fn cpu(&self) -> &RayonExecutorService {
143        &self.cpu
144    }
145
146    /// Returns the Tokio blocking execution domain.
147    ///
148    /// # Returns
149    ///
150    /// A shared reference to the Tokio blocking executor service.
151    #[inline]
152    pub fn tokio_blocking(&self) -> &TokioBlockingExecutorService {
153        &self.tokio_blocking
154    }
155
156    /// Returns the Tokio async IO execution domain.
157    ///
158    /// # Returns
159    ///
160    /// A shared reference to the Tokio IO executor service.
161    #[inline]
162    pub fn io(&self) -> &TokioIoExecutorService {
163        &self.io
164    }
165
166    /// Submits a blocking runnable task to the blocking domain.
167    ///
168    /// # Parameters
169    ///
170    /// * `task` - Runnable task that may block an OS thread.
171    ///
172    /// # Returns
173    ///
174    /// `Ok(())` if the blocking domain accepts the task.
175    ///
176    /// # Errors
177    ///
178    /// Returns [`SubmissionError`] if the blocking domain refuses the task.
179    #[inline]
180    pub fn submit_blocking<T, E>(&self, task: T) -> Result<(), SubmissionError>
181    where
182        T: Runnable<E> + Send + 'static,
183        E: Send + 'static,
184    {
185        self.blocking.submit(task)
186    }
187
188    /// Submits a blocking runnable task and returns a tracked handle.
189    ///
190    /// # Parameters
191    ///
192    /// * `task` - Runnable task that may block an OS thread.
193    ///
194    /// # Returns
195    ///
196    /// A [`TrackedTask`] for the accepted blocking task.
197    ///
198    /// # Errors
199    ///
200    /// Returns [`SubmissionError`] if the blocking domain refuses the task.
201    #[inline]
202    pub fn submit_tracked_blocking<T, E>(
203        &self,
204        task: T,
205    ) -> Result<TrackedTask<(), E>, SubmissionError>
206    where
207        T: Runnable<E> + Send + 'static,
208        E: Send + 'static,
209    {
210        self.blocking.submit_tracked(task)
211    }
212
213    /// Submits a blocking callable task to the blocking domain.
214    ///
215    /// # Parameters
216    ///
217    /// * `task` - Callable task that may block an OS thread.
218    ///
219    /// # Returns
220    ///
221    /// A [`TaskHandle`] for the accepted blocking task.
222    ///
223    /// # Errors
224    ///
225    /// Returns [`SubmissionError`] if the blocking domain refuses the task.
226    #[inline]
227    pub fn submit_blocking_callable<C, R, E>(
228        &self,
229        task: C,
230    ) -> Result<TaskHandle<R, E>, SubmissionError>
231    where
232        C: Callable<R, E> + Send + 'static,
233        R: Send + 'static,
234        E: Send + 'static,
235    {
236        self.blocking.submit_callable(task)
237    }
238
239    /// Submits a blocking callable task and returns a tracked handle.
240    ///
241    /// # Parameters
242    ///
243    /// * `task` - Callable task that may block an OS thread.
244    ///
245    /// # Returns
246    ///
247    /// A [`TrackedTask`] for the accepted blocking task.
248    ///
249    /// # Errors
250    ///
251    /// Returns [`SubmissionError`] if the blocking domain refuses the task.
252    #[inline]
253    pub fn submit_tracked_blocking_callable<C, R, E>(
254        &self,
255        task: C,
256    ) -> Result<TrackedTask<R, E>, SubmissionError>
257    where
258        C: Callable<R, E> + Send + 'static,
259        R: Send + 'static,
260        E: Send + 'static,
261    {
262        self.blocking.submit_tracked_callable(task)
263    }
264
265    /// Submits a CPU-bound runnable task to the Rayon domain.
266    ///
267    /// # Parameters
268    ///
269    /// * `task` - Runnable CPU task.
270    ///
271    /// # Returns
272    ///
273    /// `Ok(())` if the CPU domain accepts the task.
274    ///
275    /// # Errors
276    ///
277    /// Returns [`SubmissionError`] if the CPU domain refuses the task.
278    #[inline]
279    pub fn submit_cpu<T, E>(&self, task: T) -> Result<(), SubmissionError>
280    where
281        T: Runnable<E> + Send + 'static,
282        E: Send + 'static,
283    {
284        self.cpu.submit(task)
285    }
286
287    /// Submits a CPU-bound runnable task and returns a tracked handle.
288    ///
289    /// # Parameters
290    ///
291    /// * `task` - Runnable CPU task.
292    ///
293    /// # Returns
294    ///
295    /// A [`RayonTaskHandle`] for the accepted CPU task.
296    ///
297    /// # Errors
298    ///
299    /// Returns [`SubmissionError`] if the CPU domain refuses the task.
300    #[inline]
301    pub fn submit_tracked_cpu<T, E>(
302        &self,
303        task: T,
304    ) -> Result<RayonTaskHandle<(), E>, SubmissionError>
305    where
306        T: Runnable<E> + Send + 'static,
307        E: Send + 'static,
308    {
309        self.cpu.submit_tracked(task)
310    }
311
312    /// Submits a CPU-bound callable task to the Rayon domain.
313    ///
314    /// # Parameters
315    ///
316    /// * `task` - Callable CPU task.
317    ///
318    /// # Returns
319    ///
320    /// A [`TaskHandle`] for the accepted CPU task.
321    ///
322    /// # Errors
323    ///
324    /// Returns [`SubmissionError`] if the CPU domain refuses the task.
325    #[inline]
326    pub fn submit_cpu_callable<C, R, E>(&self, task: C) -> Result<TaskHandle<R, E>, SubmissionError>
327    where
328        C: Callable<R, E> + Send + 'static,
329        R: Send + 'static,
330        E: Send + 'static,
331    {
332        self.cpu.submit_callable(task)
333    }
334
335    /// Submits a CPU-bound callable task and returns a tracked handle.
336    ///
337    /// # Parameters
338    ///
339    /// * `task` - Callable CPU task.
340    ///
341    /// # Returns
342    ///
343    /// A [`RayonTaskHandle`] for the accepted CPU task.
344    ///
345    /// # Errors
346    ///
347    /// Returns [`SubmissionError`] if the CPU domain refuses the task.
348    #[inline]
349    pub fn submit_tracked_cpu_callable<C, R, E>(
350        &self,
351        task: C,
352    ) -> Result<RayonTaskHandle<R, E>, SubmissionError>
353    where
354        C: Callable<R, E> + Send + 'static,
355        R: Send + 'static,
356        E: Send + 'static,
357    {
358        self.cpu.submit_tracked_callable(task)
359    }
360
361    /// Submits a blocking runnable task to Tokio `spawn_blocking`.
362    ///
363    /// # Parameters
364    ///
365    /// * `task` - Runnable task to execute on Tokio's blocking pool.
366    ///
367    /// # Returns
368    ///
369    /// `Ok(())` if the Tokio blocking domain accepts the task.
370    ///
371    /// # Errors
372    ///
373    /// Returns [`SubmissionError`] if the Tokio blocking domain refuses the
374    /// task.
375    #[inline]
376    pub fn submit_tokio_blocking<T, E>(&self, task: T) -> Result<(), SubmissionError>
377    where
378        T: Runnable<E> + Send + 'static,
379        E: Send + 'static,
380    {
381        self.tokio_blocking.submit(task)
382    }
383
384    /// Submits a blocking runnable task to Tokio and returns a tracked handle.
385    ///
386    /// # Parameters
387    ///
388    /// * `task` - Runnable task to execute on Tokio's blocking pool.
389    ///
390    /// # Returns
391    ///
392    /// A [`TokioBlockingTaskHandle`] for the accepted blocking task.
393    ///
394    /// # Errors
395    ///
396    /// Returns [`SubmissionError`] if the Tokio blocking domain refuses the
397    /// task.
398    #[inline]
399    pub fn submit_tracked_tokio_blocking<T, E>(
400        &self,
401        task: T,
402    ) -> Result<TokioBlockingTaskHandle<(), E>, SubmissionError>
403    where
404        T: Runnable<E> + Send + 'static,
405        E: Send + 'static,
406    {
407        self.tokio_blocking.submit_tracked(task)
408    }
409
410    /// Submits a blocking callable task to Tokio `spawn_blocking`.
411    ///
412    /// # Parameters
413    ///
414    /// * `task` - Callable task to execute on Tokio's blocking pool.
415    ///
416    /// # Returns
417    ///
418    /// A [`TaskHandle`] for the accepted blocking task.
419    ///
420    /// # Errors
421    ///
422    /// Returns [`SubmissionError`] if the Tokio blocking domain refuses the
423    /// task.
424    #[inline]
425    pub fn submit_tokio_blocking_callable<C, R, E>(
426        &self,
427        task: C,
428    ) -> Result<TaskHandle<R, E>, SubmissionError>
429    where
430        C: Callable<R, E> + Send + 'static,
431        R: Send + 'static,
432        E: Send + 'static,
433    {
434        self.tokio_blocking.submit_callable(task)
435    }
436
437    /// Submits a blocking callable task to Tokio and returns a tracked handle.
438    ///
439    /// # Parameters
440    ///
441    /// * `task` - Callable task to execute on Tokio's blocking pool.
442    ///
443    /// # Returns
444    ///
445    /// A [`TokioBlockingTaskHandle`] for the accepted blocking task.
446    ///
447    /// # Errors
448    ///
449    /// Returns [`SubmissionError`] if the Tokio blocking domain refuses the
450    /// task.
451    #[inline]
452    pub fn submit_tracked_tokio_blocking_callable<C, R, E>(
453        &self,
454        task: C,
455    ) -> Result<TokioBlockingTaskHandle<R, E>, SubmissionError>
456    where
457        C: Callable<R, E> + Send + 'static,
458        R: Send + 'static,
459        E: Send + 'static,
460    {
461        self.tokio_blocking.submit_tracked_callable(task)
462    }
463
464    /// Spawns an async IO or Future-based task on Tokio's async runtime.
465    ///
466    /// # Parameters
467    ///
468    /// * `future` - Future to execute on Tokio's async scheduler.
469    ///
470    /// # Returns
471    ///
472    /// A [`TokioTaskHandle`] for the accepted async task.
473    ///
474    /// # Errors
475    ///
476    /// Returns [`SubmissionError`] if the Tokio IO domain refuses the task.
477    #[inline]
478    pub fn spawn_io<F, R, E>(&self, future: F) -> Result<TokioTaskHandle<R, E>, SubmissionError>
479    where
480        F: Future<Output = Result<R, E>> + Send + 'static,
481        R: Send + 'static,
482        E: Send + 'static,
483    {
484        self.io.spawn(future)
485    }
486
487    /// Requests graceful shutdown for every execution domain.
488    pub fn shutdown(&self) {
489        self.blocking.shutdown();
490        self.cpu.shutdown();
491        self.tokio_blocking.shutdown();
492        self.io.shutdown();
493    }
494
495    /// Requests abrupt stop for every execution domain.
496    ///
497    /// # Returns
498    ///
499    /// A per-domain aggregate report describing queued, running, and cancelled
500    /// work observed during shutdown.
501    pub fn stop(&self) -> ExecutionServicesStopReport {
502        ExecutionServicesStopReport {
503            blocking: self.blocking.stop(),
504            cpu: self.cpu.stop(),
505            tokio_blocking: self.tokio_blocking.stop(),
506            io: self.io.stop(),
507        }
508    }
509
510    /// Returns the aggregate lifecycle state.
511    ///
512    /// # Returns
513    ///
514    /// [`ExecutorServiceLifecycle::Terminated`] if all domains have
515    /// terminated; [`ExecutorServiceLifecycle::Stopping`] if any domain is
516    /// stopping; [`ExecutorServiceLifecycle::ShuttingDown`] if any domain is no
517    /// longer running; otherwise [`ExecutorServiceLifecycle::Running`].
518    pub fn lifecycle(&self) -> ExecutorServiceLifecycle {
519        let lifecycles = [
520            self.blocking.lifecycle(),
521            self.cpu.lifecycle(),
522            self.tokio_blocking.lifecycle(),
523            self.io.lifecycle(),
524        ];
525        if lifecycles
526            .iter()
527            .all(|state| *state == ExecutorServiceLifecycle::Terminated)
528        {
529            ExecutorServiceLifecycle::Terminated
530        } else if lifecycles.contains(&ExecutorServiceLifecycle::Stopping) {
531            ExecutorServiceLifecycle::Stopping
532        } else if lifecycles
533            .iter()
534            .any(|state| *state != ExecutorServiceLifecycle::Running)
535        {
536            ExecutorServiceLifecycle::ShuttingDown
537        } else {
538            ExecutorServiceLifecycle::Running
539        }
540    }
541
542    /// Returns whether every execution domain is running.
543    ///
544    /// # Returns
545    ///
546    /// `true` only if all execution domains are running.
547    #[inline]
548    pub fn is_running(&self) -> bool {
549        self.lifecycle() == ExecutorServiceLifecycle::Running
550    }
551
552    /// Returns whether any execution domain is gracefully shutting down.
553    ///
554    /// # Returns
555    ///
556    /// `true` when the aggregate lifecycle is
557    /// [`ExecutorServiceLifecycle::ShuttingDown`].
558    #[inline]
559    pub fn is_shutting_down(&self) -> bool {
560        self.lifecycle() == ExecutorServiceLifecycle::ShuttingDown
561    }
562
563    /// Returns whether any execution domain is stopping abruptly.
564    ///
565    /// # Returns
566    ///
567    /// `true` when the aggregate lifecycle is
568    /// [`ExecutorServiceLifecycle::Stopping`].
569    #[inline]
570    pub fn is_stopping(&self) -> bool {
571        self.lifecycle() == ExecutorServiceLifecycle::Stopping
572    }
573
574    /// Returns whether the facade is no longer fully running.
575    ///
576    /// # Returns
577    ///
578    /// `true` after any execution domain starts shutdown, stop, or has already
579    /// terminated.
580    #[inline]
581    pub fn is_not_running(&self) -> bool {
582        self.lifecycle() != ExecutorServiceLifecycle::Running
583    }
584
585    /// Returns whether every execution domain has terminated.
586    ///
587    /// # Returns
588    ///
589    /// `true` only after all execution domains have terminated.
590    #[inline]
591    pub fn is_terminated(&self) -> bool {
592        self.lifecycle() == ExecutorServiceLifecycle::Terminated
593    }
594
595    /// Waits until every execution domain has terminated.
596    ///
597    /// # Returns
598    ///
599    /// A future that resolves after all execution domains have terminated.
600    pub fn await_termination(&self) -> Pin<Box<dyn Future<Output = ()> + Send + '_>> {
601        Box::pin(async move {
602            let blocking = Arc::clone(&self.blocking);
603            let cpu = self.cpu.clone();
604            let tokio_blocking = self.tokio_blocking.clone();
605            let blocking_wait = tokio::task::spawn_blocking(move || blocking.wait_termination());
606            let cpu_wait = tokio::task::spawn_blocking(move || cpu.wait_termination());
607            let tokio_blocking_wait =
608                tokio::task::spawn_blocking(move || tokio_blocking.wait_termination());
609            self.io.await_termination().await;
610            let _ = blocking_wait.await;
611            let _ = cpu_wait.await;
612            let _ = tokio_blocking_wait.await;
613        })
614    }
615}