Skip to main content

qubit_executor/executor/
thread_per_task_executor.rs

1/*******************************************************************************
2 *
3 *    Copyright (c) 2025 - 2026 Haixing Hu.
4 *
5 *    SPDX-License-Identifier: Apache-2.0
6 *
7 *    Licensed under the Apache License, Version 2.0.
8 *
9 ******************************************************************************/
10use std::sync::Arc;
11
12use qubit_function::Callable;
13
14use crate::{
15    TrackedTask,
16    hook::{
17        TaskHook,
18        notify_rejected_optional,
19    },
20    service::SubmissionError,
21    task::{
22        spi::TaskEndpointPair,
23        task_admission_gate::TaskAdmissionGate,
24    },
25};
26
27use super::{
28    Executor,
29    ThreadPerTaskExecutorBuilder,
30    thread_spawn_config::ThreadSpawnConfig,
31};
32
33/// Executes each task on a dedicated OS thread.
34///
35/// This executor does not manage lifecycle or maintain a queue. Each accepted
36/// task receives a [`TrackedTask`] that can be used to wait for the result.
37///
38/// # Semantics
39///
40/// * **One task, one thread** — each [`Executor::call`] or [`Executor::execute`]
41///   spawns a new OS thread. There is no pool and no submission queue.
42/// * **Blocking or async wait** — [`TrackedTask::get`] blocks the calling thread,
43///   while awaiting the handle uses a waker and does not block the polling
44///   thread.
45/// * **Completion probe** — [`TrackedTask::is_done`] reads the terminal task
46///   state; result publication to the handle may still be racing with that
47///   observation (you still need [`TrackedTask::get`] for the value).
48///
49/// # Examples
50///
51/// ```rust
52/// use std::io;
53///
54/// use qubit_executor::{Executor, ThreadPerTaskExecutor};
55///
56/// let executor = ThreadPerTaskExecutor::new();
57/// let handle = executor
58///     .call(|| Ok::<i32, io::Error>(40 + 2))
59///     .expect("worker thread should spawn");
60///
61/// // Blocks the current thread until the spawned thread completes.
62/// let value = handle.get().expect("task should succeed");
63/// assert_eq!(value, 42);
64/// ```
65#[derive(Clone)]
66pub struct ThreadPerTaskExecutor {
67    /// Optional stack size for each spawned worker thread.
68    pub(crate) stack_size: Option<usize>,
69    /// Hook notified about accepted task lifecycle events.
70    pub(crate) hook: Option<Arc<dyn TaskHook>>,
71}
72
73impl ThreadPerTaskExecutor {
74    /// Creates an executor using the platform default worker stack size.
75    ///
76    /// # Returns
77    ///
78    /// A thread-per-task executor with default worker thread configuration.
79    #[inline]
80    pub fn new() -> Self {
81        Self::default()
82    }
83
84    /// Creates a builder for configuring this executor.
85    ///
86    /// # Returns
87    ///
88    /// A builder initialized with default worker thread options.
89    #[inline]
90    pub fn builder() -> ThreadPerTaskExecutorBuilder {
91        ThreadPerTaskExecutorBuilder::new()
92    }
93
94    /// Returns a copy of this executor using the supplied task hook.
95    ///
96    /// # Parameters
97    ///
98    /// * `hook` - Hook notified about accepted task lifecycle events.
99    ///
100    /// # Returns
101    ///
102    /// This executor configured with `hook`.
103    #[inline]
104    pub fn with_hook(mut self, hook: Arc<dyn TaskHook>) -> Self {
105        self.hook = Some(hook);
106        self
107    }
108
109    /// Spawns one worker thread.
110    ///
111    /// # Parameters
112    ///
113    /// * `worker` - Closure to run on the new OS thread.
114    ///
115    /// # Returns
116    ///
117    /// `Ok(())` if the worker was spawned.
118    ///
119    /// # Errors
120    ///
121    /// Returns [`SubmissionError::WorkerSpawnFailed`] if the operating system
122    /// refuses to create the worker thread.
123    fn spawn_worker(&self, worker: impl FnOnce() + Send + 'static) -> Result<(), SubmissionError> {
124        ThreadSpawnConfig::new(self.stack_size).spawn(worker)
125    }
126}
127
128impl Default for ThreadPerTaskExecutor {
129    /// Creates an executor using the platform default worker stack size and no hook.
130    #[inline]
131    fn default() -> Self {
132        Self {
133            stack_size: None,
134            hook: None,
135        }
136    }
137}
138
139impl Executor for ThreadPerTaskExecutor {
140    /// Spawns one OS thread for the callable and returns a handle to its result.
141    ///
142    /// # Parameters
143    ///
144    /// * `task` - Callable to run on a dedicated OS thread.
145    ///
146    /// # Returns
147    ///
148    /// A [`TrackedTask`] that can block or await the spawned task's final
149    /// result.
150    ///
151    /// # Errors
152    ///
153    /// Returns [`SubmissionError::WorkerSpawnFailed`] if the worker thread
154    /// cannot be created.
155    fn call<C, R, E>(&self, task: C) -> Result<TrackedTask<R, E>, SubmissionError>
156    where
157        C: Callable<R, E> + Send + 'static,
158        R: Send + 'static,
159        E: Send + 'static,
160    {
161        let (handle, slot) =
162            TaskEndpointPair::with_optional_hook(self.hook.clone()).into_tracked_parts();
163        let gate = TaskAdmissionGate::new(self.hook.is_some());
164        let worker_gate = gate.clone();
165        let hook = self.hook.clone();
166        self.spawn_worker(move || {
167            worker_gate.wait();
168            slot.run(task);
169        })
170        .inspect_err(|error| notify_rejected_optional(hook.as_ref(), error))?;
171        handle.accept();
172        gate.open();
173        Ok(handle)
174    }
175}