Skip to main content

qubit_tokio_executor/
tokio_io_executor_service.rs

1/*******************************************************************************
2 *
3 *    Copyright (c) 2025 - 2026.
4 *    Haixing Hu, Qubit Co. Ltd.
5 *
6 *    All rights reserved.
7 *
8 ******************************************************************************/
9use std::{
10    future::Future,
11    pin::Pin,
12    sync::Arc,
13};
14
15use qubit_executor::TaskExecutionError;
16
17use crate::TokioTaskHandle;
18use crate::tokio_io_executor_service_state::TokioIoExecutorServiceState;
19use crate::tokio_io_service_task_guard::TokioIoServiceTaskGuard;
20use qubit_executor::service::{
21    RejectedExecution,
22    ShutdownReport,
23};
24
25/// Tokio-backed executor service for async IO and Future-based tasks.
26///
27/// Accepted futures are spawned with [`tokio::spawn`], so waiting for external
28/// IO does not occupy a dedicated blocking thread.
29#[derive(Default, Clone)]
30pub struct TokioIoExecutorService {
31    /// Shared service state used by all clones of this service.
32    state: Arc<TokioIoExecutorServiceState>,
33}
34
35impl TokioIoExecutorService {
36    /// Creates a new service instance.
37    ///
38    /// # Returns
39    ///
40    /// A Tokio-backed executor service for Future-based tasks.
41    #[inline]
42    pub fn new() -> Self {
43        Self::default()
44    }
45
46    /// Accepts an async task and spawns it on the current Tokio runtime.
47    ///
48    /// # Parameters
49    ///
50    /// * `future` - Future to execute on Tokio's async scheduler.
51    ///
52    /// # Returns
53    ///
54    /// A [`TokioTaskHandle`] for the accepted task.
55    ///
56    /// # Errors
57    ///
58    /// Returns [`RejectedExecution::Shutdown`] if shutdown has already been
59    /// requested before the task is accepted.
60    pub fn spawn<F, R, E>(&self, future: F) -> Result<TokioTaskHandle<R, E>, RejectedExecution>
61    where
62        F: Future<Output = Result<R, E>> + Send + 'static,
63        R: Send + 'static,
64        E: Send + 'static,
65    {
66        let submission_guard = self.state.lock_submission();
67        if self.state.shutdown.load() {
68            return Err(RejectedExecution::Shutdown);
69        }
70        self.state.active_tasks.inc();
71
72        let marker = Arc::new(());
73        let guard = TokioIoServiceTaskGuard::new(Arc::clone(&self.state), Arc::clone(&marker));
74        let handle = tokio::spawn(async move {
75            let _guard = guard;
76            future.await.map_err(TaskExecutionError::Failed)
77        });
78        self.state
79            .register_abort_handle(marker, handle.abort_handle());
80        drop(submission_guard);
81        Ok(TokioTaskHandle::new(handle))
82    }
83
84    /// Stops accepting new async tasks.
85    ///
86    /// Already accepted tasks are allowed to finish unless aborted through
87    /// their handles or by [`Self::shutdown_now`].
88    pub fn shutdown(&self) {
89        let _guard = self.state.lock_submission();
90        self.state.shutdown.store(true);
91        self.state.notify_if_terminated();
92    }
93
94    /// Stops accepting new tasks and aborts tracked async tasks.
95    ///
96    /// # Returns
97    ///
98    /// A report with zero queued tasks, the observed active-task count, and
99    /// the number of Tokio abort handles signalled.
100    pub fn shutdown_now(&self) -> ShutdownReport {
101        let _guard = self.state.lock_submission();
102        self.state.shutdown.store(true);
103        let running = self.state.active_tasks.get();
104        let cancellation_count = self.state.abort_tracked_tasks();
105        self.state.notify_if_terminated();
106        ShutdownReport::new(0, running, cancellation_count)
107    }
108
109    /// Returns whether shutdown has been requested.
110    ///
111    /// # Returns
112    ///
113    /// `true` if this service no longer accepts new async tasks.
114    #[inline]
115    pub fn is_shutdown(&self) -> bool {
116        self.state.shutdown.load()
117    }
118
119    /// Returns whether shutdown was requested and all async tasks are finished.
120    ///
121    /// # Returns
122    ///
123    /// `true` only after shutdown has been requested and no accepted async
124    /// tasks remain active.
125    #[inline]
126    pub fn is_terminated(&self) -> bool {
127        self.is_shutdown() && self.state.active_tasks.is_zero()
128    }
129
130    /// Waits until the service has terminated.
131    ///
132    /// # Returns
133    ///
134    /// A future that resolves after shutdown has been requested and all
135    /// accepted async tasks have finished or been aborted.
136    pub fn await_termination(&self) -> Pin<Box<dyn Future<Output = ()> + Send + '_>> {
137        Box::pin(async move {
138            let notified = self.state.terminated_notify.notified();
139            tokio::pin!(notified);
140            loop {
141                notified.as_mut().enable();
142                if self.is_terminated() {
143                    return;
144                }
145                notified.as_mut().await;
146                notified.set(self.state.terminated_notify.notified());
147            }
148        })
149    }
150}