Skip to main content

qubit_executor/executor/
thread_per_task_executor.rs

1/*******************************************************************************
2 *
3 *    Copyright (c) 2025 - 2026 Haixing Hu.
4 *
5 *    SPDX-License-Identifier: Apache-2.0
6 *
7 *    Licensed under the Apache License, Version 2.0.
8 *
9 ******************************************************************************/
10use std::sync::Arc;
11
12use qubit_function::Callable;
13
14use crate::{
15    TrackedTask,
16    hook::{
17        TaskHook,
18        notify_rejected_optional,
19    },
20    service::SubmissionError,
21    task::{
22        spi::TaskEndpointPair,
23        task_admission_gate::TaskAdmissionGate,
24    },
25};
26
27use super::{
28    Executor,
29    ThreadPerTaskExecutorBuilder,
30    thread_spawn_config::ThreadSpawnConfig,
31};
32
33/// Executes each task on a dedicated OS thread.
34///
35/// This executor does not manage lifecycle or maintain a queue. Each accepted
36/// task receives a [`TrackedTask`] that can be used to wait for the result.
37///
38/// # Semantics
39///
40/// * **One task, one thread** — each [`Executor::call`] or [`Executor::execute`]
41///   spawns a new OS thread. There is no pool and no submission queue.
42/// * **Blocking or async wait** — [`TrackedTask::get`] blocks the calling thread,
43///   while awaiting the handle uses a waker and does not block the polling
44///   thread.
45/// * **Completion probe** — [`TrackedTask::is_done`] reads the terminal task
46///   state; result publication to the handle may still be racing with that
47///   observation (you still need [`TrackedTask::get`] for the value).
48///
49/// # Examples
50///
51/// ```rust
52/// use std::io;
53///
54/// use qubit_executor::executor::{
55///     Executor,
56///     ThreadPerTaskExecutor,
57/// };
58///
59/// let executor = ThreadPerTaskExecutor::new();
60/// let handle = executor
61///     .call(|| Ok::<i32, io::Error>(40 + 2))
62///     .expect("worker thread should spawn");
63///
64/// // Blocks the current thread until the spawned thread completes.
65/// let value = handle.get().expect("task should succeed");
66/// assert_eq!(value, 42);
67/// ```
68#[derive(Clone)]
69pub struct ThreadPerTaskExecutor {
70    /// Optional stack size for each spawned worker thread.
71    pub(crate) stack_size: Option<usize>,
72    /// Hook notified about accepted task lifecycle events.
73    pub(crate) hook: Option<Arc<dyn TaskHook>>,
74}
75
76impl ThreadPerTaskExecutor {
77    /// Creates an executor using the platform default worker stack size.
78    ///
79    /// # Returns
80    ///
81    /// A thread-per-task executor with default worker thread configuration.
82    #[inline]
83    pub fn new() -> Self {
84        Self::default()
85    }
86
87    /// Creates a builder for configuring this executor.
88    ///
89    /// # Returns
90    ///
91    /// A builder initialized with default worker thread options.
92    #[inline]
93    pub fn builder() -> ThreadPerTaskExecutorBuilder {
94        ThreadPerTaskExecutorBuilder::new()
95    }
96
97    /// Returns a copy of this executor using the supplied task hook.
98    ///
99    /// # Parameters
100    ///
101    /// * `hook` - Hook notified about accepted task lifecycle events.
102    ///
103    /// # Returns
104    ///
105    /// This executor configured with `hook`.
106    #[inline]
107    pub fn with_hook(mut self, hook: Arc<dyn TaskHook>) -> Self {
108        self.hook = Some(hook);
109        self
110    }
111
112    /// Spawns one worker thread.
113    ///
114    /// # Parameters
115    ///
116    /// * `worker` - Closure to run on the new OS thread.
117    ///
118    /// # Returns
119    ///
120    /// `Ok(())` if the worker was spawned.
121    ///
122    /// # Errors
123    ///
124    /// Returns [`SubmissionError::WorkerSpawnFailed`] if the operating system
125    /// refuses to create the worker thread.
126    fn spawn_worker(&self, worker: impl FnOnce() + Send + 'static) -> Result<(), SubmissionError> {
127        ThreadSpawnConfig::new(self.stack_size).spawn(worker)
128    }
129}
130
131impl Default for ThreadPerTaskExecutor {
132    /// Creates an executor using the platform default worker stack size and no hook.
133    #[inline]
134    fn default() -> Self {
135        Self {
136            stack_size: None,
137            hook: None,
138        }
139    }
140}
141
142impl Executor for ThreadPerTaskExecutor {
143    /// Spawns one OS thread for the callable and returns a handle to its result.
144    ///
145    /// # Parameters
146    ///
147    /// * `task` - Callable to run on a dedicated OS thread.
148    ///
149    /// # Returns
150    ///
151    /// A [`TrackedTask`] that can block or await the spawned task's final
152    /// result.
153    ///
154    /// # Errors
155    ///
156    /// Returns [`SubmissionError::WorkerSpawnFailed`] if the worker thread
157    /// cannot be created.
158    fn call<C, R, E>(&self, task: C) -> Result<TrackedTask<R, E>, SubmissionError>
159    where
160        C: Callable<R, E> + Send + 'static,
161        R: Send + 'static,
162        E: Send + 'static,
163    {
164        let (handle, slot) =
165            TaskEndpointPair::with_optional_hook(self.hook.clone()).into_tracked_parts();
166        let gate = TaskAdmissionGate::new(self.hook.is_some());
167        let worker_gate = gate.clone();
168        let hook = self.hook.clone();
169        self.spawn_worker(move || {
170            worker_gate.wait();
171            slot.run(task);
172        })
173        .inspect_err(|error| notify_rejected_optional(hook.as_ref(), error))?;
174        handle.accept();
175        gate.open();
176        Ok(handle)
177    }
178}