Skip to main content

qubit_tokio_executor/
tokio_io_executor_service.rs

1/*******************************************************************************
2 *
3 *    Copyright (c) 2025 - 2026 Haixing Hu.
4 *
5 *    SPDX-License-Identifier: Apache-2.0
6 *
7 *    Licensed under the Apache License, Version 2.0.
8 *
9 ******************************************************************************/
10use std::{
11    future::Future,
12    pin::Pin,
13    sync::Arc,
14};
15
16use qubit_executor::TaskExecutionError;
17
18use crate::TokioTaskHandle;
19use crate::tokio_io_executor_service_state::TokioIoExecutorServiceState;
20use crate::tokio_io_service_task_guard::TokioIoServiceTaskGuard;
21use qubit_executor::service::{
22    RejectedExecution,
23    ShutdownReport,
24};
25
26/// Tokio-backed executor service for async IO and Future-based tasks.
27///
28/// Accepted futures are spawned with [`tokio::spawn`], so waiting for external
29/// IO does not occupy a dedicated blocking thread.
30#[derive(Default, Clone)]
31pub struct TokioIoExecutorService {
32    /// Shared service state used by all clones of this service.
33    state: Arc<TokioIoExecutorServiceState>,
34}
35
36impl TokioIoExecutorService {
37    /// Creates a new service instance.
38    ///
39    /// # Returns
40    ///
41    /// A Tokio-backed executor service for Future-based tasks.
42    #[inline]
43    pub fn new() -> Self {
44        Self::default()
45    }
46
47    /// Accepts an async task and spawns it on the current Tokio runtime.
48    ///
49    /// # Parameters
50    ///
51    /// * `future` - Future to execute on Tokio's async scheduler.
52    ///
53    /// # Returns
54    ///
55    /// A [`TokioTaskHandle`] for the accepted task.
56    ///
57    /// # Errors
58    ///
59    /// Returns [`RejectedExecution::Shutdown`] if shutdown has already been
60    /// requested before the task is accepted.
61    pub fn spawn<F, R, E>(&self, future: F) -> Result<TokioTaskHandle<R, E>, RejectedExecution>
62    where
63        F: Future<Output = Result<R, E>> + Send + 'static,
64        R: Send + 'static,
65        E: Send + 'static,
66    {
67        let submission_guard = self.state.lock_submission();
68        if self.state.shutdown.load() {
69            return Err(RejectedExecution::Shutdown);
70        }
71        self.state.active_tasks.inc();
72
73        let marker = Arc::new(());
74        let guard = TokioIoServiceTaskGuard::new(Arc::clone(&self.state), Arc::clone(&marker));
75        let handle = tokio::spawn(async move {
76            let _guard = guard;
77            future.await.map_err(TaskExecutionError::Failed)
78        });
79        self.state
80            .register_abort_handle(marker, handle.abort_handle());
81        drop(submission_guard);
82        Ok(TokioTaskHandle::new(handle))
83    }
84
85    /// Stops accepting new async tasks.
86    ///
87    /// Already accepted tasks are allowed to finish unless aborted through
88    /// their handles or by [`Self::shutdown_now`].
89    pub fn shutdown(&self) {
90        let _guard = self.state.lock_submission();
91        self.state.shutdown.store(true);
92        self.state.notify_if_terminated();
93    }
94
95    /// Stops accepting new tasks and aborts tracked async tasks.
96    ///
97    /// # Returns
98    ///
99    /// A report with zero queued tasks, the observed active-task count, and
100    /// the number of Tokio abort handles signalled.
101    pub fn shutdown_now(&self) -> ShutdownReport {
102        let _guard = self.state.lock_submission();
103        self.state.shutdown.store(true);
104        let running = self.state.active_tasks.get();
105        let cancellation_count = self.state.abort_tracked_tasks();
106        self.state.notify_if_terminated();
107        ShutdownReport::new(0, running, cancellation_count)
108    }
109
110    /// Returns whether shutdown has been requested.
111    ///
112    /// # Returns
113    ///
114    /// `true` if this service no longer accepts new async tasks.
115    #[inline]
116    pub fn is_shutdown(&self) -> bool {
117        self.state.shutdown.load()
118    }
119
120    /// Returns whether shutdown was requested and all async tasks are finished.
121    ///
122    /// # Returns
123    ///
124    /// `true` only after shutdown has been requested and no accepted async
125    /// tasks remain active.
126    #[inline]
127    pub fn is_terminated(&self) -> bool {
128        self.is_shutdown() && self.state.active_tasks.is_zero()
129    }
130
131    /// Waits until the service has terminated.
132    ///
133    /// # Returns
134    ///
135    /// A future that resolves after shutdown has been requested and all
136    /// accepted async tasks have finished or been aborted.
137    pub fn await_termination(&self) -> Pin<Box<dyn Future<Output = ()> + Send + '_>> {
138        Box::pin(async move {
139            let notified = self.state.terminated_notify.notified();
140            tokio::pin!(notified);
141            loop {
142                notified.as_mut().enable();
143                if self.is_terminated() {
144                    return;
145                }
146                notified.as_mut().await;
147                notified.set(self.state.terminated_notify.notified());
148            }
149        })
150    }
151}