qubit_executor/service/thread_per_task_executor_service.rs
1/*******************************************************************************
2 *
3 * Copyright (c) 2025 - 2026 Haixing Hu.
4 *
5 * SPDX-License-Identifier: Apache-2.0
6 *
7 * Licensed under the Apache License, Version 2.0.
8 *
9 ******************************************************************************/
10use std::sync::Arc;
11
12use parking_lot::{
13 Condvar,
14 Mutex,
15};
16use qubit_function::{
17 Callable,
18 Runnable,
19};
20
21use crate::executor::thread_spawn_config::ThreadSpawnConfig;
22use crate::{
23 TaskHandle,
24 TrackedTask,
25 hook::{
26 TaskHook,
27 notify_rejected,
28 notify_rejected_optional,
29 },
30 task::{
31 spi::{
32 TaskEndpointPair,
33 TaskSlot,
34 },
35 task_admission_gate::TaskAdmissionGate,
36 },
37};
38
39use super::{
40 ExecutorService,
41 ExecutorServiceLifecycle,
42 StopReport,
43 SubmissionError,
44 ThreadPerTaskExecutorServiceBuilder,
45};
46type Worker = Box<dyn FnOnce() + Send + 'static>;
47
48/// Handle variants that can cross the accepted task boundary.
49trait TaskAdmissionHandle {
50 /// Marks the underlying task as accepted and emits accepted hooks.
51 fn mark_accepted(&self);
52}
53
54impl<R, E> TaskAdmissionHandle for TaskHandle<R, E> {
55 /// Marks a task handle as accepted.
56 #[inline]
57 fn mark_accepted(&self) {
58 self.accept();
59 }
60}
61
62impl<R, E> TaskAdmissionHandle for TrackedTask<R, E> {
63 /// Marks a tracked task handle as accepted.
64 #[inline]
65 fn mark_accepted(&self) {
66 self.accept();
67 }
68}
69
70/// Mutable service state protected by the service mutex.
71#[derive(Debug, Clone, Copy)]
72struct ServiceState {
73 /// Current lifecycle state.
74 lifecycle: ExecutorServiceLifecycle,
75 /// Number of accepted OS-thread tasks that have not completed.
76 active_tasks: usize,
77}
78
79impl Default for ServiceState {
80 /// Creates a running state with no active tasks.
81 #[inline]
82 fn default() -> Self {
83 Self {
84 lifecycle: ExecutorServiceLifecycle::Running,
85 active_tasks: 0,
86 }
87 }
88}
89
90/// Shared state for [`ThreadPerTaskExecutorService`].
91#[derive(Default)]
92struct ThreadPerTaskExecutorServiceState {
93 /// Lifecycle and active-task counters protected as one state machine.
94 state: Mutex<ServiceState>,
95 /// Condition variable used to wait for service termination.
96 termination: Condvar,
97}
98
99/// Guard that records completion for one accepted task when dropped.
100struct ActiveTaskGuard {
101 /// Shared service state to update when the worker exits.
102 state: Arc<ThreadPerTaskExecutorServiceState>,
103}
104
105impl ActiveTaskGuard {
106 /// Creates a guard for one accepted active task.
107 ///
108 /// # Parameters
109 ///
110 /// * `state` - Shared service state whose active count should be decremented.
111 ///
112 /// # Returns
113 ///
114 /// A guard that finishes the accepted task on drop.
115 #[inline]
116 fn new(state: Arc<ThreadPerTaskExecutorServiceState>) -> Self {
117 Self { state }
118 }
119}
120
121impl Drop for ActiveTaskGuard {
122 /// Records task completion when the worker closure exits.
123 #[inline]
124 fn drop(&mut self) {
125 self.state.finish_task();
126 }
127}
128
129impl ThreadPerTaskExecutorServiceState {
130 /// Returns the currently stored lifecycle state.
131 ///
132 /// # Returns
133 ///
134 /// The lifecycle stored in the service state.
135 #[inline]
136 fn lifecycle(&self) -> ExecutorServiceLifecycle {
137 self.state.lock().lifecycle
138 }
139
140 /// Attempts to accept one task and increments the active task count.
141 ///
142 /// # Returns
143 ///
144 /// `Ok(())` if the service is running and accepted the task.
145 ///
146 /// # Errors
147 ///
148 /// Returns [`SubmissionError::Shutdown`] if the service is not running.
149 #[inline]
150 fn accept_task(&self) -> Result<(), SubmissionError> {
151 let mut state = self.state.lock();
152 if state.lifecycle != ExecutorServiceLifecycle::Running {
153 return Err(SubmissionError::Shutdown);
154 }
155 state.active_tasks += 1;
156 Ok(())
157 }
158
159 /// Records one task completion and wakes termination waiters if appropriate.
160 #[inline]
161 fn finish_task(&self) {
162 let mut state = self.state.lock();
163 state.active_tasks -= 1;
164 Self::terminate_if_ready(&mut state, &self.termination);
165 }
166
167 /// Blocks the current thread until the service is terminated.
168 fn wait_for_termination(&self) {
169 let mut state = self.state.lock();
170 while state.lifecycle != ExecutorServiceLifecycle::Terminated {
171 self.termination.wait(&mut state);
172 }
173 }
174
175 /// Requests graceful shutdown.
176 #[inline]
177 fn shutdown(&self) {
178 let mut state = self.state.lock();
179 if state.lifecycle == ExecutorServiceLifecycle::Running {
180 state.lifecycle = ExecutorServiceLifecycle::ShuttingDown;
181 }
182 Self::terminate_if_ready(&mut state, &self.termination);
183 }
184
185 /// Requests abrupt stop and returns the observed active work count.
186 ///
187 /// # Returns
188 ///
189 /// The number of active tasks observed while stopping.
190 #[inline]
191 fn stop(&self) -> usize {
192 let mut state = self.state.lock();
193 if state.lifecycle != ExecutorServiceLifecycle::Terminated {
194 state.lifecycle = ExecutorServiceLifecycle::Stopping;
195 }
196 let running = state.active_tasks;
197 Self::terminate_if_ready(&mut state, &self.termination);
198 running
199 }
200
201 /// Marks the service terminated when it is non-running and idle.
202 #[inline]
203 fn terminate_if_ready(state: &mut ServiceState, termination: &Condvar) {
204 if state.lifecycle != ExecutorServiceLifecycle::Running && state.active_tasks == 0 {
205 state.lifecycle = ExecutorServiceLifecycle::Terminated;
206 termination.notify_all();
207 }
208 }
209}
210
211/// Managed service that runs every accepted task on a dedicated OS thread.
212///
213/// The service has no queue: accepted tasks start immediately on their own
214/// thread. Shutdown prevents later submissions but cannot forcefully stop
215/// running OS threads.
216#[derive(Clone)]
217pub struct ThreadPerTaskExecutorService {
218 /// Shared service state used by all clones of this service.
219 state: Arc<ThreadPerTaskExecutorServiceState>,
220 /// Optional stack size for each spawned worker thread.
221 stack_size: Option<usize>,
222 /// Hook notified about accepted task lifecycle events.
223 pub(crate) hook: Option<Arc<dyn TaskHook>>,
224}
225
226impl Default for ThreadPerTaskExecutorService {
227 /// Creates a service with default worker options and no hook.
228 #[inline]
229 fn default() -> Self {
230 Self {
231 state: Arc::default(),
232 stack_size: None,
233 hook: None,
234 }
235 }
236}
237
238impl ThreadPerTaskExecutorService {
239 /// Creates a new service instance.
240 ///
241 /// # Returns
242 ///
243 /// A service that accepts tasks until shutdown is requested.
244 #[inline]
245 pub fn new() -> Self {
246 Self::default()
247 }
248
249 /// Creates a service with the supplied worker stack size configuration.
250 ///
251 /// # Parameters
252 ///
253 /// * `stack_size` - Optional stack size in bytes for spawned workers.
254 ///
255 /// # Returns
256 ///
257 /// A service using the supplied worker stack size configuration.
258 #[inline]
259 pub(crate) fn from_stack_size(stack_size: Option<usize>) -> Self {
260 Self {
261 state: Arc::default(),
262 stack_size,
263 hook: None,
264 }
265 }
266
267 /// Creates a builder for configuring this service.
268 ///
269 /// # Returns
270 ///
271 /// A builder initialized with default worker thread options.
272 #[inline]
273 pub fn builder() -> ThreadPerTaskExecutorServiceBuilder {
274 ThreadPerTaskExecutorServiceBuilder::new()
275 }
276
277 /// Spawns one accepted worker thread.
278 ///
279 /// # Parameters
280 ///
281 /// * `worker` - Closure to run on the worker OS thread.
282 ///
283 /// # Returns
284 ///
285 /// `Ok(())` if the worker was spawned.
286 ///
287 /// # Errors
288 ///
289 /// Returns [`SubmissionError::WorkerSpawnFailed`] if the operating system
290 /// refuses to create the worker thread. Accepted task accounting is handled
291 /// by the active-task guard captured by `worker`.
292 fn spawn_worker_after_accept(&self, worker: Worker) -> Result<(), SubmissionError> {
293 ThreadSpawnConfig::new(self.stack_size).spawn(worker)
294 }
295
296 /// Notifies the configured hook about a rejected submission.
297 ///
298 /// # Parameters
299 ///
300 /// * `error` - Submission failure reported to the caller.
301 #[inline]
302 fn notify_rejected(&self, error: &SubmissionError) {
303 if let Some(hook) = &self.hook {
304 notify_rejected(hook.as_ref(), error);
305 }
306 }
307
308 /// Accepts service work, starts a worker thread, and returns the chosen handle.
309 ///
310 /// # Parameters
311 ///
312 /// * `split_pair` - Splits the task endpoint pair into the desired handle and slot.
313 /// * `run_slot` - Worker body that consumes the runner-side task slot.
314 ///
315 /// # Returns
316 ///
317 /// The accepted task handle produced by `split_pair`.
318 ///
319 /// # Errors
320 ///
321 /// Returns [`SubmissionError::Shutdown`] if the service is not running, or
322 /// [`SubmissionError::WorkerSpawnFailed`] if the worker thread cannot be created.
323 fn submit_with_slot<R, E, H, S, F>(
324 &self,
325 split_pair: S,
326 run_slot: F,
327 ) -> Result<H, SubmissionError>
328 where
329 R: Send + 'static,
330 E: Send + 'static,
331 H: TaskAdmissionHandle,
332 S: FnOnce(TaskEndpointPair<R, E>) -> (H, TaskSlot<R, E>),
333 F: FnOnce(TaskSlot<R, E>) + Send + 'static,
334 {
335 if let Err(error) = self.state.accept_task() {
336 self.notify_rejected(&error);
337 return Err(error);
338 }
339
340 let pair = TaskEndpointPair::with_optional_hook(self.hook.clone());
341 let (handle, slot) = split_pair(pair);
342 let guard = ActiveTaskGuard::new(Arc::clone(&self.state));
343 let gate = TaskAdmissionGate::new(self.hook.is_some());
344 let worker_gate = gate.clone();
345 let hook = self.hook.clone();
346 if let Err(error) = self.spawn_worker_after_accept(Box::new(move || {
347 worker_gate.wait();
348 let _guard = guard;
349 run_slot(slot);
350 })) {
351 notify_rejected_optional(hook.as_ref(), &error);
352 return Err(error);
353 }
354 handle.mark_accepted();
355 gate.open();
356 Ok(handle)
357 }
358}
359
360impl ExecutorService for ThreadPerTaskExecutorService {
361 type ResultHandle<R, E>
362 = TaskHandle<R, E>
363 where
364 R: Send + 'static,
365 E: Send + 'static;
366
367 type TrackedHandle<R, E>
368 = TrackedTask<R, E>
369 where
370 R: Send + 'static,
371 E: Send + 'static;
372
373 /// Accepts a runnable and starts it on a dedicated OS thread.
374 ///
375 /// # Parameters
376 ///
377 /// * `task` - Runnable to execute on a new OS thread.
378 ///
379 /// # Returns
380 ///
381 /// `Ok(())` if the runnable was accepted.
382 ///
383 /// # Errors
384 ///
385 /// Returns [`SubmissionError::Shutdown`] if shutdown has already been
386 /// requested before the task is accepted.
387 fn submit<T, E>(&self, task: T) -> Result<(), SubmissionError>
388 where
389 T: Runnable<E> + Send + 'static,
390 E: Send + 'static,
391 {
392 let handle = self.submit_with_slot(
393 |pair| pair.into_parts(),
394 move |slot| {
395 let mut task = task;
396 slot.run(move || task.run());
397 },
398 )?;
399 drop(handle);
400 Ok(())
401 }
402
403 /// Accepts a callable and starts it on a dedicated OS thread.
404 ///
405 /// # Parameters
406 ///
407 /// * `task` - Callable to execute on a new OS thread.
408 ///
409 /// # Returns
410 ///
411 /// A [`TaskHandle`] for the accepted task.
412 ///
413 /// # Errors
414 ///
415 /// Returns [`SubmissionError::Shutdown`] if shutdown has already been
416 /// requested before the task is accepted.
417 fn submit_callable<C, R, E>(&self, task: C) -> Result<Self::ResultHandle<R, E>, SubmissionError>
418 where
419 C: Callable<R, E> + Send + 'static,
420 R: Send + 'static,
421 E: Send + 'static,
422 {
423 self.submit_with_slot(
424 |pair| pair.into_parts(),
425 move |slot| {
426 slot.run(task);
427 },
428 )
429 }
430
431 /// Accepts a callable and starts it with a tracked handle.
432 fn submit_tracked_callable<C, R, E>(
433 &self,
434 task: C,
435 ) -> Result<Self::TrackedHandle<R, E>, SubmissionError>
436 where
437 C: Callable<R, E> + Send + 'static,
438 R: Send + 'static,
439 E: Send + 'static,
440 {
441 self.submit_with_slot(
442 |pair| pair.into_tracked_parts(),
443 move |slot| {
444 slot.run(task);
445 },
446 )
447 }
448
449 /// Stops accepting new tasks.
450 ///
451 /// Already accepted threads are allowed to finish.
452 fn shutdown(&self) {
453 self.state.shutdown();
454 }
455
456 /// Stops accepting new tasks and reports currently running work.
457 ///
458 /// Running OS threads cannot be forcefully stopped by this service.
459 ///
460 /// # Returns
461 ///
462 /// A report with zero queued tasks, the observed active thread count, and
463 /// zero cancelled tasks.
464 fn stop(&self) -> StopReport {
465 let running = self.state.stop();
466 StopReport::new(0, running, 0)
467 }
468
469 /// Returns the current lifecycle state.
470 #[inline]
471 fn lifecycle(&self) -> ExecutorServiceLifecycle {
472 self.state.lifecycle()
473 }
474
475 /// Blocks until all accepted tasks complete after shutdown or stop.
476 ///
477 /// This method blocks the current thread on a condition variable. Calling
478 /// it while the service is still running will wait until another thread
479 /// calls [`Self::shutdown`] or [`Self::stop`] and all accepted OS-thread
480 /// tasks have completed.
481 #[inline]
482 fn wait_termination(&self) {
483 self.state.wait_for_termination();
484 }
485}