qubit_thread_pool/dynamic/thread_pool.rs
1/*******************************************************************************
2 *
3 * Copyright (c) 2025 - 2026 Haixing Hu.
4 *
5 * SPDX-License-Identifier: Apache-2.0
6 *
7 * Licensed under the Apache License, Version 2.0.
8 *
9 ******************************************************************************/
10use std::{
11 sync::Arc,
12 time::Duration,
13};
14
15use qubit_function::{
16 Callable,
17 Runnable,
18};
19
20use qubit_executor::task::spi::TaskEndpointPair;
21use qubit_executor::{
22 TaskHandle,
23 TrackedTask,
24};
25
26use super::thread_pool_builder::ThreadPoolBuilder;
27use super::thread_pool_inner::ThreadPoolInner;
28use crate::{
29 ExecutorServiceBuilderError,
30 PoolJob,
31 ThreadPoolStats,
32};
33use qubit_executor::service::{
34 ExecutorService,
35 ExecutorServiceLifecycle,
36 StopReport,
37 SubmissionError,
38};
39
40/// OS thread pool implementing [`ExecutorService`].
41///
42/// `ThreadPool` accepts fallible tasks, stores them in an internal FIFO queue,
43/// and executes them on worker threads. Workers are created lazily up to the
44/// configured core size, queued after that, and may grow up to the maximum size
45/// when a bounded queue is full. Callable submissions return [`TaskHandle`],
46/// while tracked submissions return [`TrackedTask`].
47/// retrieval.
48///
49/// `shutdown` is graceful: already accepted queued tasks are allowed to run.
50/// `stop` is abrupt: queued tasks that have not started are completed
51/// with [`TaskExecutionError::Cancelled`](qubit_executor::TaskExecutionError::Cancelled).
52///
53pub struct ThreadPool {
54 /// Shared pool state and worker coordination primitives.
55 inner: Arc<ThreadPoolInner>,
56}
57
58impl ThreadPool {
59 pub(super) fn from_inner(inner: Arc<ThreadPoolInner>) -> Self {
60 Self { inner }
61 }
62
63 /// Creates a thread pool with equal core and maximum pool sizes.
64 ///
65 /// # Parameters
66 ///
67 /// * `pool_size` - Value applied as both core and maximum pool size.
68 ///
69 /// # Returns
70 ///
71 /// `Ok(ThreadPool)` if all workers are spawned successfully.
72 ///
73 /// # Errors
74 ///
75 /// Returns [`ExecutorServiceBuilderError`] if the resulting maximum pool size is
76 /// zero or a worker thread cannot be spawned.
77 #[inline]
78 pub fn new(pool_size: usize) -> Result<Self, ExecutorServiceBuilderError> {
79 Self::builder().pool_size(pool_size).build()
80 }
81
82 /// Creates a builder for configuring a thread pool.
83 ///
84 /// # Returns
85 ///
86 /// A builder with default core and maximum pool sizes and an unbounded queue.
87 #[inline]
88 pub fn builder() -> ThreadPoolBuilder {
89 ThreadPoolBuilder::default()
90 }
91
92 /// Returns the number of queued tasks waiting for a worker.
93 ///
94 /// # Returns
95 ///
96 /// The number of accepted tasks that have not started yet.
97 #[inline]
98 pub fn queued_count(&self) -> usize {
99 self.inner.read_state(|state| state.queued_tasks)
100 }
101
102 /// Returns the number of tasks currently held by workers.
103 ///
104 /// # Returns
105 ///
106 /// The number of tasks that workers have taken from the queue and have not
107 /// yet finished processing.
108 #[inline]
109 pub fn running_count(&self) -> usize {
110 self.inner.read_state(|state| state.running_tasks)
111 }
112
113 /// Returns how many worker threads are still running in this pool.
114 ///
115 /// # Returns
116 ///
117 /// The number of live worker loops still owned by this pool. This is a
118 /// runtime count and is not required to match configured
119 /// [`Self::core_pool_size`] or [`Self::maximum_pool_size`].
120 #[inline]
121 pub fn live_worker_count(&self) -> usize {
122 self.inner.read_state(|state| state.live_workers)
123 }
124
125 /// Returns the configured core pool size.
126 ///
127 /// # Returns
128 ///
129 /// The number of workers kept for normal load before tasks are queued.
130 #[inline]
131 pub fn core_pool_size(&self) -> usize {
132 self.inner.read_state(|state| state.core_pool_size)
133 }
134
135 /// Returns the configured maximum pool size.
136 ///
137 /// # Returns
138 ///
139 /// The maximum number of worker threads this pool may create.
140 #[inline]
141 pub fn maximum_pool_size(&self) -> usize {
142 self.inner.read_state(|state| state.maximum_pool_size)
143 }
144
145 /// Returns a point-in-time snapshot of pool counters.
146 ///
147 /// # Returns
148 ///
149 /// A snapshot containing worker, queue, and task counters observed under
150 /// the pool state lock.
151 #[inline]
152 pub fn stats(&self) -> ThreadPoolStats {
153 self.inner.stats()
154 }
155
156 /// Submits a custom pool job.
157 ///
158 /// This low-level extension point is intended for higher-level services
159 /// that need to pair pool execution with their own task registry or
160 /// cancellation bookkeeping. Prefer the [`ExecutorService`] submission
161 /// methods for ordinary runnable and callable work.
162 ///
163 /// # Parameters
164 ///
165 /// * `job` - Custom job to execute or cancel later.
166 ///
167 /// # Returns
168 ///
169 /// `Ok(())` when the pool accepts the job.
170 ///
171 /// # Errors
172 ///
173 /// Returns [`SubmissionError::Shutdown`] after shutdown, returns
174 /// [`SubmissionError::Saturated`] when the bounded pool cannot accept
175 /// more work, or returns [`SubmissionError::WorkerSpawnFailed`] when a
176 /// required worker cannot be created.
177 #[inline]
178 pub fn submit_job(&self, job: PoolJob) -> Result<(), SubmissionError> {
179 self.inner.submit(job)
180 }
181
182 /// Blocks until all accepted work has completed.
183 ///
184 /// This is a join-style wait for quiescence: it does not request shutdown
185 /// and does not wait for worker threads to exit. Concurrent submissions may
186 /// extend the wait until those accepted jobs also drain.
187 #[inline]
188 pub fn join(&self) {
189 self.inner.wait_until_idle();
190 }
191
192 /// Starts one core worker if the pool has fewer live workers than its
193 /// configured core size.
194 ///
195 /// # Returns
196 ///
197 /// `Ok(true)` if a worker was started, or `Ok(false)` if no core worker
198 /// was needed.
199 ///
200 /// # Errors
201 ///
202 /// Returns [`SubmissionError::Shutdown`] if the pool is shut down, or
203 /// [`SubmissionError::WorkerSpawnFailed`] if worker creation fails.
204 #[inline]
205 pub fn prestart_core_thread(&self) -> Result<bool, SubmissionError> {
206 self.inner.prestart_core_thread()
207 }
208
209 /// Starts all missing core workers.
210 ///
211 /// # Returns
212 ///
213 /// The number of workers started.
214 ///
215 /// # Errors
216 ///
217 /// Returns [`SubmissionError::Shutdown`] if the pool is shut down, or
218 /// [`SubmissionError::WorkerSpawnFailed`] if worker creation fails.
219 #[inline]
220 pub fn prestart_all_core_threads(&self) -> Result<usize, SubmissionError> {
221 self.inner.prestart_all_core_threads()
222 }
223
224 /// Updates the core pool size.
225 ///
226 /// Increasing the core size does not eagerly create new workers unless
227 /// queued work is waiting. Call [`Self::prestart_all_core_threads`] when
228 /// eager creation is desired. Decreasing the core size lets excess idle
229 /// workers retire according to the keep-alive policy.
230 ///
231 /// # Parameters
232 ///
233 /// * `core_pool_size` - New core pool size.
234 ///
235 /// # Returns
236 ///
237 /// `Ok(())` if the size is accepted.
238 ///
239 /// # Errors
240 ///
241 /// Returns [`ExecutorServiceBuilderError::CorePoolSizeExceedsMaximum`] when the
242 /// new core size would exceed the current maximum size.
243 pub fn set_core_pool_size(
244 &self,
245 core_pool_size: usize,
246 ) -> Result<(), ExecutorServiceBuilderError> {
247 self.inner.set_core_pool_size(core_pool_size)
248 }
249
250 /// Updates the maximum pool size.
251 ///
252 /// Excess workers are not interrupted. They retire after finishing current
253 /// work or timing out while idle.
254 ///
255 /// # Parameters
256 ///
257 /// * `maximum_pool_size` - New maximum pool size.
258 ///
259 /// # Returns
260 ///
261 /// `Ok(())` if the size is accepted.
262 ///
263 /// # Errors
264 ///
265 /// Returns [`ExecutorServiceBuilderError::ZeroMaximumPoolSize`] when the maximum
266 /// size is zero, or [`ExecutorServiceBuilderError::CorePoolSizeExceedsMaximum`]
267 /// when it would be smaller than the current core size.
268 pub fn set_maximum_pool_size(
269 &self,
270 maximum_pool_size: usize,
271 ) -> Result<(), ExecutorServiceBuilderError> {
272 self.inner.set_maximum_pool_size(maximum_pool_size)
273 }
274
275 /// Updates how long excess idle workers may wait before exiting.
276 ///
277 /// # Parameters
278 ///
279 /// * `keep_alive` - New idle timeout for workers above the core size.
280 ///
281 /// # Returns
282 ///
283 /// `Ok(())` if the timeout is accepted.
284 ///
285 /// # Errors
286 ///
287 /// Returns [`ExecutorServiceBuilderError::ZeroKeepAlive`] when `keep_alive` is
288 /// zero.
289 pub fn set_keep_alive(&self, keep_alive: Duration) -> Result<(), ExecutorServiceBuilderError> {
290 self.inner.set_keep_alive(keep_alive)
291 }
292
293 /// Updates whether core workers may also retire after keep-alive timeout.
294 ///
295 /// # Parameters
296 ///
297 /// * `allow` - Whether core workers are subject to idle timeout.
298 pub fn allow_core_thread_timeout(&self, allow: bool) {
299 self.inner.allow_core_thread_timeout(allow);
300 }
301}
302
303impl Drop for ThreadPool {
304 /// Requests graceful shutdown when the pool value is dropped.
305 fn drop(&mut self) {
306 self.inner.shutdown();
307 }
308}
309
310impl ExecutorService for ThreadPool {
311 type ResultHandle<R, E>
312 = TaskHandle<R, E>
313 where
314 R: Send + 'static,
315 E: Send + 'static;
316
317 type TrackedHandle<R, E>
318 = TrackedTask<R, E>
319 where
320 R: Send + 'static,
321 E: Send + 'static;
322
323 /// Accepts a runnable and queues it for pool workers.
324 fn submit<T, E>(&self, task: T) -> Result<(), SubmissionError>
325 where
326 T: Runnable<E> + Send + 'static,
327 E: Send + 'static,
328 {
329 self.inner.submit(PoolJob::detached(task))
330 }
331
332 /// Accepts a callable and queues it for pool workers.
333 ///
334 /// # Parameters
335 ///
336 /// * `task` - Callable to execute on a pool worker.
337 ///
338 /// # Returns
339 ///
340 /// A [`TaskHandle`] for the accepted task.
341 ///
342 /// # Errors
343 ///
344 /// Returns [`SubmissionError::Shutdown`] after shutdown, returns
345 /// [`SubmissionError::Saturated`] when the bounded pool cannot accept
346 /// more work, or returns [`SubmissionError::WorkerSpawnFailed`] when a
347 /// required worker cannot be created.
348 fn submit_callable<C, R, E>(&self, task: C) -> Result<Self::ResultHandle<R, E>, SubmissionError>
349 where
350 C: Callable<R, E> + Send + 'static,
351 R: Send + 'static,
352 E: Send + 'static,
353 {
354 let (handle, completion) = TaskEndpointPair::new().into_parts();
355 let job = PoolJob::from_task(task, completion);
356 self.inner.submit(job)?;
357 Ok(handle)
358 }
359
360 /// Accepts a callable and queues it with a tracked handle.
361 fn submit_tracked_callable<C, R, E>(
362 &self,
363 task: C,
364 ) -> Result<Self::TrackedHandle<R, E>, SubmissionError>
365 where
366 C: Callable<R, E> + Send + 'static,
367 R: Send + 'static,
368 E: Send + 'static,
369 {
370 let (handle, completion) = TaskEndpointPair::new().into_tracked_parts();
371 let job = PoolJob::from_task(task, completion);
372 self.inner.submit(job)?;
373 Ok(handle)
374 }
375
376 /// Stops accepting new tasks after already queued work is drained.
377 ///
378 /// Queued and running tasks remain eligible to complete normally.
379 #[inline]
380 fn shutdown(&self) {
381 self.inner.shutdown();
382 }
383
384 /// Stops accepting tasks and cancels queued tasks that have not started.
385 ///
386 /// # Returns
387 ///
388 /// A report containing the number of queued jobs cancelled and the number
389 /// of jobs running at the time of the request.
390 #[inline]
391 fn stop(&self) -> StopReport {
392 self.inner.stop()
393 }
394
395 /// Returns the current lifecycle state.
396 #[inline]
397 fn lifecycle(&self) -> ExecutorServiceLifecycle {
398 self.inner.lifecycle()
399 }
400
401 /// Returns whether shutdown has been requested.
402 #[inline]
403 fn is_not_running(&self) -> bool {
404 self.inner.is_not_running()
405 }
406
407 /// Returns whether shutdown was requested and all workers have exited.
408 #[inline]
409 fn is_terminated(&self) -> bool {
410 self.inner.is_terminated()
411 }
412
413 /// Blocks until the pool has terminated.
414 fn wait_termination(&self) {
415 self.inner.wait_for_termination();
416 }
417}