Skip to main content

qubit_executor/service/
thread_per_task_executor_service.rs

1/*******************************************************************************
2 *
3 *    Copyright (c) 2025 - 2026.
4 *    Haixing Hu, Qubit Co. Ltd.
5 *
6 *    All rights reserved.
7 *
8 ******************************************************************************/
9use std::{
10    future::Future,
11    pin::Pin,
12    sync::{
13        Arc,
14        Condvar,
15        Mutex,
16        MutexGuard,
17    },
18    thread,
19};
20
21use qubit_atomic::{
22    Atomic,
23    AtomicCount,
24};
25use qubit_function::Callable;
26
27use crate::{
28    TaskCompletionPair,
29    TaskHandle,
30    TaskRunner,
31};
32
33use super::{
34    ExecutorService,
35    RejectedExecution,
36    ShutdownReport,
37};
38
39/// Shared state for [`ThreadPerTaskExecutorService`].
40#[derive(Default)]
41struct ThreadPerTaskExecutorServiceState {
42    /// Whether shutdown has been requested.
43    shutdown: Atomic<bool>,
44    /// Number of accepted OS-thread tasks that have not completed.
45    active_tasks: AtomicCount,
46    /// Serializes task submission and shutdown transitions.
47    submission_lock: Mutex<()>,
48    /// Mutex paired with the termination condition variable.
49    termination_lock: Mutex<()>,
50    /// Condition variable used to wait for service termination.
51    termination: Condvar,
52}
53
54impl ThreadPerTaskExecutorServiceState {
55    /// Acquires the submission lock while tolerating poisoned locks.
56    ///
57    /// # Returns
58    ///
59    /// A guard for the submission lock.
60    #[inline]
61    fn lock_submission(&self) -> MutexGuard<'_, ()> {
62        self.submission_lock
63            .lock()
64            .unwrap_or_else(std::sync::PoisonError::into_inner)
65    }
66
67    /// Acquires the termination lock while tolerating poisoned locks.
68    ///
69    /// # Returns
70    ///
71    /// A guard for the mutex paired with the termination condition variable.
72    #[inline]
73    fn lock_termination(&self) -> MutexGuard<'_, ()> {
74        self.termination_lock
75            .lock()
76            .unwrap_or_else(std::sync::PoisonError::into_inner)
77    }
78
79    /// Wakes termination waiters when shutdown and task completion allow it.
80    #[inline]
81    fn notify_if_terminated(&self) {
82        if self.shutdown.load() && self.active_tasks.is_zero() {
83            self.termination.notify_all();
84        }
85    }
86
87    /// Blocks the current thread until the service is terminated.
88    fn wait_for_termination(&self) {
89        let mut guard = self.lock_termination();
90        while !(self.shutdown.load() && self.active_tasks.is_zero()) {
91            guard = self
92                .termination
93                .wait(guard)
94                .unwrap_or_else(std::sync::PoisonError::into_inner);
95        }
96    }
97}
98
99/// Managed service that runs every accepted task on a dedicated OS thread.
100///
101/// The service has no queue: accepted tasks start immediately on their own
102/// thread. Shutdown prevents later submissions but cannot forcefully stop
103/// running OS threads.
104#[derive(Default, Clone)]
105pub struct ThreadPerTaskExecutorService {
106    /// Shared service state used by all clones of this service.
107    state: Arc<ThreadPerTaskExecutorServiceState>,
108}
109
110impl ThreadPerTaskExecutorService {
111    /// Creates a new service instance.
112    ///
113    /// # Returns
114    ///
115    /// A service that accepts tasks until shutdown is requested.
116    #[inline]
117    pub fn new() -> Self {
118        Self::default()
119    }
120}
121
122impl ExecutorService for ThreadPerTaskExecutorService {
123    type Handle<R, E>
124        = TaskHandle<R, E>
125    where
126        R: Send + 'static,
127        E: Send + 'static;
128
129    type Termination<'a>
130        = Pin<Box<dyn Future<Output = ()> + Send + 'a>>
131    where
132        Self: 'a;
133
134    /// Accepts a callable and starts it on a dedicated OS thread.
135    ///
136    /// # Parameters
137    ///
138    /// * `task` - Callable to execute on a new OS thread.
139    ///
140    /// # Returns
141    ///
142    /// A [`TaskHandle`] for the accepted task.
143    ///
144    /// # Errors
145    ///
146    /// Returns [`RejectedExecution::Shutdown`] if shutdown has already been
147    /// requested before the task is accepted.
148    fn submit_callable<C, R, E>(&self, task: C) -> Result<Self::Handle<R, E>, RejectedExecution>
149    where
150        C: Callable<R, E> + Send + 'static,
151        R: Send + 'static,
152        E: Send + 'static,
153    {
154        let submission_guard = self.state.lock_submission();
155        if self.state.shutdown.load() {
156            return Err(RejectedExecution::Shutdown);
157        }
158        self.state.active_tasks.inc();
159        drop(submission_guard);
160
161        let (handle, completion) = TaskCompletionPair::new().into_parts();
162        let state = Arc::clone(&self.state);
163        thread::spawn(move || {
164            TaskRunner::new(task).run(completion);
165            if state.active_tasks.dec() == 0 {
166                state.notify_if_terminated();
167            }
168        });
169        Ok(handle)
170    }
171
172    /// Stops accepting new tasks.
173    ///
174    /// Already accepted threads are allowed to finish.
175    fn shutdown(&self) {
176        let _guard = self.state.lock_submission();
177        self.state.shutdown.store(true);
178        self.state.notify_if_terminated();
179    }
180
181    /// Stops accepting new tasks and reports currently running work.
182    ///
183    /// Running OS threads cannot be forcefully stopped by this service.
184    ///
185    /// # Returns
186    ///
187    /// A report with zero queued tasks, the observed active thread count, and
188    /// zero cancelled tasks.
189    fn shutdown_now(&self) -> ShutdownReport {
190        let _guard = self.state.lock_submission();
191        self.state.shutdown.store(true);
192        let running = self.state.active_tasks.get();
193        self.state.notify_if_terminated();
194        ShutdownReport::new(0, running, 0)
195    }
196
197    /// Returns whether shutdown has been requested.
198    #[inline]
199    fn is_shutdown(&self) -> bool {
200        self.state.shutdown.load()
201    }
202
203    /// Returns whether shutdown was requested and all tasks are finished.
204    #[inline]
205    fn is_terminated(&self) -> bool {
206        self.is_shutdown() && self.state.active_tasks.is_zero()
207    }
208
209    /// Waits for all accepted tasks to complete after shutdown.
210    ///
211    /// This future blocks the polling thread while waiting on a condition
212    /// variable.
213    ///
214    /// # Returns
215    ///
216    /// A future that resolves after shutdown has been requested and all
217    /// accepted OS-thread tasks have completed.
218    #[inline]
219    fn await_termination(&self) -> Self::Termination<'_> {
220        Box::pin(async move {
221            self.state.wait_for_termination();
222        })
223    }
224}