qubit_executor/executor/thread_per_task_executor.rs
1/*******************************************************************************
2 *
3 * Copyright (c) 2025 - 2026 Haixing Hu.
4 *
5 * SPDX-License-Identifier: Apache-2.0
6 *
7 * Licensed under the Apache License, Version 2.0.
8 *
9 ******************************************************************************/
10use std::sync::Arc;
11
12use qubit_function::Callable;
13
14use crate::{
15 TrackedTask,
16 hook::{
17 TaskHook,
18 notify_rejected_optional,
19 },
20 service::SubmissionError,
21 task::{
22 spi::TaskEndpointPair,
23 task_admission_gate::TaskAdmissionGate,
24 },
25};
26
27use super::{
28 Executor,
29 ThreadPerTaskExecutorBuilder,
30 thread_spawn_config::ThreadSpawnConfig,
31};
32
33/// Executes each task on a dedicated OS thread.
34///
35/// This executor does not manage lifecycle or maintain a queue. Each accepted
36/// task receives a [`TrackedTask`] that can be used to wait for the result.
37///
38/// # Semantics
39///
40/// * **One task, one thread** — each [`Executor::call`] or [`Executor::execute`]
41/// spawns a new OS thread. There is no pool and no submission queue.
42/// * **Blocking or async wait** — [`TrackedTask::get`] blocks the calling thread,
43/// while awaiting the handle uses a waker and does not block the polling
44/// thread.
45/// * **Completion probe** — [`TrackedTask::is_done`] reads the terminal task
46/// state; result publication to the handle may still be racing with that
47/// observation (you still need [`TrackedTask::get`] for the value).
48///
49/// # Examples
50///
51/// ```rust
52/// use std::io;
53///
54/// use qubit_executor::{Executor, ThreadPerTaskExecutor};
55///
56/// let executor = ThreadPerTaskExecutor::new();
57/// let handle = executor
58/// .call(|| Ok::<i32, io::Error>(40 + 2))
59/// .expect("worker thread should spawn");
60///
61/// // Blocks the current thread until the spawned thread completes.
62/// let value = handle.get().expect("task should succeed");
63/// assert_eq!(value, 42);
64/// ```
65#[derive(Clone)]
66pub struct ThreadPerTaskExecutor {
67 /// Optional stack size for each spawned worker thread.
68 pub(crate) stack_size: Option<usize>,
69 /// Hook notified about accepted task lifecycle events.
70 pub(crate) hook: Option<Arc<dyn TaskHook>>,
71}
72
73impl ThreadPerTaskExecutor {
74 /// Creates an executor using the platform default worker stack size.
75 ///
76 /// # Returns
77 ///
78 /// A thread-per-task executor with default worker thread configuration.
79 #[inline]
80 pub fn new() -> Self {
81 Self::default()
82 }
83
84 /// Creates a builder for configuring this executor.
85 ///
86 /// # Returns
87 ///
88 /// A builder initialized with default worker thread options.
89 #[inline]
90 pub fn builder() -> ThreadPerTaskExecutorBuilder {
91 ThreadPerTaskExecutorBuilder::new()
92 }
93
94 /// Returns a copy of this executor using the supplied task hook.
95 ///
96 /// # Parameters
97 ///
98 /// * `hook` - Hook notified about accepted task lifecycle events.
99 ///
100 /// # Returns
101 ///
102 /// This executor configured with `hook`.
103 #[inline]
104 pub fn with_hook(mut self, hook: Arc<dyn TaskHook>) -> Self {
105 self.hook = Some(hook);
106 self
107 }
108
109 /// Spawns one worker thread.
110 ///
111 /// # Parameters
112 ///
113 /// * `worker` - Closure to run on the new OS thread.
114 ///
115 /// # Returns
116 ///
117 /// `Ok(())` if the worker was spawned.
118 ///
119 /// # Errors
120 ///
121 /// Returns [`SubmissionError::WorkerSpawnFailed`] if the operating system
122 /// refuses to create the worker thread.
123 fn spawn_worker(&self, worker: impl FnOnce() + Send + 'static) -> Result<(), SubmissionError> {
124 ThreadSpawnConfig::new(self.stack_size).spawn(worker)
125 }
126}
127
128impl Default for ThreadPerTaskExecutor {
129 /// Creates an executor using the platform default worker stack size and no hook.
130 #[inline]
131 fn default() -> Self {
132 Self {
133 stack_size: None,
134 hook: None,
135 }
136 }
137}
138
139impl Executor for ThreadPerTaskExecutor {
140 /// Spawns one OS thread for the callable and returns a handle to its result.
141 ///
142 /// # Parameters
143 ///
144 /// * `task` - Callable to run on a dedicated OS thread.
145 ///
146 /// # Returns
147 ///
148 /// A [`TrackedTask`] that can block or await the spawned task's final
149 /// result.
150 ///
151 /// # Errors
152 ///
153 /// Returns [`SubmissionError::WorkerSpawnFailed`] if the worker thread
154 /// cannot be created.
155 fn call<C, R, E>(&self, task: C) -> Result<TrackedTask<R, E>, SubmissionError>
156 where
157 C: Callable<R, E> + Send + 'static,
158 R: Send + 'static,
159 E: Send + 'static,
160 {
161 let (handle, slot) =
162 TaskEndpointPair::with_optional_hook(self.hook.clone()).into_tracked_parts();
163 let gate = TaskAdmissionGate::new(self.hook.is_some());
164 let worker_gate = gate.clone();
165 let hook = self.hook.clone();
166 self.spawn_worker(move || {
167 worker_gate.wait();
168 slot.run(task);
169 })
170 .inspect_err(|error| notify_rejected_optional(hook.as_ref(), error))?;
171 handle.accept();
172 gate.open();
173 Ok(handle)
174 }
175}