qubit_executor/executor/thread_per_task_executor.rs
1/*******************************************************************************
2 *
3 * Copyright (c) 2025 - 2026 Haixing Hu.
4 *
5 * SPDX-License-Identifier: Apache-2.0
6 *
7 * Licensed under the Apache License, Version 2.0.
8 *
9 ******************************************************************************/
10use std::sync::Arc;
11
12use qubit_function::Callable;
13
14use crate::{
15 TrackedTask,
16 hook::{
17 TaskHook,
18 notify_rejected_optional,
19 },
20 service::SubmissionError,
21 task::{
22 spi::TaskEndpointPair,
23 task_admission_gate::TaskAdmissionGate,
24 },
25};
26
27use super::{
28 Executor,
29 ThreadPerTaskExecutorBuilder,
30 thread_spawn_config::ThreadSpawnConfig,
31};
32
33/// Executes each task on a dedicated OS thread.
34///
35/// This executor does not manage lifecycle or maintain a queue. Each accepted
36/// task receives a [`TrackedTask`] that can be used to wait for the result.
37///
38/// # Semantics
39///
40/// * **One task, one thread** — each [`Executor::call`] or [`Executor::execute`]
41/// spawns a new OS thread. There is no pool and no submission queue.
42/// * **Blocking or async wait** — [`TrackedTask::get`] blocks the calling thread,
43/// while awaiting the handle uses a waker and does not block the polling
44/// thread.
45/// * **Completion probe** — [`TrackedTask::is_done`] reads the terminal task
46/// state; result publication to the handle may still be racing with that
47/// observation (you still need [`TrackedTask::get`] for the value).
48///
49/// # Examples
50///
51/// ```rust
52/// use std::io;
53///
54/// use qubit_executor::executor::{
55/// Executor,
56/// ThreadPerTaskExecutor,
57/// };
58///
59/// let executor = ThreadPerTaskExecutor::new();
60/// let handle = executor
61/// .call(|| Ok::<i32, io::Error>(40 + 2))
62/// .expect("worker thread should spawn");
63///
64/// // Blocks the current thread until the spawned thread completes.
65/// let value = handle.get().expect("task should succeed");
66/// assert_eq!(value, 42);
67/// ```
68#[derive(Clone)]
69pub struct ThreadPerTaskExecutor {
70 /// Optional stack size for each spawned worker thread.
71 pub(crate) stack_size: Option<usize>,
72 /// Hook notified about accepted task lifecycle events.
73 pub(crate) hook: Option<Arc<dyn TaskHook>>,
74}
75
76impl ThreadPerTaskExecutor {
77 /// Creates an executor using the platform default worker stack size.
78 ///
79 /// # Returns
80 ///
81 /// A thread-per-task executor with default worker thread configuration.
82 #[inline]
83 pub fn new() -> Self {
84 Self::default()
85 }
86
87 /// Creates a builder for configuring this executor.
88 ///
89 /// # Returns
90 ///
91 /// A builder initialized with default worker thread options.
92 #[inline]
93 pub fn builder() -> ThreadPerTaskExecutorBuilder {
94 ThreadPerTaskExecutorBuilder::new()
95 }
96
97 /// Returns a copy of this executor using the supplied task hook.
98 ///
99 /// # Parameters
100 ///
101 /// * `hook` - Hook notified about accepted task lifecycle events.
102 ///
103 /// # Returns
104 ///
105 /// This executor configured with `hook`.
106 #[inline]
107 pub fn with_hook(mut self, hook: Arc<dyn TaskHook>) -> Self {
108 self.hook = Some(hook);
109 self
110 }
111
112 /// Spawns one worker thread.
113 ///
114 /// # Parameters
115 ///
116 /// * `worker` - Closure to run on the new OS thread.
117 ///
118 /// # Returns
119 ///
120 /// `Ok(())` if the worker was spawned.
121 ///
122 /// # Errors
123 ///
124 /// Returns [`SubmissionError::WorkerSpawnFailed`] if the operating system
125 /// refuses to create the worker thread.
126 fn spawn_worker(&self, worker: impl FnOnce() + Send + 'static) -> Result<(), SubmissionError> {
127 ThreadSpawnConfig::new(self.stack_size).spawn(worker)
128 }
129}
130
131impl Default for ThreadPerTaskExecutor {
132 /// Creates an executor using the platform default worker stack size and no hook.
133 #[inline]
134 fn default() -> Self {
135 Self {
136 stack_size: None,
137 hook: None,
138 }
139 }
140}
141
142impl Executor for ThreadPerTaskExecutor {
143 /// Spawns one OS thread for the callable and returns a handle to its result.
144 ///
145 /// # Parameters
146 ///
147 /// * `task` - Callable to run on a dedicated OS thread.
148 ///
149 /// # Returns
150 ///
151 /// A [`TrackedTask`] that can block or await the spawned task's final
152 /// result.
153 ///
154 /// # Errors
155 ///
156 /// Returns [`SubmissionError::WorkerSpawnFailed`] if the worker thread
157 /// cannot be created.
158 fn call<C, R, E>(&self, task: C) -> Result<TrackedTask<R, E>, SubmissionError>
159 where
160 C: Callable<R, E> + Send + 'static,
161 R: Send + 'static,
162 E: Send + 'static,
163 {
164 let (handle, slot) =
165 TaskEndpointPair::with_optional_hook(self.hook.clone()).into_tracked_parts();
166 let gate = TaskAdmissionGate::new(self.hook.is_some());
167 let worker_gate = gate.clone();
168 let hook = self.hook.clone();
169 self.spawn_worker(move || {
170 worker_gate.wait();
171 slot.run(task);
172 })
173 .inspect_err(|error| notify_rejected_optional(hook.as_ref(), error))?;
174 handle.accept();
175 gate.open();
176 Ok(handle)
177 }
178}