Skip to main content

qubit_executor/service/
thread_per_task_executor_service.rs

1/*******************************************************************************
2 *
3 *    Copyright (c) 2025 - 2026 Haixing Hu.
4 *
5 *    SPDX-License-Identifier: Apache-2.0
6 *
7 *    Licensed under the Apache License, Version 2.0.
8 *
9 ******************************************************************************/
10use std::sync::Arc;
11
12use parking_lot::{
13    Condvar,
14    Mutex,
15};
16use qubit_function::{
17    Callable,
18    Runnable,
19};
20
21use crate::executor::thread_spawn_config::ThreadSpawnConfig;
22use crate::{
23    TaskHandle,
24    TrackedTask,
25    hook::{
26        TaskHook,
27        notify_rejected,
28        notify_rejected_optional,
29    },
30    task::{
31        spi::{
32            TaskEndpointPair,
33            TaskSlot,
34        },
35        task_admission_gate::TaskAdmissionGate,
36    },
37};
38
39use super::{
40    ExecutorService,
41    ExecutorServiceLifecycle,
42    StopReport,
43    SubmissionError,
44    ThreadPerTaskExecutorServiceBuilder,
45};
46type Worker = Box<dyn FnOnce() + Send + 'static>;
47
48/// Handle variants that can cross the accepted task boundary.
49trait TaskAdmissionHandle {
50    /// Marks the underlying task as accepted and emits accepted hooks.
51    fn mark_accepted(&self);
52}
53
54impl<R, E> TaskAdmissionHandle for TaskHandle<R, E> {
55    /// Marks a task handle as accepted.
56    #[inline]
57    fn mark_accepted(&self) {
58        self.accept();
59    }
60}
61
62impl<R, E> TaskAdmissionHandle for TrackedTask<R, E> {
63    /// Marks a tracked task handle as accepted.
64    #[inline]
65    fn mark_accepted(&self) {
66        self.accept();
67    }
68}
69
70/// Mutable service state protected by the service mutex.
71#[derive(Debug, Clone, Copy)]
72struct ServiceState {
73    /// Current lifecycle state.
74    lifecycle: ExecutorServiceLifecycle,
75    /// Number of accepted OS-thread tasks that have not completed.
76    active_tasks: usize,
77}
78
79impl Default for ServiceState {
80    /// Creates a running state with no active tasks.
81    #[inline]
82    fn default() -> Self {
83        Self {
84            lifecycle: ExecutorServiceLifecycle::Running,
85            active_tasks: 0,
86        }
87    }
88}
89
90/// Shared state for [`ThreadPerTaskExecutorService`].
91#[derive(Default)]
92struct ThreadPerTaskExecutorServiceState {
93    /// Lifecycle and active-task counters protected as one state machine.
94    state: Mutex<ServiceState>,
95    /// Condition variable used to wait for service termination.
96    termination: Condvar,
97}
98
99/// Guard that records completion for one accepted task when dropped.
100struct ActiveTaskGuard {
101    /// Shared service state to update when the worker exits.
102    state: Arc<ThreadPerTaskExecutorServiceState>,
103}
104
105impl ActiveTaskGuard {
106    /// Creates a guard for one accepted active task.
107    ///
108    /// # Parameters
109    ///
110    /// * `state` - Shared service state whose active count should be decremented.
111    ///
112    /// # Returns
113    ///
114    /// A guard that finishes the accepted task on drop.
115    #[inline]
116    fn new(state: Arc<ThreadPerTaskExecutorServiceState>) -> Self {
117        Self { state }
118    }
119}
120
121impl Drop for ActiveTaskGuard {
122    /// Records task completion when the worker closure exits.
123    #[inline]
124    fn drop(&mut self) {
125        self.state.finish_task();
126    }
127}
128
129impl ThreadPerTaskExecutorServiceState {
130    /// Returns the currently stored lifecycle state.
131    ///
132    /// # Returns
133    ///
134    /// The lifecycle stored in the service state.
135    #[inline]
136    fn lifecycle(&self) -> ExecutorServiceLifecycle {
137        self.state.lock().lifecycle
138    }
139
140    /// Attempts to accept one task and increments the active task count.
141    ///
142    /// # Returns
143    ///
144    /// `Ok(())` if the service is running and accepted the task.
145    ///
146    /// # Errors
147    ///
148    /// Returns [`SubmissionError::Shutdown`] if the service is not running.
149    #[inline]
150    fn accept_task(&self) -> Result<(), SubmissionError> {
151        let mut state = self.state.lock();
152        if state.lifecycle != ExecutorServiceLifecycle::Running {
153            return Err(SubmissionError::Shutdown);
154        }
155        state.active_tasks += 1;
156        Ok(())
157    }
158
159    /// Records one task completion and wakes termination waiters if appropriate.
160    #[inline]
161    fn finish_task(&self) {
162        let mut state = self.state.lock();
163        state.active_tasks -= 1;
164        Self::terminate_if_ready(&mut state, &self.termination);
165    }
166
167    /// Blocks the current thread until the service is terminated.
168    fn wait_for_termination(&self) {
169        let mut state = self.state.lock();
170        while state.lifecycle != ExecutorServiceLifecycle::Terminated {
171            self.termination.wait(&mut state);
172        }
173    }
174
175    /// Requests graceful shutdown.
176    #[inline]
177    fn shutdown(&self) {
178        let mut state = self.state.lock();
179        if state.lifecycle == ExecutorServiceLifecycle::Running {
180            state.lifecycle = ExecutorServiceLifecycle::ShuttingDown;
181        }
182        Self::terminate_if_ready(&mut state, &self.termination);
183    }
184
185    /// Requests abrupt stop and returns the observed active work count.
186    ///
187    /// # Returns
188    ///
189    /// The number of active tasks observed while stopping.
190    #[inline]
191    fn stop(&self) -> usize {
192        let mut state = self.state.lock();
193        if state.lifecycle != ExecutorServiceLifecycle::Terminated {
194            state.lifecycle = ExecutorServiceLifecycle::Stopping;
195        }
196        let running = state.active_tasks;
197        Self::terminate_if_ready(&mut state, &self.termination);
198        running
199    }
200
201    /// Marks the service terminated when it is non-running and idle.
202    #[inline]
203    fn terminate_if_ready(state: &mut ServiceState, termination: &Condvar) {
204        if state.lifecycle != ExecutorServiceLifecycle::Running && state.active_tasks == 0 {
205            state.lifecycle = ExecutorServiceLifecycle::Terminated;
206            termination.notify_all();
207        }
208    }
209}
210
211/// Managed service that runs every accepted task on a dedicated OS thread.
212///
213/// The service has no queue: accepted tasks start immediately on their own
214/// thread. Shutdown prevents later submissions but cannot forcefully stop
215/// running OS threads.
216#[derive(Clone)]
217pub struct ThreadPerTaskExecutorService {
218    /// Shared service state used by all clones of this service.
219    state: Arc<ThreadPerTaskExecutorServiceState>,
220    /// Optional stack size for each spawned worker thread.
221    stack_size: Option<usize>,
222    /// Hook notified about accepted task lifecycle events.
223    pub(crate) hook: Option<Arc<dyn TaskHook>>,
224}
225
226impl Default for ThreadPerTaskExecutorService {
227    /// Creates a service with default worker options and no hook.
228    #[inline]
229    fn default() -> Self {
230        Self {
231            state: Arc::default(),
232            stack_size: None,
233            hook: None,
234        }
235    }
236}
237
238impl ThreadPerTaskExecutorService {
239    /// Creates a new service instance.
240    ///
241    /// # Returns
242    ///
243    /// A service that accepts tasks until shutdown is requested.
244    #[inline]
245    pub fn new() -> Self {
246        Self::default()
247    }
248
249    /// Creates a service with the supplied worker stack size configuration.
250    ///
251    /// # Parameters
252    ///
253    /// * `stack_size` - Optional stack size in bytes for spawned workers.
254    ///
255    /// # Returns
256    ///
257    /// A service using the supplied worker stack size configuration.
258    #[inline]
259    pub(crate) fn from_stack_size(stack_size: Option<usize>) -> Self {
260        Self {
261            state: Arc::default(),
262            stack_size,
263            hook: None,
264        }
265    }
266
267    /// Creates a builder for configuring this service.
268    ///
269    /// # Returns
270    ///
271    /// A builder initialized with default worker thread options.
272    #[inline]
273    pub fn builder() -> ThreadPerTaskExecutorServiceBuilder {
274        ThreadPerTaskExecutorServiceBuilder::new()
275    }
276
277    /// Spawns one accepted worker thread.
278    ///
279    /// # Parameters
280    ///
281    /// * `worker` - Closure to run on the worker OS thread.
282    ///
283    /// # Returns
284    ///
285    /// `Ok(())` if the worker was spawned.
286    ///
287    /// # Errors
288    ///
289    /// Returns [`SubmissionError::WorkerSpawnFailed`] if the operating system
290    /// refuses to create the worker thread. Accepted task accounting is handled
291    /// by the active-task guard captured by `worker`.
292    fn spawn_worker_after_accept(&self, worker: Worker) -> Result<(), SubmissionError> {
293        ThreadSpawnConfig::new(self.stack_size).spawn(worker)
294    }
295
296    /// Notifies the configured hook about a rejected submission.
297    ///
298    /// # Parameters
299    ///
300    /// * `error` - Submission failure reported to the caller.
301    #[inline]
302    fn notify_rejected(&self, error: &SubmissionError) {
303        if let Some(hook) = &self.hook {
304            notify_rejected(hook.as_ref(), error);
305        }
306    }
307
308    /// Accepts service work, starts a worker thread, and returns the chosen handle.
309    ///
310    /// # Parameters
311    ///
312    /// * `split_pair` - Splits the task endpoint pair into the desired handle and slot.
313    /// * `run_slot` - Worker body that consumes the runner-side task slot.
314    ///
315    /// # Returns
316    ///
317    /// The accepted task handle produced by `split_pair`.
318    ///
319    /// # Errors
320    ///
321    /// Returns [`SubmissionError::Shutdown`] if the service is not running, or
322    /// [`SubmissionError::WorkerSpawnFailed`] if the worker thread cannot be created.
323    fn submit_with_slot<R, E, H, S, F>(
324        &self,
325        split_pair: S,
326        run_slot: F,
327    ) -> Result<H, SubmissionError>
328    where
329        R: Send + 'static,
330        E: Send + 'static,
331        H: TaskAdmissionHandle,
332        S: FnOnce(TaskEndpointPair<R, E>) -> (H, TaskSlot<R, E>),
333        F: FnOnce(TaskSlot<R, E>) + Send + 'static,
334    {
335        if let Err(error) = self.state.accept_task() {
336            self.notify_rejected(&error);
337            return Err(error);
338        }
339
340        let pair = TaskEndpointPair::with_optional_hook(self.hook.clone());
341        let (handle, slot) = split_pair(pair);
342        let guard = ActiveTaskGuard::new(Arc::clone(&self.state));
343        let gate = TaskAdmissionGate::new(self.hook.is_some());
344        let worker_gate = gate.clone();
345        let hook = self.hook.clone();
346        if let Err(error) = self.spawn_worker_after_accept(Box::new(move || {
347            worker_gate.wait();
348            let _guard = guard;
349            run_slot(slot);
350        })) {
351            notify_rejected_optional(hook.as_ref(), &error);
352            return Err(error);
353        }
354        handle.mark_accepted();
355        gate.open();
356        Ok(handle)
357    }
358}
359
360impl ExecutorService for ThreadPerTaskExecutorService {
361    type ResultHandle<R, E>
362        = TaskHandle<R, E>
363    where
364        R: Send + 'static,
365        E: Send + 'static;
366
367    type TrackedHandle<R, E>
368        = TrackedTask<R, E>
369    where
370        R: Send + 'static,
371        E: Send + 'static;
372
373    /// Accepts a runnable and starts it on a dedicated OS thread.
374    ///
375    /// # Parameters
376    ///
377    /// * `task` - Runnable to execute on a new OS thread.
378    ///
379    /// # Returns
380    ///
381    /// `Ok(())` if the runnable was accepted.
382    ///
383    /// # Errors
384    ///
385    /// Returns [`SubmissionError::Shutdown`] if shutdown has already been
386    /// requested before the task is accepted.
387    fn submit<T, E>(&self, task: T) -> Result<(), SubmissionError>
388    where
389        T: Runnable<E> + Send + 'static,
390        E: Send + 'static,
391    {
392        let handle = self.submit_with_slot(
393            |pair| pair.into_parts(),
394            move |slot| {
395                let mut task = task;
396                slot.run(move || task.run());
397            },
398        )?;
399        drop(handle);
400        Ok(())
401    }
402
403    /// Accepts a callable and starts it on a dedicated OS thread.
404    ///
405    /// # Parameters
406    ///
407    /// * `task` - Callable to execute on a new OS thread.
408    ///
409    /// # Returns
410    ///
411    /// A [`TaskHandle`] for the accepted task.
412    ///
413    /// # Errors
414    ///
415    /// Returns [`SubmissionError::Shutdown`] if shutdown has already been
416    /// requested before the task is accepted.
417    fn submit_callable<C, R, E>(&self, task: C) -> Result<Self::ResultHandle<R, E>, SubmissionError>
418    where
419        C: Callable<R, E> + Send + 'static,
420        R: Send + 'static,
421        E: Send + 'static,
422    {
423        self.submit_with_slot(
424            |pair| pair.into_parts(),
425            move |slot| {
426                slot.run(task);
427            },
428        )
429    }
430
431    /// Accepts a callable and starts it with a tracked handle.
432    fn submit_tracked_callable<C, R, E>(
433        &self,
434        task: C,
435    ) -> Result<Self::TrackedHandle<R, E>, SubmissionError>
436    where
437        C: Callable<R, E> + Send + 'static,
438        R: Send + 'static,
439        E: Send + 'static,
440    {
441        self.submit_with_slot(
442            |pair| pair.into_tracked_parts(),
443            move |slot| {
444                slot.run(task);
445            },
446        )
447    }
448
449    /// Stops accepting new tasks.
450    ///
451    /// Already accepted threads are allowed to finish.
452    fn shutdown(&self) {
453        self.state.shutdown();
454    }
455
456    /// Stops accepting new tasks and reports currently running work.
457    ///
458    /// Running OS threads cannot be forcefully stopped by this service.
459    ///
460    /// # Returns
461    ///
462    /// A report with zero queued tasks, the observed active thread count, and
463    /// zero cancelled tasks.
464    fn stop(&self) -> StopReport {
465        let running = self.state.stop();
466        StopReport::new(0, running, 0)
467    }
468
469    /// Returns the current lifecycle state.
470    #[inline]
471    fn lifecycle(&self) -> ExecutorServiceLifecycle {
472        self.state.lifecycle()
473    }
474
475    /// Blocks until all accepted tasks complete after shutdown or stop.
476    ///
477    /// This method blocks the current thread on a condition variable. Calling
478    /// it while the service is still running will wait until another thread
479    /// calls [`Self::shutdown`] or [`Self::stop`] and all accepted OS-thread
480    /// tasks have completed.
481    #[inline]
482    fn wait_termination(&self) {
483        self.state.wait_for_termination();
484    }
485}