Skip to main content

qubit_thread_pool/thread_pool/
thread_pool_builder.rs

1/*******************************************************************************
2 *
3 *    Copyright (c) 2025 - 2026 Haixing Hu.
4 *
5 *    SPDX-License-Identifier: Apache-2.0
6 *
7 *    Licensed under the Apache License, Version 2.0.
8 *
9 ******************************************************************************/
10use std::{
11    sync::Arc,
12    thread,
13    time::Duration,
14};
15
16use super::thread_pool::ThreadPool;
17use super::thread_pool_build_error::ThreadPoolBuildError;
18use super::thread_pool_config::ThreadPoolConfig;
19use super::thread_pool_inner::ThreadPoolInner;
20
21/// Default thread name prefix used by [`ThreadPoolBuilder`].
22const DEFAULT_THREAD_NAME_PREFIX: &str = "qubit-thread-pool";
23
24/// Default idle lifetime for workers above the core pool size.
25const DEFAULT_KEEP_ALIVE: Duration = Duration::from_secs(60);
26
27/// Builder for [`ThreadPool`].
28///
29/// The default builder uses the available CPU parallelism as both core and
30/// maximum pool size, with an unbounded FIFO queue.
31///
32#[derive(Debug, Clone)]
33pub struct ThreadPoolBuilder {
34    /// Configured core pool size.
35    core_pool_size: usize,
36    /// Configured maximum pool size.
37    maximum_pool_size: usize,
38    /// Optional maximum number of jobs that may wait in the queue.
39    queue_capacity: Option<usize>,
40    /// Prefix used when naming worker threads.
41    thread_name_prefix: String,
42    /// Optional stack size in bytes for worker threads.
43    stack_size: Option<usize>,
44    /// Idle timeout for workers allowed to retire.
45    keep_alive: Duration,
46    /// Whether core workers may retire after the keep-alive timeout.
47    allow_core_thread_timeout: bool,
48    /// Whether [`Self::build`] should start all core workers eagerly.
49    prestart_core_threads: bool,
50}
51
52impl ThreadPoolBuilder {
53    /// Sets both the core and maximum pool size to the same value.
54    ///
55    /// # Parameters
56    ///
57    /// * `pool_size` - Pool size applied as both core and maximum limits.
58    ///
59    /// # Returns
60    ///
61    /// This builder for fluent configuration.
62    #[inline]
63    pub fn pool_size(mut self, pool_size: usize) -> Self {
64        self.core_pool_size = pool_size;
65        self.maximum_pool_size = pool_size;
66        self
67    }
68
69    /// Sets the core pool size.
70    ///
71    /// A submitted task creates a new worker while the live worker count is
72    /// below this value. Once the core size is reached, tasks are queued before
73    /// the pool considers growing toward the maximum size.
74    ///
75    /// # Parameters
76    ///
77    /// * `core_pool_size` - Core pool size.
78    ///
79    /// # Returns
80    ///
81    /// This builder for fluent configuration.
82    #[inline]
83    pub fn core_pool_size(mut self, core_pool_size: usize) -> Self {
84        self.core_pool_size = core_pool_size;
85        self
86    }
87
88    /// Sets the maximum pool size.
89    ///
90    /// The pool grows above the core size only when the queue cannot accept a
91    /// submitted task.
92    ///
93    /// # Parameters
94    ///
95    /// * `maximum_pool_size` - Maximum pool size.
96    ///
97    /// # Returns
98    ///
99    /// This builder for fluent configuration.
100    #[inline]
101    pub fn maximum_pool_size(mut self, maximum_pool_size: usize) -> Self {
102        self.maximum_pool_size = maximum_pool_size;
103        self
104    }
105
106    /// Sets a bounded queue capacity.
107    ///
108    /// The capacity counts only tasks waiting in the queue. Tasks already held
109    /// by worker threads are not included.
110    ///
111    /// # Parameters
112    ///
113    /// * `capacity` - Maximum number of queued tasks.
114    ///
115    /// # Returns
116    ///
117    /// This builder for fluent configuration.
118    #[inline]
119    pub fn queue_capacity(mut self, capacity: usize) -> Self {
120        self.queue_capacity = Some(capacity);
121        self
122    }
123
124    /// Uses an unbounded queue.
125    ///
126    /// # Returns
127    ///
128    /// This builder for fluent configuration.
129    #[inline]
130    pub fn unbounded_queue(mut self) -> Self {
131        self.queue_capacity = None;
132        self
133    }
134
135    /// Sets the worker thread name prefix.
136    ///
137    /// Worker names are created by appending the worker index to this prefix.
138    ///
139    /// # Parameters
140    ///
141    /// * `prefix` - Prefix for worker thread names.
142    ///
143    /// # Returns
144    ///
145    /// This builder for fluent configuration.
146    #[inline]
147    pub fn thread_name_prefix(mut self, prefix: &str) -> Self {
148        self.thread_name_prefix = prefix.to_owned();
149        self
150    }
151
152    /// Sets the worker thread stack size.
153    ///
154    /// # Parameters
155    ///
156    /// * `stack_size` - Stack size in bytes for each worker thread.
157    ///
158    /// # Returns
159    ///
160    /// This builder for fluent configuration.
161    #[inline]
162    pub fn stack_size(mut self, stack_size: usize) -> Self {
163        self.stack_size = Some(stack_size);
164        self
165    }
166
167    /// Sets the idle timeout for workers above the core pool size.
168    ///
169    /// # Parameters
170    ///
171    /// * `keep_alive` - Duration an excess worker may stay idle.
172    ///
173    /// # Returns
174    ///
175    /// This builder for fluent configuration.
176    #[inline]
177    pub fn keep_alive(mut self, keep_alive: Duration) -> Self {
178        self.keep_alive = keep_alive;
179        self
180    }
181
182    /// Allows core workers to retire after the keep-alive timeout.
183    ///
184    /// # Parameters
185    ///
186    /// * `allow` - Whether idle core workers may time out.
187    ///
188    /// # Returns
189    ///
190    /// This builder for fluent configuration.
191    #[inline]
192    pub fn allow_core_thread_timeout(mut self, allow: bool) -> Self {
193        self.allow_core_thread_timeout = allow;
194        self
195    }
196
197    /// Starts all core workers during [`Self::build`].
198    ///
199    /// Without this option, workers are created lazily as tasks are submitted,
200    /// matching the default JDK `ThreadPoolExecutor` behavior.
201    ///
202    /// # Returns
203    ///
204    /// This builder for fluent configuration.
205    #[inline]
206    pub fn prestart_core_threads(mut self) -> Self {
207        self.prestart_core_threads = true;
208        self
209    }
210
211    /// Builds the configured thread pool.
212    ///
213    /// # Returns
214    ///
215    /// `Ok(ThreadPool)` if the configuration is valid and all requested
216    /// prestarted workers are spawned successfully.
217    ///
218    /// # Errors
219    ///
220    /// Returns [`ThreadPoolBuildError`] if the configuration is invalid or a
221    /// prestarted worker thread cannot be spawned.
222    pub fn build(self) -> Result<ThreadPool, ThreadPoolBuildError> {
223        self.validate()?;
224        let prestart_core_threads = self.prestart_core_threads;
225        let inner = Arc::new(ThreadPoolInner::new(ThreadPoolConfig {
226            core_pool_size: self.core_pool_size,
227            maximum_pool_size: self.maximum_pool_size,
228            queue_capacity: self.queue_capacity,
229            thread_name_prefix: self.thread_name_prefix,
230            stack_size: self.stack_size,
231            keep_alive: self.keep_alive,
232            allow_core_thread_timeout: self.allow_core_thread_timeout,
233        }));
234        if prestart_core_threads {
235            inner
236                .prestart_all_core_threads()
237                .map_err(ThreadPoolBuildError::from_rejected_execution)?;
238        }
239        Ok(ThreadPool::from_inner(inner))
240    }
241
242    /// Validates this builder configuration.
243    ///
244    /// # Returns
245    ///
246    /// `Ok(())` when all configured values are internally consistent.
247    ///
248    /// # Errors
249    ///
250    /// Returns [`ThreadPoolBuildError`] for zero maximum size, core size larger
251    /// than maximum size, zero bounded queue capacity, zero stack size, or zero
252    /// keep-alive timeout.
253    fn validate(&self) -> Result<(), ThreadPoolBuildError> {
254        if self.maximum_pool_size == 0 {
255            return Err(ThreadPoolBuildError::ZeroMaximumPoolSize);
256        }
257        if self.core_pool_size > self.maximum_pool_size {
258            return Err(ThreadPoolBuildError::CorePoolSizeExceedsMaximum {
259                core_pool_size: self.core_pool_size,
260                maximum_pool_size: self.maximum_pool_size,
261            });
262        }
263        if self.queue_capacity == Some(0) {
264            return Err(ThreadPoolBuildError::ZeroQueueCapacity);
265        }
266        if self.stack_size == Some(0) {
267            return Err(ThreadPoolBuildError::ZeroStackSize);
268        }
269        if self.keep_alive.is_zero() {
270            return Err(ThreadPoolBuildError::ZeroKeepAlive);
271        }
272        Ok(())
273    }
274}
275
276impl Default for ThreadPoolBuilder {
277    /// Creates a builder with CPU parallelism defaults.
278    ///
279    /// # Returns
280    ///
281    /// A builder configured with CPU parallelism for both core and maximum
282    /// sizes, an unbounded queue, and the default keep-alive timeout.
283    fn default() -> Self {
284        let pool_size = default_pool_size();
285        Self {
286            core_pool_size: pool_size,
287            maximum_pool_size: pool_size,
288            queue_capacity: None,
289            thread_name_prefix: DEFAULT_THREAD_NAME_PREFIX.to_owned(),
290            stack_size: None,
291            keep_alive: DEFAULT_KEEP_ALIVE,
292            allow_core_thread_timeout: false,
293            prestart_core_threads: false,
294        }
295    }
296}
297
298/// Returns the default core and maximum pool size for new builders.
299///
300/// # Returns
301///
302/// The available CPU parallelism, or `1` if it cannot be detected.
303fn default_pool_size() -> usize {
304    thread::available_parallelism()
305        .map(usize::from)
306        .unwrap_or(1)
307}