Skip to main content

qubit_thread_pool/thread_pool/
thread_pool.rs

1/*******************************************************************************
2 *
3 *    Copyright (c) 2025 - 2026 Haixing Hu.
4 *
5 *    SPDX-License-Identifier: Apache-2.0
6 *
7 *    Licensed under the Apache License, Version 2.0.
8 *
9 ******************************************************************************/
10use std::{
11    future::Future,
12    pin::Pin,
13    sync::Arc,
14    time::Duration,
15};
16
17use qubit_function::Callable;
18
19use qubit_executor::{
20    TaskCompletionPair,
21    TaskHandle,
22    TaskRunner,
23};
24
25use super::pool_job::PoolJob;
26use super::thread_pool_build_error::ThreadPoolBuildError;
27use super::thread_pool_builder::ThreadPoolBuilder;
28use super::thread_pool_inner::ThreadPoolInner;
29use super::thread_pool_stats::ThreadPoolStats;
30use qubit_executor::service::{
31    ExecutorService,
32    RejectedExecution,
33    ShutdownReport,
34};
35
36/// OS thread pool implementing [`ExecutorService`].
37///
38/// `ThreadPool` accepts fallible tasks, stores them in an internal FIFO queue,
39/// and executes them on worker threads. Workers are created lazily up to the
40/// configured core size, queued after that, and may grow up to the maximum size
41/// when a bounded queue is full. Submitted tasks return [`TaskHandle`], which
42/// supports both blocking [`TaskHandle::get`] and async `.await` result
43/// retrieval.
44///
45/// `shutdown` is graceful: already accepted queued tasks are allowed to run.
46/// `shutdown_now` is abrupt: queued tasks that have not started are completed
47/// with [`TaskExecutionError::Cancelled`](qubit_executor::TaskExecutionError::Cancelled).
48///
49pub struct ThreadPool {
50    /// Shared pool state and worker coordination primitives.
51    inner: Arc<ThreadPoolInner>,
52}
53
54impl ThreadPool {
55    pub(super) fn from_inner(inner: Arc<ThreadPoolInner>) -> Self {
56        Self { inner }
57    }
58
59    /// Creates a thread pool with equal core and maximum pool sizes.
60    ///
61    /// # Parameters
62    ///
63    /// * `pool_size` - Value applied as both core and maximum pool size.
64    ///
65    /// # Returns
66    ///
67    /// `Ok(ThreadPool)` if all workers are spawned successfully.
68    ///
69    /// # Errors
70    ///
71    /// Returns [`ThreadPoolBuildError`] if the resulting maximum pool size is
72    /// zero or a worker thread cannot be spawned.
73    #[inline]
74    pub fn new(pool_size: usize) -> Result<Self, ThreadPoolBuildError> {
75        Self::builder().pool_size(pool_size).build()
76    }
77
78    /// Creates a builder for configuring a thread pool.
79    ///
80    /// # Returns
81    ///
82    /// A builder with default core and maximum pool sizes and an unbounded queue.
83    #[inline]
84    pub fn builder() -> ThreadPoolBuilder {
85        ThreadPoolBuilder::default()
86    }
87
88    /// Returns the number of queued tasks waiting for a worker.
89    ///
90    /// # Returns
91    ///
92    /// The number of accepted tasks that have not started yet.
93    #[inline]
94    pub fn queued_count(&self) -> usize {
95        self.inner.read_state(|state| state.queued_tasks)
96    }
97
98    /// Returns the number of tasks currently held by workers.
99    ///
100    /// # Returns
101    ///
102    /// The number of tasks that workers have taken from the queue and have not
103    /// yet finished processing.
104    #[inline]
105    pub fn running_count(&self) -> usize {
106        self.inner.read_state(|state| state.running_tasks)
107    }
108
109    /// Returns how many worker threads are still running in this pool.
110    ///
111    /// # Returns
112    ///
113    /// The number of live worker loops still owned by this pool. This is a
114    /// runtime count and is not required to match configured
115    /// [`Self::core_pool_size`] or [`Self::maximum_pool_size`].
116    #[inline]
117    pub fn live_worker_count(&self) -> usize {
118        self.inner.read_state(|state| state.live_workers)
119    }
120
121    /// Returns the configured core pool size.
122    ///
123    /// # Returns
124    ///
125    /// The number of workers kept for normal load before tasks are queued.
126    #[inline]
127    pub fn core_pool_size(&self) -> usize {
128        self.inner.read_state(|state| state.core_pool_size)
129    }
130
131    /// Returns the configured maximum pool size.
132    ///
133    /// # Returns
134    ///
135    /// The maximum number of worker threads this pool may create.
136    #[inline]
137    pub fn maximum_pool_size(&self) -> usize {
138        self.inner.read_state(|state| state.maximum_pool_size)
139    }
140
141    /// Returns a point-in-time snapshot of pool counters.
142    ///
143    /// # Returns
144    ///
145    /// A snapshot containing worker, queue, and task counters observed under
146    /// the pool state lock.
147    #[inline]
148    pub fn stats(&self) -> ThreadPoolStats {
149        self.inner.stats()
150    }
151
152    /// Starts one core worker if the pool has fewer live workers than its
153    /// configured core size.
154    ///
155    /// # Returns
156    ///
157    /// `Ok(true)` if a worker was started, or `Ok(false)` if no core worker
158    /// was needed.
159    ///
160    /// # Errors
161    ///
162    /// Returns [`RejectedExecution::Shutdown`] if the pool is shut down, or
163    /// [`RejectedExecution::WorkerSpawnFailed`] if worker creation fails.
164    #[inline]
165    pub fn prestart_core_thread(&self) -> Result<bool, RejectedExecution> {
166        self.inner.prestart_core_thread()
167    }
168
169    /// Starts all missing core workers.
170    ///
171    /// # Returns
172    ///
173    /// The number of workers started.
174    ///
175    /// # Errors
176    ///
177    /// Returns [`RejectedExecution::Shutdown`] if the pool is shut down, or
178    /// [`RejectedExecution::WorkerSpawnFailed`] if worker creation fails.
179    #[inline]
180    pub fn prestart_all_core_threads(&self) -> Result<usize, RejectedExecution> {
181        self.inner.prestart_all_core_threads()
182    }
183
184    /// Updates the core pool size.
185    ///
186    /// Increasing the core size does not eagerly create new workers unless
187    /// queued work is waiting. Call [`Self::prestart_all_core_threads`] when
188    /// eager creation is desired. Decreasing the core size lets excess idle
189    /// workers retire according to the keep-alive policy.
190    ///
191    /// # Parameters
192    ///
193    /// * `core_pool_size` - New core pool size.
194    ///
195    /// # Returns
196    ///
197    /// `Ok(())` if the size is accepted.
198    ///
199    /// # Errors
200    ///
201    /// Returns [`ThreadPoolBuildError::CorePoolSizeExceedsMaximum`] when the
202    /// new core size would exceed the current maximum size.
203    pub fn set_core_pool_size(&self, core_pool_size: usize) -> Result<(), ThreadPoolBuildError> {
204        self.inner.set_core_pool_size(core_pool_size)
205    }
206
207    /// Updates the maximum pool size.
208    ///
209    /// Excess workers are not interrupted. They retire after finishing current
210    /// work or timing out while idle.
211    ///
212    /// # Parameters
213    ///
214    /// * `maximum_pool_size` - New maximum pool size.
215    ///
216    /// # Returns
217    ///
218    /// `Ok(())` if the size is accepted.
219    ///
220    /// # Errors
221    ///
222    /// Returns [`ThreadPoolBuildError::ZeroMaximumPoolSize`] when the maximum
223    /// size is zero, or [`ThreadPoolBuildError::CorePoolSizeExceedsMaximum`]
224    /// when it would be smaller than the current core size.
225    pub fn set_maximum_pool_size(
226        &self,
227        maximum_pool_size: usize,
228    ) -> Result<(), ThreadPoolBuildError> {
229        self.inner.set_maximum_pool_size(maximum_pool_size)
230    }
231
232    /// Updates how long excess idle workers may wait before exiting.
233    ///
234    /// # Parameters
235    ///
236    /// * `keep_alive` - New idle timeout for workers above the core size.
237    ///
238    /// # Returns
239    ///
240    /// `Ok(())` if the timeout is accepted.
241    ///
242    /// # Errors
243    ///
244    /// Returns [`ThreadPoolBuildError::ZeroKeepAlive`] when `keep_alive` is
245    /// zero.
246    pub fn set_keep_alive(&self, keep_alive: Duration) -> Result<(), ThreadPoolBuildError> {
247        self.inner.set_keep_alive(keep_alive)
248    }
249
250    /// Updates whether core workers may also retire after keep-alive timeout.
251    ///
252    /// # Parameters
253    ///
254    /// * `allow` - Whether core workers are subject to idle timeout.
255    pub fn allow_core_thread_timeout(&self, allow: bool) {
256        self.inner.allow_core_thread_timeout(allow);
257    }
258
259    /// Submits an already type-erased pool job.
260    ///
261    /// This low-level hook is intended for higher-level service crates that
262    /// need to attach their own lifecycle callbacks while still using this
263    /// pool's queueing, cancellation, and shutdown behavior.
264    ///
265    /// # Parameters
266    ///
267    /// * `job` - Type-erased job containing run and cancellation callbacks.
268    ///
269    /// # Returns
270    ///
271    /// `Ok(())` when the job is accepted.
272    ///
273    /// # Errors
274    ///
275    /// Returns [`RejectedExecution::Shutdown`] after shutdown, returns
276    /// [`RejectedExecution::Saturated`] when a bounded pool cannot accept more
277    /// work, or returns [`RejectedExecution::WorkerSpawnFailed`] when the pool
278    /// fails to create a required worker.
279    pub fn submit_job(&self, job: PoolJob) -> Result<(), RejectedExecution> {
280        self.inner.submit(job)
281    }
282}
283
284impl Drop for ThreadPool {
285    /// Requests graceful shutdown when the pool value is dropped.
286    fn drop(&mut self) {
287        self.inner.shutdown();
288    }
289}
290
291impl ExecutorService for ThreadPool {
292    type Handle<R, E>
293        = TaskHandle<R, E>
294    where
295        R: Send + 'static,
296        E: Send + 'static;
297
298    type Termination<'a>
299        = Pin<Box<dyn Future<Output = ()> + Send + 'a>>
300    where
301        Self: 'a;
302
303    /// Accepts a callable and queues it for pool workers.
304    ///
305    /// # Parameters
306    ///
307    /// * `task` - Callable to execute on a pool worker.
308    ///
309    /// # Returns
310    ///
311    /// A [`TaskHandle`] for the accepted task.
312    ///
313    /// # Errors
314    ///
315    /// Returns [`RejectedExecution::Shutdown`] after shutdown, returns
316    /// [`RejectedExecution::Saturated`] when the bounded pool cannot accept
317    /// more work, or returns [`RejectedExecution::WorkerSpawnFailed`] when a
318    /// required worker cannot be created.
319    fn submit_callable<C, R, E>(&self, task: C) -> Result<Self::Handle<R, E>, RejectedExecution>
320    where
321        C: Callable<R, E> + Send + 'static,
322        R: Send + 'static,
323        E: Send + 'static,
324    {
325        let (handle, completion) = TaskCompletionPair::new().into_parts();
326        let completion_for_run = completion.clone();
327        let job = PoolJob::new(
328            Box::new(move || {
329                TaskRunner::new(task).run(completion_for_run);
330            }),
331            Box::new(move || {
332                completion.cancel();
333            }),
334        );
335        self.inner.submit(job)?;
336        Ok(handle)
337    }
338
339    /// Stops accepting new tasks after already queued work is drained.
340    ///
341    /// Queued and running tasks remain eligible to complete normally.
342    #[inline]
343    fn shutdown(&self) {
344        self.inner.shutdown();
345    }
346
347    /// Stops accepting tasks and cancels queued tasks that have not started.
348    ///
349    /// # Returns
350    ///
351    /// A report containing the number of queued jobs cancelled and the number
352    /// of jobs running at the time of the request.
353    #[inline]
354    fn shutdown_now(&self) -> ShutdownReport {
355        self.inner.shutdown_now()
356    }
357
358    /// Returns whether shutdown has been requested.
359    #[inline]
360    fn is_shutdown(&self) -> bool {
361        self.inner.is_shutdown()
362    }
363
364    /// Returns whether shutdown was requested and all workers have exited.
365    #[inline]
366    fn is_terminated(&self) -> bool {
367        self.inner.is_terminated()
368    }
369
370    /// Waits until the pool has terminated.
371    ///
372    /// This future blocks the polling thread while waiting on a condition
373    /// variable.
374    ///
375    /// # Returns
376    ///
377    /// A future that resolves when shutdown has been requested, the queue is
378    /// empty, no task is running, and all worker loops have exited.
379    fn await_termination(&self) -> Self::Termination<'_> {
380        Box::pin(async move {
381            self.inner.wait_for_termination();
382        })
383    }
384}