qubit_executor/service/thread_per_task_executor_service.rs
1/*******************************************************************************
2 *
3 * Copyright (c) 2025 - 2026.
4 * Haixing Hu, Qubit Co. Ltd.
5 *
6 * All rights reserved.
7 *
8 ******************************************************************************/
9use std::{
10 future::Future,
11 pin::Pin,
12 sync::{
13 Arc,
14 Condvar,
15 Mutex,
16 MutexGuard,
17 },
18 thread,
19};
20
21use qubit_atomic::{
22 Atomic,
23 AtomicCount,
24};
25use qubit_function::Callable;
26
27use crate::{
28 TaskCompletionPair,
29 TaskHandle,
30 TaskRunner,
31};
32
33use super::{
34 ExecutorService,
35 RejectedExecution,
36 ShutdownReport,
37};
38
39/// Shared state for [`ThreadPerTaskExecutorService`].
40#[derive(Default)]
41struct ThreadPerTaskExecutorServiceState {
42 /// Whether shutdown has been requested.
43 shutdown: Atomic<bool>,
44 /// Number of accepted OS-thread tasks that have not completed.
45 active_tasks: AtomicCount,
46 /// Serializes task submission and shutdown transitions.
47 submission_lock: Mutex<()>,
48 /// Mutex paired with the termination condition variable.
49 termination_lock: Mutex<()>,
50 /// Condition variable used to wait for service termination.
51 termination: Condvar,
52}
53
54impl ThreadPerTaskExecutorServiceState {
55 /// Acquires the submission lock while tolerating poisoned locks.
56 ///
57 /// # Returns
58 ///
59 /// A guard for the submission lock.
60 #[inline]
61 fn lock_submission(&self) -> MutexGuard<'_, ()> {
62 self.submission_lock
63 .lock()
64 .unwrap_or_else(std::sync::PoisonError::into_inner)
65 }
66
67 /// Acquires the termination lock while tolerating poisoned locks.
68 ///
69 /// # Returns
70 ///
71 /// A guard for the mutex paired with the termination condition variable.
72 #[inline]
73 fn lock_termination(&self) -> MutexGuard<'_, ()> {
74 self.termination_lock
75 .lock()
76 .unwrap_or_else(std::sync::PoisonError::into_inner)
77 }
78
79 /// Wakes termination waiters when shutdown and task completion allow it.
80 #[inline]
81 fn notify_if_terminated(&self) {
82 if self.shutdown.load() && self.active_tasks.is_zero() {
83 self.termination.notify_all();
84 }
85 }
86
87 /// Blocks the current thread until the service is terminated.
88 fn wait_for_termination(&self) {
89 let mut guard = self.lock_termination();
90 while !(self.shutdown.load() && self.active_tasks.is_zero()) {
91 guard = self
92 .termination
93 .wait(guard)
94 .unwrap_or_else(std::sync::PoisonError::into_inner);
95 }
96 }
97}
98
99/// Managed service that runs every accepted task on a dedicated OS thread.
100///
101/// The service has no queue: accepted tasks start immediately on their own
102/// thread. Shutdown prevents later submissions but cannot forcefully stop
103/// running OS threads.
104#[derive(Default, Clone)]
105pub struct ThreadPerTaskExecutorService {
106 /// Shared service state used by all clones of this service.
107 state: Arc<ThreadPerTaskExecutorServiceState>,
108}
109
110impl ThreadPerTaskExecutorService {
111 /// Creates a new service instance.
112 ///
113 /// # Returns
114 ///
115 /// A service that accepts tasks until shutdown is requested.
116 #[inline]
117 pub fn new() -> Self {
118 Self::default()
119 }
120}
121
122impl ExecutorService for ThreadPerTaskExecutorService {
123 type Handle<R, E>
124 = TaskHandle<R, E>
125 where
126 R: Send + 'static,
127 E: Send + 'static;
128
129 type Termination<'a>
130 = Pin<Box<dyn Future<Output = ()> + Send + 'a>>
131 where
132 Self: 'a;
133
134 /// Accepts a callable and starts it on a dedicated OS thread.
135 ///
136 /// # Parameters
137 ///
138 /// * `task` - Callable to execute on a new OS thread.
139 ///
140 /// # Returns
141 ///
142 /// A [`TaskHandle`] for the accepted task.
143 ///
144 /// # Errors
145 ///
146 /// Returns [`RejectedExecution::Shutdown`] if shutdown has already been
147 /// requested before the task is accepted.
148 fn submit_callable<C, R, E>(&self, task: C) -> Result<Self::Handle<R, E>, RejectedExecution>
149 where
150 C: Callable<R, E> + Send + 'static,
151 R: Send + 'static,
152 E: Send + 'static,
153 {
154 let submission_guard = self.state.lock_submission();
155 if self.state.shutdown.load() {
156 return Err(RejectedExecution::Shutdown);
157 }
158 self.state.active_tasks.inc();
159 drop(submission_guard);
160
161 let (handle, completion) = TaskCompletionPair::new().into_parts();
162 let state = Arc::clone(&self.state);
163 thread::spawn(move || {
164 TaskRunner::new(task).run(completion);
165 if state.active_tasks.dec() == 0 {
166 state.notify_if_terminated();
167 }
168 });
169 Ok(handle)
170 }
171
172 /// Stops accepting new tasks.
173 ///
174 /// Already accepted threads are allowed to finish.
175 fn shutdown(&self) {
176 let _guard = self.state.lock_submission();
177 self.state.shutdown.store(true);
178 self.state.notify_if_terminated();
179 }
180
181 /// Stops accepting new tasks and reports currently running work.
182 ///
183 /// Running OS threads cannot be forcefully stopped by this service.
184 ///
185 /// # Returns
186 ///
187 /// A report with zero queued tasks, the observed active thread count, and
188 /// zero cancelled tasks.
189 fn shutdown_now(&self) -> ShutdownReport {
190 let _guard = self.state.lock_submission();
191 self.state.shutdown.store(true);
192 let running = self.state.active_tasks.get();
193 self.state.notify_if_terminated();
194 ShutdownReport::new(0, running, 0)
195 }
196
197 /// Returns whether shutdown has been requested.
198 #[inline]
199 fn is_shutdown(&self) -> bool {
200 self.state.shutdown.load()
201 }
202
203 /// Returns whether shutdown was requested and all tasks are finished.
204 #[inline]
205 fn is_terminated(&self) -> bool {
206 self.is_shutdown() && self.state.active_tasks.is_zero()
207 }
208
209 /// Waits for all accepted tasks to complete after shutdown.
210 ///
211 /// This future blocks the polling thread while waiting on a condition
212 /// variable.
213 ///
214 /// # Returns
215 ///
216 /// A future that resolves after shutdown has been requested and all
217 /// accepted OS-thread tasks have completed.
218 #[inline]
219 fn await_termination(&self) -> Self::Termination<'_> {
220 Box::pin(async move {
221 self.state.wait_for_termination();
222 })
223 }
224}