Skip to main content

qubit_thread_pool/dynamic/
thread_pool_builder.rs

1/*******************************************************************************
2 *
3 *    Copyright (c) 2025 - 2026 Haixing Hu.
4 *
5 *    SPDX-License-Identifier: Apache-2.0
6 *
7 *    Licensed under the Apache License, Version 2.0.
8 *
9 ******************************************************************************/
10use std::{
11    sync::Arc,
12    thread,
13    time::Duration,
14};
15
16use super::thread_pool::ThreadPool;
17use super::thread_pool_config::ThreadPoolConfig;
18use super::thread_pool_inner::ThreadPoolInner;
19use crate::{
20    ExecutorServiceBuilderError,
21    ThreadPoolHooks,
22};
23
24/// Default thread name prefix used by [`ThreadPoolBuilder`].
25const DEFAULT_THREAD_NAME_PREFIX: &str = "qubit-thread-pool";
26
27/// Default idle lifetime for workers above the core pool size.
28const DEFAULT_KEEP_ALIVE: Duration = Duration::from_secs(60);
29
30/// Builder for [`ThreadPool`].
31///
32/// The default builder uses the available CPU parallelism as both core and
33/// maximum pool size, with an unbounded FIFO queue.
34///
35#[derive(Debug, Clone)]
36pub struct ThreadPoolBuilder {
37    /// Configured core pool size.
38    core_pool_size: usize,
39    /// Configured maximum pool size.
40    maximum_pool_size: usize,
41    /// Optional maximum number of jobs that may wait in the queue.
42    queue_capacity: Option<usize>,
43    /// Prefix used when naming worker threads.
44    thread_name_prefix: String,
45    /// Optional stack size in bytes for worker threads.
46    stack_size: Option<usize>,
47    /// Idle timeout for workers allowed to retire.
48    keep_alive: Duration,
49    /// Whether core workers may retire after the keep-alive timeout.
50    allow_core_thread_timeout: bool,
51    /// Whether [`Self::build`] should start all core workers eagerly.
52    prestart_core_threads: bool,
53    /// Optional worker and task lifecycle hooks.
54    hooks: ThreadPoolHooks,
55}
56
57impl ThreadPoolBuilder {
58    /// Sets both the core and maximum pool size to the same value.
59    ///
60    /// # Parameters
61    ///
62    /// * `pool_size` - Pool size applied as both core and maximum limits.
63    ///
64    /// # Returns
65    ///
66    /// This builder for fluent configuration.
67    #[inline]
68    pub fn pool_size(mut self, pool_size: usize) -> Self {
69        self.core_pool_size = pool_size;
70        self.maximum_pool_size = pool_size;
71        self
72    }
73
74    /// Sets the core pool size.
75    ///
76    /// A submitted task creates a new worker while the live worker count is
77    /// below this value. Once the core size is reached, tasks are queued before
78    /// the pool considers growing toward the maximum size.
79    ///
80    /// # Parameters
81    ///
82    /// * `core_pool_size` - Core pool size.
83    ///
84    /// # Returns
85    ///
86    /// This builder for fluent configuration.
87    #[inline]
88    pub fn core_pool_size(mut self, core_pool_size: usize) -> Self {
89        self.core_pool_size = core_pool_size;
90        self
91    }
92
93    /// Sets the maximum pool size.
94    ///
95    /// The pool grows above the core size only when the queue cannot accept a
96    /// submitted task.
97    ///
98    /// # Parameters
99    ///
100    /// * `maximum_pool_size` - Maximum pool size.
101    ///
102    /// # Returns
103    ///
104    /// This builder for fluent configuration.
105    #[inline]
106    pub fn maximum_pool_size(mut self, maximum_pool_size: usize) -> Self {
107        self.maximum_pool_size = maximum_pool_size;
108        self
109    }
110
111    /// Sets a bounded queue capacity.
112    ///
113    /// The capacity counts only tasks waiting in the queue. Tasks already held
114    /// by worker threads are not included.
115    ///
116    /// # Parameters
117    ///
118    /// * `capacity` - Maximum number of queued tasks.
119    ///
120    /// # Returns
121    ///
122    /// This builder for fluent configuration.
123    #[inline]
124    pub fn queue_capacity(mut self, capacity: usize) -> Self {
125        self.queue_capacity = Some(capacity);
126        self
127    }
128
129    /// Uses an unbounded queue.
130    ///
131    /// # Returns
132    ///
133    /// This builder for fluent configuration.
134    #[inline]
135    pub fn unbounded_queue(mut self) -> Self {
136        self.queue_capacity = None;
137        self
138    }
139
140    /// Sets the worker thread name prefix.
141    ///
142    /// Worker names are created by appending the worker index to this prefix.
143    ///
144    /// # Parameters
145    ///
146    /// * `prefix` - Prefix for worker thread names.
147    ///
148    /// # Returns
149    ///
150    /// This builder for fluent configuration.
151    #[inline]
152    pub fn thread_name_prefix(mut self, prefix: &str) -> Self {
153        self.thread_name_prefix = prefix.to_owned();
154        self
155    }
156
157    /// Sets the worker thread stack size.
158    ///
159    /// # Parameters
160    ///
161    /// * `stack_size` - Stack size in bytes for each worker thread.
162    ///
163    /// # Returns
164    ///
165    /// This builder for fluent configuration.
166    #[inline]
167    pub fn stack_size(mut self, stack_size: usize) -> Self {
168        self.stack_size = Some(stack_size);
169        self
170    }
171
172    /// Sets the idle timeout for workers above the core pool size.
173    ///
174    /// # Parameters
175    ///
176    /// * `keep_alive` - Duration an excess worker may stay idle.
177    ///
178    /// # Returns
179    ///
180    /// This builder for fluent configuration.
181    #[inline]
182    pub fn keep_alive(mut self, keep_alive: Duration) -> Self {
183        self.keep_alive = keep_alive;
184        self
185    }
186
187    /// Allows core workers to retire after the keep-alive timeout.
188    ///
189    /// # Parameters
190    ///
191    /// * `allow` - Whether idle core workers may time out.
192    ///
193    /// # Returns
194    ///
195    /// This builder for fluent configuration.
196    #[inline]
197    pub fn allow_core_thread_timeout(mut self, allow: bool) -> Self {
198        self.allow_core_thread_timeout = allow;
199        self
200    }
201
202    /// Starts all core workers during [`Self::build`].
203    ///
204    /// Without this option, workers are created lazily as tasks are submitted,
205    /// matching the default JDK `ThreadPoolExecutor` behavior.
206    ///
207    /// # Returns
208    ///
209    /// This builder for fluent configuration.
210    #[inline]
211    pub fn prestart_core_threads(mut self) -> Self {
212        self.prestart_core_threads = true;
213        self
214    }
215
216    /// Installs a callback invoked when a worker thread starts.
217    ///
218    /// # Parameters
219    ///
220    /// * `hook` - Callback receiving the stable worker index.
221    ///
222    /// # Returns
223    ///
224    /// This builder for fluent configuration.
225    #[inline]
226    pub fn before_worker_start<F>(mut self, hook: F) -> Self
227    where
228        F: Fn(usize) + Send + Sync + 'static,
229    {
230        self.hooks = self.hooks.before_worker_start(hook);
231        self
232    }
233
234    /// Installs a callback invoked before a worker thread exits.
235    ///
236    /// # Parameters
237    ///
238    /// * `hook` - Callback receiving the stable worker index.
239    ///
240    /// # Returns
241    ///
242    /// This builder for fluent configuration.
243    #[inline]
244    pub fn after_worker_stop<F>(mut self, hook: F) -> Self
245    where
246        F: Fn(usize) + Send + Sync + 'static,
247    {
248        self.hooks = self.hooks.after_worker_stop(hook);
249        self
250    }
251
252    /// Installs a callback invoked before each job is run.
253    ///
254    /// # Parameters
255    ///
256    /// * `hook` - Callback receiving the stable worker index.
257    ///
258    /// # Returns
259    ///
260    /// This builder for fluent configuration.
261    #[inline]
262    pub fn before_task<F>(mut self, hook: F) -> Self
263    where
264        F: Fn(usize) + Send + Sync + 'static,
265    {
266        self.hooks = self.hooks.before_task(hook);
267        self
268    }
269
270    /// Installs a callback invoked after each job is run.
271    ///
272    /// # Parameters
273    ///
274    /// * `hook` - Callback receiving the stable worker index.
275    ///
276    /// # Returns
277    ///
278    /// This builder for fluent configuration.
279    #[inline]
280    pub fn after_task<F>(mut self, hook: F) -> Self
281    where
282        F: Fn(usize) + Send + Sync + 'static,
283    {
284        self.hooks = self.hooks.after_task(hook);
285        self
286    }
287
288    /// Builds the configured thread pool.
289    ///
290    /// # Returns
291    ///
292    /// `Ok(ThreadPool)` if the configuration is valid and all requested
293    /// prestarted workers are spawned successfully.
294    ///
295    /// # Errors
296    ///
297    /// Returns [`ExecutorServiceBuilderError`] if the configuration is invalid or a
298    /// prestarted worker thread cannot be spawned.
299    pub fn build(self) -> Result<ThreadPool, ExecutorServiceBuilderError> {
300        self.validate()?;
301        let prestart_core_threads = self.prestart_core_threads;
302        let inner = Arc::new(ThreadPoolInner::new(
303            ThreadPoolConfig {
304                core_pool_size: self.core_pool_size,
305                maximum_pool_size: self.maximum_pool_size,
306                queue_capacity: self.queue_capacity,
307                thread_name_prefix: self.thread_name_prefix,
308                stack_size: self.stack_size,
309                keep_alive: self.keep_alive,
310                allow_core_thread_timeout: self.allow_core_thread_timeout,
311            },
312            self.hooks,
313        ));
314        if prestart_core_threads && let Err(error) = inner.prestart_all_core_threads() {
315            inner.stop();
316            inner.wait_for_termination();
317            return Err(ExecutorServiceBuilderError::from_submission_error(error));
318        }
319        Ok(ThreadPool::from_inner(inner))
320    }
321
322    /// Validates this builder configuration.
323    ///
324    /// # Returns
325    ///
326    /// `Ok(())` when all configured values are internally consistent.
327    ///
328    /// # Errors
329    ///
330    /// Returns [`ExecutorServiceBuilderError`] for zero maximum size, core size larger
331    /// than maximum size, zero bounded queue capacity, zero stack size, or zero
332    /// keep-alive timeout.
333    fn validate(&self) -> Result<(), ExecutorServiceBuilderError> {
334        if self.maximum_pool_size == 0 {
335            return Err(ExecutorServiceBuilderError::ZeroMaximumPoolSize);
336        }
337        if self.core_pool_size > self.maximum_pool_size {
338            return Err(ExecutorServiceBuilderError::CorePoolSizeExceedsMaximum {
339                core_pool_size: self.core_pool_size,
340                maximum_pool_size: self.maximum_pool_size,
341            });
342        }
343        if self.queue_capacity == Some(0) {
344            return Err(ExecutorServiceBuilderError::ZeroQueueCapacity);
345        }
346        if self.stack_size == Some(0) {
347            return Err(ExecutorServiceBuilderError::ZeroStackSize);
348        }
349        if self.keep_alive.is_zero() {
350            return Err(ExecutorServiceBuilderError::ZeroKeepAlive);
351        }
352        Ok(())
353    }
354}
355
356impl Default for ThreadPoolBuilder {
357    /// Creates a builder with CPU parallelism defaults.
358    ///
359    /// # Returns
360    ///
361    /// A builder configured with CPU parallelism for both core and maximum
362    /// sizes, an unbounded queue, and the default keep-alive timeout.
363    fn default() -> Self {
364        let pool_size = default_pool_size();
365        Self {
366            core_pool_size: pool_size,
367            maximum_pool_size: pool_size,
368            queue_capacity: None,
369            thread_name_prefix: DEFAULT_THREAD_NAME_PREFIX.to_owned(),
370            stack_size: None,
371            keep_alive: DEFAULT_KEEP_ALIVE,
372            allow_core_thread_timeout: false,
373            prestart_core_threads: false,
374            hooks: ThreadPoolHooks::default(),
375        }
376    }
377}
378
379/// Returns the default core and maximum pool size for new builders.
380///
381/// # Returns
382///
383/// The available CPU parallelism, or `1` if it cannot be detected.
384fn default_pool_size() -> usize {
385    thread::available_parallelism()
386        .map(usize::from)
387        .unwrap_or(1)
388}