Skip to main content

qubit_batch/process/impls/
chunked_batch_processor.rs

1/*******************************************************************************
2 *
3 *    Copyright (c) 2025 - 2026 Haixing Hu.
4 *
5 *    SPDX-License-Identifier: Apache-2.0
6 *
7 *    Licensed under the Apache License, Version 2.0.
8 *
9 ******************************************************************************/
10use std::{
11    cmp,
12    num::NonZeroUsize,
13    sync::Arc,
14    time::Duration,
15};
16
17use qubit_progress::{
18    Progress,
19    reporter::ProgressReporter,
20};
21
22use crate::process::{
23    BatchProcessResult,
24    BatchProcessState,
25    BatchProcessor,
26    ChunkedBatchProcessError,
27};
28
29use super::ChunkedBatchProcessorBuilder;
30
31/// Processes input items by submitting fixed-size chunks to a delegate.
32///
33/// `ChunkedBatchProcessor` is useful when the caller has a large logical batch
34/// but the real target must receive smaller batches, such as SQL batch insert
35/// operations with a maximum row count per statement.
36///
37/// The delegate must return a result whose `item_count` and `completed_count`
38/// match the submitted chunk length whenever it returns `Ok`. The delegate may
39/// still report a lower `processed_count`, such as when a database reports
40/// fewer affected rows than submitted rows. If the delegate cannot reach a
41/// terminal outcome for every item in the chunk, it should return `Err`;
42/// inconsistent `Ok` results are returned as
43/// [`ChunkedBatchProcessError::InvalidChunkResult`].
44///
45/// # Type Parameters
46///
47/// * `P` - Delegate processor receiving each chunk.
48///
49/// ```rust
50/// use std::{
51///     num::NonZeroUsize,
52///     time::Duration,
53/// };
54///
55/// use qubit_batch::{
56///     BatchProcessResult,
57///     BatchProcessResultBuilder,
58///     BatchProcessor,
59///     ChunkedBatchProcessor,
60/// };
61///
62/// struct InsertChunk;
63///
64/// impl BatchProcessor<i32> for InsertChunk {
65///     type Error = &'static str;
66///
67///     fn process_with_count<I>(
68///         &mut self,
69///         rows: I,
70///         count: usize,
71///     ) -> Result<BatchProcessResult, Self::Error>
72///     where
73///         I: IntoIterator<Item = i32>,
74///     {
75///         let processed = rows.into_iter().count();
76///         BatchProcessResultBuilder::builder(count)
77///             .completed_count(processed)
78///             .processed_count(processed)
79///             .chunk_count(1)
80///             .elapsed(Duration::ZERO)
81///             .build()
82///             .map_err(|_| "invalid process result")
83///     }
84/// }
85///
86/// let mut processor = ChunkedBatchProcessor::new(
87///     InsertChunk,
88///     NonZeroUsize::new(2).expect("chunk size should be non-zero"),
89/// );
90///
91/// let result = processor
92///     .process([1, 2, 3, 4, 5])
93///     .expect("array length should be exact");
94///
95/// assert_eq!(result.completed_count(), 5);
96/// assert_eq!(result.chunk_count(), 3);
97/// ```
98///
99pub struct ChunkedBatchProcessor<P> {
100    /// Delegate processor receiving each chunk.
101    pub(crate) delegate: P,
102    /// Maximum number of items submitted to the delegate at once.
103    pub(crate) chunk_size: NonZeroUsize,
104    /// Minimum interval between progress callbacks.
105    pub(crate) report_interval: Duration,
106    /// Reporter receiving batch lifecycle callbacks.
107    pub(crate) reporter: Arc<dyn ProgressReporter>,
108}
109
110impl<P> ChunkedBatchProcessor<P> {
111    /// Default interval between progress callbacks.
112    pub const DEFAULT_REPORT_INTERVAL: Duration = Duration::from_secs(5);
113
114    /// Creates a chunked batch processor.
115    ///
116    /// # Parameters
117    ///
118    /// * `delegate` - Processor receiving each chunk.
119    /// * `chunk_size` - Maximum number of items submitted in one chunk.
120    ///
121    /// # Returns
122    ///
123    /// A chunked processor using no-op progress reporting.
124    ///
125    /// # Type Constraints
126    ///
127    /// This constructor only stores `delegate`; it intentionally does not
128    /// require `P: BatchProcessor<Item>` because the item type is not part of
129    /// construction. That bound is enforced when this wrapper is used as a
130    /// [`BatchProcessor<Item>`], such as when calling [`BatchProcessor::process`].
131    /// Therefore, a value can be constructed with any delegate type, but it can
132    /// only process items for item types that the delegate actually supports.
133    #[inline]
134    pub fn new(delegate: P, chunk_size: NonZeroUsize) -> Self {
135        Self::builder(delegate, chunk_size).build()
136    }
137
138    /// Creates a builder for configuring a chunked batch processor.
139    ///
140    /// # Parameters
141    ///
142    /// * `delegate` - Processor receiving each chunk.
143    /// * `chunk_size` - Maximum number of items submitted in one chunk.
144    ///
145    /// # Returns
146    ///
147    /// A builder initialized with default settings.
148    #[inline]
149    pub fn builder(delegate: P, chunk_size: NonZeroUsize) -> ChunkedBatchProcessorBuilder<P> {
150        ChunkedBatchProcessorBuilder::new(delegate, chunk_size)
151    }
152
153    /// Returns the configured chunk size.
154    ///
155    /// # Returns
156    ///
157    /// The maximum number of items submitted to the delegate at once.
158    #[inline]
159    pub const fn chunk_size(&self) -> NonZeroUsize {
160        self.chunk_size
161    }
162
163    /// Returns the configured progress-report interval.
164    ///
165    /// # Returns
166    ///
167    /// The minimum time between due-based running progress callbacks.
168    #[inline]
169    pub const fn report_interval(&self) -> Duration {
170        self.report_interval
171    }
172
173    /// Returns the configured progress reporter.
174    ///
175    /// # Returns
176    ///
177    /// A shared reference to the configured progress reporter.
178    #[inline]
179    pub fn reporter(&self) -> &Arc<dyn ProgressReporter> {
180        &self.reporter
181    }
182
183    /// Returns a shared reference to the delegate processor.
184    ///
185    /// # Returns
186    ///
187    /// The wrapped delegate processor.
188    #[inline]
189    pub const fn delegate(&self) -> &P {
190        &self.delegate
191    }
192
193    /// Returns a mutable reference to the delegate processor.
194    ///
195    /// # Returns
196    ///
197    /// The wrapped delegate processor.
198    #[inline]
199    pub fn delegate_mut(&mut self) -> &mut P {
200        &mut self.delegate
201    }
202
203    /// Consumes this wrapper and returns the delegate processor.
204    ///
205    /// # Returns
206    ///
207    /// The wrapped delegate processor.
208    #[inline]
209    pub fn into_delegate(self) -> P {
210        self.delegate
211    }
212}
213
214impl<Item, P> BatchProcessor<Item> for ChunkedBatchProcessor<P>
215where
216    P: BatchProcessor<Item>,
217{
218    type Error = ChunkedBatchProcessError<P::Error>;
219
220    /// Processes items by delegating fixed-size chunks.
221    ///
222    /// # Parameters
223    ///
224    /// * `items` - Item source for the logical batch.
225    /// * `count` - Declared number of items expected from `items`.
226    ///
227    /// # Returns
228    ///
229    /// A result aggregating all successfully delegated chunks.
230    ///
231    /// # Errors
232    ///
233    /// Returns [`ChunkedBatchProcessError`] when the source count does not
234    /// match `count`, when the delegate fails for one chunk, or when a delegate
235    /// `Ok` result does not describe the submitted chunk.
236    fn process_with_count<I>(
237        &mut self,
238        items: I,
239        count: usize,
240    ) -> Result<BatchProcessResult, Self::Error>
241    where
242        I: IntoIterator<Item = Item>,
243    {
244        let reporter = Arc::clone(&self.reporter);
245        let mut progress = Progress::new(reporter.as_ref(), self.report_interval);
246        let state = BatchProcessState::new(count);
247        progress.report_started(state.progress_counters());
248        let capacity = cmp::min(self.chunk_size.get(), count.max(1));
249        let mut chunk = Vec::with_capacity(capacity);
250
251        for item in items {
252            let observed_count = state.record_item_observed();
253            if observed_count > count {
254                if !chunk.is_empty() {
255                    self.process_chunk(&mut chunk, &state, &mut progress)?;
256                }
257                let failed = progress.report_failed(state.progress_counters());
258                let result = state.to_chunked_result(failed.elapsed());
259                return Err(ChunkedBatchProcessError::CountExceeded {
260                    expected: count,
261                    observed_at_least: observed_count,
262                    result,
263                });
264            }
265            chunk.push(item);
266            if chunk.len() == self.chunk_size.get() {
267                self.process_chunk(&mut chunk, &state, &mut progress)?;
268            }
269        }
270
271        if !chunk.is_empty() {
272            self.process_chunk(&mut chunk, &state, &mut progress)?;
273        }
274
275        if state.observed_count() < count {
276            let failed = progress.report_failed(state.progress_counters());
277            let result = state.to_chunked_result(failed.elapsed());
278            Err(ChunkedBatchProcessError::CountShortfall {
279                expected: count,
280                actual: state.observed_count(),
281                result,
282            })
283        } else {
284            let finished = progress.report_finished(state.progress_counters());
285            let result = state.to_chunked_result(finished.elapsed());
286            Ok(result)
287        }
288    }
289}
290
291impl<P> ChunkedBatchProcessor<P> {
292    /// Submits one collected chunk to the delegate and updates aggregate state.
293    ///
294    /// # Parameters
295    ///
296    /// * `chunk` - Buffered items waiting to be submitted.
297    /// * `state` - Aggregate counters updated after successful delegation.
298    /// * `progress` - Progress run used for lifecycle and periodic callbacks.
299    ///
300    /// # Returns
301    ///
302    /// Returns `Ok(())` after the delegate accepts the chunk.
303    ///
304    /// # Errors
305    ///
306    /// Returns [`ChunkedBatchProcessError::ChunkFailed`] when the delegate
307    /// returns an error.
308    fn process_chunk<Item>(
309        &mut self,
310        chunk: &mut Vec<Item>,
311        state: &BatchProcessState,
312        progress: &mut Progress<'_>,
313    ) -> Result<(), ChunkedBatchProcessError<P::Error>>
314    where
315        P: BatchProcessor<Item>,
316    {
317        let chunk_len = chunk.len();
318        let start_index = state.completed_count();
319        let chunk_index = state.chunk_count();
320        let current_chunk = std::mem::take(chunk);
321        match self.delegate.process_with_count(current_chunk, chunk_len) {
322            Ok(chunk_result) => {
323                if chunk_result.item_count() != chunk_len
324                    || chunk_result.completed_count() != chunk_len
325                {
326                    let failed = progress.report_failed(state.progress_counters());
327                    let result = state.to_chunked_result(failed.elapsed());
328                    return Err(ChunkedBatchProcessError::InvalidChunkResult {
329                        chunk_index,
330                        start_index,
331                        chunk_len,
332                        item_count: chunk_result.item_count(),
333                        completed_count: chunk_result.completed_count(),
334                        result,
335                    });
336                }
337                state.record_chunk_processed(chunk_len, chunk_result.processed_count());
338                let _ = progress.report_running_if_due(state.running_chunk_progress_counters());
339                Ok(())
340            }
341            Err(source) => {
342                let failed = progress.report_failed(state.progress_counters());
343                let result = state.to_chunked_result(failed.elapsed());
344                Err(ChunkedBatchProcessError::ChunkFailed {
345                    chunk_index,
346                    start_index,
347                    chunk_len,
348                    source,
349                    result,
350                })
351            }
352        }
353    }
354}