Skip to main content

qubit_batch/process/impls/
parallel_batch_processor_builder.rs

1/*******************************************************************************
2 *
3 *    Copyright (c) 2025 - 2026 Haixing Hu.
4 *
5 *    SPDX-License-Identifier: Apache-2.0
6 *
7 *    Licensed under the Apache License, Version 2.0.
8 *
9 ******************************************************************************/
10use std::{
11    num::NonZeroUsize,
12    sync::Arc,
13    time::Duration,
14};
15
16use qubit_function::{
17    ArcConsumer,
18    Consumer,
19};
20use qubit_progress::reporter::{
21    NoOpProgressReporter,
22    ProgressReporter,
23};
24
25use super::ParallelBatchProcessor;
26use super::ParallelBatchProcessorBuildError;
27
28/// Builder for [`ParallelBatchProcessor`].
29///
30/// Use the builder when the default worker count, sequential fallback
31/// threshold, progress interval, or reporter should be customized.
32///
33/// ```rust
34/// use qubit_batch::ParallelBatchProcessor;
35///
36/// let processor = ParallelBatchProcessor::builder(|_item: &i32| {})
37///     .thread_count(2)
38///     .sequential_threshold(0)
39///     .build()
40///     .expect("parallel processor configuration should be valid");
41///
42/// assert_eq!(processor.thread_count(), 2);
43/// assert_eq!(processor.sequential_threshold(), 0);
44/// ```
45pub struct ParallelBatchProcessorBuilder<Item> {
46    /// Consumer shared by all scoped workers.
47    consumer: ArcConsumer<Item>,
48    /// Fixed worker-thread count used by each processing call.
49    thread_count: usize,
50    /// Maximum batch size that still uses sequential processing.
51    sequential_threshold: usize,
52    /// Minimum interval between progress callbacks.
53    report_interval: Duration,
54    /// Reporter receiving batch lifecycle callbacks.
55    reporter: Arc<dyn ProgressReporter>,
56}
57
58impl<Item> ParallelBatchProcessorBuilder<Item> {
59    /// Creates a builder from a thread-safe consumer.
60    ///
61    /// # Parameters
62    ///
63    /// * `consumer` - Thread-safe consumer invoked once for each accepted item.
64    ///
65    /// # Returns
66    ///
67    /// A builder initialized with default parallel processor settings.
68    #[inline]
69    pub fn new<C>(consumer: C) -> Self
70    where
71        C: Consumer<Item> + Send + Sync + 'static,
72    {
73        Self {
74            consumer: consumer.into_arc(),
75            thread_count: ParallelBatchProcessor::<Item>::default_thread_count(),
76            sequential_threshold: ParallelBatchProcessor::<Item>::DEFAULT_SEQUENTIAL_THRESHOLD,
77            report_interval: ParallelBatchProcessor::<Item>::DEFAULT_REPORT_INTERVAL,
78            reporter: Arc::new(NoOpProgressReporter),
79        }
80    }
81
82    /// Sets the worker-thread count.
83    ///
84    /// # Parameters
85    ///
86    /// * `thread_count` - Number of scoped worker threads to use.
87    ///
88    /// # Returns
89    ///
90    /// This builder for fluent configuration.
91    #[inline]
92    pub const fn thread_count(mut self, thread_count: usize) -> Self {
93        self.thread_count = thread_count;
94        self
95    }
96
97    /// Sets the sequential fallback threshold.
98    ///
99    /// # Parameters
100    ///
101    /// * `sequential_threshold` - Maximum declared item count that still runs
102    ///   on the caller thread. Use `0` when every non-empty batch should use
103    ///   scoped workers.
104    ///
105    /// # Returns
106    ///
107    /// This builder for fluent configuration.
108    #[inline]
109    pub const fn sequential_threshold(mut self, sequential_threshold: usize) -> Self {
110        self.sequential_threshold = sequential_threshold;
111        self
112    }
113
114    /// Sets the progress-report interval.
115    ///
116    /// # Parameters
117    ///
118    /// * `report_interval` - Minimum time between due-based running progress
119    ///   callbacks. [`Duration::ZERO`] reports at every sequential between-item
120    ///   progress point or on parallel worker completion signals without
121    ///   periodic polling.
122    ///
123    /// # Returns
124    ///
125    /// This builder for fluent configuration.
126    #[inline]
127    pub const fn report_interval(mut self, report_interval: Duration) -> Self {
128        self.report_interval = report_interval;
129        self
130    }
131
132    /// Sets the progress reporter used by built processors.
133    ///
134    /// # Parameters
135    ///
136    /// * `reporter` - Progress reporter used for later processing calls.
137    ///
138    /// # Returns
139    ///
140    /// This builder for fluent configuration.
141    #[inline]
142    pub fn reporter<R>(mut self, reporter: R) -> Self
143    where
144        R: ProgressReporter + 'static,
145    {
146        self.reporter = Arc::new(reporter);
147        self
148    }
149
150    /// Sets the shared progress reporter used by built processors.
151    ///
152    /// # Parameters
153    ///
154    /// * `reporter` - Shared progress reporter used for later processing calls.
155    ///
156    /// # Returns
157    ///
158    /// This builder for fluent configuration.
159    #[inline]
160    pub fn reporter_arc(mut self, reporter: Arc<dyn ProgressReporter>) -> Self {
161        self.reporter = reporter;
162        self
163    }
164
165    /// Disables progress callbacks by using [`NoOpProgressReporter`].
166    ///
167    /// # Returns
168    ///
169    /// This builder for fluent configuration.
170    #[inline]
171    pub fn no_reporter(mut self) -> Self {
172        self.reporter = Arc::new(NoOpProgressReporter);
173        self
174    }
175
176    /// Builds a validated [`ParallelBatchProcessor`].
177    ///
178    /// # Returns
179    ///
180    /// A parallel batch processor when the configuration is valid.
181    ///
182    /// # Errors
183    ///
184    /// Returns [`ParallelBatchProcessorBuildError`] when the worker count is
185    /// zero.
186    #[inline]
187    pub fn build(self) -> Result<ParallelBatchProcessor<Item>, ParallelBatchProcessorBuildError> {
188        let thread_count = NonZeroUsize::new(self.thread_count)
189            .ok_or(ParallelBatchProcessorBuildError::ZeroThreadCount)?;
190        Ok(ParallelBatchProcessor {
191            consumer: self.consumer,
192            thread_count,
193            sequential_threshold: self.sequential_threshold,
194            report_interval: self.report_interval,
195            reporter: self.reporter,
196        })
197    }
198}