qubit_thread_pool/fixed/fixed_thread_pool.rs
1/*******************************************************************************
2 *
3 * Copyright (c) 2025 - 2026 Haixing Hu.
4 *
5 * SPDX-License-Identifier: Apache-2.0
6 *
7 * Licensed under the Apache License, Version 2.0.
8 *
9 ******************************************************************************/
10use std::{
11 sync::Arc,
12 thread::JoinHandle,
13};
14
15use qubit_executor::service::{
16 ExecutorService,
17 ExecutorServiceLifecycle,
18 StopReport,
19 SubmissionError,
20};
21use qubit_executor::task::spi::TaskEndpointPair;
22use qubit_executor::{
23 TaskHandle,
24 TrackedTask,
25};
26use qubit_function::{
27 Callable,
28 Runnable,
29};
30
31use super::fixed_thread_pool_builder::FixedThreadPoolBuilder;
32use super::fixed_thread_pool_inner::FixedThreadPoolInner;
33use super::fixed_worker::FixedWorker;
34use super::fixed_worker_runtime::FixedWorkerRuntime;
35use crate::{
36 ExecutorServiceBuilderError,
37 PoolJob,
38 ThreadPoolStats,
39};
40
41/// Fixed-size thread pool implementing [`ExecutorService`].
42///
43/// `FixedThreadPool` prestarts a fixed number of worker threads and does not
44/// support runtime pool-size changes. Use [`crate::ThreadPool`] when dynamic
45/// core/maximum sizes or keep-alive policies are required.
46pub struct FixedThreadPool {
47 /// Shared fixed pool state.
48 inner: Arc<FixedThreadPoolInner>,
49}
50
51impl FixedThreadPool {
52 /// Builds a fixed pool from a validated [`FixedThreadPoolBuilder`].
53 ///
54 /// # Parameters
55 ///
56 /// * `builder` - Configuration produced by [`FixedThreadPoolBuilder`].
57 ///
58 /// # Returns
59 ///
60 /// A fixed thread-pool handle with workers already started.
61 ///
62 /// # Errors
63 ///
64 /// Returns [`ExecutorServiceBuilderError`] when a worker thread cannot be spawned.
65 pub(crate) fn new_with_builder(
66 builder: FixedThreadPoolBuilder,
67 ) -> Result<Self, ExecutorServiceBuilderError> {
68 let FixedThreadPoolBuilder {
69 pool_size,
70 queue_capacity,
71 thread_name_prefix,
72 stack_size,
73 hooks,
74 } = builder;
75 let mut worker_runtimes = Vec::with_capacity(pool_size);
76 for index in 0..pool_size {
77 let worker_runtime = FixedWorkerRuntime::new(index);
78 worker_runtimes.push(worker_runtime);
79 }
80 let inner = Arc::new(FixedThreadPoolInner::with_hooks(
81 pool_size,
82 queue_capacity,
83 hooks,
84 ));
85 let mut worker_handles = Vec::with_capacity(pool_size);
86 for (index, worker_runtime) in worker_runtimes.into_iter().enumerate() {
87 inner.reserve_worker_slot();
88 let worker_inner = Arc::clone(&inner);
89 let thread_name = format!("{}-{}", thread_name_prefix, index);
90 let mut builder = std::thread::Builder::new().name(thread_name);
91 if let Some(stack_size) = stack_size {
92 builder = builder.stack_size(stack_size);
93 }
94 match builder.spawn(move || FixedWorker::run(worker_inner, worker_runtime)) {
95 Ok(handle) => worker_handles.push(handle),
96 Err(source) => {
97 inner.rollback_worker_slot();
98 inner.stop_after_failed_build();
99 join_started_workers(worker_handles);
100 return Err(ExecutorServiceBuilderError::SpawnWorker {
101 index: Some(index),
102 source,
103 });
104 }
105 }
106 }
107 Ok(Self { inner })
108 }
109
110 /// Creates a fixed thread pool with `pool_size` prestarted workers.
111 ///
112 /// # Parameters
113 ///
114 /// * `pool_size` - Number of worker threads.
115 ///
116 /// # Returns
117 ///
118 /// A fixed thread pool.
119 ///
120 /// # Errors
121 ///
122 /// Returns [`ExecutorServiceBuilderError`] if the worker count is zero or a worker
123 /// cannot be spawned.
124 pub fn new(pool_size: usize) -> Result<Self, ExecutorServiceBuilderError> {
125 Self::builder().pool_size(pool_size).build()
126 }
127
128 /// Creates a fixed pool builder.
129 ///
130 /// # Returns
131 ///
132 /// Builder with CPU parallelism defaults.
133 pub fn builder() -> FixedThreadPoolBuilder {
134 FixedThreadPoolBuilder::new()
135 }
136
137 /// Returns the fixed worker count.
138 ///
139 /// # Returns
140 ///
141 /// Number of workers in this pool.
142 pub fn pool_size(&self) -> usize {
143 self.inner.pool_size()
144 }
145
146 /// Returns the queued task count.
147 ///
148 /// # Returns
149 ///
150 /// Number of accepted tasks waiting to run.
151 pub fn queued_count(&self) -> usize {
152 self.inner.queued_count()
153 }
154
155 /// Returns the running task count.
156 ///
157 /// # Returns
158 ///
159 /// Number of tasks currently held by workers.
160 pub fn running_count(&self) -> usize {
161 self.inner.running_count()
162 }
163
164 /// Returns the live worker count.
165 ///
166 /// # Returns
167 ///
168 /// Number of worker loops that have not exited.
169 pub fn live_worker_count(&self) -> usize {
170 self.inner.state.read(|state| state.live_workers)
171 }
172
173 /// Returns a point-in-time stats snapshot.
174 ///
175 /// # Returns
176 ///
177 /// Snapshot containing queue, worker, and lifecycle counters.
178 pub fn stats(&self) -> ThreadPoolStats {
179 self.inner.stats()
180 }
181
182 /// Blocks until all accepted work has completed.
183 ///
184 /// This is a join-style wait for quiescence: it does not request shutdown
185 /// and does not wait for worker threads to exit. Concurrent submissions may
186 /// extend the wait until those accepted jobs also drain.
187 #[inline]
188 pub fn join(&self) {
189 self.inner.wait_until_idle();
190 }
191}
192
193impl Default for FixedThreadPool {
194 /// Creates a fixed thread pool using [`FixedThreadPoolBuilder::default`].
195 ///
196 /// # Returns
197 ///
198 /// A fixed thread pool with CPU parallelism defaults and prestarted workers.
199 ///
200 /// # Panics
201 ///
202 /// Panics when the default builder fails to spawn a worker thread.
203 fn default() -> Self {
204 FixedThreadPoolBuilder::default()
205 .build()
206 .expect("failed to build default FixedThreadPool")
207 }
208}
209
210impl Drop for FixedThreadPool {
211 /// Requests graceful shutdown when the pool handle is dropped.
212 fn drop(&mut self) {
213 self.inner.shutdown();
214 }
215}
216
217impl ExecutorService for FixedThreadPool {
218 type ResultHandle<R, E>
219 = TaskHandle<R, E>
220 where
221 R: Send + 'static,
222 E: Send + 'static;
223
224 type TrackedHandle<R, E>
225 = TrackedTask<R, E>
226 where
227 R: Send + 'static,
228 E: Send + 'static;
229
230 /// Accepts a runnable and queues it for fixed pool workers.
231 fn submit<T, E>(&self, task: T) -> Result<(), SubmissionError>
232 where
233 T: Runnable<E> + Send + 'static,
234 E: Send + 'static,
235 {
236 self.inner.submit(PoolJob::detached(task))
237 }
238
239 /// Accepts a callable and queues it for fixed pool workers.
240 ///
241 /// # Parameters
242 ///
243 /// * `task` - Callable to execute on a fixed pool worker.
244 ///
245 /// # Returns
246 ///
247 /// A [`TaskHandle`] for the accepted task.
248 ///
249 /// # Errors
250 ///
251 /// Returns [`SubmissionError::Shutdown`] after shutdown or
252 /// [`SubmissionError::Saturated`] when a bounded queue is full.
253 fn submit_callable<C, R, E>(&self, task: C) -> Result<Self::ResultHandle<R, E>, SubmissionError>
254 where
255 C: Callable<R, E> + Send + 'static,
256 R: Send + 'static,
257 E: Send + 'static,
258 {
259 let (handle, completion) = TaskEndpointPair::new().into_parts();
260 let job = PoolJob::from_task(task, completion);
261 self.inner.submit(job)?;
262 Ok(handle)
263 }
264
265 /// Accepts a callable and queues it with a tracked handle.
266 ///
267 /// # Parameters
268 ///
269 /// * `task` - Callable to execute on a fixed pool worker.
270 ///
271 /// # Returns
272 ///
273 /// A [`TrackedTask`] that reports task status and can observe completion,
274 /// failure, or queued cancellation.
275 ///
276 /// # Errors
277 ///
278 /// Returns [`SubmissionError::Shutdown`] after shutdown or
279 /// [`SubmissionError::Saturated`] when a bounded queue is full.
280 fn submit_tracked_callable<C, R, E>(
281 &self,
282 task: C,
283 ) -> Result<Self::TrackedHandle<R, E>, SubmissionError>
284 where
285 C: Callable<R, E> + Send + 'static,
286 R: Send + 'static,
287 E: Send + 'static,
288 {
289 let (handle, completion) = TaskEndpointPair::new().into_tracked_parts();
290 let job = PoolJob::from_task(task, completion);
291 self.inner.submit(job)?;
292 Ok(handle)
293 }
294
295 /// Stops accepting new work and drains accepted queued tasks.
296 fn shutdown(&self) {
297 self.inner.shutdown();
298 }
299
300 /// Stops accepting work and cancels queued tasks.
301 ///
302 /// # Returns
303 ///
304 /// A count-based shutdown report.
305 fn stop(&self) -> StopReport {
306 self.inner.stop()
307 }
308
309 /// Returns the current lifecycle state.
310 fn lifecycle(&self) -> ExecutorServiceLifecycle {
311 self.inner.lifecycle()
312 }
313
314 /// Returns whether shutdown has been requested.
315 ///
316 /// # Returns
317 ///
318 /// `true` when this pool no longer accepts new work.
319 fn is_not_running(&self) -> bool {
320 self.inner.is_not_running()
321 }
322
323 /// Returns whether this pool is fully terminated.
324 ///
325 /// # Returns
326 ///
327 /// `true` after shutdown and after all workers have exited.
328 fn is_terminated(&self) -> bool {
329 self.inner.is_terminated()
330 }
331
332 /// Blocks until this fixed pool has terminated.
333 fn wait_termination(&self) {
334 self.inner.wait_for_termination();
335 }
336}
337
338/// Joins workers that were already spawned during a failed build.
339///
340/// # Parameters
341///
342/// * `worker_handles` - Join handles for workers started before construction failed.
343fn join_started_workers(worker_handles: Vec<JoinHandle<()>>) {
344 for worker_handle in worker_handles {
345 let _ignored = worker_handle.join();
346 }
347}