qubit_thread_pool/dynamic/thread_pool_builder.rs
1/*******************************************************************************
2 *
3 * Copyright (c) 2025 - 2026 Haixing Hu.
4 *
5 * SPDX-License-Identifier: Apache-2.0
6 *
7 * Licensed under the Apache License, Version 2.0.
8 *
9 ******************************************************************************/
10use std::{
11 sync::Arc,
12 thread,
13 time::Duration,
14};
15
16use super::thread_pool::ThreadPool;
17use super::thread_pool_config::ThreadPoolConfig;
18use super::thread_pool_inner::ThreadPoolInner;
19use crate::{
20 ExecutorServiceBuilderError,
21 ThreadPoolHooks,
22};
23
24/// Default thread name prefix used by [`ThreadPoolBuilder`].
25const DEFAULT_THREAD_NAME_PREFIX: &str = "qubit-thread-pool";
26
27/// Default idle lifetime for workers above the core pool size.
28const DEFAULT_KEEP_ALIVE: Duration = Duration::from_secs(60);
29
30/// Builder for [`ThreadPool`].
31///
32/// The default builder uses the available CPU parallelism as both core and
33/// maximum pool size, with an unbounded FIFO queue.
34///
35#[derive(Debug, Clone)]
36pub struct ThreadPoolBuilder {
37 /// Configured core pool size.
38 core_pool_size: usize,
39 /// Configured maximum pool size.
40 maximum_pool_size: usize,
41 /// Optional maximum number of jobs that may wait in the queue.
42 queue_capacity: Option<usize>,
43 /// Prefix used when naming worker threads.
44 thread_name_prefix: String,
45 /// Optional stack size in bytes for worker threads.
46 stack_size: Option<usize>,
47 /// Idle timeout for workers allowed to retire.
48 keep_alive: Duration,
49 /// Whether core workers may retire after the keep-alive timeout.
50 allow_core_thread_timeout: bool,
51 /// Whether [`Self::build`] should start all core workers eagerly.
52 prestart_core_threads: bool,
53 /// Optional worker and task lifecycle hooks.
54 hooks: ThreadPoolHooks,
55}
56
57impl ThreadPoolBuilder {
58 /// Sets both the core and maximum pool size to the same value.
59 ///
60 /// # Parameters
61 ///
62 /// * `pool_size` - Pool size applied as both core and maximum limits.
63 ///
64 /// # Returns
65 ///
66 /// This builder for fluent configuration.
67 #[inline]
68 pub fn pool_size(mut self, pool_size: usize) -> Self {
69 self.core_pool_size = pool_size;
70 self.maximum_pool_size = pool_size;
71 self
72 }
73
74 /// Sets the core pool size.
75 ///
76 /// A submitted task creates a new worker while the live worker count is
77 /// below this value. Once the core size is reached, tasks are queued before
78 /// the pool considers growing toward the maximum size.
79 ///
80 /// # Parameters
81 ///
82 /// * `core_pool_size` - Core pool size.
83 ///
84 /// # Returns
85 ///
86 /// This builder for fluent configuration.
87 #[inline]
88 pub fn core_pool_size(mut self, core_pool_size: usize) -> Self {
89 self.core_pool_size = core_pool_size;
90 self
91 }
92
93 /// Sets the maximum pool size.
94 ///
95 /// The pool grows above the core size only when the queue cannot accept a
96 /// submitted task.
97 ///
98 /// # Parameters
99 ///
100 /// * `maximum_pool_size` - Maximum pool size.
101 ///
102 /// # Returns
103 ///
104 /// This builder for fluent configuration.
105 #[inline]
106 pub fn maximum_pool_size(mut self, maximum_pool_size: usize) -> Self {
107 self.maximum_pool_size = maximum_pool_size;
108 self
109 }
110
111 /// Sets a bounded queue capacity.
112 ///
113 /// The capacity counts only tasks waiting in the queue. Tasks already held
114 /// by worker threads are not included.
115 ///
116 /// # Parameters
117 ///
118 /// * `capacity` - Maximum number of queued tasks.
119 ///
120 /// # Returns
121 ///
122 /// This builder for fluent configuration.
123 #[inline]
124 pub fn queue_capacity(mut self, capacity: usize) -> Self {
125 self.queue_capacity = Some(capacity);
126 self
127 }
128
129 /// Uses an unbounded queue.
130 ///
131 /// # Returns
132 ///
133 /// This builder for fluent configuration.
134 #[inline]
135 pub fn unbounded_queue(mut self) -> Self {
136 self.queue_capacity = None;
137 self
138 }
139
140 /// Sets the worker thread name prefix.
141 ///
142 /// Worker names are created by appending the worker index to this prefix.
143 ///
144 /// # Parameters
145 ///
146 /// * `prefix` - Prefix for worker thread names.
147 ///
148 /// # Returns
149 ///
150 /// This builder for fluent configuration.
151 #[inline]
152 pub fn thread_name_prefix(mut self, prefix: &str) -> Self {
153 self.thread_name_prefix = prefix.to_owned();
154 self
155 }
156
157 /// Sets the worker thread stack size.
158 ///
159 /// # Parameters
160 ///
161 /// * `stack_size` - Stack size in bytes for each worker thread.
162 ///
163 /// # Returns
164 ///
165 /// This builder for fluent configuration.
166 #[inline]
167 pub fn stack_size(mut self, stack_size: usize) -> Self {
168 self.stack_size = Some(stack_size);
169 self
170 }
171
172 /// Sets the idle timeout for workers above the core pool size.
173 ///
174 /// # Parameters
175 ///
176 /// * `keep_alive` - Duration an excess worker may stay idle.
177 ///
178 /// # Returns
179 ///
180 /// This builder for fluent configuration.
181 #[inline]
182 pub fn keep_alive(mut self, keep_alive: Duration) -> Self {
183 self.keep_alive = keep_alive;
184 self
185 }
186
187 /// Allows core workers to retire after the keep-alive timeout.
188 ///
189 /// # Parameters
190 ///
191 /// * `allow` - Whether idle core workers may time out.
192 ///
193 /// # Returns
194 ///
195 /// This builder for fluent configuration.
196 #[inline]
197 pub fn allow_core_thread_timeout(mut self, allow: bool) -> Self {
198 self.allow_core_thread_timeout = allow;
199 self
200 }
201
202 /// Starts all core workers during [`Self::build`].
203 ///
204 /// Without this option, workers are created lazily as tasks are submitted,
205 /// matching the default JDK `ThreadPoolExecutor` behavior.
206 ///
207 /// # Returns
208 ///
209 /// This builder for fluent configuration.
210 #[inline]
211 pub fn prestart_core_threads(mut self) -> Self {
212 self.prestart_core_threads = true;
213 self
214 }
215
216 /// Installs a callback invoked when a worker thread starts.
217 ///
218 /// # Parameters
219 ///
220 /// * `hook` - Callback receiving the stable worker index.
221 ///
222 /// # Returns
223 ///
224 /// This builder for fluent configuration.
225 #[inline]
226 pub fn before_worker_start<F>(mut self, hook: F) -> Self
227 where
228 F: Fn(usize) + Send + Sync + 'static,
229 {
230 self.hooks = self.hooks.before_worker_start(hook);
231 self
232 }
233
234 /// Installs a callback invoked before a worker thread exits.
235 ///
236 /// # Parameters
237 ///
238 /// * `hook` - Callback receiving the stable worker index.
239 ///
240 /// # Returns
241 ///
242 /// This builder for fluent configuration.
243 #[inline]
244 pub fn after_worker_stop<F>(mut self, hook: F) -> Self
245 where
246 F: Fn(usize) + Send + Sync + 'static,
247 {
248 self.hooks = self.hooks.after_worker_stop(hook);
249 self
250 }
251
252 /// Installs a callback invoked before each job is run.
253 ///
254 /// # Parameters
255 ///
256 /// * `hook` - Callback receiving the stable worker index.
257 ///
258 /// # Returns
259 ///
260 /// This builder for fluent configuration.
261 #[inline]
262 pub fn before_task<F>(mut self, hook: F) -> Self
263 where
264 F: Fn(usize) + Send + Sync + 'static,
265 {
266 self.hooks = self.hooks.before_task(hook);
267 self
268 }
269
270 /// Installs a callback invoked after each job is run.
271 ///
272 /// # Parameters
273 ///
274 /// * `hook` - Callback receiving the stable worker index.
275 ///
276 /// # Returns
277 ///
278 /// This builder for fluent configuration.
279 #[inline]
280 pub fn after_task<F>(mut self, hook: F) -> Self
281 where
282 F: Fn(usize) + Send + Sync + 'static,
283 {
284 self.hooks = self.hooks.after_task(hook);
285 self
286 }
287
288 /// Builds the configured thread pool.
289 ///
290 /// # Returns
291 ///
292 /// `Ok(ThreadPool)` if the configuration is valid and all requested
293 /// prestarted workers are spawned successfully.
294 ///
295 /// # Errors
296 ///
297 /// Returns [`ExecutorServiceBuilderError`] if the configuration is invalid or a
298 /// prestarted worker thread cannot be spawned.
299 pub fn build(self) -> Result<ThreadPool, ExecutorServiceBuilderError> {
300 self.validate()?;
301 let prestart_core_threads = self.prestart_core_threads;
302 let inner = Arc::new(ThreadPoolInner::new(
303 ThreadPoolConfig {
304 core_pool_size: self.core_pool_size,
305 maximum_pool_size: self.maximum_pool_size,
306 queue_capacity: self.queue_capacity,
307 thread_name_prefix: self.thread_name_prefix,
308 stack_size: self.stack_size,
309 keep_alive: self.keep_alive,
310 allow_core_thread_timeout: self.allow_core_thread_timeout,
311 },
312 self.hooks,
313 ));
314 if prestart_core_threads && let Err(error) = inner.prestart_all_core_threads() {
315 inner.stop();
316 inner.wait_for_termination();
317 return Err(ExecutorServiceBuilderError::from_submission_error(error));
318 }
319 Ok(ThreadPool::from_inner(inner))
320 }
321
322 /// Validates this builder configuration.
323 ///
324 /// # Returns
325 ///
326 /// `Ok(())` when all configured values are internally consistent.
327 ///
328 /// # Errors
329 ///
330 /// Returns [`ExecutorServiceBuilderError`] for zero maximum size, core size larger
331 /// than maximum size, zero bounded queue capacity, zero stack size, or zero
332 /// keep-alive timeout.
333 fn validate(&self) -> Result<(), ExecutorServiceBuilderError> {
334 if self.maximum_pool_size == 0 {
335 return Err(ExecutorServiceBuilderError::ZeroMaximumPoolSize);
336 }
337 if self.core_pool_size > self.maximum_pool_size {
338 return Err(ExecutorServiceBuilderError::CorePoolSizeExceedsMaximum {
339 core_pool_size: self.core_pool_size,
340 maximum_pool_size: self.maximum_pool_size,
341 });
342 }
343 if self.queue_capacity == Some(0) {
344 return Err(ExecutorServiceBuilderError::ZeroQueueCapacity);
345 }
346 if self.stack_size == Some(0) {
347 return Err(ExecutorServiceBuilderError::ZeroStackSize);
348 }
349 if self.keep_alive.is_zero() {
350 return Err(ExecutorServiceBuilderError::ZeroKeepAlive);
351 }
352 Ok(())
353 }
354}
355
356impl Default for ThreadPoolBuilder {
357 /// Creates a builder with CPU parallelism defaults.
358 ///
359 /// # Returns
360 ///
361 /// A builder configured with CPU parallelism for both core and maximum
362 /// sizes, an unbounded queue, and the default keep-alive timeout.
363 fn default() -> Self {
364 let pool_size = default_pool_size();
365 Self {
366 core_pool_size: pool_size,
367 maximum_pool_size: pool_size,
368 queue_capacity: None,
369 thread_name_prefix: DEFAULT_THREAD_NAME_PREFIX.to_owned(),
370 stack_size: None,
371 keep_alive: DEFAULT_KEEP_ALIVE,
372 allow_core_thread_timeout: false,
373 prestart_core_threads: false,
374 hooks: ThreadPoolHooks::default(),
375 }
376 }
377}
378
379/// Returns the default core and maximum pool size for new builders.
380///
381/// # Returns
382///
383/// The available CPU parallelism, or `1` if it cannot be detected.
384fn default_pool_size() -> usize {
385 thread::available_parallelism()
386 .map(usize::from)
387 .unwrap_or(1)
388}