qubit_thread_pool/dynamic/thread_pool.rs
1/*******************************************************************************
2 *
3 * Copyright (c) 2025 - 2026 Haixing Hu.
4 *
5 * SPDX-License-Identifier: Apache-2.0
6 *
7 * Licensed under the Apache License, Version 2.0.
8 *
9 ******************************************************************************/
10use std::{
11 sync::Arc,
12 time::Duration,
13};
14
15use qubit_function::{
16 Callable,
17 Runnable,
18};
19
20use qubit_executor::task::spi::TaskEndpointPair;
21use qubit_executor::{
22 TaskHandle,
23 TrackedTask,
24};
25
26use super::thread_pool_builder::ThreadPoolBuilder;
27use super::thread_pool_inner::ThreadPoolInner;
28use crate::{
29 ExecutorServiceBuilderError,
30 PoolJob,
31 ThreadPoolStats,
32};
33use qubit_executor::service::{
34 ExecutorService,
35 ExecutorServiceLifecycle,
36 StopReport,
37 SubmissionError,
38};
39
40/// OS thread pool implementing [`ExecutorService`].
41///
42/// `ThreadPool` accepts fallible tasks, stores accepted waiting tasks in
43/// internal queues, and executes them on worker threads. The global waiting
44/// queue is FIFO, but task start and completion order are not a strict FIFO API
45/// guarantee. Workers are created lazily up to the configured core size, tasks
46/// are queued after that, and the pool may grow up to the maximum size when a
47/// bounded queue is full. Callable submissions return [`TaskHandle`], while
48/// tracked submissions return [`TrackedTask`].
49///
50/// `shutdown` is graceful: already accepted queued tasks are allowed to run.
51/// `stop` is abrupt: queued tasks that have not started are completed
52/// with [`TaskExecutionError::Cancelled`](qubit_executor::TaskExecutionError::Cancelled).
53///
54pub struct ThreadPool {
55 /// Shared pool state and worker coordination primitives.
56 inner: Arc<ThreadPoolInner>,
57}
58
59impl ThreadPool {
60 pub(super) fn from_inner(inner: Arc<ThreadPoolInner>) -> Self {
61 Self { inner }
62 }
63
64 /// Creates a thread pool with equal core and maximum pool sizes.
65 ///
66 /// # Parameters
67 ///
68 /// * `pool_size` - Value applied as both core and maximum pool size.
69 ///
70 /// # Returns
71 ///
72 /// `Ok(ThreadPool)` if all workers are spawned successfully.
73 ///
74 /// # Errors
75 ///
76 /// Returns [`ExecutorServiceBuilderError`] if the resulting maximum pool size is
77 /// zero or a worker thread cannot be spawned.
78 #[inline]
79 pub fn new(pool_size: usize) -> Result<Self, ExecutorServiceBuilderError> {
80 Self::builder().pool_size(pool_size).build()
81 }
82
83 /// Creates a builder for configuring a thread pool.
84 ///
85 /// # Returns
86 ///
87 /// A builder with default core and maximum pool sizes and an unbounded queue.
88 #[inline]
89 pub fn builder() -> ThreadPoolBuilder {
90 ThreadPoolBuilder::default()
91 }
92
93 /// Returns the number of queued tasks waiting for a worker.
94 ///
95 /// # Returns
96 ///
97 /// The number of accepted tasks that have not started yet.
98 #[inline]
99 pub fn queued_count(&self) -> usize {
100 self.inner.queued_count()
101 }
102
103 /// Returns the number of tasks currently held by workers.
104 ///
105 /// # Returns
106 ///
107 /// The number of tasks that workers have taken from the queue and have not
108 /// yet finished processing.
109 #[inline]
110 pub fn running_count(&self) -> usize {
111 self.inner.running_count()
112 }
113
114 /// Returns how many worker threads are still running in this pool.
115 ///
116 /// # Returns
117 ///
118 /// The number of live worker loops still owned by this pool. This is a
119 /// runtime count and is not required to match configured
120 /// [`Self::core_pool_size`] or [`Self::maximum_pool_size`].
121 #[inline]
122 pub fn live_worker_count(&self) -> usize {
123 self.inner.read_state(|state| state.live_workers)
124 }
125
126 /// Returns the configured core pool size.
127 ///
128 /// # Returns
129 ///
130 /// The number of workers kept for normal load before tasks are queued.
131 #[inline]
132 pub fn core_pool_size(&self) -> usize {
133 self.inner.read_state(|state| state.core_pool_size)
134 }
135
136 /// Returns the configured maximum pool size.
137 ///
138 /// # Returns
139 ///
140 /// The maximum number of worker threads this pool may create.
141 #[inline]
142 pub fn maximum_pool_size(&self) -> usize {
143 self.inner.read_state(|state| state.maximum_pool_size)
144 }
145
146 /// Returns a point-in-time snapshot of pool counters.
147 ///
148 /// # Returns
149 ///
150 /// A snapshot containing worker, queue, and task counters observed under
151 /// the pool state lock.
152 #[inline]
153 pub fn stats(&self) -> ThreadPoolStats {
154 self.inner.stats()
155 }
156
157 /// Submits a custom pool job.
158 ///
159 /// This low-level extension point is intended for higher-level services
160 /// that need to pair pool execution with their own task registry or
161 /// cancellation bookkeeping. Prefer the [`ExecutorService`] submission
162 /// methods for ordinary runnable and callable work. Custom job callbacks
163 /// run synchronously on the thread that reaches the corresponding lifecycle
164 /// event and should stay short and non-blocking. Callback panics are
165 /// contained; if an acceptance callback panics, the job is not queued,
166 /// run, or cancelled.
167 ///
168 /// # Parameters
169 ///
170 /// * `job` - Custom job to execute or cancel later.
171 ///
172 /// # Returns
173 ///
174 /// `Ok(())` when the pool accepts the job.
175 ///
176 /// # Errors
177 ///
178 /// Returns [`SubmissionError::Shutdown`] after shutdown, returns
179 /// [`SubmissionError::Saturated`] when the bounded pool cannot accept
180 /// more work, or returns [`SubmissionError::WorkerSpawnFailed`] when a
181 /// required worker cannot be created.
182 #[inline]
183 pub fn submit_job(&self, job: PoolJob) -> Result<(), SubmissionError> {
184 self.inner.submit(job)
185 }
186
187 /// Blocks until all accepted work has completed.
188 ///
189 /// This is a join-style wait for quiescence: it does not request shutdown
190 /// and does not wait for worker threads to exit. Concurrent submissions may
191 /// extend the wait until those accepted jobs also drain.
192 #[inline]
193 pub fn join(&self) {
194 self.inner.wait_until_idle();
195 }
196
197 /// Starts one core worker if the pool has fewer live workers than its
198 /// configured core size.
199 ///
200 /// # Returns
201 ///
202 /// `Ok(true)` if a worker was started, or `Ok(false)` if no core worker
203 /// was needed.
204 ///
205 /// # Errors
206 ///
207 /// Returns [`SubmissionError::Shutdown`] if the pool is shut down, or
208 /// [`SubmissionError::WorkerSpawnFailed`] if worker creation fails.
209 #[inline]
210 pub fn prestart_core_thread(&self) -> Result<bool, SubmissionError> {
211 self.inner.prestart_core_thread()
212 }
213
214 /// Starts all missing core workers.
215 ///
216 /// # Returns
217 ///
218 /// The number of workers started.
219 ///
220 /// # Errors
221 ///
222 /// Returns [`SubmissionError::Shutdown`] if the pool is shut down, or
223 /// [`SubmissionError::WorkerSpawnFailed`] if worker creation fails.
224 #[inline]
225 pub fn prestart_all_core_threads(&self) -> Result<usize, SubmissionError> {
226 self.inner.prestart_all_core_threads()
227 }
228
229 /// Updates the core pool size.
230 ///
231 /// Increasing the core size changes future admission and prestart limits,
232 /// but it does not eagerly create workers or reschedule already queued work.
233 /// Call [`Self::prestart_all_core_threads`] when eager creation is desired.
234 /// Decreasing the core size lets excess idle workers retire according to
235 /// the keep-alive policy.
236 ///
237 /// # Parameters
238 ///
239 /// * `core_pool_size` - New core pool size.
240 ///
241 /// # Returns
242 ///
243 /// `Ok(())` if the size is accepted.
244 ///
245 /// # Errors
246 ///
247 /// Returns [`ExecutorServiceBuilderError::CorePoolSizeExceedsMaximum`] when the
248 /// new core size would exceed the current maximum size.
249 pub fn set_core_pool_size(
250 &self,
251 core_pool_size: usize,
252 ) -> Result<(), ExecutorServiceBuilderError> {
253 self.inner.set_core_pool_size(core_pool_size)
254 }
255
256 /// Updates the maximum pool size.
257 ///
258 /// Excess workers are not interrupted. They retire after finishing current
259 /// work or timing out while idle.
260 ///
261 /// # Parameters
262 ///
263 /// * `maximum_pool_size` - New maximum pool size.
264 ///
265 /// # Returns
266 ///
267 /// `Ok(())` if the size is accepted.
268 ///
269 /// # Errors
270 ///
271 /// Returns [`ExecutorServiceBuilderError::ZeroMaximumPoolSize`] when the maximum
272 /// size is zero, or [`ExecutorServiceBuilderError::CorePoolSizeExceedsMaximum`]
273 /// when it would be smaller than the current core size.
274 pub fn set_maximum_pool_size(
275 &self,
276 maximum_pool_size: usize,
277 ) -> Result<(), ExecutorServiceBuilderError> {
278 self.inner.set_maximum_pool_size(maximum_pool_size)
279 }
280
281 /// Updates how long excess idle workers may wait before exiting.
282 ///
283 /// # Parameters
284 ///
285 /// * `keep_alive` - New idle timeout for workers above the core size.
286 ///
287 /// # Returns
288 ///
289 /// `Ok(())` if the timeout is accepted.
290 ///
291 /// # Errors
292 ///
293 /// Returns [`ExecutorServiceBuilderError::ZeroKeepAlive`] when `keep_alive` is
294 /// zero.
295 pub fn set_keep_alive(&self, keep_alive: Duration) -> Result<(), ExecutorServiceBuilderError> {
296 self.inner.set_keep_alive(keep_alive)
297 }
298
299 /// Updates whether core workers may also retire after keep-alive timeout.
300 ///
301 /// # Parameters
302 ///
303 /// * `allow` - Whether core workers are subject to idle timeout.
304 pub fn allow_core_thread_timeout(&self, allow: bool) {
305 self.inner.allow_core_thread_timeout(allow);
306 }
307}
308
309impl Drop for ThreadPool {
310 /// Requests graceful shutdown when the pool value is dropped.
311 fn drop(&mut self) {
312 self.inner.shutdown();
313 }
314}
315
316impl ExecutorService for ThreadPool {
317 type ResultHandle<R, E>
318 = TaskHandle<R, E>
319 where
320 R: Send + 'static,
321 E: Send + 'static;
322
323 type TrackedHandle<R, E>
324 = TrackedTask<R, E>
325 where
326 R: Send + 'static,
327 E: Send + 'static;
328
329 /// Accepts a runnable and queues it for pool workers.
330 fn submit<T, E>(&self, task: T) -> Result<(), SubmissionError>
331 where
332 T: Runnable<E> + Send + 'static,
333 E: Send + 'static,
334 {
335 self.inner.submit(PoolJob::detached(task))
336 }
337
338 /// Accepts a callable and queues it for pool workers.
339 ///
340 /// # Parameters
341 ///
342 /// * `task` - Callable to execute on a pool worker.
343 ///
344 /// # Returns
345 ///
346 /// A [`TaskHandle`] for the accepted task.
347 ///
348 /// # Errors
349 ///
350 /// Returns [`SubmissionError::Shutdown`] after shutdown, returns
351 /// [`SubmissionError::Saturated`] when the bounded pool cannot accept
352 /// more work, or returns [`SubmissionError::WorkerSpawnFailed`] when a
353 /// required worker cannot be created.
354 fn submit_callable<C, R, E>(&self, task: C) -> Result<Self::ResultHandle<R, E>, SubmissionError>
355 where
356 C: Callable<R, E> + Send + 'static,
357 R: Send + 'static,
358 E: Send + 'static,
359 {
360 let (handle, completion) = TaskEndpointPair::new().into_parts();
361 let job = PoolJob::from_task(task, completion);
362 self.inner.submit(job)?;
363 Ok(handle)
364 }
365
366 /// Accepts a callable and queues it with a tracked handle.
367 fn submit_tracked_callable<C, R, E>(
368 &self,
369 task: C,
370 ) -> Result<Self::TrackedHandle<R, E>, SubmissionError>
371 where
372 C: Callable<R, E> + Send + 'static,
373 R: Send + 'static,
374 E: Send + 'static,
375 {
376 let (handle, completion) = TaskEndpointPair::new().into_tracked_parts();
377 let job = PoolJob::from_task(task, completion);
378 self.inner.submit(job)?;
379 Ok(handle)
380 }
381
382 /// Stops accepting new tasks after already queued work is drained.
383 ///
384 /// Queued and running tasks remain eligible to complete normally.
385 #[inline]
386 fn shutdown(&self) {
387 self.inner.shutdown();
388 }
389
390 /// Stops accepting tasks and cancels queued tasks that have not started.
391 ///
392 /// # Returns
393 ///
394 /// A report containing the number of queued jobs cancelled and the number
395 /// of jobs running at the time of the request.
396 #[inline]
397 fn stop(&self) -> StopReport {
398 self.inner.stop()
399 }
400
401 /// Returns the current lifecycle state.
402 #[inline]
403 fn lifecycle(&self) -> ExecutorServiceLifecycle {
404 self.inner.lifecycle()
405 }
406
407 /// Returns whether shutdown has been requested.
408 #[inline]
409 fn is_not_running(&self) -> bool {
410 self.inner.is_not_running()
411 }
412
413 /// Returns whether shutdown was requested and all workers have exited.
414 #[inline]
415 fn is_terminated(&self) -> bool {
416 self.inner.is_terminated()
417 }
418
419 /// Blocks until the pool has terminated.
420 fn wait_termination(&self) {
421 self.inner.wait_for_termination();
422 }
423}