qubit_batch/process/impls/parallel_batch_processor_builder.rs
1/*******************************************************************************
2 *
3 * Copyright (c) 2025 - 2026 Haixing Hu.
4 *
5 * SPDX-License-Identifier: Apache-2.0
6 *
7 * Licensed under the Apache License, Version 2.0.
8 *
9 ******************************************************************************/
10use std::{
11 num::NonZeroUsize,
12 sync::Arc,
13 time::Duration,
14};
15
16use qubit_function::{
17 ArcConsumer,
18 Consumer,
19};
20use qubit_progress::reporter::{
21 NoOpProgressReporter,
22 ProgressReporter,
23};
24
25use super::ParallelBatchProcessor;
26use super::ParallelBatchProcessorBuildError;
27
28/// Builder for [`ParallelBatchProcessor`].
29///
30/// Use the builder when the default worker count, sequential fallback
31/// threshold, progress interval, or reporter should be customized.
32///
33/// ```rust
34/// use qubit_batch::ParallelBatchProcessor;
35///
36/// let processor = ParallelBatchProcessor::builder(|_item: &i32| {})
37/// .thread_count(2)
38/// .sequential_threshold(0)
39/// .build()
40/// .expect("parallel processor configuration should be valid");
41///
42/// assert_eq!(processor.thread_count(), 2);
43/// assert_eq!(processor.sequential_threshold(), 0);
44/// ```
45pub struct ParallelBatchProcessorBuilder<Item> {
46 /// Consumer shared by all scoped workers.
47 consumer: ArcConsumer<Item>,
48 /// Fixed worker-thread count used by each processing call.
49 thread_count: usize,
50 /// Maximum batch size that still uses sequential processing.
51 sequential_threshold: usize,
52 /// Minimum interval between progress callbacks.
53 report_interval: Duration,
54 /// Reporter receiving batch lifecycle callbacks.
55 reporter: Arc<dyn ProgressReporter>,
56}
57
58impl<Item> ParallelBatchProcessorBuilder<Item> {
59 /// Creates a builder from a thread-safe consumer.
60 ///
61 /// # Parameters
62 ///
63 /// * `consumer` - Thread-safe consumer invoked once for each accepted item.
64 ///
65 /// # Returns
66 ///
67 /// A builder initialized with default parallel processor settings.
68 #[inline]
69 pub fn new<C>(consumer: C) -> Self
70 where
71 C: Consumer<Item> + Send + Sync + 'static,
72 {
73 Self {
74 consumer: consumer.into_arc(),
75 thread_count: ParallelBatchProcessor::<Item>::default_thread_count(),
76 sequential_threshold: ParallelBatchProcessor::<Item>::DEFAULT_SEQUENTIAL_THRESHOLD,
77 report_interval: ParallelBatchProcessor::<Item>::DEFAULT_REPORT_INTERVAL,
78 reporter: Arc::new(NoOpProgressReporter),
79 }
80 }
81
82 /// Sets the worker-thread count.
83 ///
84 /// # Parameters
85 ///
86 /// * `thread_count` - Number of scoped worker threads to use.
87 ///
88 /// # Returns
89 ///
90 /// This builder for fluent configuration.
91 #[inline]
92 pub const fn thread_count(mut self, thread_count: usize) -> Self {
93 self.thread_count = thread_count;
94 self
95 }
96
97 /// Sets the sequential fallback threshold.
98 ///
99 /// # Parameters
100 ///
101 /// * `sequential_threshold` - Maximum declared item count that still runs
102 /// on the caller thread. Use `0` when every non-empty batch should use
103 /// scoped workers.
104 ///
105 /// # Returns
106 ///
107 /// This builder for fluent configuration.
108 #[inline]
109 pub const fn sequential_threshold(mut self, sequential_threshold: usize) -> Self {
110 self.sequential_threshold = sequential_threshold;
111 self
112 }
113
114 /// Sets the progress-report interval.
115 ///
116 /// # Parameters
117 ///
118 /// * `report_interval` - Minimum time between due-based running progress
119 /// callbacks. [`Duration::ZERO`] reports at every sequential between-item
120 /// progress point or on parallel worker completion signals without
121 /// periodic polling.
122 ///
123 /// # Returns
124 ///
125 /// This builder for fluent configuration.
126 #[inline]
127 pub const fn report_interval(mut self, report_interval: Duration) -> Self {
128 self.report_interval = report_interval;
129 self
130 }
131
132 /// Sets the progress reporter used by built processors.
133 ///
134 /// # Parameters
135 ///
136 /// * `reporter` - Progress reporter used for later processing calls.
137 ///
138 /// # Returns
139 ///
140 /// This builder for fluent configuration.
141 #[inline]
142 pub fn reporter<R>(mut self, reporter: R) -> Self
143 where
144 R: ProgressReporter + 'static,
145 {
146 self.reporter = Arc::new(reporter);
147 self
148 }
149
150 /// Sets the shared progress reporter used by built processors.
151 ///
152 /// # Parameters
153 ///
154 /// * `reporter` - Shared progress reporter used for later processing calls.
155 ///
156 /// # Returns
157 ///
158 /// This builder for fluent configuration.
159 #[inline]
160 pub fn reporter_arc(mut self, reporter: Arc<dyn ProgressReporter>) -> Self {
161 self.reporter = reporter;
162 self
163 }
164
165 /// Disables progress callbacks by using [`NoOpProgressReporter`].
166 ///
167 /// # Returns
168 ///
169 /// This builder for fluent configuration.
170 #[inline]
171 pub fn no_reporter(mut self) -> Self {
172 self.reporter = Arc::new(NoOpProgressReporter);
173 self
174 }
175
176 /// Builds a validated [`ParallelBatchProcessor`].
177 ///
178 /// # Returns
179 ///
180 /// A parallel batch processor when the configuration is valid.
181 ///
182 /// # Errors
183 ///
184 /// Returns [`ParallelBatchProcessorBuildError`] when the worker count is
185 /// zero.
186 #[inline]
187 pub fn build(self) -> Result<ParallelBatchProcessor<Item>, ParallelBatchProcessorBuildError> {
188 let thread_count = NonZeroUsize::new(self.thread_count)
189 .ok_or(ParallelBatchProcessorBuildError::ZeroThreadCount)?;
190 Ok(ParallelBatchProcessor {
191 consumer: self.consumer,
192 thread_count,
193 sequential_threshold: self.sequential_threshold,
194 report_interval: self.report_interval,
195 reporter: self.reporter,
196 })
197 }
198}