Skip to main content

qubit_executor/service/
thread_per_task_executor_service.rs

1/*******************************************************************************
2 *
3 *    Copyright (c) 2025 - 2026 Haixing Hu.
4 *
5 *    SPDX-License-Identifier: Apache-2.0
6 *
7 *    Licensed under the Apache License, Version 2.0.
8 *
9 ******************************************************************************/
10use std::{
11    future::Future,
12    pin::Pin,
13    sync::{
14        Arc,
15        Condvar,
16        Mutex,
17        MutexGuard,
18    },
19    thread,
20};
21
22use qubit_atomic::{
23    Atomic,
24    AtomicCount,
25};
26use qubit_function::Callable;
27
28use crate::{
29    TaskCompletionPair,
30    TaskHandle,
31    TaskRunner,
32};
33
34use super::{
35    ExecutorService,
36    RejectedExecution,
37    ShutdownReport,
38};
39
40/// Shared state for [`ThreadPerTaskExecutorService`].
41#[derive(Default)]
42struct ThreadPerTaskExecutorServiceState {
43    /// Whether shutdown has been requested.
44    shutdown: Atomic<bool>,
45    /// Number of accepted OS-thread tasks that have not completed.
46    active_tasks: AtomicCount,
47    /// Serializes task submission and shutdown transitions.
48    submission_lock: Mutex<()>,
49    /// Mutex paired with the termination condition variable.
50    termination_lock: Mutex<()>,
51    /// Condition variable used to wait for service termination.
52    termination: Condvar,
53}
54
55impl ThreadPerTaskExecutorServiceState {
56    /// Acquires the submission lock while tolerating poisoned locks.
57    ///
58    /// # Returns
59    ///
60    /// A guard for the submission lock.
61    #[inline]
62    fn lock_submission(&self) -> MutexGuard<'_, ()> {
63        self.submission_lock
64            .lock()
65            .unwrap_or_else(std::sync::PoisonError::into_inner)
66    }
67
68    /// Acquires the termination lock while tolerating poisoned locks.
69    ///
70    /// # Returns
71    ///
72    /// A guard for the mutex paired with the termination condition variable.
73    #[inline]
74    fn lock_termination(&self) -> MutexGuard<'_, ()> {
75        self.termination_lock
76            .lock()
77            .unwrap_or_else(std::sync::PoisonError::into_inner)
78    }
79
80    /// Wakes termination waiters when shutdown and task completion allow it.
81    #[inline]
82    fn notify_if_terminated(&self) {
83        if self.shutdown.load() && self.active_tasks.is_zero() {
84            self.termination.notify_all();
85        }
86    }
87
88    /// Blocks the current thread until the service is terminated.
89    fn wait_for_termination(&self) {
90        let mut guard = self.lock_termination();
91        while !(self.shutdown.load() && self.active_tasks.is_zero()) {
92            guard = self
93                .termination
94                .wait(guard)
95                .unwrap_or_else(std::sync::PoisonError::into_inner);
96        }
97    }
98}
99
100/// Managed service that runs every accepted task on a dedicated OS thread.
101///
102/// The service has no queue: accepted tasks start immediately on their own
103/// thread. Shutdown prevents later submissions but cannot forcefully stop
104/// running OS threads.
105#[derive(Default, Clone)]
106pub struct ThreadPerTaskExecutorService {
107    /// Shared service state used by all clones of this service.
108    state: Arc<ThreadPerTaskExecutorServiceState>,
109}
110
111impl ThreadPerTaskExecutorService {
112    /// Creates a new service instance.
113    ///
114    /// # Returns
115    ///
116    /// A service that accepts tasks until shutdown is requested.
117    #[inline]
118    pub fn new() -> Self {
119        Self::default()
120    }
121}
122
123impl ExecutorService for ThreadPerTaskExecutorService {
124    type Handle<R, E>
125        = TaskHandle<R, E>
126    where
127        R: Send + 'static,
128        E: Send + 'static;
129
130    type Termination<'a>
131        = Pin<Box<dyn Future<Output = ()> + Send + 'a>>
132    where
133        Self: 'a;
134
135    /// Accepts a callable and starts it on a dedicated OS thread.
136    ///
137    /// # Parameters
138    ///
139    /// * `task` - Callable to execute on a new OS thread.
140    ///
141    /// # Returns
142    ///
143    /// A [`TaskHandle`] for the accepted task.
144    ///
145    /// # Errors
146    ///
147    /// Returns [`RejectedExecution::Shutdown`] if shutdown has already been
148    /// requested before the task is accepted.
149    fn submit_callable<C, R, E>(&self, task: C) -> Result<Self::Handle<R, E>, RejectedExecution>
150    where
151        C: Callable<R, E> + Send + 'static,
152        R: Send + 'static,
153        E: Send + 'static,
154    {
155        let submission_guard = self.state.lock_submission();
156        if self.state.shutdown.load() {
157            return Err(RejectedExecution::Shutdown);
158        }
159        self.state.active_tasks.inc();
160        drop(submission_guard);
161
162        let (handle, completion) = TaskCompletionPair::new().into_parts();
163        let state = Arc::clone(&self.state);
164        thread::spawn(move || {
165            TaskRunner::new(task).run(completion);
166            if state.active_tasks.dec() == 0 {
167                state.notify_if_terminated();
168            }
169        });
170        Ok(handle)
171    }
172
173    /// Stops accepting new tasks.
174    ///
175    /// Already accepted threads are allowed to finish.
176    fn shutdown(&self) {
177        let _guard = self.state.lock_submission();
178        self.state.shutdown.store(true);
179        self.state.notify_if_terminated();
180    }
181
182    /// Stops accepting new tasks and reports currently running work.
183    ///
184    /// Running OS threads cannot be forcefully stopped by this service.
185    ///
186    /// # Returns
187    ///
188    /// A report with zero queued tasks, the observed active thread count, and
189    /// zero cancelled tasks.
190    fn shutdown_now(&self) -> ShutdownReport {
191        let _guard = self.state.lock_submission();
192        self.state.shutdown.store(true);
193        let running = self.state.active_tasks.get();
194        self.state.notify_if_terminated();
195        ShutdownReport::new(0, running, 0)
196    }
197
198    /// Returns whether shutdown has been requested.
199    #[inline]
200    fn is_shutdown(&self) -> bool {
201        self.state.shutdown.load()
202    }
203
204    /// Returns whether shutdown was requested and all tasks are finished.
205    #[inline]
206    fn is_terminated(&self) -> bool {
207        self.is_shutdown() && self.state.active_tasks.is_zero()
208    }
209
210    /// Waits for all accepted tasks to complete after shutdown.
211    ///
212    /// This future blocks the polling thread while waiting on a condition
213    /// variable.
214    ///
215    /// # Returns
216    ///
217    /// A future that resolves after shutdown has been requested and all
218    /// accepted OS-thread tasks have completed.
219    #[inline]
220    fn await_termination(&self) -> Self::Termination<'_> {
221        Box::pin(async move {
222            self.state.wait_for_termination();
223        })
224    }
225}