qubit_tokio_executor/tokio_io_executor_service.rs
1/*******************************************************************************
2 *
3 * Copyright (c) 2025 - 2026 Haixing Hu.
4 *
5 * SPDX-License-Identifier: Apache-2.0
6 *
7 * Licensed under the Apache License, Version 2.0.
8 *
9 ******************************************************************************/
10use std::{
11 future::Future,
12 pin::Pin,
13 sync::Arc,
14};
15
16use qubit_executor::TaskExecutionError;
17
18use crate::TokioTaskHandle;
19use crate::tokio_io_executor_service_state::TokioIoExecutorServiceState;
20use crate::tokio_io_service_task_guard::TokioIoServiceTaskGuard;
21use qubit_executor::service::{
22 RejectedExecution,
23 ShutdownReport,
24};
25
26/// Tokio-backed executor service for async IO and Future-based tasks.
27///
28/// Accepted futures are spawned with [`tokio::spawn`], so waiting for external
29/// IO does not occupy a dedicated blocking thread.
30#[derive(Default, Clone)]
31pub struct TokioIoExecutorService {
32 /// Shared service state used by all clones of this service.
33 state: Arc<TokioIoExecutorServiceState>,
34}
35
36impl TokioIoExecutorService {
37 /// Creates a new service instance.
38 ///
39 /// # Returns
40 ///
41 /// A Tokio-backed executor service for Future-based tasks.
42 #[inline]
43 pub fn new() -> Self {
44 Self::default()
45 }
46
47 /// Accepts an async task and spawns it on the current Tokio runtime.
48 ///
49 /// # Parameters
50 ///
51 /// * `future` - Future to execute on Tokio's async scheduler.
52 ///
53 /// # Returns
54 ///
55 /// A [`TokioTaskHandle`] for the accepted task.
56 ///
57 /// # Errors
58 ///
59 /// Returns [`RejectedExecution::Shutdown`] if shutdown has already been
60 /// requested before the task is accepted.
61 pub fn spawn<F, R, E>(&self, future: F) -> Result<TokioTaskHandle<R, E>, RejectedExecution>
62 where
63 F: Future<Output = Result<R, E>> + Send + 'static,
64 R: Send + 'static,
65 E: Send + 'static,
66 {
67 let submission_guard = self.state.lock_submission();
68 if self.state.shutdown.load() {
69 return Err(RejectedExecution::Shutdown);
70 }
71 self.state.active_tasks.inc();
72
73 let marker = Arc::new(());
74 let guard = TokioIoServiceTaskGuard::new(Arc::clone(&self.state), Arc::clone(&marker));
75 let handle = tokio::spawn(async move {
76 let _guard = guard;
77 future.await.map_err(TaskExecutionError::Failed)
78 });
79 self.state
80 .register_abort_handle(marker, handle.abort_handle());
81 drop(submission_guard);
82 Ok(TokioTaskHandle::new(handle))
83 }
84
85 /// Stops accepting new async tasks.
86 ///
87 /// Already accepted tasks are allowed to finish unless aborted through
88 /// their handles or by [`Self::shutdown_now`].
89 pub fn shutdown(&self) {
90 let _guard = self.state.lock_submission();
91 self.state.shutdown.store(true);
92 self.state.notify_if_terminated();
93 }
94
95 /// Stops accepting new tasks and aborts tracked async tasks.
96 ///
97 /// # Returns
98 ///
99 /// A report with zero queued tasks, the observed active-task count, and
100 /// the number of Tokio abort handles signalled.
101 pub fn shutdown_now(&self) -> ShutdownReport {
102 let _guard = self.state.lock_submission();
103 self.state.shutdown.store(true);
104 let running = self.state.active_tasks.get();
105 let cancellation_count = self.state.abort_tracked_tasks();
106 self.state.notify_if_terminated();
107 ShutdownReport::new(0, running, cancellation_count)
108 }
109
110 /// Returns whether shutdown has been requested.
111 ///
112 /// # Returns
113 ///
114 /// `true` if this service no longer accepts new async tasks.
115 #[inline]
116 pub fn is_shutdown(&self) -> bool {
117 self.state.shutdown.load()
118 }
119
120 /// Returns whether shutdown was requested and all async tasks are finished.
121 ///
122 /// # Returns
123 ///
124 /// `true` only after shutdown has been requested and no accepted async
125 /// tasks remain active.
126 #[inline]
127 pub fn is_terminated(&self) -> bool {
128 self.is_shutdown() && self.state.active_tasks.is_zero()
129 }
130
131 /// Waits until the service has terminated.
132 ///
133 /// # Returns
134 ///
135 /// A future that resolves after shutdown has been requested and all
136 /// accepted async tasks have finished or been aborted.
137 pub fn await_termination(&self) -> Pin<Box<dyn Future<Output = ()> + Send + '_>> {
138 Box::pin(async move {
139 let notified = self.state.terminated_notify.notified();
140 tokio::pin!(notified);
141 loop {
142 notified.as_mut().enable();
143 if self.is_terminated() {
144 return;
145 }
146 notified.as_mut().await;
147 notified.set(self.state.terminated_notify.notified());
148 }
149 })
150 }
151}