qubit_thread_pool/thread_pool/thread_pool.rs
1/*******************************************************************************
2 *
3 * Copyright (c) 2025 - 2026 Haixing Hu.
4 *
5 * SPDX-License-Identifier: Apache-2.0
6 *
7 * Licensed under the Apache License, Version 2.0.
8 *
9 ******************************************************************************/
10use std::{
11 future::Future,
12 pin::Pin,
13 sync::Arc,
14 time::Duration,
15};
16
17use qubit_function::Callable;
18
19use qubit_executor::{
20 TaskCompletionPair,
21 TaskHandle,
22 TaskRunner,
23};
24
25use super::pool_job::PoolJob;
26use super::thread_pool_build_error::ThreadPoolBuildError;
27use super::thread_pool_builder::ThreadPoolBuilder;
28use super::thread_pool_inner::ThreadPoolInner;
29use super::thread_pool_stats::ThreadPoolStats;
30use qubit_executor::service::{
31 ExecutorService,
32 RejectedExecution,
33 ShutdownReport,
34};
35
36/// OS thread pool implementing [`ExecutorService`].
37///
38/// `ThreadPool` accepts fallible tasks, stores them in an internal FIFO queue,
39/// and executes them on worker threads. Workers are created lazily up to the
40/// configured core size, queued after that, and may grow up to the maximum size
41/// when a bounded queue is full. Submitted tasks return [`TaskHandle`], which
42/// supports both blocking [`TaskHandle::get`] and async `.await` result
43/// retrieval.
44///
45/// `shutdown` is graceful: already accepted queued tasks are allowed to run.
46/// `shutdown_now` is abrupt: queued tasks that have not started are completed
47/// with [`TaskExecutionError::Cancelled`](qubit_executor::TaskExecutionError::Cancelled).
48///
49pub struct ThreadPool {
50 /// Shared pool state and worker coordination primitives.
51 inner: Arc<ThreadPoolInner>,
52}
53
54impl ThreadPool {
55 pub(super) fn from_inner(inner: Arc<ThreadPoolInner>) -> Self {
56 Self { inner }
57 }
58
59 /// Creates a thread pool with equal core and maximum pool sizes.
60 ///
61 /// # Parameters
62 ///
63 /// * `pool_size` - Value applied as both core and maximum pool size.
64 ///
65 /// # Returns
66 ///
67 /// `Ok(ThreadPool)` if all workers are spawned successfully.
68 ///
69 /// # Errors
70 ///
71 /// Returns [`ThreadPoolBuildError`] if the resulting maximum pool size is
72 /// zero or a worker thread cannot be spawned.
73 #[inline]
74 pub fn new(pool_size: usize) -> Result<Self, ThreadPoolBuildError> {
75 Self::builder().pool_size(pool_size).build()
76 }
77
78 /// Creates a builder for configuring a thread pool.
79 ///
80 /// # Returns
81 ///
82 /// A builder with default core and maximum pool sizes and an unbounded queue.
83 #[inline]
84 pub fn builder() -> ThreadPoolBuilder {
85 ThreadPoolBuilder::default()
86 }
87
88 /// Returns the number of queued tasks waiting for a worker.
89 ///
90 /// # Returns
91 ///
92 /// The number of accepted tasks that have not started yet.
93 #[inline]
94 pub fn queued_count(&self) -> usize {
95 self.inner.read_state(|state| state.queued_tasks)
96 }
97
98 /// Returns the number of tasks currently held by workers.
99 ///
100 /// # Returns
101 ///
102 /// The number of tasks that workers have taken from the queue and have not
103 /// yet finished processing.
104 #[inline]
105 pub fn running_count(&self) -> usize {
106 self.inner.read_state(|state| state.running_tasks)
107 }
108
109 /// Returns how many worker threads are still running in this pool.
110 ///
111 /// # Returns
112 ///
113 /// The number of live worker loops still owned by this pool. This is a
114 /// runtime count and is not required to match configured
115 /// [`Self::core_pool_size`] or [`Self::maximum_pool_size`].
116 #[inline]
117 pub fn live_worker_count(&self) -> usize {
118 self.inner.read_state(|state| state.live_workers)
119 }
120
121 /// Returns the configured core pool size.
122 ///
123 /// # Returns
124 ///
125 /// The number of workers kept for normal load before tasks are queued.
126 #[inline]
127 pub fn core_pool_size(&self) -> usize {
128 self.inner.read_state(|state| state.core_pool_size)
129 }
130
131 /// Returns the configured maximum pool size.
132 ///
133 /// # Returns
134 ///
135 /// The maximum number of worker threads this pool may create.
136 #[inline]
137 pub fn maximum_pool_size(&self) -> usize {
138 self.inner.read_state(|state| state.maximum_pool_size)
139 }
140
141 /// Returns a point-in-time snapshot of pool counters.
142 ///
143 /// # Returns
144 ///
145 /// A snapshot containing worker, queue, and task counters observed under
146 /// the pool state lock.
147 #[inline]
148 pub fn stats(&self) -> ThreadPoolStats {
149 self.inner.stats()
150 }
151
152 /// Starts one core worker if the pool has fewer live workers than its
153 /// configured core size.
154 ///
155 /// # Returns
156 ///
157 /// `Ok(true)` if a worker was started, or `Ok(false)` if no core worker
158 /// was needed.
159 ///
160 /// # Errors
161 ///
162 /// Returns [`RejectedExecution::Shutdown`] if the pool is shut down, or
163 /// [`RejectedExecution::WorkerSpawnFailed`] if worker creation fails.
164 #[inline]
165 pub fn prestart_core_thread(&self) -> Result<bool, RejectedExecution> {
166 self.inner.prestart_core_thread()
167 }
168
169 /// Starts all missing core workers.
170 ///
171 /// # Returns
172 ///
173 /// The number of workers started.
174 ///
175 /// # Errors
176 ///
177 /// Returns [`RejectedExecution::Shutdown`] if the pool is shut down, or
178 /// [`RejectedExecution::WorkerSpawnFailed`] if worker creation fails.
179 #[inline]
180 pub fn prestart_all_core_threads(&self) -> Result<usize, RejectedExecution> {
181 self.inner.prestart_all_core_threads()
182 }
183
184 /// Updates the core pool size.
185 ///
186 /// Increasing the core size does not eagerly create new workers unless
187 /// queued work is waiting. Call [`Self::prestart_all_core_threads`] when
188 /// eager creation is desired. Decreasing the core size lets excess idle
189 /// workers retire according to the keep-alive policy.
190 ///
191 /// # Parameters
192 ///
193 /// * `core_pool_size` - New core pool size.
194 ///
195 /// # Returns
196 ///
197 /// `Ok(())` if the size is accepted.
198 ///
199 /// # Errors
200 ///
201 /// Returns [`ThreadPoolBuildError::CorePoolSizeExceedsMaximum`] when the
202 /// new core size would exceed the current maximum size.
203 pub fn set_core_pool_size(&self, core_pool_size: usize) -> Result<(), ThreadPoolBuildError> {
204 self.inner.set_core_pool_size(core_pool_size)
205 }
206
207 /// Updates the maximum pool size.
208 ///
209 /// Excess workers are not interrupted. They retire after finishing current
210 /// work or timing out while idle.
211 ///
212 /// # Parameters
213 ///
214 /// * `maximum_pool_size` - New maximum pool size.
215 ///
216 /// # Returns
217 ///
218 /// `Ok(())` if the size is accepted.
219 ///
220 /// # Errors
221 ///
222 /// Returns [`ThreadPoolBuildError::ZeroMaximumPoolSize`] when the maximum
223 /// size is zero, or [`ThreadPoolBuildError::CorePoolSizeExceedsMaximum`]
224 /// when it would be smaller than the current core size.
225 pub fn set_maximum_pool_size(
226 &self,
227 maximum_pool_size: usize,
228 ) -> Result<(), ThreadPoolBuildError> {
229 self.inner.set_maximum_pool_size(maximum_pool_size)
230 }
231
232 /// Updates how long excess idle workers may wait before exiting.
233 ///
234 /// # Parameters
235 ///
236 /// * `keep_alive` - New idle timeout for workers above the core size.
237 ///
238 /// # Returns
239 ///
240 /// `Ok(())` if the timeout is accepted.
241 ///
242 /// # Errors
243 ///
244 /// Returns [`ThreadPoolBuildError::ZeroKeepAlive`] when `keep_alive` is
245 /// zero.
246 pub fn set_keep_alive(&self, keep_alive: Duration) -> Result<(), ThreadPoolBuildError> {
247 self.inner.set_keep_alive(keep_alive)
248 }
249
250 /// Updates whether core workers may also retire after keep-alive timeout.
251 ///
252 /// # Parameters
253 ///
254 /// * `allow` - Whether core workers are subject to idle timeout.
255 pub fn allow_core_thread_timeout(&self, allow: bool) {
256 self.inner.allow_core_thread_timeout(allow);
257 }
258
259 /// Submits an already type-erased pool job.
260 ///
261 /// This low-level hook is intended for higher-level service crates that
262 /// need to attach their own lifecycle callbacks while still using this
263 /// pool's queueing, cancellation, and shutdown behavior.
264 ///
265 /// # Parameters
266 ///
267 /// * `job` - Type-erased job containing run and cancellation callbacks.
268 ///
269 /// # Returns
270 ///
271 /// `Ok(())` when the job is accepted.
272 ///
273 /// # Errors
274 ///
275 /// Returns [`RejectedExecution::Shutdown`] after shutdown, returns
276 /// [`RejectedExecution::Saturated`] when a bounded pool cannot accept more
277 /// work, or returns [`RejectedExecution::WorkerSpawnFailed`] when the pool
278 /// fails to create a required worker.
279 pub fn submit_job(&self, job: PoolJob) -> Result<(), RejectedExecution> {
280 self.inner.submit(job)
281 }
282}
283
284impl Drop for ThreadPool {
285 /// Requests graceful shutdown when the pool value is dropped.
286 fn drop(&mut self) {
287 self.inner.shutdown();
288 }
289}
290
291impl ExecutorService for ThreadPool {
292 type Handle<R, E>
293 = TaskHandle<R, E>
294 where
295 R: Send + 'static,
296 E: Send + 'static;
297
298 type Termination<'a>
299 = Pin<Box<dyn Future<Output = ()> + Send + 'a>>
300 where
301 Self: 'a;
302
303 /// Accepts a callable and queues it for pool workers.
304 ///
305 /// # Parameters
306 ///
307 /// * `task` - Callable to execute on a pool worker.
308 ///
309 /// # Returns
310 ///
311 /// A [`TaskHandle`] for the accepted task.
312 ///
313 /// # Errors
314 ///
315 /// Returns [`RejectedExecution::Shutdown`] after shutdown, returns
316 /// [`RejectedExecution::Saturated`] when the bounded pool cannot accept
317 /// more work, or returns [`RejectedExecution::WorkerSpawnFailed`] when a
318 /// required worker cannot be created.
319 fn submit_callable<C, R, E>(&self, task: C) -> Result<Self::Handle<R, E>, RejectedExecution>
320 where
321 C: Callable<R, E> + Send + 'static,
322 R: Send + 'static,
323 E: Send + 'static,
324 {
325 let (handle, completion) = TaskCompletionPair::new().into_parts();
326 let completion_for_run = completion.clone();
327 let job = PoolJob::new(
328 Box::new(move || {
329 TaskRunner::new(task).run(completion_for_run);
330 }),
331 Box::new(move || {
332 completion.cancel();
333 }),
334 );
335 self.inner.submit(job)?;
336 Ok(handle)
337 }
338
339 /// Stops accepting new tasks after already queued work is drained.
340 ///
341 /// Queued and running tasks remain eligible to complete normally.
342 #[inline]
343 fn shutdown(&self) {
344 self.inner.shutdown();
345 }
346
347 /// Stops accepting tasks and cancels queued tasks that have not started.
348 ///
349 /// # Returns
350 ///
351 /// A report containing the number of queued jobs cancelled and the number
352 /// of jobs running at the time of the request.
353 #[inline]
354 fn shutdown_now(&self) -> ShutdownReport {
355 self.inner.shutdown_now()
356 }
357
358 /// Returns whether shutdown has been requested.
359 #[inline]
360 fn is_shutdown(&self) -> bool {
361 self.inner.is_shutdown()
362 }
363
364 /// Returns whether shutdown was requested and all workers have exited.
365 #[inline]
366 fn is_terminated(&self) -> bool {
367 self.inner.is_terminated()
368 }
369
370 /// Waits until the pool has terminated.
371 ///
372 /// This future blocks the polling thread while waiting on a condition
373 /// variable.
374 ///
375 /// # Returns
376 ///
377 /// A future that resolves when shutdown has been requested, the queue is
378 /// empty, no task is running, and all worker loops have exited.
379 fn await_termination(&self) -> Self::Termination<'_> {
380 Box::pin(async move {
381 self.inner.wait_for_termination();
382 })
383 }
384}