Skip to main content

qubit_execution_services/
execution_services.rs

1/*******************************************************************************
2 *
3 *    Copyright (c) 2025 - 2026 Haixing Hu.
4 *
5 *    SPDX-License-Identifier: Apache-2.0
6 *
7 *    Licensed under the Apache License, Version 2.0.
8 *
9 ******************************************************************************/
10use std::{
11    future::Future,
12    pin::Pin,
13};
14
15use qubit_executor::TaskHandle;
16use qubit_function::{
17    Callable,
18    Runnable,
19};
20use qubit_thread_pool::{
21    ThreadPool,
22    ThreadPoolBuilder,
23};
24use qubit_tokio_executor::TokioExecutorService;
25
26use super::{
27    ExecutionServicesBuildError,
28    ExecutionServicesBuilder,
29    ExecutionServicesShutdownReport,
30    ExecutorService,
31    RayonExecutorService,
32    RayonTaskHandle,
33    RejectedExecution,
34    TokioIoExecutorService,
35    TokioTaskHandle,
36};
37
38/// Default managed service for synchronous tasks that may block an OS thread.
39pub type BlockingExecutorService = ThreadPool;
40
41/// Builder alias for configuring [`BlockingExecutorService`].
42pub type BlockingExecutorServiceBuilder = ThreadPoolBuilder;
43
44/// Tokio-backed blocking executor service routed through `spawn_blocking`.
45pub type TokioBlockingExecutorService = TokioExecutorService;
46
47/// Unified facade exposing separate execution domains through one owner.
48///
49/// The facade does not implement a single scheduling core. Instead it routes
50/// work to one of four dedicated execution domains:
51///
52/// - `blocking`: synchronous tasks that may block an OS thread.
53/// - `cpu`: CPU-bound synchronous tasks backed by Rayon.
54/// - `tokio_blocking`: blocking tasks routed through Tokio `spawn_blocking`.
55/// - `io`: async futures spawned on Tokio's async runtime.
56pub struct ExecutionServices {
57    /// Managed service for synchronous tasks that may block OS threads.
58    blocking: BlockingExecutorService,
59    /// Managed service for CPU-bound synchronous tasks.
60    cpu: RayonExecutorService,
61    /// Tokio-backed blocking service using `spawn_blocking`.
62    tokio_blocking: TokioBlockingExecutorService,
63    /// Tokio-backed async service for Future-based tasks.
64    io: TokioIoExecutorService,
65}
66
67impl ExecutionServices {
68    /// Creates an execution-services facade from its four execution domains.
69    ///
70    /// # Parameters
71    ///
72    /// * `blocking` - Blocking executor domain.
73    /// * `cpu` - CPU-bound executor domain.
74    /// * `tokio_blocking` - Tokio blocking executor domain.
75    /// * `io` - Tokio async IO executor domain.
76    ///
77    /// # Returns
78    ///
79    /// An execution-services facade owning all supplied domains.
80    pub(crate) fn from_parts(
81        blocking: BlockingExecutorService,
82        cpu: RayonExecutorService,
83        tokio_blocking: TokioBlockingExecutorService,
84        io: TokioIoExecutorService,
85    ) -> Self {
86        Self {
87            blocking,
88            cpu,
89            tokio_blocking,
90            io,
91        }
92    }
93
94    /// Creates an execution-services facade with default builder settings.
95    ///
96    /// # Returns
97    ///
98    /// `Ok(ExecutionServices)` if the default blocking and CPU domains build
99    /// successfully.
100    ///
101    /// # Errors
102    ///
103    /// Returns [`ExecutionServicesBuildError`] if the default builder
104    /// configuration is rejected.
105    #[inline]
106    pub fn new() -> Result<Self, ExecutionServicesBuildError> {
107        Self::builder().build()
108    }
109
110    /// Creates a builder for configuring the execution-services facade.
111    ///
112    /// # Returns
113    ///
114    /// A builder configured with CPU-parallelism defaults.
115    #[inline]
116    pub fn builder() -> ExecutionServicesBuilder {
117        ExecutionServicesBuilder::default()
118    }
119
120    /// Returns the blocking execution domain.
121    ///
122    /// # Returns
123    ///
124    /// A shared reference to the blocking executor service.
125    #[inline]
126    pub fn blocking(&self) -> &BlockingExecutorService {
127        &self.blocking
128    }
129
130    /// Returns the CPU execution domain.
131    ///
132    /// # Returns
133    ///
134    /// A shared reference to the Rayon-backed CPU executor service.
135    #[inline]
136    pub fn cpu(&self) -> &RayonExecutorService {
137        &self.cpu
138    }
139
140    /// Returns the Tokio blocking execution domain.
141    ///
142    /// # Returns
143    ///
144    /// A shared reference to the Tokio blocking executor service.
145    #[inline]
146    pub fn tokio_blocking(&self) -> &TokioBlockingExecutorService {
147        &self.tokio_blocking
148    }
149
150    /// Returns the Tokio async IO execution domain.
151    ///
152    /// # Returns
153    ///
154    /// A shared reference to the Tokio IO executor service.
155    #[inline]
156    pub fn io(&self) -> &TokioIoExecutorService {
157        &self.io
158    }
159
160    /// Submits a blocking runnable task to the blocking domain.
161    ///
162    /// # Parameters
163    ///
164    /// * `task` - Runnable task that may block an OS thread.
165    ///
166    /// # Returns
167    ///
168    /// A [`TaskHandle`] for the accepted blocking task.
169    ///
170    /// # Errors
171    ///
172    /// Returns [`RejectedExecution`] if the blocking domain refuses the task.
173    #[inline]
174    pub fn submit_blocking<T, E>(&self, task: T) -> Result<TaskHandle<(), E>, RejectedExecution>
175    where
176        T: Runnable<E> + Send + 'static,
177        E: Send + 'static,
178    {
179        self.blocking.submit(task)
180    }
181
182    /// Submits a blocking callable task to the blocking domain.
183    ///
184    /// # Parameters
185    ///
186    /// * `task` - Callable task that may block an OS thread.
187    ///
188    /// # Returns
189    ///
190    /// A [`TaskHandle`] for the accepted blocking task.
191    ///
192    /// # Errors
193    ///
194    /// Returns [`RejectedExecution`] if the blocking domain refuses the task.
195    #[inline]
196    pub fn submit_blocking_callable<C, R, E>(
197        &self,
198        task: C,
199    ) -> Result<TaskHandle<R, E>, RejectedExecution>
200    where
201        C: Callable<R, E> + Send + 'static,
202        R: Send + 'static,
203        E: Send + 'static,
204    {
205        self.blocking.submit_callable(task)
206    }
207
208    /// Submits a CPU-bound runnable task to the Rayon domain.
209    ///
210    /// # Parameters
211    ///
212    /// * `task` - Runnable CPU task.
213    ///
214    /// # Returns
215    ///
216    /// A [`RayonTaskHandle`] for the accepted CPU task.
217    ///
218    /// # Errors
219    ///
220    /// Returns [`RejectedExecution`] if the CPU domain refuses the task.
221    #[inline]
222    pub fn submit_cpu<T, E>(&self, task: T) -> Result<RayonTaskHandle<(), E>, RejectedExecution>
223    where
224        T: Runnable<E> + Send + 'static,
225        E: Send + 'static,
226    {
227        self.cpu.submit(task)
228    }
229
230    /// Submits a CPU-bound callable task to the Rayon domain.
231    ///
232    /// # Parameters
233    ///
234    /// * `task` - Callable CPU task.
235    ///
236    /// # Returns
237    ///
238    /// A [`RayonTaskHandle`] for the accepted CPU task.
239    ///
240    /// # Errors
241    ///
242    /// Returns [`RejectedExecution`] if the CPU domain refuses the task.
243    #[inline]
244    pub fn submit_cpu_callable<C, R, E>(
245        &self,
246        task: C,
247    ) -> Result<RayonTaskHandle<R, E>, RejectedExecution>
248    where
249        C: Callable<R, E> + Send + 'static,
250        R: Send + 'static,
251        E: Send + 'static,
252    {
253        self.cpu.submit_callable(task)
254    }
255
256    /// Submits a blocking runnable task to Tokio `spawn_blocking`.
257    ///
258    /// # Parameters
259    ///
260    /// * `task` - Runnable task to execute on Tokio's blocking pool.
261    ///
262    /// # Returns
263    ///
264    /// A [`TokioTaskHandle`] for the accepted blocking task.
265    ///
266    /// # Errors
267    ///
268    /// Returns [`RejectedExecution`] if the Tokio blocking domain refuses the
269    /// task.
270    #[inline]
271    pub fn submit_tokio_blocking<T, E>(
272        &self,
273        task: T,
274    ) -> Result<TokioTaskHandle<(), E>, RejectedExecution>
275    where
276        T: Runnable<E> + Send + 'static,
277        E: Send + 'static,
278    {
279        self.tokio_blocking.submit(task)
280    }
281
282    /// Submits a blocking callable task to Tokio `spawn_blocking`.
283    ///
284    /// # Parameters
285    ///
286    /// * `task` - Callable task to execute on Tokio's blocking pool.
287    ///
288    /// # Returns
289    ///
290    /// A [`TokioTaskHandle`] for the accepted blocking task.
291    ///
292    /// # Errors
293    ///
294    /// Returns [`RejectedExecution`] if the Tokio blocking domain refuses the
295    /// task.
296    #[inline]
297    pub fn submit_tokio_blocking_callable<C, R, E>(
298        &self,
299        task: C,
300    ) -> Result<TokioTaskHandle<R, E>, RejectedExecution>
301    where
302        C: Callable<R, E> + Send + 'static,
303        R: Send + 'static,
304        E: Send + 'static,
305    {
306        self.tokio_blocking.submit_callable(task)
307    }
308
309    /// Spawns an async IO or Future-based task on Tokio's async runtime.
310    ///
311    /// # Parameters
312    ///
313    /// * `future` - Future to execute on Tokio's async scheduler.
314    ///
315    /// # Returns
316    ///
317    /// A [`TokioTaskHandle`] for the accepted async task.
318    ///
319    /// # Errors
320    ///
321    /// Returns [`RejectedExecution`] if the Tokio IO domain refuses the task.
322    #[inline]
323    pub fn spawn_io<F, R, E>(&self, future: F) -> Result<TokioTaskHandle<R, E>, RejectedExecution>
324    where
325        F: Future<Output = Result<R, E>> + Send + 'static,
326        R: Send + 'static,
327        E: Send + 'static,
328    {
329        self.io.spawn(future)
330    }
331
332    /// Requests graceful shutdown for every execution domain.
333    pub fn shutdown(&self) {
334        self.blocking.shutdown();
335        self.cpu.shutdown();
336        self.tokio_blocking.shutdown();
337        self.io.shutdown();
338    }
339
340    /// Requests immediate shutdown for every execution domain.
341    ///
342    /// # Returns
343    ///
344    /// A per-domain aggregate report describing queued, running, and cancelled
345    /// work observed during shutdown.
346    pub fn shutdown_now(&self) -> ExecutionServicesShutdownReport {
347        ExecutionServicesShutdownReport {
348            blocking: self.blocking.shutdown_now(),
349            cpu: self.cpu.shutdown_now(),
350            tokio_blocking: self.tokio_blocking.shutdown_now(),
351            io: self.io.shutdown_now(),
352        }
353    }
354
355    /// Returns whether every execution domain has been shut down.
356    ///
357    /// # Returns
358    ///
359    /// `true` only if all execution domains no longer accept new tasks.
360    #[inline]
361    pub fn is_shutdown(&self) -> bool {
362        self.blocking.is_shutdown()
363            && self.cpu.is_shutdown()
364            && self.tokio_blocking.is_shutdown()
365            && self.io.is_shutdown()
366    }
367
368    /// Returns whether every execution domain has terminated.
369    ///
370    /// # Returns
371    ///
372    /// `true` only after all execution domains have terminated.
373    #[inline]
374    pub fn is_terminated(&self) -> bool {
375        self.blocking.is_terminated()
376            && self.cpu.is_terminated()
377            && self.tokio_blocking.is_terminated()
378            && self.io.is_terminated()
379    }
380
381    /// Waits until every execution domain has terminated.
382    ///
383    /// # Returns
384    ///
385    /// A future that resolves after all execution domains have terminated.
386    pub fn await_termination(&self) -> Pin<Box<dyn Future<Output = ()> + Send + '_>> {
387        Box::pin(async move {
388            self.blocking.await_termination().await;
389            self.cpu.await_termination().await;
390            self.tokio_blocking.await_termination().await;
391            self.io.await_termination().await;
392        })
393    }
394}