Skip to main content

qubit_thread_pool/fixed/
fixed_thread_pool.rs

1/*******************************************************************************
2 *
3 *    Copyright (c) 2025 - 2026 Haixing Hu.
4 *
5 *    SPDX-License-Identifier: Apache-2.0
6 *
7 *    Licensed under the Apache License, Version 2.0.
8 *
9 ******************************************************************************/
10use std::{
11    sync::Arc,
12    thread::JoinHandle,
13};
14
15use qubit_executor::service::{
16    ExecutorService,
17    ExecutorServiceLifecycle,
18    StopReport,
19    SubmissionError,
20};
21use qubit_executor::task::spi::TaskEndpointPair;
22use qubit_executor::{
23    TaskHandle,
24    TrackedTask,
25};
26use qubit_function::{
27    Callable,
28    Runnable,
29};
30
31use super::fixed_thread_pool_builder::FixedThreadPoolBuilder;
32use super::fixed_thread_pool_inner::FixedThreadPoolInner;
33use super::fixed_worker::FixedWorker;
34use super::fixed_worker_runtime::FixedWorkerRuntime;
35use crate::{
36    ExecutorServiceBuilderError,
37    PoolJob,
38    ThreadPoolStats,
39};
40
41/// Fixed-size thread pool implementing [`ExecutorService`].
42///
43/// `FixedThreadPool` prestarts a fixed number of worker threads and does not
44/// support runtime pool-size changes. Use [`crate::ThreadPool`] when dynamic
45/// core/maximum sizes or keep-alive policies are required.
46pub struct FixedThreadPool {
47    /// Shared fixed pool state.
48    inner: Arc<FixedThreadPoolInner>,
49}
50
51impl FixedThreadPool {
52    /// Builds a fixed pool from a validated [`FixedThreadPoolBuilder`].
53    ///
54    /// # Parameters
55    ///
56    /// * `builder` - Configuration produced by [`FixedThreadPoolBuilder`].
57    ///
58    /// # Returns
59    ///
60    /// A fixed thread-pool handle with workers already started.
61    ///
62    /// # Errors
63    ///
64    /// Returns [`ExecutorServiceBuilderError`] when a worker thread cannot be spawned.
65    pub(crate) fn new_with_builder(
66        builder: FixedThreadPoolBuilder,
67    ) -> Result<Self, ExecutorServiceBuilderError> {
68        let FixedThreadPoolBuilder {
69            pool_size,
70            queue_capacity,
71            thread_name_prefix,
72            stack_size,
73            hooks,
74        } = builder;
75        let mut worker_runtimes = Vec::with_capacity(pool_size);
76        for index in 0..pool_size {
77            let worker_runtime = FixedWorkerRuntime::new(index);
78            worker_runtimes.push(worker_runtime);
79        }
80        let inner = Arc::new(FixedThreadPoolInner::with_hooks(
81            pool_size,
82            queue_capacity,
83            hooks,
84        ));
85        let mut worker_handles = Vec::with_capacity(pool_size);
86        for (index, worker_runtime) in worker_runtimes.into_iter().enumerate() {
87            inner.reserve_worker_slot();
88            let worker_inner = Arc::clone(&inner);
89            let thread_name = format!("{}-{}", thread_name_prefix, index);
90            let mut builder = std::thread::Builder::new().name(thread_name);
91            if let Some(stack_size) = stack_size {
92                builder = builder.stack_size(stack_size);
93            }
94            match builder.spawn(move || FixedWorker::run(worker_inner, worker_runtime)) {
95                Ok(handle) => worker_handles.push(handle),
96                Err(source) => {
97                    inner.rollback_worker_slot();
98                    inner.stop_after_failed_build();
99                    join_started_workers(worker_handles);
100                    return Err(ExecutorServiceBuilderError::SpawnWorker {
101                        index: Some(index),
102                        source,
103                    });
104                }
105            }
106        }
107        Ok(Self { inner })
108    }
109
110    /// Creates a fixed thread pool with `pool_size` prestarted workers.
111    ///
112    /// # Parameters
113    ///
114    /// * `pool_size` - Number of worker threads.
115    ///
116    /// # Returns
117    ///
118    /// A fixed thread pool.
119    ///
120    /// # Errors
121    ///
122    /// Returns [`ExecutorServiceBuilderError`] if the worker count is zero or a worker
123    /// cannot be spawned.
124    pub fn new(pool_size: usize) -> Result<Self, ExecutorServiceBuilderError> {
125        Self::builder().pool_size(pool_size).build()
126    }
127
128    /// Creates a fixed pool builder.
129    ///
130    /// # Returns
131    ///
132    /// Builder with CPU parallelism defaults.
133    pub fn builder() -> FixedThreadPoolBuilder {
134        FixedThreadPoolBuilder::new()
135    }
136
137    /// Returns the fixed worker count.
138    ///
139    /// # Returns
140    ///
141    /// Number of workers in this pool.
142    pub fn pool_size(&self) -> usize {
143        self.inner.pool_size()
144    }
145
146    /// Returns the queued task count.
147    ///
148    /// # Returns
149    ///
150    /// Number of accepted tasks waiting to run.
151    pub fn queued_count(&self) -> usize {
152        self.inner.queued_count()
153    }
154
155    /// Returns the running task count.
156    ///
157    /// # Returns
158    ///
159    /// Number of tasks currently held by workers.
160    pub fn running_count(&self) -> usize {
161        self.inner.running_count()
162    }
163
164    /// Returns the live worker count.
165    ///
166    /// # Returns
167    ///
168    /// Number of worker loops that have not exited.
169    pub fn live_worker_count(&self) -> usize {
170        self.inner.state.read(|state| state.live_workers)
171    }
172
173    /// Returns a point-in-time stats snapshot.
174    ///
175    /// # Returns
176    ///
177    /// Snapshot containing queue, worker, and lifecycle counters.
178    pub fn stats(&self) -> ThreadPoolStats {
179        self.inner.stats()
180    }
181
182    /// Blocks until all accepted work has completed.
183    ///
184    /// This is a join-style wait for quiescence: it does not request shutdown
185    /// and does not wait for worker threads to exit. Concurrent submissions may
186    /// extend the wait until those accepted jobs also drain.
187    #[inline]
188    pub fn join(&self) {
189        self.inner.wait_until_idle();
190    }
191}
192
193impl Default for FixedThreadPool {
194    /// Creates a fixed thread pool using [`FixedThreadPoolBuilder::default`].
195    ///
196    /// # Returns
197    ///
198    /// A fixed thread pool with CPU parallelism defaults and prestarted workers.
199    ///
200    /// # Panics
201    ///
202    /// Panics when the default builder fails to spawn a worker thread.
203    fn default() -> Self {
204        FixedThreadPoolBuilder::default()
205            .build()
206            .expect("failed to build default FixedThreadPool")
207    }
208}
209
210impl Drop for FixedThreadPool {
211    /// Requests graceful shutdown when the pool handle is dropped.
212    fn drop(&mut self) {
213        self.inner.shutdown();
214    }
215}
216
217impl ExecutorService for FixedThreadPool {
218    type ResultHandle<R, E>
219        = TaskHandle<R, E>
220    where
221        R: Send + 'static,
222        E: Send + 'static;
223
224    type TrackedHandle<R, E>
225        = TrackedTask<R, E>
226    where
227        R: Send + 'static,
228        E: Send + 'static;
229
230    /// Accepts a runnable and queues it for fixed pool workers.
231    fn submit<T, E>(&self, task: T) -> Result<(), SubmissionError>
232    where
233        T: Runnable<E> + Send + 'static,
234        E: Send + 'static,
235    {
236        self.inner.submit(PoolJob::detached(task))
237    }
238
239    /// Accepts a callable and queues it for fixed pool workers.
240    ///
241    /// # Parameters
242    ///
243    /// * `task` - Callable to execute on a fixed pool worker.
244    ///
245    /// # Returns
246    ///
247    /// A [`TaskHandle`] for the accepted task.
248    ///
249    /// # Errors
250    ///
251    /// Returns [`SubmissionError::Shutdown`] after shutdown or
252    /// [`SubmissionError::Saturated`] when a bounded queue is full.
253    fn submit_callable<C, R, E>(&self, task: C) -> Result<Self::ResultHandle<R, E>, SubmissionError>
254    where
255        C: Callable<R, E> + Send + 'static,
256        R: Send + 'static,
257        E: Send + 'static,
258    {
259        let (handle, completion) = TaskEndpointPair::new().into_parts();
260        let job = PoolJob::from_task(task, completion);
261        self.inner.submit(job)?;
262        Ok(handle)
263    }
264
265    /// Accepts a callable and queues it with a tracked handle.
266    ///
267    /// # Parameters
268    ///
269    /// * `task` - Callable to execute on a fixed pool worker.
270    ///
271    /// # Returns
272    ///
273    /// A [`TrackedTask`] that reports task status and can observe completion,
274    /// failure, or queued cancellation.
275    ///
276    /// # Errors
277    ///
278    /// Returns [`SubmissionError::Shutdown`] after shutdown or
279    /// [`SubmissionError::Saturated`] when a bounded queue is full.
280    fn submit_tracked_callable<C, R, E>(
281        &self,
282        task: C,
283    ) -> Result<Self::TrackedHandle<R, E>, SubmissionError>
284    where
285        C: Callable<R, E> + Send + 'static,
286        R: Send + 'static,
287        E: Send + 'static,
288    {
289        let (handle, completion) = TaskEndpointPair::new().into_tracked_parts();
290        let job = PoolJob::from_task(task, completion);
291        self.inner.submit(job)?;
292        Ok(handle)
293    }
294
295    /// Stops accepting new work and drains accepted queued tasks.
296    fn shutdown(&self) {
297        self.inner.shutdown();
298    }
299
300    /// Stops accepting work and cancels queued tasks.
301    ///
302    /// # Returns
303    ///
304    /// A count-based shutdown report.
305    fn stop(&self) -> StopReport {
306        self.inner.stop()
307    }
308
309    /// Returns the current lifecycle state.
310    fn lifecycle(&self) -> ExecutorServiceLifecycle {
311        self.inner.lifecycle()
312    }
313
314    /// Returns whether shutdown has been requested.
315    ///
316    /// # Returns
317    ///
318    /// `true` when this pool no longer accepts new work.
319    fn is_not_running(&self) -> bool {
320        self.inner.is_not_running()
321    }
322
323    /// Returns whether this pool is fully terminated.
324    ///
325    /// # Returns
326    ///
327    /// `true` after shutdown and after all workers have exited.
328    fn is_terminated(&self) -> bool {
329        self.inner.is_terminated()
330    }
331
332    /// Blocks until this fixed pool has terminated.
333    fn wait_termination(&self) {
334        self.inner.wait_for_termination();
335    }
336}
337
338/// Joins workers that were already spawned during a failed build.
339///
340/// # Parameters
341///
342/// * `worker_handles` - Join handles for workers started before construction failed.
343fn join_started_workers(worker_handles: Vec<JoinHandle<()>>) {
344    for worker_handle in worker_handles {
345        let _ignored = worker_handle.join();
346    }
347}