Skip to main content

qubit_thread_pool/thread_pool/
thread_pool.rs

1/*******************************************************************************
2 *
3 *    Copyright (c) 2025 - 2026.
4 *    Haixing Hu, Qubit Co. Ltd.
5 *
6 *    All rights reserved.
7 *
8 ******************************************************************************/
9use std::{
10    future::Future,
11    pin::Pin,
12    sync::Arc,
13    time::Duration,
14};
15
16use qubit_function::Callable;
17
18use qubit_executor::{
19    TaskCompletionPair,
20    TaskHandle,
21    TaskRunner,
22};
23
24use super::pool_job::PoolJob;
25use super::thread_pool_build_error::ThreadPoolBuildError;
26use super::thread_pool_builder::ThreadPoolBuilder;
27use super::thread_pool_inner::ThreadPoolInner;
28use super::thread_pool_stats::ThreadPoolStats;
29use qubit_executor::service::{
30    ExecutorService,
31    RejectedExecution,
32    ShutdownReport,
33};
34
35/// OS thread pool implementing [`ExecutorService`].
36///
37/// `ThreadPool` accepts fallible tasks, stores them in an internal FIFO queue,
38/// and executes them on worker threads. Workers are created lazily up to the
39/// configured core size, queued after that, and may grow up to the maximum size
40/// when a bounded queue is full. Submitted tasks return [`TaskHandle`], which
41/// supports both blocking [`TaskHandle::get`] and async `.await` result
42/// retrieval.
43///
44/// `shutdown` is graceful: already accepted queued tasks are allowed to run.
45/// `shutdown_now` is abrupt: queued tasks that have not started are completed
46/// with [`TaskExecutionError::Cancelled`](qubit_executor::TaskExecutionError::Cancelled).
47///
48/// # Author
49///
50/// Haixing Hu
51pub struct ThreadPool {
52    /// Shared pool state and worker coordination primitives.
53    inner: Arc<ThreadPoolInner>,
54}
55
56impl ThreadPool {
57    pub(super) fn from_inner(inner: Arc<ThreadPoolInner>) -> Self {
58        Self { inner }
59    }
60
61    /// Creates a thread pool with equal core and maximum pool sizes.
62    ///
63    /// # Parameters
64    ///
65    /// * `pool_size` - Value applied as both core and maximum pool size.
66    ///
67    /// # Returns
68    ///
69    /// `Ok(ThreadPool)` if all workers are spawned successfully.
70    ///
71    /// # Errors
72    ///
73    /// Returns [`ThreadPoolBuildError`] if the resulting maximum pool size is
74    /// zero or a worker thread cannot be spawned.
75    #[inline]
76    pub fn new(pool_size: usize) -> Result<Self, ThreadPoolBuildError> {
77        Self::builder().pool_size(pool_size).build()
78    }
79
80    /// Creates a builder for configuring a thread pool.
81    ///
82    /// # Returns
83    ///
84    /// A builder with default core and maximum pool sizes and an unbounded queue.
85    #[inline]
86    pub fn builder() -> ThreadPoolBuilder {
87        ThreadPoolBuilder::default()
88    }
89
90    /// Returns the number of queued tasks waiting for a worker.
91    ///
92    /// # Returns
93    ///
94    /// The number of accepted tasks that have not started yet.
95    #[inline]
96    pub fn queued_count(&self) -> usize {
97        self.inner.read_state(|state| state.queued_tasks)
98    }
99
100    /// Returns the number of tasks currently held by workers.
101    ///
102    /// # Returns
103    ///
104    /// The number of tasks that workers have taken from the queue and have not
105    /// yet finished processing.
106    #[inline]
107    pub fn running_count(&self) -> usize {
108        self.inner.read_state(|state| state.running_tasks)
109    }
110
111    /// Returns how many worker threads are still running in this pool.
112    ///
113    /// # Returns
114    ///
115    /// The number of live worker loops still owned by this pool. This is a
116    /// runtime count and is not required to match configured
117    /// [`Self::core_pool_size`] or [`Self::maximum_pool_size`].
118    #[inline]
119    pub fn live_worker_count(&self) -> usize {
120        self.inner.read_state(|state| state.live_workers)
121    }
122
123    /// Returns the configured core pool size.
124    ///
125    /// # Returns
126    ///
127    /// The number of workers kept for normal load before tasks are queued.
128    #[inline]
129    pub fn core_pool_size(&self) -> usize {
130        self.inner.read_state(|state| state.core_pool_size)
131    }
132
133    /// Returns the configured maximum pool size.
134    ///
135    /// # Returns
136    ///
137    /// The maximum number of worker threads this pool may create.
138    #[inline]
139    pub fn maximum_pool_size(&self) -> usize {
140        self.inner.read_state(|state| state.maximum_pool_size)
141    }
142
143    /// Returns a point-in-time snapshot of pool counters.
144    ///
145    /// # Returns
146    ///
147    /// A snapshot containing worker, queue, and task counters observed under
148    /// the pool state lock.
149    #[inline]
150    pub fn stats(&self) -> ThreadPoolStats {
151        self.inner.stats()
152    }
153
154    /// Starts one core worker if the pool has fewer live workers than its
155    /// configured core size.
156    ///
157    /// # Returns
158    ///
159    /// `Ok(true)` if a worker was started, or `Ok(false)` if no core worker
160    /// was needed.
161    ///
162    /// # Errors
163    ///
164    /// Returns [`RejectedExecution::Shutdown`] if the pool is shut down, or
165    /// [`RejectedExecution::WorkerSpawnFailed`] if worker creation fails.
166    #[inline]
167    pub fn prestart_core_thread(&self) -> Result<bool, RejectedExecution> {
168        self.inner.prestart_core_thread()
169    }
170
171    /// Starts all missing core workers.
172    ///
173    /// # Returns
174    ///
175    /// The number of workers started.
176    ///
177    /// # Errors
178    ///
179    /// Returns [`RejectedExecution::Shutdown`] if the pool is shut down, or
180    /// [`RejectedExecution::WorkerSpawnFailed`] if worker creation fails.
181    #[inline]
182    pub fn prestart_all_core_threads(&self) -> Result<usize, RejectedExecution> {
183        self.inner.prestart_all_core_threads()
184    }
185
186    /// Updates the core pool size.
187    ///
188    /// Increasing the core size does not eagerly create new workers unless
189    /// queued work is waiting. Call [`Self::prestart_all_core_threads`] when
190    /// eager creation is desired. Decreasing the core size lets excess idle
191    /// workers retire according to the keep-alive policy.
192    ///
193    /// # Parameters
194    ///
195    /// * `core_pool_size` - New core pool size.
196    ///
197    /// # Returns
198    ///
199    /// `Ok(())` if the size is accepted.
200    ///
201    /// # Errors
202    ///
203    /// Returns [`ThreadPoolBuildError::CorePoolSizeExceedsMaximum`] when the
204    /// new core size would exceed the current maximum size.
205    pub fn set_core_pool_size(&self, core_pool_size: usize) -> Result<(), ThreadPoolBuildError> {
206        self.inner.set_core_pool_size(core_pool_size)
207    }
208
209    /// Updates the maximum pool size.
210    ///
211    /// Excess workers are not interrupted. They retire after finishing current
212    /// work or timing out while idle.
213    ///
214    /// # Parameters
215    ///
216    /// * `maximum_pool_size` - New maximum pool size.
217    ///
218    /// # Returns
219    ///
220    /// `Ok(())` if the size is accepted.
221    ///
222    /// # Errors
223    ///
224    /// Returns [`ThreadPoolBuildError::ZeroMaximumPoolSize`] when the maximum
225    /// size is zero, or [`ThreadPoolBuildError::CorePoolSizeExceedsMaximum`]
226    /// when it would be smaller than the current core size.
227    pub fn set_maximum_pool_size(
228        &self,
229        maximum_pool_size: usize,
230    ) -> Result<(), ThreadPoolBuildError> {
231        self.inner.set_maximum_pool_size(maximum_pool_size)
232    }
233
234    /// Updates how long excess idle workers may wait before exiting.
235    ///
236    /// # Parameters
237    ///
238    /// * `keep_alive` - New idle timeout for workers above the core size.
239    ///
240    /// # Returns
241    ///
242    /// `Ok(())` if the timeout is accepted.
243    ///
244    /// # Errors
245    ///
246    /// Returns [`ThreadPoolBuildError::ZeroKeepAlive`] when `keep_alive` is
247    /// zero.
248    pub fn set_keep_alive(&self, keep_alive: Duration) -> Result<(), ThreadPoolBuildError> {
249        self.inner.set_keep_alive(keep_alive)
250    }
251
252    /// Updates whether core workers may also retire after keep-alive timeout.
253    ///
254    /// # Parameters
255    ///
256    /// * `allow` - Whether core workers are subject to idle timeout.
257    pub fn allow_core_thread_timeout(&self, allow: bool) {
258        self.inner.allow_core_thread_timeout(allow);
259    }
260
261    /// Submits an already type-erased pool job.
262    ///
263    /// This low-level hook is intended for higher-level service crates that
264    /// need to attach their own lifecycle callbacks while still using this
265    /// pool's queueing, cancellation, and shutdown behavior.
266    ///
267    /// # Parameters
268    ///
269    /// * `job` - Type-erased job containing run and cancellation callbacks.
270    ///
271    /// # Returns
272    ///
273    /// `Ok(())` when the job is accepted.
274    ///
275    /// # Errors
276    ///
277    /// Returns [`RejectedExecution::Shutdown`] after shutdown, returns
278    /// [`RejectedExecution::Saturated`] when a bounded pool cannot accept more
279    /// work, or returns [`RejectedExecution::WorkerSpawnFailed`] when the pool
280    /// fails to create a required worker.
281    pub fn submit_job(&self, job: PoolJob) -> Result<(), RejectedExecution> {
282        self.inner.submit(job)
283    }
284}
285
286impl Drop for ThreadPool {
287    /// Requests graceful shutdown when the pool value is dropped.
288    fn drop(&mut self) {
289        self.inner.shutdown();
290    }
291}
292
293impl ExecutorService for ThreadPool {
294    type Handle<R, E>
295        = TaskHandle<R, E>
296    where
297        R: Send + 'static,
298        E: Send + 'static;
299
300    type Termination<'a>
301        = Pin<Box<dyn Future<Output = ()> + Send + 'a>>
302    where
303        Self: 'a;
304
305    /// Accepts a callable and queues it for pool workers.
306    ///
307    /// # Parameters
308    ///
309    /// * `task` - Callable to execute on a pool worker.
310    ///
311    /// # Returns
312    ///
313    /// A [`TaskHandle`] for the accepted task.
314    ///
315    /// # Errors
316    ///
317    /// Returns [`RejectedExecution::Shutdown`] after shutdown, returns
318    /// [`RejectedExecution::Saturated`] when the bounded pool cannot accept
319    /// more work, or returns [`RejectedExecution::WorkerSpawnFailed`] when a
320    /// required worker cannot be created.
321    fn submit_callable<C, R, E>(&self, task: C) -> Result<Self::Handle<R, E>, RejectedExecution>
322    where
323        C: Callable<R, E> + Send + 'static,
324        R: Send + 'static,
325        E: Send + 'static,
326    {
327        let (handle, completion) = TaskCompletionPair::new().into_parts();
328        let completion_for_run = completion.clone();
329        let job = PoolJob::new(
330            Box::new(move || {
331                TaskRunner::new(task).run(completion_for_run);
332            }),
333            Box::new(move || {
334                completion.cancel();
335            }),
336        );
337        self.inner.submit(job)?;
338        Ok(handle)
339    }
340
341    /// Stops accepting new tasks after already queued work is drained.
342    ///
343    /// Queued and running tasks remain eligible to complete normally.
344    #[inline]
345    fn shutdown(&self) {
346        self.inner.shutdown();
347    }
348
349    /// Stops accepting tasks and cancels queued tasks that have not started.
350    ///
351    /// # Returns
352    ///
353    /// A report containing the number of queued jobs cancelled and the number
354    /// of jobs running at the time of the request.
355    #[inline]
356    fn shutdown_now(&self) -> ShutdownReport {
357        self.inner.shutdown_now()
358    }
359
360    /// Returns whether shutdown has been requested.
361    #[inline]
362    fn is_shutdown(&self) -> bool {
363        self.inner.is_shutdown()
364    }
365
366    /// Returns whether shutdown was requested and all workers have exited.
367    #[inline]
368    fn is_terminated(&self) -> bool {
369        self.inner.is_terminated()
370    }
371
372    /// Waits until the pool has terminated.
373    ///
374    /// This future blocks the polling thread while waiting on a condition
375    /// variable.
376    ///
377    /// # Returns
378    ///
379    /// A future that resolves when shutdown has been requested, the queue is
380    /// empty, no task is running, and all worker loops have exited.
381    fn await_termination(&self) -> Self::Termination<'_> {
382        Box::pin(async move {
383            self.inner.wait_for_termination();
384        })
385    }
386}