Skip to main content

qubit_thread_pool/dynamic/
thread_pool.rs

1/*******************************************************************************
2 *
3 *    Copyright (c) 2025 - 2026 Haixing Hu.
4 *
5 *    SPDX-License-Identifier: Apache-2.0
6 *
7 *    Licensed under the Apache License, Version 2.0.
8 *
9 ******************************************************************************/
10use std::{
11    sync::Arc,
12    time::Duration,
13};
14
15use qubit_function::{
16    Callable,
17    Runnable,
18};
19
20use qubit_executor::task::spi::TaskEndpointPair;
21use qubit_executor::{
22    TaskHandle,
23    TrackedTask,
24};
25
26use super::thread_pool_builder::ThreadPoolBuilder;
27use super::thread_pool_inner::ThreadPoolInner;
28use crate::{
29    ExecutorServiceBuilderError,
30    PoolJob,
31    ThreadPoolStats,
32};
33use qubit_executor::service::{
34    ExecutorService,
35    ExecutorServiceLifecycle,
36    StopReport,
37    SubmissionError,
38};
39
40/// OS thread pool implementing [`ExecutorService`].
41///
42/// `ThreadPool` accepts fallible tasks, stores accepted waiting tasks in
43/// internal queues, and executes them on worker threads. The global waiting
44/// queue is FIFO, but task start and completion order are not a strict FIFO API
45/// guarantee. Workers are created lazily up to the configured core size, tasks
46/// are queued after that, and the pool may grow up to the maximum size when a
47/// bounded queue is full. Callable submissions return [`TaskHandle`], while
48/// tracked submissions return [`TrackedTask`].
49///
50/// `shutdown` is graceful: already accepted queued tasks are allowed to run.
51/// `stop` is abrupt: queued tasks that have not started are completed
52/// with [`TaskExecutionError::Cancelled`](qubit_executor::TaskExecutionError::Cancelled).
53///
54pub struct ThreadPool {
55    /// Shared pool state and worker coordination primitives.
56    inner: Arc<ThreadPoolInner>,
57}
58
59impl ThreadPool {
60    pub(super) fn from_inner(inner: Arc<ThreadPoolInner>) -> Self {
61        Self { inner }
62    }
63
64    /// Creates a thread pool with equal core and maximum pool sizes.
65    ///
66    /// # Parameters
67    ///
68    /// * `pool_size` - Value applied as both core and maximum pool size.
69    ///
70    /// # Returns
71    ///
72    /// `Ok(ThreadPool)` if all workers are spawned successfully.
73    ///
74    /// # Errors
75    ///
76    /// Returns [`ExecutorServiceBuilderError`] if the resulting maximum pool size is
77    /// zero or a worker thread cannot be spawned.
78    #[inline]
79    pub fn new(pool_size: usize) -> Result<Self, ExecutorServiceBuilderError> {
80        Self::builder().pool_size(pool_size).build()
81    }
82
83    /// Creates a builder for configuring a thread pool.
84    ///
85    /// # Returns
86    ///
87    /// A builder with default core and maximum pool sizes and an unbounded queue.
88    #[inline]
89    pub fn builder() -> ThreadPoolBuilder {
90        ThreadPoolBuilder::default()
91    }
92
93    /// Returns the number of queued tasks waiting for a worker.
94    ///
95    /// # Returns
96    ///
97    /// The number of accepted tasks that have not started yet.
98    #[inline]
99    pub fn queued_count(&self) -> usize {
100        self.inner.queued_count()
101    }
102
103    /// Returns the number of tasks currently held by workers.
104    ///
105    /// # Returns
106    ///
107    /// The number of tasks that workers have taken from the queue and have not
108    /// yet finished processing.
109    #[inline]
110    pub fn running_count(&self) -> usize {
111        self.inner.running_count()
112    }
113
114    /// Returns how many worker threads are still running in this pool.
115    ///
116    /// # Returns
117    ///
118    /// The number of live worker loops still owned by this pool. This is a
119    /// runtime count and is not required to match configured
120    /// [`Self::core_pool_size`] or [`Self::maximum_pool_size`].
121    #[inline]
122    pub fn live_worker_count(&self) -> usize {
123        self.inner.read_state(|state| state.live_workers)
124    }
125
126    /// Returns the configured core pool size.
127    ///
128    /// # Returns
129    ///
130    /// The number of workers kept for normal load before tasks are queued.
131    #[inline]
132    pub fn core_pool_size(&self) -> usize {
133        self.inner.read_state(|state| state.core_pool_size)
134    }
135
136    /// Returns the configured maximum pool size.
137    ///
138    /// # Returns
139    ///
140    /// The maximum number of worker threads this pool may create.
141    #[inline]
142    pub fn maximum_pool_size(&self) -> usize {
143        self.inner.read_state(|state| state.maximum_pool_size)
144    }
145
146    /// Returns a point-in-time snapshot of pool counters.
147    ///
148    /// # Returns
149    ///
150    /// A snapshot containing worker, queue, and task counters observed under
151    /// the pool state lock.
152    #[inline]
153    pub fn stats(&self) -> ThreadPoolStats {
154        self.inner.stats()
155    }
156
157    /// Submits a custom pool job.
158    ///
159    /// This low-level extension point is intended for higher-level services
160    /// that need to pair pool execution with their own task registry or
161    /// cancellation bookkeeping. Prefer the [`ExecutorService`] submission
162    /// methods for ordinary runnable and callable work. Custom job callbacks
163    /// run synchronously on the thread that reaches the corresponding lifecycle
164    /// event and should stay short and non-blocking. Callback panics are
165    /// contained; if an acceptance callback panics, the job is not queued,
166    /// run, or cancelled.
167    ///
168    /// # Parameters
169    ///
170    /// * `job` - Custom job to execute or cancel later.
171    ///
172    /// # Returns
173    ///
174    /// `Ok(())` when the pool accepts the job.
175    ///
176    /// # Errors
177    ///
178    /// Returns [`SubmissionError::Shutdown`] after shutdown, returns
179    /// [`SubmissionError::Saturated`] when the bounded pool cannot accept
180    /// more work, or returns [`SubmissionError::WorkerSpawnFailed`] when a
181    /// required worker cannot be created.
182    #[inline]
183    pub fn submit_job(&self, job: PoolJob) -> Result<(), SubmissionError> {
184        self.inner.submit(job)
185    }
186
187    /// Blocks until all accepted work has completed.
188    ///
189    /// This is a join-style wait for quiescence: it does not request shutdown
190    /// and does not wait for worker threads to exit. Concurrent submissions may
191    /// extend the wait until those accepted jobs also drain.
192    #[inline]
193    pub fn join(&self) {
194        self.inner.wait_until_idle();
195    }
196
197    /// Starts one core worker if the pool has fewer live workers than its
198    /// configured core size.
199    ///
200    /// # Returns
201    ///
202    /// `Ok(true)` if a worker was started, or `Ok(false)` if no core worker
203    /// was needed.
204    ///
205    /// # Errors
206    ///
207    /// Returns [`SubmissionError::Shutdown`] if the pool is shut down, or
208    /// [`SubmissionError::WorkerSpawnFailed`] if worker creation fails.
209    #[inline]
210    pub fn prestart_core_thread(&self) -> Result<bool, SubmissionError> {
211        self.inner.prestart_core_thread()
212    }
213
214    /// Starts all missing core workers.
215    ///
216    /// # Returns
217    ///
218    /// The number of workers started.
219    ///
220    /// # Errors
221    ///
222    /// Returns [`SubmissionError::Shutdown`] if the pool is shut down, or
223    /// [`SubmissionError::WorkerSpawnFailed`] if worker creation fails.
224    #[inline]
225    pub fn prestart_all_core_threads(&self) -> Result<usize, SubmissionError> {
226        self.inner.prestart_all_core_threads()
227    }
228
229    /// Updates the core pool size.
230    ///
231    /// Increasing the core size changes future admission and prestart limits,
232    /// but it does not eagerly create workers or reschedule already queued work.
233    /// Call [`Self::prestart_all_core_threads`] when eager creation is desired.
234    /// Decreasing the core size lets excess idle workers retire according to
235    /// the keep-alive policy.
236    ///
237    /// # Parameters
238    ///
239    /// * `core_pool_size` - New core pool size.
240    ///
241    /// # Returns
242    ///
243    /// `Ok(())` if the size is accepted.
244    ///
245    /// # Errors
246    ///
247    /// Returns [`ExecutorServiceBuilderError::CorePoolSizeExceedsMaximum`] when the
248    /// new core size would exceed the current maximum size.
249    pub fn set_core_pool_size(
250        &self,
251        core_pool_size: usize,
252    ) -> Result<(), ExecutorServiceBuilderError> {
253        self.inner.set_core_pool_size(core_pool_size)
254    }
255
256    /// Updates the maximum pool size.
257    ///
258    /// Excess workers are not interrupted. They retire after finishing current
259    /// work or timing out while idle.
260    ///
261    /// # Parameters
262    ///
263    /// * `maximum_pool_size` - New maximum pool size.
264    ///
265    /// # Returns
266    ///
267    /// `Ok(())` if the size is accepted.
268    ///
269    /// # Errors
270    ///
271    /// Returns [`ExecutorServiceBuilderError::ZeroMaximumPoolSize`] when the maximum
272    /// size is zero, or [`ExecutorServiceBuilderError::CorePoolSizeExceedsMaximum`]
273    /// when it would be smaller than the current core size.
274    pub fn set_maximum_pool_size(
275        &self,
276        maximum_pool_size: usize,
277    ) -> Result<(), ExecutorServiceBuilderError> {
278        self.inner.set_maximum_pool_size(maximum_pool_size)
279    }
280
281    /// Updates how long excess idle workers may wait before exiting.
282    ///
283    /// # Parameters
284    ///
285    /// * `keep_alive` - New idle timeout for workers above the core size.
286    ///
287    /// # Returns
288    ///
289    /// `Ok(())` if the timeout is accepted.
290    ///
291    /// # Errors
292    ///
293    /// Returns [`ExecutorServiceBuilderError::ZeroKeepAlive`] when `keep_alive` is
294    /// zero.
295    pub fn set_keep_alive(&self, keep_alive: Duration) -> Result<(), ExecutorServiceBuilderError> {
296        self.inner.set_keep_alive(keep_alive)
297    }
298
299    /// Updates whether core workers may also retire after keep-alive timeout.
300    ///
301    /// # Parameters
302    ///
303    /// * `allow` - Whether core workers are subject to idle timeout.
304    pub fn allow_core_thread_timeout(&self, allow: bool) {
305        self.inner.allow_core_thread_timeout(allow);
306    }
307}
308
309impl Drop for ThreadPool {
310    /// Requests graceful shutdown when the pool value is dropped.
311    fn drop(&mut self) {
312        self.inner.shutdown();
313    }
314}
315
316impl ExecutorService for ThreadPool {
317    type ResultHandle<R, E>
318        = TaskHandle<R, E>
319    where
320        R: Send + 'static,
321        E: Send + 'static;
322
323    type TrackedHandle<R, E>
324        = TrackedTask<R, E>
325    where
326        R: Send + 'static,
327        E: Send + 'static;
328
329    /// Accepts a runnable and queues it for pool workers.
330    fn submit<T, E>(&self, task: T) -> Result<(), SubmissionError>
331    where
332        T: Runnable<E> + Send + 'static,
333        E: Send + 'static,
334    {
335        self.inner.submit(PoolJob::detached(task))
336    }
337
338    /// Accepts a callable and queues it for pool workers.
339    ///
340    /// # Parameters
341    ///
342    /// * `task` - Callable to execute on a pool worker.
343    ///
344    /// # Returns
345    ///
346    /// A [`TaskHandle`] for the accepted task.
347    ///
348    /// # Errors
349    ///
350    /// Returns [`SubmissionError::Shutdown`] after shutdown, returns
351    /// [`SubmissionError::Saturated`] when the bounded pool cannot accept
352    /// more work, or returns [`SubmissionError::WorkerSpawnFailed`] when a
353    /// required worker cannot be created.
354    fn submit_callable<C, R, E>(&self, task: C) -> Result<Self::ResultHandle<R, E>, SubmissionError>
355    where
356        C: Callable<R, E> + Send + 'static,
357        R: Send + 'static,
358        E: Send + 'static,
359    {
360        let (handle, completion) = TaskEndpointPair::new().into_parts();
361        let job = PoolJob::from_task(task, completion);
362        self.inner.submit(job)?;
363        Ok(handle)
364    }
365
366    /// Accepts a callable and queues it with a tracked handle.
367    fn submit_tracked_callable<C, R, E>(
368        &self,
369        task: C,
370    ) -> Result<Self::TrackedHandle<R, E>, SubmissionError>
371    where
372        C: Callable<R, E> + Send + 'static,
373        R: Send + 'static,
374        E: Send + 'static,
375    {
376        let (handle, completion) = TaskEndpointPair::new().into_tracked_parts();
377        let job = PoolJob::from_task(task, completion);
378        self.inner.submit(job)?;
379        Ok(handle)
380    }
381
382    /// Stops accepting new tasks after already queued work is drained.
383    ///
384    /// Queued and running tasks remain eligible to complete normally.
385    #[inline]
386    fn shutdown(&self) {
387        self.inner.shutdown();
388    }
389
390    /// Stops accepting tasks and cancels queued tasks that have not started.
391    ///
392    /// # Returns
393    ///
394    /// A report containing the number of queued jobs cancelled and the number
395    /// of jobs running at the time of the request.
396    #[inline]
397    fn stop(&self) -> StopReport {
398        self.inner.stop()
399    }
400
401    /// Returns the current lifecycle state.
402    #[inline]
403    fn lifecycle(&self) -> ExecutorServiceLifecycle {
404        self.inner.lifecycle()
405    }
406
407    /// Returns whether shutdown has been requested.
408    #[inline]
409    fn is_not_running(&self) -> bool {
410        self.inner.is_not_running()
411    }
412
413    /// Returns whether shutdown was requested and all workers have exited.
414    #[inline]
415    fn is_terminated(&self) -> bool {
416        self.inner.is_terminated()
417    }
418
419    /// Blocks until the pool has terminated.
420    fn wait_termination(&self) {
421        self.inner.wait_for_termination();
422    }
423}