qubit_tokio_executor/tokio_io_executor_service.rs
1/*******************************************************************************
2 *
3 * Copyright (c) 2025 - 2026.
4 * Haixing Hu, Qubit Co. Ltd.
5 *
6 * All rights reserved.
7 *
8 ******************************************************************************/
9use std::{
10 future::Future,
11 pin::Pin,
12 sync::Arc,
13};
14
15use qubit_executor::TaskExecutionError;
16
17use crate::TokioTaskHandle;
18use crate::tokio_io_executor_service_state::TokioIoExecutorServiceState;
19use crate::tokio_io_service_task_guard::TokioIoServiceTaskGuard;
20use qubit_executor::service::{
21 RejectedExecution,
22 ShutdownReport,
23};
24
25/// Tokio-backed executor service for async IO and Future-based tasks.
26///
27/// Accepted futures are spawned with [`tokio::spawn`], so waiting for external
28/// IO does not occupy a dedicated blocking thread.
29#[derive(Default, Clone)]
30pub struct TokioIoExecutorService {
31 /// Shared service state used by all clones of this service.
32 state: Arc<TokioIoExecutorServiceState>,
33}
34
35impl TokioIoExecutorService {
36 /// Creates a new service instance.
37 ///
38 /// # Returns
39 ///
40 /// A Tokio-backed executor service for Future-based tasks.
41 #[inline]
42 pub fn new() -> Self {
43 Self::default()
44 }
45
46 /// Accepts an async task and spawns it on the current Tokio runtime.
47 ///
48 /// # Parameters
49 ///
50 /// * `future` - Future to execute on Tokio's async scheduler.
51 ///
52 /// # Returns
53 ///
54 /// A [`TokioTaskHandle`] for the accepted task.
55 ///
56 /// # Errors
57 ///
58 /// Returns [`RejectedExecution::Shutdown`] if shutdown has already been
59 /// requested before the task is accepted.
60 pub fn spawn<F, R, E>(&self, future: F) -> Result<TokioTaskHandle<R, E>, RejectedExecution>
61 where
62 F: Future<Output = Result<R, E>> + Send + 'static,
63 R: Send + 'static,
64 E: Send + 'static,
65 {
66 let submission_guard = self.state.lock_submission();
67 if self.state.shutdown.load() {
68 return Err(RejectedExecution::Shutdown);
69 }
70 self.state.active_tasks.inc();
71
72 let marker = Arc::new(());
73 let guard = TokioIoServiceTaskGuard::new(Arc::clone(&self.state), Arc::clone(&marker));
74 let handle = tokio::spawn(async move {
75 let _guard = guard;
76 future.await.map_err(TaskExecutionError::Failed)
77 });
78 self.state
79 .register_abort_handle(marker, handle.abort_handle());
80 drop(submission_guard);
81 Ok(TokioTaskHandle::new(handle))
82 }
83
84 /// Stops accepting new async tasks.
85 ///
86 /// Already accepted tasks are allowed to finish unless aborted through
87 /// their handles or by [`Self::shutdown_now`].
88 pub fn shutdown(&self) {
89 let _guard = self.state.lock_submission();
90 self.state.shutdown.store(true);
91 self.state.notify_if_terminated();
92 }
93
94 /// Stops accepting new tasks and aborts tracked async tasks.
95 ///
96 /// # Returns
97 ///
98 /// A report with zero queued tasks, the observed active-task count, and
99 /// the number of Tokio abort handles signalled.
100 pub fn shutdown_now(&self) -> ShutdownReport {
101 let _guard = self.state.lock_submission();
102 self.state.shutdown.store(true);
103 let running = self.state.active_tasks.get();
104 let cancellation_count = self.state.abort_tracked_tasks();
105 self.state.notify_if_terminated();
106 ShutdownReport::new(0, running, cancellation_count)
107 }
108
109 /// Returns whether shutdown has been requested.
110 ///
111 /// # Returns
112 ///
113 /// `true` if this service no longer accepts new async tasks.
114 #[inline]
115 pub fn is_shutdown(&self) -> bool {
116 self.state.shutdown.load()
117 }
118
119 /// Returns whether shutdown was requested and all async tasks are finished.
120 ///
121 /// # Returns
122 ///
123 /// `true` only after shutdown has been requested and no accepted async
124 /// tasks remain active.
125 #[inline]
126 pub fn is_terminated(&self) -> bool {
127 self.is_shutdown() && self.state.active_tasks.is_zero()
128 }
129
130 /// Waits until the service has terminated.
131 ///
132 /// # Returns
133 ///
134 /// A future that resolves after shutdown has been requested and all
135 /// accepted async tasks have finished or been aborted.
136 pub fn await_termination(&self) -> Pin<Box<dyn Future<Output = ()> + Send + '_>> {
137 Box::pin(async move {
138 let notified = self.state.terminated_notify.notified();
139 tokio::pin!(notified);
140 loop {
141 notified.as_mut().enable();
142 if self.is_terminated() {
143 return;
144 }
145 notified.as_mut().await;
146 notified.set(self.state.terminated_notify.notified());
147 }
148 })
149 }
150}