Skip to main content

qubit_thread_pool/dynamic/
thread_pool.rs

1/*******************************************************************************
2 *
3 *    Copyright (c) 2025 - 2026 Haixing Hu.
4 *
5 *    SPDX-License-Identifier: Apache-2.0
6 *
7 *    Licensed under the Apache License, Version 2.0.
8 *
9 ******************************************************************************/
10use std::{
11    sync::Arc,
12    time::Duration,
13};
14
15use qubit_function::{
16    Callable,
17    Runnable,
18};
19
20use qubit_executor::task::spi::TaskEndpointPair;
21use qubit_executor::{
22    TaskHandle,
23    TrackedTask,
24};
25
26use super::thread_pool_builder::ThreadPoolBuilder;
27use super::thread_pool_inner::ThreadPoolInner;
28use crate::{
29    ExecutorServiceBuilderError,
30    PoolJob,
31    ThreadPoolStats,
32};
33use qubit_executor::service::{
34    ExecutorService,
35    ExecutorServiceLifecycle,
36    StopReport,
37    SubmissionError,
38};
39
40/// OS thread pool implementing [`ExecutorService`].
41///
42/// `ThreadPool` accepts fallible tasks, stores them in an internal FIFO queue,
43/// and executes them on worker threads. Workers are created lazily up to the
44/// configured core size, queued after that, and may grow up to the maximum size
45/// when a bounded queue is full. Callable submissions return [`TaskHandle`],
46/// while tracked submissions return [`TrackedTask`].
47/// retrieval.
48///
49/// `shutdown` is graceful: already accepted queued tasks are allowed to run.
50/// `stop` is abrupt: queued tasks that have not started are completed
51/// with [`TaskExecutionError::Cancelled`](qubit_executor::TaskExecutionError::Cancelled).
52///
53pub struct ThreadPool {
54    /// Shared pool state and worker coordination primitives.
55    inner: Arc<ThreadPoolInner>,
56}
57
58impl ThreadPool {
59    pub(super) fn from_inner(inner: Arc<ThreadPoolInner>) -> Self {
60        Self { inner }
61    }
62
63    /// Creates a thread pool with equal core and maximum pool sizes.
64    ///
65    /// # Parameters
66    ///
67    /// * `pool_size` - Value applied as both core and maximum pool size.
68    ///
69    /// # Returns
70    ///
71    /// `Ok(ThreadPool)` if all workers are spawned successfully.
72    ///
73    /// # Errors
74    ///
75    /// Returns [`ExecutorServiceBuilderError`] if the resulting maximum pool size is
76    /// zero or a worker thread cannot be spawned.
77    #[inline]
78    pub fn new(pool_size: usize) -> Result<Self, ExecutorServiceBuilderError> {
79        Self::builder().pool_size(pool_size).build()
80    }
81
82    /// Creates a builder for configuring a thread pool.
83    ///
84    /// # Returns
85    ///
86    /// A builder with default core and maximum pool sizes and an unbounded queue.
87    #[inline]
88    pub fn builder() -> ThreadPoolBuilder {
89        ThreadPoolBuilder::default()
90    }
91
92    /// Returns the number of queued tasks waiting for a worker.
93    ///
94    /// # Returns
95    ///
96    /// The number of accepted tasks that have not started yet.
97    #[inline]
98    pub fn queued_count(&self) -> usize {
99        self.inner.read_state(|state| state.queued_tasks)
100    }
101
102    /// Returns the number of tasks currently held by workers.
103    ///
104    /// # Returns
105    ///
106    /// The number of tasks that workers have taken from the queue and have not
107    /// yet finished processing.
108    #[inline]
109    pub fn running_count(&self) -> usize {
110        self.inner.read_state(|state| state.running_tasks)
111    }
112
113    /// Returns how many worker threads are still running in this pool.
114    ///
115    /// # Returns
116    ///
117    /// The number of live worker loops still owned by this pool. This is a
118    /// runtime count and is not required to match configured
119    /// [`Self::core_pool_size`] or [`Self::maximum_pool_size`].
120    #[inline]
121    pub fn live_worker_count(&self) -> usize {
122        self.inner.read_state(|state| state.live_workers)
123    }
124
125    /// Returns the configured core pool size.
126    ///
127    /// # Returns
128    ///
129    /// The number of workers kept for normal load before tasks are queued.
130    #[inline]
131    pub fn core_pool_size(&self) -> usize {
132        self.inner.read_state(|state| state.core_pool_size)
133    }
134
135    /// Returns the configured maximum pool size.
136    ///
137    /// # Returns
138    ///
139    /// The maximum number of worker threads this pool may create.
140    #[inline]
141    pub fn maximum_pool_size(&self) -> usize {
142        self.inner.read_state(|state| state.maximum_pool_size)
143    }
144
145    /// Returns a point-in-time snapshot of pool counters.
146    ///
147    /// # Returns
148    ///
149    /// A snapshot containing worker, queue, and task counters observed under
150    /// the pool state lock.
151    #[inline]
152    pub fn stats(&self) -> ThreadPoolStats {
153        self.inner.stats()
154    }
155
156    /// Submits a custom pool job.
157    ///
158    /// This low-level extension point is intended for higher-level services
159    /// that need to pair pool execution with their own task registry or
160    /// cancellation bookkeeping. Prefer the [`ExecutorService`] submission
161    /// methods for ordinary runnable and callable work.
162    ///
163    /// # Parameters
164    ///
165    /// * `job` - Custom job to execute or cancel later.
166    ///
167    /// # Returns
168    ///
169    /// `Ok(())` when the pool accepts the job.
170    ///
171    /// # Errors
172    ///
173    /// Returns [`SubmissionError::Shutdown`] after shutdown, returns
174    /// [`SubmissionError::Saturated`] when the bounded pool cannot accept
175    /// more work, or returns [`SubmissionError::WorkerSpawnFailed`] when a
176    /// required worker cannot be created.
177    #[inline]
178    pub fn submit_job(&self, job: PoolJob) -> Result<(), SubmissionError> {
179        self.inner.submit(job)
180    }
181
182    /// Blocks until all accepted work has completed.
183    ///
184    /// This is a join-style wait for quiescence: it does not request shutdown
185    /// and does not wait for worker threads to exit. Concurrent submissions may
186    /// extend the wait until those accepted jobs also drain.
187    #[inline]
188    pub fn join(&self) {
189        self.inner.wait_until_idle();
190    }
191
192    /// Starts one core worker if the pool has fewer live workers than its
193    /// configured core size.
194    ///
195    /// # Returns
196    ///
197    /// `Ok(true)` if a worker was started, or `Ok(false)` if no core worker
198    /// was needed.
199    ///
200    /// # Errors
201    ///
202    /// Returns [`SubmissionError::Shutdown`] if the pool is shut down, or
203    /// [`SubmissionError::WorkerSpawnFailed`] if worker creation fails.
204    #[inline]
205    pub fn prestart_core_thread(&self) -> Result<bool, SubmissionError> {
206        self.inner.prestart_core_thread()
207    }
208
209    /// Starts all missing core workers.
210    ///
211    /// # Returns
212    ///
213    /// The number of workers started.
214    ///
215    /// # Errors
216    ///
217    /// Returns [`SubmissionError::Shutdown`] if the pool is shut down, or
218    /// [`SubmissionError::WorkerSpawnFailed`] if worker creation fails.
219    #[inline]
220    pub fn prestart_all_core_threads(&self) -> Result<usize, SubmissionError> {
221        self.inner.prestart_all_core_threads()
222    }
223
224    /// Updates the core pool size.
225    ///
226    /// Increasing the core size does not eagerly create new workers unless
227    /// queued work is waiting. Call [`Self::prestart_all_core_threads`] when
228    /// eager creation is desired. Decreasing the core size lets excess idle
229    /// workers retire according to the keep-alive policy.
230    ///
231    /// # Parameters
232    ///
233    /// * `core_pool_size` - New core pool size.
234    ///
235    /// # Returns
236    ///
237    /// `Ok(())` if the size is accepted.
238    ///
239    /// # Errors
240    ///
241    /// Returns [`ExecutorServiceBuilderError::CorePoolSizeExceedsMaximum`] when the
242    /// new core size would exceed the current maximum size.
243    pub fn set_core_pool_size(
244        &self,
245        core_pool_size: usize,
246    ) -> Result<(), ExecutorServiceBuilderError> {
247        self.inner.set_core_pool_size(core_pool_size)
248    }
249
250    /// Updates the maximum pool size.
251    ///
252    /// Excess workers are not interrupted. They retire after finishing current
253    /// work or timing out while idle.
254    ///
255    /// # Parameters
256    ///
257    /// * `maximum_pool_size` - New maximum pool size.
258    ///
259    /// # Returns
260    ///
261    /// `Ok(())` if the size is accepted.
262    ///
263    /// # Errors
264    ///
265    /// Returns [`ExecutorServiceBuilderError::ZeroMaximumPoolSize`] when the maximum
266    /// size is zero, or [`ExecutorServiceBuilderError::CorePoolSizeExceedsMaximum`]
267    /// when it would be smaller than the current core size.
268    pub fn set_maximum_pool_size(
269        &self,
270        maximum_pool_size: usize,
271    ) -> Result<(), ExecutorServiceBuilderError> {
272        self.inner.set_maximum_pool_size(maximum_pool_size)
273    }
274
275    /// Updates how long excess idle workers may wait before exiting.
276    ///
277    /// # Parameters
278    ///
279    /// * `keep_alive` - New idle timeout for workers above the core size.
280    ///
281    /// # Returns
282    ///
283    /// `Ok(())` if the timeout is accepted.
284    ///
285    /// # Errors
286    ///
287    /// Returns [`ExecutorServiceBuilderError::ZeroKeepAlive`] when `keep_alive` is
288    /// zero.
289    pub fn set_keep_alive(&self, keep_alive: Duration) -> Result<(), ExecutorServiceBuilderError> {
290        self.inner.set_keep_alive(keep_alive)
291    }
292
293    /// Updates whether core workers may also retire after keep-alive timeout.
294    ///
295    /// # Parameters
296    ///
297    /// * `allow` - Whether core workers are subject to idle timeout.
298    pub fn allow_core_thread_timeout(&self, allow: bool) {
299        self.inner.allow_core_thread_timeout(allow);
300    }
301}
302
303impl Drop for ThreadPool {
304    /// Requests graceful shutdown when the pool value is dropped.
305    fn drop(&mut self) {
306        self.inner.shutdown();
307    }
308}
309
310impl ExecutorService for ThreadPool {
311    type ResultHandle<R, E>
312        = TaskHandle<R, E>
313    where
314        R: Send + 'static,
315        E: Send + 'static;
316
317    type TrackedHandle<R, E>
318        = TrackedTask<R, E>
319    where
320        R: Send + 'static,
321        E: Send + 'static;
322
323    /// Accepts a runnable and queues it for pool workers.
324    fn submit<T, E>(&self, task: T) -> Result<(), SubmissionError>
325    where
326        T: Runnable<E> + Send + 'static,
327        E: Send + 'static,
328    {
329        self.inner.submit(PoolJob::detached(task))
330    }
331
332    /// Accepts a callable and queues it for pool workers.
333    ///
334    /// # Parameters
335    ///
336    /// * `task` - Callable to execute on a pool worker.
337    ///
338    /// # Returns
339    ///
340    /// A [`TaskHandle`] for the accepted task.
341    ///
342    /// # Errors
343    ///
344    /// Returns [`SubmissionError::Shutdown`] after shutdown, returns
345    /// [`SubmissionError::Saturated`] when the bounded pool cannot accept
346    /// more work, or returns [`SubmissionError::WorkerSpawnFailed`] when a
347    /// required worker cannot be created.
348    fn submit_callable<C, R, E>(&self, task: C) -> Result<Self::ResultHandle<R, E>, SubmissionError>
349    where
350        C: Callable<R, E> + Send + 'static,
351        R: Send + 'static,
352        E: Send + 'static,
353    {
354        let (handle, completion) = TaskEndpointPair::new().into_parts();
355        let job = PoolJob::from_task(task, completion);
356        self.inner.submit(job)?;
357        Ok(handle)
358    }
359
360    /// Accepts a callable and queues it with a tracked handle.
361    fn submit_tracked_callable<C, R, E>(
362        &self,
363        task: C,
364    ) -> Result<Self::TrackedHandle<R, E>, SubmissionError>
365    where
366        C: Callable<R, E> + Send + 'static,
367        R: Send + 'static,
368        E: Send + 'static,
369    {
370        let (handle, completion) = TaskEndpointPair::new().into_tracked_parts();
371        let job = PoolJob::from_task(task, completion);
372        self.inner.submit(job)?;
373        Ok(handle)
374    }
375
376    /// Stops accepting new tasks after already queued work is drained.
377    ///
378    /// Queued and running tasks remain eligible to complete normally.
379    #[inline]
380    fn shutdown(&self) {
381        self.inner.shutdown();
382    }
383
384    /// Stops accepting tasks and cancels queued tasks that have not started.
385    ///
386    /// # Returns
387    ///
388    /// A report containing the number of queued jobs cancelled and the number
389    /// of jobs running at the time of the request.
390    #[inline]
391    fn stop(&self) -> StopReport {
392        self.inner.stop()
393    }
394
395    /// Returns the current lifecycle state.
396    #[inline]
397    fn lifecycle(&self) -> ExecutorServiceLifecycle {
398        self.inner.lifecycle()
399    }
400
401    /// Returns whether shutdown has been requested.
402    #[inline]
403    fn is_not_running(&self) -> bool {
404        self.inner.is_not_running()
405    }
406
407    /// Returns whether shutdown was requested and all workers have exited.
408    #[inline]
409    fn is_terminated(&self) -> bool {
410        self.inner.is_terminated()
411    }
412
413    /// Blocks until the pool has terminated.
414    fn wait_termination(&self) {
415        self.inner.wait_for_termination();
416    }
417}