qubit_executor/service/thread_per_task_executor_service.rs
1/*******************************************************************************
2 *
3 * Copyright (c) 2025 - 2026 Haixing Hu.
4 *
5 * SPDX-License-Identifier: Apache-2.0
6 *
7 * Licensed under the Apache License, Version 2.0.
8 *
9 ******************************************************************************/
10use std::{
11 future::Future,
12 pin::Pin,
13 sync::{
14 Arc,
15 Condvar,
16 Mutex,
17 MutexGuard,
18 },
19 thread,
20};
21
22use qubit_atomic::{
23 Atomic,
24 AtomicCount,
25};
26use qubit_function::Callable;
27
28use crate::{
29 TaskCompletionPair,
30 TaskHandle,
31 TaskRunner,
32};
33
34use super::{
35 ExecutorService,
36 RejectedExecution,
37 ShutdownReport,
38};
39
40/// Shared state for [`ThreadPerTaskExecutorService`].
41#[derive(Default)]
42struct ThreadPerTaskExecutorServiceState {
43 /// Whether shutdown has been requested.
44 shutdown: Atomic<bool>,
45 /// Number of accepted OS-thread tasks that have not completed.
46 active_tasks: AtomicCount,
47 /// Serializes task submission and shutdown transitions.
48 submission_lock: Mutex<()>,
49 /// Mutex paired with the termination condition variable.
50 termination_lock: Mutex<()>,
51 /// Condition variable used to wait for service termination.
52 termination: Condvar,
53}
54
55impl ThreadPerTaskExecutorServiceState {
56 /// Acquires the submission lock while tolerating poisoned locks.
57 ///
58 /// # Returns
59 ///
60 /// A guard for the submission lock.
61 #[inline]
62 fn lock_submission(&self) -> MutexGuard<'_, ()> {
63 self.submission_lock
64 .lock()
65 .unwrap_or_else(std::sync::PoisonError::into_inner)
66 }
67
68 /// Acquires the termination lock while tolerating poisoned locks.
69 ///
70 /// # Returns
71 ///
72 /// A guard for the mutex paired with the termination condition variable.
73 #[inline]
74 fn lock_termination(&self) -> MutexGuard<'_, ()> {
75 self.termination_lock
76 .lock()
77 .unwrap_or_else(std::sync::PoisonError::into_inner)
78 }
79
80 /// Wakes termination waiters when shutdown and task completion allow it.
81 #[inline]
82 fn notify_if_terminated(&self) {
83 if self.shutdown.load() && self.active_tasks.is_zero() {
84 self.termination.notify_all();
85 }
86 }
87
88 /// Blocks the current thread until the service is terminated.
89 fn wait_for_termination(&self) {
90 let mut guard = self.lock_termination();
91 while !(self.shutdown.load() && self.active_tasks.is_zero()) {
92 guard = self
93 .termination
94 .wait(guard)
95 .unwrap_or_else(std::sync::PoisonError::into_inner);
96 }
97 }
98}
99
100/// Managed service that runs every accepted task on a dedicated OS thread.
101///
102/// The service has no queue: accepted tasks start immediately on their own
103/// thread. Shutdown prevents later submissions but cannot forcefully stop
104/// running OS threads.
105#[derive(Default, Clone)]
106pub struct ThreadPerTaskExecutorService {
107 /// Shared service state used by all clones of this service.
108 state: Arc<ThreadPerTaskExecutorServiceState>,
109}
110
111impl ThreadPerTaskExecutorService {
112 /// Creates a new service instance.
113 ///
114 /// # Returns
115 ///
116 /// A service that accepts tasks until shutdown is requested.
117 #[inline]
118 pub fn new() -> Self {
119 Self::default()
120 }
121}
122
123impl ExecutorService for ThreadPerTaskExecutorService {
124 type Handle<R, E>
125 = TaskHandle<R, E>
126 where
127 R: Send + 'static,
128 E: Send + 'static;
129
130 type Termination<'a>
131 = Pin<Box<dyn Future<Output = ()> + Send + 'a>>
132 where
133 Self: 'a;
134
135 /// Accepts a callable and starts it on a dedicated OS thread.
136 ///
137 /// # Parameters
138 ///
139 /// * `task` - Callable to execute on a new OS thread.
140 ///
141 /// # Returns
142 ///
143 /// A [`TaskHandle`] for the accepted task.
144 ///
145 /// # Errors
146 ///
147 /// Returns [`RejectedExecution::Shutdown`] if shutdown has already been
148 /// requested before the task is accepted.
149 fn submit_callable<C, R, E>(&self, task: C) -> Result<Self::Handle<R, E>, RejectedExecution>
150 where
151 C: Callable<R, E> + Send + 'static,
152 R: Send + 'static,
153 E: Send + 'static,
154 {
155 let submission_guard = self.state.lock_submission();
156 if self.state.shutdown.load() {
157 return Err(RejectedExecution::Shutdown);
158 }
159 self.state.active_tasks.inc();
160 drop(submission_guard);
161
162 let (handle, completion) = TaskCompletionPair::new().into_parts();
163 let state = Arc::clone(&self.state);
164 thread::spawn(move || {
165 TaskRunner::new(task).run(completion);
166 if state.active_tasks.dec() == 0 {
167 state.notify_if_terminated();
168 }
169 });
170 Ok(handle)
171 }
172
173 /// Stops accepting new tasks.
174 ///
175 /// Already accepted threads are allowed to finish.
176 fn shutdown(&self) {
177 let _guard = self.state.lock_submission();
178 self.state.shutdown.store(true);
179 self.state.notify_if_terminated();
180 }
181
182 /// Stops accepting new tasks and reports currently running work.
183 ///
184 /// Running OS threads cannot be forcefully stopped by this service.
185 ///
186 /// # Returns
187 ///
188 /// A report with zero queued tasks, the observed active thread count, and
189 /// zero cancelled tasks.
190 fn shutdown_now(&self) -> ShutdownReport {
191 let _guard = self.state.lock_submission();
192 self.state.shutdown.store(true);
193 let running = self.state.active_tasks.get();
194 self.state.notify_if_terminated();
195 ShutdownReport::new(0, running, 0)
196 }
197
198 /// Returns whether shutdown has been requested.
199 #[inline]
200 fn is_shutdown(&self) -> bool {
201 self.state.shutdown.load()
202 }
203
204 /// Returns whether shutdown was requested and all tasks are finished.
205 #[inline]
206 fn is_terminated(&self) -> bool {
207 self.is_shutdown() && self.state.active_tasks.is_zero()
208 }
209
210 /// Waits for all accepted tasks to complete after shutdown.
211 ///
212 /// This future blocks the polling thread while waiting on a condition
213 /// variable.
214 ///
215 /// # Returns
216 ///
217 /// A future that resolves after shutdown has been requested and all
218 /// accepted OS-thread tasks have completed.
219 #[inline]
220 fn await_termination(&self) -> Self::Termination<'_> {
221 Box::pin(async move {
222 self.state.wait_for_termination();
223 })
224 }
225}