Skip to main content

qubit_executor/service/
thread_per_task_executor_service.rs

1/*******************************************************************************
2 *
3 *    Copyright (c) 2025 - 2026 Haixing Hu.
4 *
5 *    SPDX-License-Identifier: Apache-2.0
6 *
7 *    Licensed under the Apache License, Version 2.0.
8 *
9 ******************************************************************************/
10use std::sync::Arc;
11
12use parking_lot::{
13    Condvar,
14    Mutex,
15};
16use qubit_function::{
17    Callable,
18    Runnable,
19};
20
21use crate::executor::thread_spawn_config::ThreadSpawnConfig;
22use crate::{
23    TaskHandle,
24    TrackedTask,
25    hook::{
26        TaskHook,
27        notify_rejected,
28        notify_rejected_optional,
29    },
30    task::{
31        spi::{
32            TaskEndpointPair,
33            TaskSlot,
34        },
35        task_admission_gate::TaskAdmissionGate,
36    },
37};
38
39use super::{
40    ExecutorService,
41    ExecutorServiceLifecycle,
42    StopReport,
43    SubmissionError,
44    ThreadPerTaskExecutorServiceBuilder,
45};
46type Worker = Box<dyn FnOnce() + Send + 'static>;
47
48/// Handle variants that can cross the accepted task boundary.
49trait TaskAdmissionHandle {
50    /// Marks the underlying task as accepted and emits accepted hooks.
51    fn mark_accepted(&self);
52}
53
54impl<R, E> TaskAdmissionHandle for TaskHandle<R, E> {
55    /// Marks a task handle as accepted.
56    #[inline]
57    fn mark_accepted(&self) {
58        self.accept();
59    }
60}
61
62impl<R, E> TaskAdmissionHandle for TrackedTask<R, E> {
63    /// Marks a tracked task handle as accepted.
64    #[inline]
65    fn mark_accepted(&self) {
66        self.accept();
67    }
68}
69
70/// Mutable service state protected by the service mutex.
71#[derive(Debug, Clone, Copy)]
72struct ServiceState {
73    /// Current lifecycle state.
74    lifecycle: ExecutorServiceLifecycle,
75    /// Number of accepted OS-thread tasks that have not completed.
76    active_tasks: usize,
77}
78
79impl Default for ServiceState {
80    /// Creates a running state with no active tasks.
81    #[inline]
82    fn default() -> Self {
83        Self {
84            lifecycle: ExecutorServiceLifecycle::Running,
85            active_tasks: 0,
86        }
87    }
88}
89
90/// Shared state for [`ThreadPerTaskExecutorService`].
91#[derive(Default)]
92struct ThreadPerTaskExecutorServiceState {
93    /// Lifecycle and active-task counters protected as one state machine.
94    state: Mutex<ServiceState>,
95    /// Condition variable used to wait for service termination.
96    termination: Condvar,
97}
98
99/// Guard that records completion for one accepted task when dropped.
100struct ActiveTaskGuard {
101    /// Shared service state to update when the worker exits.
102    state: Arc<ThreadPerTaskExecutorServiceState>,
103}
104
105impl ActiveTaskGuard {
106    /// Creates a guard for one accepted active task.
107    ///
108    /// # Parameters
109    ///
110    /// * `state` - Shared service state whose active count should be decremented.
111    ///
112    /// # Returns
113    ///
114    /// A guard that finishes the accepted task on drop.
115    #[inline]
116    fn new(state: Arc<ThreadPerTaskExecutorServiceState>) -> Self {
117        Self { state }
118    }
119}
120
121impl Drop for ActiveTaskGuard {
122    /// Records task completion when the worker closure exits.
123    #[inline]
124    fn drop(&mut self) {
125        self.state.finish_task();
126    }
127}
128
129impl ThreadPerTaskExecutorServiceState {
130    /// Returns the currently stored lifecycle state.
131    ///
132    /// # Returns
133    ///
134    /// The lifecycle stored in the service state.
135    #[inline]
136    fn lifecycle(&self) -> ExecutorServiceLifecycle {
137        self.state.lock().lifecycle
138    }
139
140    /// Attempts to accept one task and increments the active task count.
141    ///
142    /// # Returns
143    ///
144    /// `Ok(())` if the service is running and accepted the task.
145    ///
146    /// # Errors
147    ///
148    /// Returns [`SubmissionError::Shutdown`] if the service is not running.
149    #[inline]
150    fn accept_task(&self) -> Result<(), SubmissionError> {
151        let mut state = self.state.lock();
152        if state.lifecycle != ExecutorServiceLifecycle::Running {
153            return Err(SubmissionError::Shutdown);
154        }
155        state.active_tasks += 1;
156        Ok(())
157    }
158
159    /// Records one task completion and wakes termination waiters if appropriate.
160    #[inline]
161    fn finish_task(&self) {
162        let mut state = self.state.lock();
163        state.active_tasks -= 1;
164        Self::terminate_if_ready(&mut state, &self.termination);
165    }
166
167    /// Blocks the current thread until the service is terminated.
168    fn wait_for_termination(&self) {
169        let mut state = self.state.lock();
170        while state.lifecycle != ExecutorServiceLifecycle::Terminated {
171            self.termination.wait(&mut state);
172        }
173    }
174
175    /// Requests graceful shutdown.
176    #[inline]
177    fn shutdown(&self) {
178        let mut state = self.state.lock();
179        if state.lifecycle == ExecutorServiceLifecycle::Running {
180            state.lifecycle = ExecutorServiceLifecycle::ShuttingDown;
181        }
182        Self::terminate_if_ready(&mut state, &self.termination);
183    }
184
185    /// Requests abrupt stop and returns the observed active work count.
186    ///
187    /// # Returns
188    ///
189    /// The number of active tasks observed while stopping.
190    #[inline]
191    fn stop(&self) -> usize {
192        let mut state = self.state.lock();
193        if state.lifecycle != ExecutorServiceLifecycle::Terminated {
194            state.lifecycle = ExecutorServiceLifecycle::Stopping;
195        }
196        let running = state.active_tasks;
197        Self::terminate_if_ready(&mut state, &self.termination);
198        running
199    }
200
201    /// Marks the service terminated when it is non-running and idle.
202    #[inline]
203    fn terminate_if_ready(state: &mut ServiceState, termination: &Condvar) {
204        if state.lifecycle != ExecutorServiceLifecycle::Running && state.active_tasks == 0 {
205            state.lifecycle = ExecutorServiceLifecycle::Terminated;
206            termination.notify_all();
207        }
208    }
209}
210
211/// Managed service that runs every accepted task on a dedicated OS thread.
212///
213/// The service has no queue: accepted tasks start immediately on their own
214/// thread. Shutdown prevents later submissions but cannot forcefully stop
215/// running OS threads.
216#[derive(Clone)]
217pub struct ThreadPerTaskExecutorService {
218    /// Shared service state used by all clones of this service.
219    state: Arc<ThreadPerTaskExecutorServiceState>,
220    /// Optional stack size for each spawned worker thread.
221    stack_size: Option<usize>,
222    /// Hook notified about accepted task lifecycle events.
223    pub(crate) hook: Option<Arc<dyn TaskHook>>,
224}
225
226impl Default for ThreadPerTaskExecutorService {
227    /// Creates a service with default worker options and no hook.
228    #[inline]
229    fn default() -> Self {
230        Self {
231            state: Arc::default(),
232            stack_size: None,
233            hook: None,
234        }
235    }
236}
237
238impl ThreadPerTaskExecutorService {
239    /// Creates a new service instance.
240    ///
241    /// # Returns
242    ///
243    /// A service that accepts tasks until shutdown is requested.
244    #[inline]
245    pub fn new() -> Self {
246        Self::default()
247    }
248
249    /// Creates a service with the supplied worker stack size configuration.
250    ///
251    /// # Parameters
252    ///
253    /// * `stack_size` - Optional stack size in bytes for spawned workers.
254    ///
255    /// # Returns
256    ///
257    /// A service using the supplied worker stack size configuration.
258    #[inline]
259    pub(crate) fn from_stack_size(stack_size: Option<usize>) -> Self {
260        Self {
261            state: Arc::default(),
262            stack_size,
263            hook: None,
264        }
265    }
266
267    /// Creates a builder for configuring this service.
268    ///
269    /// # Returns
270    ///
271    /// A builder initialized with default worker thread options.
272    #[inline]
273    pub fn builder() -> ThreadPerTaskExecutorServiceBuilder {
274        ThreadPerTaskExecutorServiceBuilder::new()
275    }
276
277    /// Spawns one accepted worker thread.
278    ///
279    /// # Parameters
280    ///
281    /// * `worker` - Closure to run on the worker OS thread.
282    ///
283    /// # Returns
284    ///
285    /// `Ok(())` if the worker was spawned.
286    ///
287    /// # Errors
288    ///
289    /// Returns [`SubmissionError::WorkerSpawnFailed`] if the operating system
290    /// refuses to create the worker thread. Accepted task accounting is handled
291    /// by the active-task guard captured by `worker`.
292    fn spawn_worker_after_accept(&self, worker: Worker) -> Result<(), SubmissionError> {
293        ThreadSpawnConfig::new(self.stack_size).spawn(worker)
294    }
295
296    /// Notifies the configured hook about a rejected submission.
297    ///
298    /// # Parameters
299    ///
300    /// * `error` - Submission failure reported to the caller.
301    #[inline]
302    fn notify_rejected(&self, error: &SubmissionError) {
303        if let Some(hook) = &self.hook {
304            notify_rejected(hook.as_ref(), error);
305        }
306    }
307
308    /// Accepts service work, starts a worker thread, and returns the chosen handle.
309    ///
310    /// # Parameters
311    ///
312    /// * `split_pair` - Splits the task endpoint pair into the desired handle and slot.
313    /// * `run_slot` - Worker body that consumes the runner-side task slot.
314    ///
315    /// # Returns
316    ///
317    /// The accepted task handle produced by `split_pair`.
318    ///
319    /// # Errors
320    ///
321    /// Returns [`SubmissionError::Shutdown`] if the service is not running, or
322    /// [`SubmissionError::WorkerSpawnFailed`] if the worker thread cannot be created.
323    fn submit_with_slot<R, E, H, S, F>(&self, split_pair: S, run_slot: F) -> Result<H, SubmissionError>
324    where
325        R: Send + 'static,
326        E: Send + 'static,
327        H: TaskAdmissionHandle,
328        S: FnOnce(TaskEndpointPair<R, E>) -> (H, TaskSlot<R, E>),
329        F: FnOnce(TaskSlot<R, E>) + Send + 'static,
330    {
331        if let Err(error) = self.state.accept_task() {
332            self.notify_rejected(&error);
333            return Err(error);
334        }
335
336        let pair = TaskEndpointPair::with_optional_hook(self.hook.clone());
337        let (handle, slot) = split_pair(pair);
338        let guard = ActiveTaskGuard::new(Arc::clone(&self.state));
339        let gate = TaskAdmissionGate::new(self.hook.is_some());
340        let worker_gate = gate.clone();
341        let hook = self.hook.clone();
342        if let Err(error) = self.spawn_worker_after_accept(Box::new(move || {
343            worker_gate.wait();
344            let _guard = guard;
345            run_slot(slot);
346        })) {
347            notify_rejected_optional(hook.as_ref(), &error);
348            return Err(error);
349        }
350        handle.mark_accepted();
351        gate.open();
352        Ok(handle)
353    }
354}
355
356impl ExecutorService for ThreadPerTaskExecutorService {
357    type ResultHandle<R, E>
358        = TaskHandle<R, E>
359    where
360        R: Send + 'static,
361        E: Send + 'static;
362
363    type TrackedHandle<R, E>
364        = TrackedTask<R, E>
365    where
366        R: Send + 'static,
367        E: Send + 'static;
368
369    /// Accepts a runnable and starts it on a dedicated OS thread.
370    ///
371    /// # Parameters
372    ///
373    /// * `task` - Runnable to execute on a new OS thread.
374    ///
375    /// # Returns
376    ///
377    /// `Ok(())` if the runnable was accepted.
378    ///
379    /// # Errors
380    ///
381    /// Returns [`SubmissionError::Shutdown`] if shutdown has already been
382    /// requested before the task is accepted.
383    fn submit<T, E>(&self, task: T) -> Result<(), SubmissionError>
384    where
385        T: Runnable<E> + Send + 'static,
386        E: Send + 'static,
387    {
388        let handle = self.submit_with_slot(
389            |pair| pair.into_parts(),
390            move |slot| {
391                let mut task = task;
392                slot.run(move || task.run());
393            },
394        )?;
395        drop(handle);
396        Ok(())
397    }
398
399    /// Accepts a callable and starts it on a dedicated OS thread.
400    ///
401    /// # Parameters
402    ///
403    /// * `task` - Callable to execute on a new OS thread.
404    ///
405    /// # Returns
406    ///
407    /// A [`TaskHandle`] for the accepted task.
408    ///
409    /// # Errors
410    ///
411    /// Returns [`SubmissionError::Shutdown`] if shutdown has already been
412    /// requested before the task is accepted.
413    fn submit_callable<C, R, E>(&self, task: C) -> Result<Self::ResultHandle<R, E>, SubmissionError>
414    where
415        C: Callable<R, E> + Send + 'static,
416        R: Send + 'static,
417        E: Send + 'static,
418    {
419        self.submit_with_slot(
420            |pair| pair.into_parts(),
421            move |slot| {
422                slot.run(task);
423            },
424        )
425    }
426
427    /// Accepts a callable and starts it with a tracked handle.
428    fn submit_tracked_callable<C, R, E>(&self, task: C) -> Result<Self::TrackedHandle<R, E>, SubmissionError>
429    where
430        C: Callable<R, E> + Send + 'static,
431        R: Send + 'static,
432        E: Send + 'static,
433    {
434        self.submit_with_slot(
435            |pair| pair.into_tracked_parts(),
436            move |slot| {
437                slot.run(task);
438            },
439        )
440    }
441
442    /// Stops accepting new tasks.
443    ///
444    /// Already accepted threads are allowed to finish.
445    fn shutdown(&self) {
446        self.state.shutdown();
447    }
448
449    /// Stops accepting new tasks and reports currently running work.
450    ///
451    /// Running OS threads cannot be forcefully stopped by this service.
452    ///
453    /// # Returns
454    ///
455    /// A report with zero queued tasks, the observed active thread count, and
456    /// zero cancelled tasks.
457    fn stop(&self) -> StopReport {
458        let running = self.state.stop();
459        StopReport::new(0, running, 0)
460    }
461
462    /// Returns the current lifecycle state.
463    #[inline]
464    fn lifecycle(&self) -> ExecutorServiceLifecycle {
465        self.state.lifecycle()
466    }
467
468    /// Blocks until all accepted tasks complete after shutdown or stop.
469    ///
470    /// This method blocks the current thread on a condition variable. Calling
471    /// it while the service is still running will wait until another thread
472    /// calls [`Self::shutdown`] or [`Self::stop`] and all accepted OS-thread
473    /// tasks have completed.
474    #[inline]
475    fn wait_termination(&self) {
476        self.state.wait_for_termination();
477    }
478}