qubit_thread_pool/thread_pool/thread_pool.rs
1/*******************************************************************************
2 *
3 * Copyright (c) 2025 - 2026.
4 * Haixing Hu, Qubit Co. Ltd.
5 *
6 * All rights reserved.
7 *
8 ******************************************************************************/
9use std::{
10 future::Future,
11 pin::Pin,
12 sync::Arc,
13 time::Duration,
14};
15
16use qubit_function::Callable;
17
18use qubit_executor::{
19 TaskCompletionPair,
20 TaskHandle,
21 TaskRunner,
22};
23
24use super::pool_job::PoolJob;
25use super::thread_pool_build_error::ThreadPoolBuildError;
26use super::thread_pool_builder::ThreadPoolBuilder;
27use super::thread_pool_inner::ThreadPoolInner;
28use super::thread_pool_stats::ThreadPoolStats;
29use qubit_executor::service::{
30 ExecutorService,
31 RejectedExecution,
32 ShutdownReport,
33};
34
35/// OS thread pool implementing [`ExecutorService`].
36///
37/// `ThreadPool` accepts fallible tasks, stores them in an internal FIFO queue,
38/// and executes them on worker threads. Workers are created lazily up to the
39/// configured core size, queued after that, and may grow up to the maximum size
40/// when a bounded queue is full. Submitted tasks return [`TaskHandle`], which
41/// supports both blocking [`TaskHandle::get`] and async `.await` result
42/// retrieval.
43///
44/// `shutdown` is graceful: already accepted queued tasks are allowed to run.
45/// `shutdown_now` is abrupt: queued tasks that have not started are completed
46/// with [`TaskExecutionError::Cancelled`](qubit_executor::TaskExecutionError::Cancelled).
47///
48/// # Author
49///
50/// Haixing Hu
51pub struct ThreadPool {
52 /// Shared pool state and worker coordination primitives.
53 inner: Arc<ThreadPoolInner>,
54}
55
56impl ThreadPool {
57 pub(super) fn from_inner(inner: Arc<ThreadPoolInner>) -> Self {
58 Self { inner }
59 }
60
61 /// Creates a thread pool with equal core and maximum pool sizes.
62 ///
63 /// # Parameters
64 ///
65 /// * `pool_size` - Value applied as both core and maximum pool size.
66 ///
67 /// # Returns
68 ///
69 /// `Ok(ThreadPool)` if all workers are spawned successfully.
70 ///
71 /// # Errors
72 ///
73 /// Returns [`ThreadPoolBuildError`] if the resulting maximum pool size is
74 /// zero or a worker thread cannot be spawned.
75 #[inline]
76 pub fn new(pool_size: usize) -> Result<Self, ThreadPoolBuildError> {
77 Self::builder().pool_size(pool_size).build()
78 }
79
80 /// Creates a builder for configuring a thread pool.
81 ///
82 /// # Returns
83 ///
84 /// A builder with default core and maximum pool sizes and an unbounded queue.
85 #[inline]
86 pub fn builder() -> ThreadPoolBuilder {
87 ThreadPoolBuilder::default()
88 }
89
90 /// Returns the number of queued tasks waiting for a worker.
91 ///
92 /// # Returns
93 ///
94 /// The number of accepted tasks that have not started yet.
95 #[inline]
96 pub fn queued_count(&self) -> usize {
97 self.inner.read_state(|state| state.queued_tasks)
98 }
99
100 /// Returns the number of tasks currently held by workers.
101 ///
102 /// # Returns
103 ///
104 /// The number of tasks that workers have taken from the queue and have not
105 /// yet finished processing.
106 #[inline]
107 pub fn running_count(&self) -> usize {
108 self.inner.read_state(|state| state.running_tasks)
109 }
110
111 /// Returns how many worker threads are still running in this pool.
112 ///
113 /// # Returns
114 ///
115 /// The number of live worker loops still owned by this pool. This is a
116 /// runtime count and is not required to match configured
117 /// [`Self::core_pool_size`] or [`Self::maximum_pool_size`].
118 #[inline]
119 pub fn live_worker_count(&self) -> usize {
120 self.inner.read_state(|state| state.live_workers)
121 }
122
123 /// Returns the configured core pool size.
124 ///
125 /// # Returns
126 ///
127 /// The number of workers kept for normal load before tasks are queued.
128 #[inline]
129 pub fn core_pool_size(&self) -> usize {
130 self.inner.read_state(|state| state.core_pool_size)
131 }
132
133 /// Returns the configured maximum pool size.
134 ///
135 /// # Returns
136 ///
137 /// The maximum number of worker threads this pool may create.
138 #[inline]
139 pub fn maximum_pool_size(&self) -> usize {
140 self.inner.read_state(|state| state.maximum_pool_size)
141 }
142
143 /// Returns a point-in-time snapshot of pool counters.
144 ///
145 /// # Returns
146 ///
147 /// A snapshot containing worker, queue, and task counters observed under
148 /// the pool state lock.
149 #[inline]
150 pub fn stats(&self) -> ThreadPoolStats {
151 self.inner.stats()
152 }
153
154 /// Starts one core worker if the pool has fewer live workers than its
155 /// configured core size.
156 ///
157 /// # Returns
158 ///
159 /// `Ok(true)` if a worker was started, or `Ok(false)` if no core worker
160 /// was needed.
161 ///
162 /// # Errors
163 ///
164 /// Returns [`RejectedExecution::Shutdown`] if the pool is shut down, or
165 /// [`RejectedExecution::WorkerSpawnFailed`] if worker creation fails.
166 #[inline]
167 pub fn prestart_core_thread(&self) -> Result<bool, RejectedExecution> {
168 self.inner.prestart_core_thread()
169 }
170
171 /// Starts all missing core workers.
172 ///
173 /// # Returns
174 ///
175 /// The number of workers started.
176 ///
177 /// # Errors
178 ///
179 /// Returns [`RejectedExecution::Shutdown`] if the pool is shut down, or
180 /// [`RejectedExecution::WorkerSpawnFailed`] if worker creation fails.
181 #[inline]
182 pub fn prestart_all_core_threads(&self) -> Result<usize, RejectedExecution> {
183 self.inner.prestart_all_core_threads()
184 }
185
186 /// Updates the core pool size.
187 ///
188 /// Increasing the core size does not eagerly create new workers unless
189 /// queued work is waiting. Call [`Self::prestart_all_core_threads`] when
190 /// eager creation is desired. Decreasing the core size lets excess idle
191 /// workers retire according to the keep-alive policy.
192 ///
193 /// # Parameters
194 ///
195 /// * `core_pool_size` - New core pool size.
196 ///
197 /// # Returns
198 ///
199 /// `Ok(())` if the size is accepted.
200 ///
201 /// # Errors
202 ///
203 /// Returns [`ThreadPoolBuildError::CorePoolSizeExceedsMaximum`] when the
204 /// new core size would exceed the current maximum size.
205 pub fn set_core_pool_size(&self, core_pool_size: usize) -> Result<(), ThreadPoolBuildError> {
206 self.inner.set_core_pool_size(core_pool_size)
207 }
208
209 /// Updates the maximum pool size.
210 ///
211 /// Excess workers are not interrupted. They retire after finishing current
212 /// work or timing out while idle.
213 ///
214 /// # Parameters
215 ///
216 /// * `maximum_pool_size` - New maximum pool size.
217 ///
218 /// # Returns
219 ///
220 /// `Ok(())` if the size is accepted.
221 ///
222 /// # Errors
223 ///
224 /// Returns [`ThreadPoolBuildError::ZeroMaximumPoolSize`] when the maximum
225 /// size is zero, or [`ThreadPoolBuildError::CorePoolSizeExceedsMaximum`]
226 /// when it would be smaller than the current core size.
227 pub fn set_maximum_pool_size(
228 &self,
229 maximum_pool_size: usize,
230 ) -> Result<(), ThreadPoolBuildError> {
231 self.inner.set_maximum_pool_size(maximum_pool_size)
232 }
233
234 /// Updates how long excess idle workers may wait before exiting.
235 ///
236 /// # Parameters
237 ///
238 /// * `keep_alive` - New idle timeout for workers above the core size.
239 ///
240 /// # Returns
241 ///
242 /// `Ok(())` if the timeout is accepted.
243 ///
244 /// # Errors
245 ///
246 /// Returns [`ThreadPoolBuildError::ZeroKeepAlive`] when `keep_alive` is
247 /// zero.
248 pub fn set_keep_alive(&self, keep_alive: Duration) -> Result<(), ThreadPoolBuildError> {
249 self.inner.set_keep_alive(keep_alive)
250 }
251
252 /// Updates whether core workers may also retire after keep-alive timeout.
253 ///
254 /// # Parameters
255 ///
256 /// * `allow` - Whether core workers are subject to idle timeout.
257 pub fn allow_core_thread_timeout(&self, allow: bool) {
258 self.inner.allow_core_thread_timeout(allow);
259 }
260
261 /// Submits an already type-erased pool job.
262 ///
263 /// This low-level hook is intended for higher-level service crates that
264 /// need to attach their own lifecycle callbacks while still using this
265 /// pool's queueing, cancellation, and shutdown behavior.
266 ///
267 /// # Parameters
268 ///
269 /// * `job` - Type-erased job containing run and cancellation callbacks.
270 ///
271 /// # Returns
272 ///
273 /// `Ok(())` when the job is accepted.
274 ///
275 /// # Errors
276 ///
277 /// Returns [`RejectedExecution::Shutdown`] after shutdown, returns
278 /// [`RejectedExecution::Saturated`] when a bounded pool cannot accept more
279 /// work, or returns [`RejectedExecution::WorkerSpawnFailed`] when the pool
280 /// fails to create a required worker.
281 pub fn submit_job(&self, job: PoolJob) -> Result<(), RejectedExecution> {
282 self.inner.submit(job)
283 }
284}
285
286impl Drop for ThreadPool {
287 /// Requests graceful shutdown when the pool value is dropped.
288 fn drop(&mut self) {
289 self.inner.shutdown();
290 }
291}
292
293impl ExecutorService for ThreadPool {
294 type Handle<R, E>
295 = TaskHandle<R, E>
296 where
297 R: Send + 'static,
298 E: Send + 'static;
299
300 type Termination<'a>
301 = Pin<Box<dyn Future<Output = ()> + Send + 'a>>
302 where
303 Self: 'a;
304
305 /// Accepts a callable and queues it for pool workers.
306 ///
307 /// # Parameters
308 ///
309 /// * `task` - Callable to execute on a pool worker.
310 ///
311 /// # Returns
312 ///
313 /// A [`TaskHandle`] for the accepted task.
314 ///
315 /// # Errors
316 ///
317 /// Returns [`RejectedExecution::Shutdown`] after shutdown, returns
318 /// [`RejectedExecution::Saturated`] when the bounded pool cannot accept
319 /// more work, or returns [`RejectedExecution::WorkerSpawnFailed`] when a
320 /// required worker cannot be created.
321 fn submit_callable<C, R, E>(&self, task: C) -> Result<Self::Handle<R, E>, RejectedExecution>
322 where
323 C: Callable<R, E> + Send + 'static,
324 R: Send + 'static,
325 E: Send + 'static,
326 {
327 let (handle, completion) = TaskCompletionPair::new().into_parts();
328 let completion_for_run = completion.clone();
329 let job = PoolJob::new(
330 Box::new(move || {
331 TaskRunner::new(task).run(completion_for_run);
332 }),
333 Box::new(move || {
334 completion.cancel();
335 }),
336 );
337 self.inner.submit(job)?;
338 Ok(handle)
339 }
340
341 /// Stops accepting new tasks after already queued work is drained.
342 ///
343 /// Queued and running tasks remain eligible to complete normally.
344 #[inline]
345 fn shutdown(&self) {
346 self.inner.shutdown();
347 }
348
349 /// Stops accepting tasks and cancels queued tasks that have not started.
350 ///
351 /// # Returns
352 ///
353 /// A report containing the number of queued jobs cancelled and the number
354 /// of jobs running at the time of the request.
355 #[inline]
356 fn shutdown_now(&self) -> ShutdownReport {
357 self.inner.shutdown_now()
358 }
359
360 /// Returns whether shutdown has been requested.
361 #[inline]
362 fn is_shutdown(&self) -> bool {
363 self.inner.is_shutdown()
364 }
365
366 /// Returns whether shutdown was requested and all workers have exited.
367 #[inline]
368 fn is_terminated(&self) -> bool {
369 self.inner.is_terminated()
370 }
371
372 /// Waits until the pool has terminated.
373 ///
374 /// This future blocks the polling thread while waiting on a condition
375 /// variable.
376 ///
377 /// # Returns
378 ///
379 /// A future that resolves when shutdown has been requested, the queue is
380 /// empty, no task is running, and all worker loops have exited.
381 fn await_termination(&self) -> Self::Termination<'_> {
382 Box::pin(async move {
383 self.inner.wait_for_termination();
384 })
385 }
386}