Skip to main content

qubit_execution_services/
execution_services.rs

1/*******************************************************************************
2 *
3 *    Copyright (c) 2025 - 2026 Haixing Hu.
4 *
5 *    SPDX-License-Identifier: Apache-2.0
6 *
7 *    Licensed under the Apache License, Version 2.0.
8 *
9 ******************************************************************************/
10use std::{
11    future::Future,
12    pin::Pin,
13};
14
15use qubit_executor::TaskHandle;
16use qubit_function::{
17    Callable,
18    Runnable,
19};
20
21use super::{
22    BlockingExecutorService,
23    ExecutionServicesBuildError,
24    ExecutionServicesBuilder,
25    ExecutionServicesShutdownReport,
26    ExecutorService,
27    RayonExecutorService,
28    RayonTaskHandle,
29    RejectedExecution,
30    TokioBlockingExecutorService,
31    TokioIoExecutorService,
32    TokioTaskHandle,
33};
34
35/// Unified facade exposing separate execution domains through one owner.
36///
37/// The facade does not implement a single scheduling core. Instead it routes
38/// work to one of four dedicated execution domains:
39///
40/// - `blocking`: synchronous tasks that may block an OS thread.
41/// - `cpu`: CPU-bound synchronous tasks backed by Rayon.
42/// - `tokio_blocking`: blocking tasks routed through Tokio `spawn_blocking`.
43/// - `io`: async futures spawned on Tokio's async runtime.
44pub struct ExecutionServices {
45    /// Managed service for synchronous tasks that may block OS threads.
46    blocking: BlockingExecutorService,
47    /// Managed service for CPU-bound synchronous tasks.
48    cpu: RayonExecutorService,
49    /// Tokio-backed blocking service using `spawn_blocking`.
50    tokio_blocking: TokioBlockingExecutorService,
51    /// Tokio-backed async service for Future-based tasks.
52    io: TokioIoExecutorService,
53}
54
55impl ExecutionServices {
56    /// Creates an execution-services facade from its four execution domains.
57    ///
58    /// # Parameters
59    ///
60    /// * `blocking` - Blocking executor domain.
61    /// * `cpu` - CPU-bound executor domain.
62    /// * `tokio_blocking` - Tokio blocking executor domain.
63    /// * `io` - Tokio async IO executor domain.
64    ///
65    /// # Returns
66    ///
67    /// An execution-services facade owning all supplied domains.
68    pub(crate) fn from_parts(
69        blocking: BlockingExecutorService,
70        cpu: RayonExecutorService,
71        tokio_blocking: TokioBlockingExecutorService,
72        io: TokioIoExecutorService,
73    ) -> Self {
74        Self {
75            blocking,
76            cpu,
77            tokio_blocking,
78            io,
79        }
80    }
81
82    /// Creates an execution-services facade with default builder settings.
83    ///
84    /// # Returns
85    ///
86    /// `Ok(ExecutionServices)` if the default blocking and CPU domains build
87    /// successfully.
88    ///
89    /// # Errors
90    ///
91    /// Returns [`ExecutionServicesBuildError`] if the default builder
92    /// configuration is rejected.
93    #[inline]
94    pub fn new() -> Result<Self, ExecutionServicesBuildError> {
95        Self::builder().build()
96    }
97
98    /// Creates a builder for configuring the execution-services facade.
99    ///
100    /// # Returns
101    ///
102    /// A builder configured with CPU-parallelism defaults.
103    #[inline]
104    pub fn builder() -> ExecutionServicesBuilder {
105        ExecutionServicesBuilder::default()
106    }
107
108    /// Returns the blocking execution domain.
109    ///
110    /// # Returns
111    ///
112    /// A shared reference to the blocking executor service.
113    #[inline]
114    pub fn blocking(&self) -> &BlockingExecutorService {
115        &self.blocking
116    }
117
118    /// Returns the CPU execution domain.
119    ///
120    /// # Returns
121    ///
122    /// A shared reference to the Rayon-backed CPU executor service.
123    #[inline]
124    pub fn cpu(&self) -> &RayonExecutorService {
125        &self.cpu
126    }
127
128    /// Returns the Tokio blocking execution domain.
129    ///
130    /// # Returns
131    ///
132    /// A shared reference to the Tokio blocking executor service.
133    #[inline]
134    pub fn tokio_blocking(&self) -> &TokioBlockingExecutorService {
135        &self.tokio_blocking
136    }
137
138    /// Returns the Tokio async IO execution domain.
139    ///
140    /// # Returns
141    ///
142    /// A shared reference to the Tokio IO executor service.
143    #[inline]
144    pub fn io(&self) -> &TokioIoExecutorService {
145        &self.io
146    }
147
148    /// Submits a blocking runnable task to the blocking domain.
149    ///
150    /// # Parameters
151    ///
152    /// * `task` - Runnable task that may block an OS thread.
153    ///
154    /// # Returns
155    ///
156    /// A [`TaskHandle`] for the accepted blocking task.
157    ///
158    /// # Errors
159    ///
160    /// Returns [`RejectedExecution`] if the blocking domain refuses the task.
161    #[inline]
162    pub fn submit_blocking<T, E>(&self, task: T) -> Result<TaskHandle<(), E>, RejectedExecution>
163    where
164        T: Runnable<E> + Send + 'static,
165        E: Send + 'static,
166    {
167        self.blocking.submit(task)
168    }
169
170    /// Submits a blocking callable task to the blocking domain.
171    ///
172    /// # Parameters
173    ///
174    /// * `task` - Callable task that may block an OS thread.
175    ///
176    /// # Returns
177    ///
178    /// A [`TaskHandle`] for the accepted blocking task.
179    ///
180    /// # Errors
181    ///
182    /// Returns [`RejectedExecution`] if the blocking domain refuses the task.
183    #[inline]
184    pub fn submit_blocking_callable<C, R, E>(
185        &self,
186        task: C,
187    ) -> Result<TaskHandle<R, E>, RejectedExecution>
188    where
189        C: Callable<R, E> + Send + 'static,
190        R: Send + 'static,
191        E: Send + 'static,
192    {
193        self.blocking.submit_callable(task)
194    }
195
196    /// Submits a CPU-bound runnable task to the Rayon domain.
197    ///
198    /// # Parameters
199    ///
200    /// * `task` - Runnable CPU task.
201    ///
202    /// # Returns
203    ///
204    /// A [`RayonTaskHandle`] for the accepted CPU task.
205    ///
206    /// # Errors
207    ///
208    /// Returns [`RejectedExecution`] if the CPU domain refuses the task.
209    #[inline]
210    pub fn submit_cpu<T, E>(&self, task: T) -> Result<RayonTaskHandle<(), E>, RejectedExecution>
211    where
212        T: Runnable<E> + Send + 'static,
213        E: Send + 'static,
214    {
215        self.cpu.submit(task)
216    }
217
218    /// Submits a CPU-bound callable task to the Rayon domain.
219    ///
220    /// # Parameters
221    ///
222    /// * `task` - Callable CPU task.
223    ///
224    /// # Returns
225    ///
226    /// A [`RayonTaskHandle`] for the accepted CPU task.
227    ///
228    /// # Errors
229    ///
230    /// Returns [`RejectedExecution`] if the CPU domain refuses the task.
231    #[inline]
232    pub fn submit_cpu_callable<C, R, E>(
233        &self,
234        task: C,
235    ) -> Result<RayonTaskHandle<R, E>, RejectedExecution>
236    where
237        C: Callable<R, E> + Send + 'static,
238        R: Send + 'static,
239        E: Send + 'static,
240    {
241        self.cpu.submit_callable(task)
242    }
243
244    /// Submits a blocking runnable task to Tokio `spawn_blocking`.
245    ///
246    /// # Parameters
247    ///
248    /// * `task` - Runnable task to execute on Tokio's blocking pool.
249    ///
250    /// # Returns
251    ///
252    /// A [`TokioTaskHandle`] for the accepted blocking task.
253    ///
254    /// # Errors
255    ///
256    /// Returns [`RejectedExecution`] if the Tokio blocking domain refuses the
257    /// task.
258    #[inline]
259    pub fn submit_tokio_blocking<T, E>(
260        &self,
261        task: T,
262    ) -> Result<TokioTaskHandle<(), E>, RejectedExecution>
263    where
264        T: Runnable<E> + Send + 'static,
265        E: Send + 'static,
266    {
267        self.tokio_blocking.submit(task)
268    }
269
270    /// Submits a blocking callable task to Tokio `spawn_blocking`.
271    ///
272    /// # Parameters
273    ///
274    /// * `task` - Callable task to execute on Tokio's blocking pool.
275    ///
276    /// # Returns
277    ///
278    /// A [`TokioTaskHandle`] for the accepted blocking task.
279    ///
280    /// # Errors
281    ///
282    /// Returns [`RejectedExecution`] if the Tokio blocking domain refuses the
283    /// task.
284    #[inline]
285    pub fn submit_tokio_blocking_callable<C, R, E>(
286        &self,
287        task: C,
288    ) -> Result<TokioTaskHandle<R, E>, RejectedExecution>
289    where
290        C: Callable<R, E> + Send + 'static,
291        R: Send + 'static,
292        E: Send + 'static,
293    {
294        self.tokio_blocking.submit_callable(task)
295    }
296
297    /// Spawns an async IO or Future-based task on Tokio's async runtime.
298    ///
299    /// # Parameters
300    ///
301    /// * `future` - Future to execute on Tokio's async scheduler.
302    ///
303    /// # Returns
304    ///
305    /// A [`TokioTaskHandle`] for the accepted async task.
306    ///
307    /// # Errors
308    ///
309    /// Returns [`RejectedExecution`] if the Tokio IO domain refuses the task.
310    #[inline]
311    pub fn spawn_io<F, R, E>(&self, future: F) -> Result<TokioTaskHandle<R, E>, RejectedExecution>
312    where
313        F: Future<Output = Result<R, E>> + Send + 'static,
314        R: Send + 'static,
315        E: Send + 'static,
316    {
317        self.io.spawn(future)
318    }
319
320    /// Requests graceful shutdown for every execution domain.
321    pub fn shutdown(&self) {
322        self.blocking.shutdown();
323        self.cpu.shutdown();
324        self.tokio_blocking.shutdown();
325        self.io.shutdown();
326    }
327
328    /// Requests immediate shutdown for every execution domain.
329    ///
330    /// # Returns
331    ///
332    /// A per-domain aggregate report describing queued, running, and cancelled
333    /// work observed during shutdown.
334    pub fn shutdown_now(&self) -> ExecutionServicesShutdownReport {
335        ExecutionServicesShutdownReport {
336            blocking: self.blocking.shutdown_now(),
337            cpu: self.cpu.shutdown_now(),
338            tokio_blocking: self.tokio_blocking.shutdown_now(),
339            io: self.io.shutdown_now(),
340        }
341    }
342
343    /// Returns whether every execution domain has been shut down.
344    ///
345    /// # Returns
346    ///
347    /// `true` only if all execution domains no longer accept new tasks.
348    #[inline]
349    pub fn is_shutdown(&self) -> bool {
350        self.blocking.is_shutdown()
351            && self.cpu.is_shutdown()
352            && self.tokio_blocking.is_shutdown()
353            && self.io.is_shutdown()
354    }
355
356    /// Returns whether every execution domain has terminated.
357    ///
358    /// # Returns
359    ///
360    /// `true` only after all execution domains have terminated.
361    #[inline]
362    pub fn is_terminated(&self) -> bool {
363        self.blocking.is_terminated()
364            && self.cpu.is_terminated()
365            && self.tokio_blocking.is_terminated()
366            && self.io.is_terminated()
367    }
368
369    /// Waits until every execution domain has terminated.
370    ///
371    /// # Returns
372    ///
373    /// A future that resolves after all execution domains have terminated.
374    pub fn await_termination(&self) -> Pin<Box<dyn Future<Output = ()> + Send + '_>> {
375        Box::pin(async move {
376            self.blocking.await_termination().await;
377            self.cpu.await_termination().await;
378            self.tokio_blocking.await_termination().await;
379            self.io.await_termination().await;
380        })
381    }
382}