Skip to main content

qubit_execution_services/
execution_services.rs

1/*******************************************************************************
2 *
3 *    Copyright (c) 2025 - 2026 Haixing Hu.
4 *
5 *    SPDX-License-Identifier: Apache-2.0
6 *
7 *    Licensed under the Apache License, Version 2.0.
8 *
9 ******************************************************************************/
10use std::{
11    future::Future,
12    pin::Pin,
13    sync::Arc,
14    time::Duration,
15};
16
17use qubit_executor::{
18    TaskHandle,
19    TrackedTask,
20};
21use qubit_function::{
22    Callable,
23    Runnable,
24};
25use qubit_thread_pool::{
26    ThreadPool,
27    ThreadPoolBuilder,
28};
29use qubit_tokio_executor::TokioExecutorService;
30
31use super::{
32    ExecutionServicesBuildError,
33    ExecutionServicesBuilder,
34    ExecutionServicesStopReport,
35    ExecutorService,
36    ExecutorServiceLifecycle,
37    RayonExecutorService,
38    RayonTaskHandle,
39    SubmissionError,
40    TokioBlockingTaskHandle,
41    TokioIoExecutorService,
42    TokioTaskHandle,
43};
44
45/// Default managed service for synchronous tasks that may block an OS thread.
46pub type BlockingExecutorService = ThreadPool;
47
48/// Builder alias for configuring [`BlockingExecutorService`].
49pub type BlockingExecutorServiceBuilder = ThreadPoolBuilder;
50
51/// Tokio-backed blocking executor service routed through `spawn_blocking`.
52pub type TokioBlockingExecutorService = TokioExecutorService;
53
54/// Unified facade exposing separate execution domains through one owner.
55///
56/// The facade does not implement a single scheduling core. Instead it routes
57/// work to one of four dedicated execution domains:
58///
59/// - `blocking`: synchronous tasks that may block an OS thread.
60/// - `cpu`: CPU-bound synchronous tasks backed by Rayon.
61/// - `tokio_blocking`: blocking tasks routed through Tokio `spawn_blocking`.
62/// - `io`: async futures spawned on Tokio's async runtime.
63pub struct ExecutionServices {
64    /// Managed service for synchronous tasks that may block OS threads.
65    blocking: Arc<BlockingExecutorService>,
66    /// Managed service for CPU-bound synchronous tasks.
67    cpu: RayonExecutorService,
68    /// Tokio-backed blocking service using `spawn_blocking`.
69    tokio_blocking: TokioBlockingExecutorService,
70    /// Tokio-backed async service for Future-based tasks.
71    io: TokioIoExecutorService,
72}
73
74impl ExecutionServices {
75    /// Creates an execution-services facade from its four execution domains.
76    ///
77    /// # Parameters
78    ///
79    /// * `blocking` - Blocking executor domain.
80    /// * `cpu` - CPU-bound executor domain.
81    /// * `tokio_blocking` - Tokio blocking executor domain.
82    /// * `io` - Tokio async IO executor domain.
83    ///
84    /// # Returns
85    ///
86    /// An execution-services facade owning all supplied domains.
87    pub(crate) fn from_parts(
88        blocking: BlockingExecutorService,
89        cpu: RayonExecutorService,
90        tokio_blocking: TokioBlockingExecutorService,
91        io: TokioIoExecutorService,
92    ) -> Self {
93        Self {
94            blocking: Arc::new(blocking),
95            cpu,
96            tokio_blocking,
97            io,
98        }
99    }
100
101    /// Creates an execution-services facade with default builder settings.
102    ///
103    /// # Returns
104    ///
105    /// `Ok(ExecutionServices)` if the default blocking and CPU domains build
106    /// successfully.
107    ///
108    /// # Errors
109    ///
110    /// Returns [`ExecutionServicesBuildError`] if the default builder
111    /// configuration is rejected.
112    #[inline]
113    pub fn new() -> Result<Self, ExecutionServicesBuildError> {
114        Self::builder().build()
115    }
116
117    /// Creates a builder for configuring the execution-services facade.
118    ///
119    /// # Returns
120    ///
121    /// A builder configured with CPU-parallelism defaults.
122    #[inline]
123    pub fn builder() -> ExecutionServicesBuilder {
124        ExecutionServicesBuilder::default()
125    }
126
127    /// Returns the blocking execution domain.
128    ///
129    /// # Returns
130    ///
131    /// A shared reference to the blocking executor service.
132    #[inline]
133    pub fn blocking(&self) -> &BlockingExecutorService {
134        self.blocking.as_ref()
135    }
136
137    /// Returns the CPU execution domain.
138    ///
139    /// # Returns
140    ///
141    /// A shared reference to the Rayon-backed CPU executor service.
142    #[inline]
143    pub fn cpu(&self) -> &RayonExecutorService {
144        &self.cpu
145    }
146
147    /// Returns the Tokio blocking execution domain.
148    ///
149    /// # Returns
150    ///
151    /// A shared reference to the Tokio blocking executor service.
152    #[inline]
153    pub fn tokio_blocking(&self) -> &TokioBlockingExecutorService {
154        &self.tokio_blocking
155    }
156
157    /// Returns the Tokio async IO execution domain.
158    ///
159    /// # Returns
160    ///
161    /// A shared reference to the Tokio IO executor service.
162    #[inline]
163    pub fn io(&self) -> &TokioIoExecutorService {
164        &self.io
165    }
166
167    /// Submits a blocking runnable task to the blocking domain.
168    ///
169    /// # Parameters
170    ///
171    /// * `task` - Runnable task that may block an OS thread.
172    ///
173    /// # Returns
174    ///
175    /// `Ok(())` if the blocking domain accepts the task.
176    ///
177    /// # Errors
178    ///
179    /// Returns [`SubmissionError`] if the blocking domain refuses the task.
180    #[inline]
181    pub fn submit_blocking<T, E>(&self, task: T) -> Result<(), SubmissionError>
182    where
183        T: Runnable<E> + Send + 'static,
184        E: Send + 'static,
185    {
186        self.blocking.submit(task)
187    }
188
189    /// Submits a blocking runnable task and returns a tracked handle.
190    ///
191    /// # Parameters
192    ///
193    /// * `task` - Runnable task that may block an OS thread.
194    ///
195    /// # Returns
196    ///
197    /// A [`TrackedTask`] for the accepted blocking task.
198    ///
199    /// # Errors
200    ///
201    /// Returns [`SubmissionError`] if the blocking domain refuses the task.
202    #[inline]
203    pub fn submit_tracked_blocking<T, E>(
204        &self,
205        task: T,
206    ) -> Result<TrackedTask<(), E>, SubmissionError>
207    where
208        T: Runnable<E> + Send + 'static,
209        E: Send + 'static,
210    {
211        self.blocking.submit_tracked(task)
212    }
213
214    /// Submits a blocking callable task to the blocking domain.
215    ///
216    /// # Parameters
217    ///
218    /// * `task` - Callable task that may block an OS thread.
219    ///
220    /// # Returns
221    ///
222    /// A [`TaskHandle`] for the accepted blocking task.
223    ///
224    /// # Errors
225    ///
226    /// Returns [`SubmissionError`] if the blocking domain refuses the task.
227    #[inline]
228    pub fn submit_blocking_callable<C, R, E>(
229        &self,
230        task: C,
231    ) -> Result<TaskHandle<R, E>, SubmissionError>
232    where
233        C: Callable<R, E> + Send + 'static,
234        R: Send + 'static,
235        E: Send + 'static,
236    {
237        self.blocking.submit_callable(task)
238    }
239
240    /// Submits a blocking callable task and returns a tracked handle.
241    ///
242    /// # Parameters
243    ///
244    /// * `task` - Callable task that may block an OS thread.
245    ///
246    /// # Returns
247    ///
248    /// A [`TrackedTask`] for the accepted blocking task.
249    ///
250    /// # Errors
251    ///
252    /// Returns [`SubmissionError`] if the blocking domain refuses the task.
253    #[inline]
254    pub fn submit_tracked_blocking_callable<C, R, E>(
255        &self,
256        task: C,
257    ) -> Result<TrackedTask<R, E>, SubmissionError>
258    where
259        C: Callable<R, E> + Send + 'static,
260        R: Send + 'static,
261        E: Send + 'static,
262    {
263        self.blocking.submit_tracked_callable(task)
264    }
265
266    /// Submits a CPU-bound runnable task to the Rayon domain.
267    ///
268    /// # Parameters
269    ///
270    /// * `task` - Runnable CPU task.
271    ///
272    /// # Returns
273    ///
274    /// `Ok(())` if the CPU domain accepts the task.
275    ///
276    /// # Errors
277    ///
278    /// Returns [`SubmissionError`] if the CPU domain refuses the task.
279    #[inline]
280    pub fn submit_cpu<T, E>(&self, task: T) -> Result<(), SubmissionError>
281    where
282        T: Runnable<E> + Send + 'static,
283        E: Send + 'static,
284    {
285        self.cpu.submit(task)
286    }
287
288    /// Submits a CPU-bound runnable task and returns a tracked handle.
289    ///
290    /// # Parameters
291    ///
292    /// * `task` - Runnable CPU task.
293    ///
294    /// # Returns
295    ///
296    /// A [`RayonTaskHandle`] for the accepted CPU task.
297    ///
298    /// # Errors
299    ///
300    /// Returns [`SubmissionError`] if the CPU domain refuses the task.
301    #[inline]
302    pub fn submit_tracked_cpu<T, E>(
303        &self,
304        task: T,
305    ) -> Result<RayonTaskHandle<(), E>, SubmissionError>
306    where
307        T: Runnable<E> + Send + 'static,
308        E: Send + 'static,
309    {
310        self.cpu.submit_tracked(task)
311    }
312
313    /// Submits a CPU-bound callable task to the Rayon domain.
314    ///
315    /// # Parameters
316    ///
317    /// * `task` - Callable CPU task.
318    ///
319    /// # Returns
320    ///
321    /// A [`TaskHandle`] for the accepted CPU task.
322    ///
323    /// # Errors
324    ///
325    /// Returns [`SubmissionError`] if the CPU domain refuses the task.
326    #[inline]
327    pub fn submit_cpu_callable<C, R, E>(&self, task: C) -> Result<TaskHandle<R, E>, SubmissionError>
328    where
329        C: Callable<R, E> + Send + 'static,
330        R: Send + 'static,
331        E: Send + 'static,
332    {
333        self.cpu.submit_callable(task)
334    }
335
336    /// Submits a CPU-bound callable task and returns a tracked handle.
337    ///
338    /// # Parameters
339    ///
340    /// * `task` - Callable CPU task.
341    ///
342    /// # Returns
343    ///
344    /// A [`RayonTaskHandle`] for the accepted CPU task.
345    ///
346    /// # Errors
347    ///
348    /// Returns [`SubmissionError`] if the CPU domain refuses the task.
349    #[inline]
350    pub fn submit_tracked_cpu_callable<C, R, E>(
351        &self,
352        task: C,
353    ) -> Result<RayonTaskHandle<R, E>, SubmissionError>
354    where
355        C: Callable<R, E> + Send + 'static,
356        R: Send + 'static,
357        E: Send + 'static,
358    {
359        self.cpu.submit_tracked_callable(task)
360    }
361
362    /// Submits a blocking runnable task to Tokio `spawn_blocking`.
363    ///
364    /// # Parameters
365    ///
366    /// * `task` - Runnable task to execute on Tokio's blocking pool.
367    ///
368    /// # Returns
369    ///
370    /// `Ok(())` if the Tokio blocking domain accepts the task.
371    ///
372    /// # Errors
373    ///
374    /// Returns [`SubmissionError`] if the Tokio blocking domain refuses the
375    /// task.
376    #[inline]
377    pub fn submit_tokio_blocking<T, E>(&self, task: T) -> Result<(), SubmissionError>
378    where
379        T: Runnable<E> + Send + 'static,
380        E: Send + 'static,
381    {
382        self.tokio_blocking.submit(task)
383    }
384
385    /// Submits a blocking runnable task to Tokio and returns a tracked handle.
386    ///
387    /// # Parameters
388    ///
389    /// * `task` - Runnable task to execute on Tokio's blocking pool.
390    ///
391    /// # Returns
392    ///
393    /// A [`TokioBlockingTaskHandle`] for the accepted blocking task.
394    ///
395    /// # Errors
396    ///
397    /// Returns [`SubmissionError`] if the Tokio blocking domain refuses the
398    /// task.
399    #[inline]
400    pub fn submit_tracked_tokio_blocking<T, E>(
401        &self,
402        task: T,
403    ) -> Result<TokioBlockingTaskHandle<(), E>, SubmissionError>
404    where
405        T: Runnable<E> + Send + 'static,
406        E: Send + 'static,
407    {
408        self.tokio_blocking.submit_tracked(task)
409    }
410
411    /// Submits a blocking callable task to Tokio `spawn_blocking`.
412    ///
413    /// # Parameters
414    ///
415    /// * `task` - Callable task to execute on Tokio's blocking pool.
416    ///
417    /// # Returns
418    ///
419    /// A [`TaskHandle`] for the accepted blocking task.
420    ///
421    /// # Errors
422    ///
423    /// Returns [`SubmissionError`] if the Tokio blocking domain refuses the
424    /// task.
425    #[inline]
426    pub fn submit_tokio_blocking_callable<C, R, E>(
427        &self,
428        task: C,
429    ) -> Result<TaskHandle<R, E>, SubmissionError>
430    where
431        C: Callable<R, E> + Send + 'static,
432        R: Send + 'static,
433        E: Send + 'static,
434    {
435        self.tokio_blocking.submit_callable(task)
436    }
437
438    /// Submits a blocking callable task to Tokio and returns a tracked handle.
439    ///
440    /// # Parameters
441    ///
442    /// * `task` - Callable task to execute on Tokio's blocking pool.
443    ///
444    /// # Returns
445    ///
446    /// A [`TokioBlockingTaskHandle`] for the accepted blocking task.
447    ///
448    /// # Errors
449    ///
450    /// Returns [`SubmissionError`] if the Tokio blocking domain refuses the
451    /// task.
452    #[inline]
453    pub fn submit_tracked_tokio_blocking_callable<C, R, E>(
454        &self,
455        task: C,
456    ) -> Result<TokioBlockingTaskHandle<R, E>, SubmissionError>
457    where
458        C: Callable<R, E> + Send + 'static,
459        R: Send + 'static,
460        E: Send + 'static,
461    {
462        self.tokio_blocking.submit_tracked_callable(task)
463    }
464
465    /// Spawns an async IO or Future-based task on Tokio's async runtime.
466    ///
467    /// # Parameters
468    ///
469    /// * `future` - Future to execute on Tokio's async scheduler.
470    ///
471    /// # Returns
472    ///
473    /// A [`TokioTaskHandle`] for the accepted async task.
474    ///
475    /// # Errors
476    ///
477    /// Returns [`SubmissionError`] if the Tokio IO domain refuses the task.
478    #[inline]
479    pub fn spawn_io<F, R, E>(&self, future: F) -> Result<TokioTaskHandle<R, E>, SubmissionError>
480    where
481        F: Future<Output = Result<R, E>> + Send + 'static,
482        R: Send + 'static,
483        E: Send + 'static,
484    {
485        self.io.spawn(future)
486    }
487
488    /// Requests graceful shutdown for every execution domain.
489    pub fn shutdown(&self) {
490        self.blocking.shutdown();
491        self.cpu.shutdown();
492        self.tokio_blocking.shutdown();
493        self.io.shutdown();
494    }
495
496    /// Requests abrupt stop for every execution domain.
497    ///
498    /// # Returns
499    ///
500    /// A per-domain aggregate report describing queued, running, and cancelled
501    /// work observed during shutdown.
502    pub fn stop(&self) -> ExecutionServicesStopReport {
503        ExecutionServicesStopReport {
504            blocking: self.blocking.stop(),
505            cpu: self.cpu.stop(),
506            tokio_blocking: self.tokio_blocking.stop(),
507            io: self.io.stop(),
508        }
509    }
510
511    /// Returns the aggregate lifecycle state.
512    ///
513    /// # Returns
514    ///
515    /// [`ExecutorServiceLifecycle::Terminated`] if all domains have
516    /// terminated; [`ExecutorServiceLifecycle::Stopping`] if any domain is
517    /// stopping; [`ExecutorServiceLifecycle::ShuttingDown`] if any domain is no
518    /// longer running; otherwise [`ExecutorServiceLifecycle::Running`].
519    pub fn lifecycle(&self) -> ExecutorServiceLifecycle {
520        let lifecycles = [
521            self.blocking.lifecycle(),
522            self.cpu.lifecycle(),
523            self.tokio_blocking.lifecycle(),
524            self.io.lifecycle(),
525        ];
526        if lifecycles
527            .iter()
528            .all(|state| *state == ExecutorServiceLifecycle::Terminated)
529        {
530            ExecutorServiceLifecycle::Terminated
531        } else if lifecycles.contains(&ExecutorServiceLifecycle::Stopping) {
532            ExecutorServiceLifecycle::Stopping
533        } else if lifecycles
534            .iter()
535            .any(|state| *state != ExecutorServiceLifecycle::Running)
536        {
537            ExecutorServiceLifecycle::ShuttingDown
538        } else {
539            ExecutorServiceLifecycle::Running
540        }
541    }
542
543    /// Returns whether every execution domain is running.
544    ///
545    /// # Returns
546    ///
547    /// `true` only if all execution domains are running.
548    #[inline]
549    pub fn is_running(&self) -> bool {
550        self.lifecycle() == ExecutorServiceLifecycle::Running
551    }
552
553    /// Returns whether any execution domain is gracefully shutting down.
554    ///
555    /// # Returns
556    ///
557    /// `true` when the aggregate lifecycle is
558    /// [`ExecutorServiceLifecycle::ShuttingDown`].
559    #[inline]
560    pub fn is_shutting_down(&self) -> bool {
561        self.lifecycle() == ExecutorServiceLifecycle::ShuttingDown
562    }
563
564    /// Returns whether any execution domain is stopping abruptly.
565    ///
566    /// # Returns
567    ///
568    /// `true` when the aggregate lifecycle is
569    /// [`ExecutorServiceLifecycle::Stopping`].
570    #[inline]
571    pub fn is_stopping(&self) -> bool {
572        self.lifecycle() == ExecutorServiceLifecycle::Stopping
573    }
574
575    /// Returns whether the facade is no longer fully running.
576    ///
577    /// # Returns
578    ///
579    /// `true` after any execution domain starts shutdown, stop, or has already
580    /// terminated.
581    #[inline]
582    pub fn is_not_running(&self) -> bool {
583        self.lifecycle() != ExecutorServiceLifecycle::Running
584    }
585
586    /// Returns whether every execution domain has terminated.
587    ///
588    /// # Returns
589    ///
590    /// `true` only after all execution domains have terminated.
591    #[inline]
592    pub fn is_terminated(&self) -> bool {
593        self.lifecycle() == ExecutorServiceLifecycle::Terminated
594    }
595
596    /// Waits until every execution domain has terminated.
597    ///
598    /// # Returns
599    ///
600    /// A future that resolves after all execution domains have terminated.
601    pub fn await_termination(&self) -> Pin<Box<dyn Future<Output = ()> + Send + '_>> {
602        Box::pin(async move {
603            let blocking = Arc::clone(&self.blocking);
604            let cpu = self.cpu.clone();
605            let tokio_blocking = self.tokio_blocking.clone();
606            let blocking_wait = tokio::task::spawn_blocking(move || blocking.wait_termination());
607            let cpu_wait = tokio::task::spawn_blocking(move || cpu.wait_termination());
608            let tokio_blocking_wait =
609                tokio::task::spawn_blocking(move || tokio_blocking.wait_termination());
610            while !self.io.is_terminated() {
611                tokio::time::sleep(Duration::from_millis(10)).await;
612            }
613            let _ = blocking_wait.await;
614            let _ = cpu_wait.await;
615            let _ = tokio_blocking_wait.await;
616        })
617    }
618}