qubit_batch/process/impls/chunked_batch_processor.rs
1/*******************************************************************************
2 *
3 * Copyright (c) 2025 - 2026 Haixing Hu.
4 *
5 * SPDX-License-Identifier: Apache-2.0
6 *
7 * Licensed under the Apache License, Version 2.0.
8 *
9 ******************************************************************************/
10use std::{
11 cmp,
12 num::NonZeroUsize,
13 sync::Arc,
14 time::Duration,
15};
16
17use qubit_progress::{
18 Progress,
19 reporter::ProgressReporter,
20};
21
22use crate::process::{
23 BatchProcessResult,
24 BatchProcessState,
25 BatchProcessor,
26 ChunkedBatchProcessError,
27};
28
29use super::ChunkedBatchProcessorBuilder;
30
31/// Processes input items by submitting fixed-size chunks to a delegate.
32///
33/// `ChunkedBatchProcessor` is useful when the caller has a large logical batch
34/// but the real target must receive smaller batches, such as SQL batch insert
35/// operations with a maximum row count per statement.
36///
37/// The delegate must return a result whose `item_count` and `completed_count`
38/// match the submitted chunk length whenever it returns `Ok`. The delegate may
39/// still report a lower `processed_count`, such as when a database reports
40/// fewer affected rows than submitted rows. If the delegate cannot reach a
41/// terminal outcome for every item in the chunk, it should return `Err`;
42/// inconsistent `Ok` results are returned as
43/// [`ChunkedBatchProcessError::InvalidChunkResult`].
44///
45/// # Type Parameters
46///
47/// * `P` - Delegate processor receiving each chunk.
48///
49/// ```rust
50/// use std::{
51/// num::NonZeroUsize,
52/// time::Duration,
53/// };
54///
55/// use qubit_batch::{
56/// BatchProcessResult,
57/// BatchProcessResultBuilder,
58/// BatchProcessor,
59/// ChunkedBatchProcessor,
60/// };
61///
62/// struct InsertChunk;
63///
64/// impl BatchProcessor<i32> for InsertChunk {
65/// type Error = &'static str;
66///
67/// fn process_with_count<I>(
68/// &mut self,
69/// rows: I,
70/// count: usize,
71/// ) -> Result<BatchProcessResult, Self::Error>
72/// where
73/// I: IntoIterator<Item = i32>,
74/// {
75/// let processed = rows.into_iter().count();
76/// BatchProcessResultBuilder::builder(count)
77/// .completed_count(processed)
78/// .processed_count(processed)
79/// .chunk_count(1)
80/// .elapsed(Duration::ZERO)
81/// .build()
82/// .map_err(|_| "invalid process result")
83/// }
84/// }
85///
86/// let mut processor = ChunkedBatchProcessor::new(
87/// InsertChunk,
88/// NonZeroUsize::new(2).expect("chunk size should be non-zero"),
89/// );
90///
91/// let result = processor
92/// .process([1, 2, 3, 4, 5])
93/// .expect("array length should be exact");
94///
95/// assert_eq!(result.completed_count(), 5);
96/// assert_eq!(result.chunk_count(), 3);
97/// ```
98///
99pub struct ChunkedBatchProcessor<P> {
100 /// Delegate processor receiving each chunk.
101 pub(crate) delegate: P,
102 /// Maximum number of items submitted to the delegate at once.
103 pub(crate) chunk_size: NonZeroUsize,
104 /// Minimum interval between progress callbacks.
105 pub(crate) report_interval: Duration,
106 /// Reporter receiving batch lifecycle callbacks.
107 pub(crate) reporter: Arc<dyn ProgressReporter>,
108}
109
110impl<P> ChunkedBatchProcessor<P> {
111 /// Default interval between progress callbacks.
112 pub const DEFAULT_REPORT_INTERVAL: Duration = Duration::from_secs(5);
113
114 /// Creates a chunked batch processor.
115 ///
116 /// # Parameters
117 ///
118 /// * `delegate` - Processor receiving each chunk.
119 /// * `chunk_size` - Maximum number of items submitted in one chunk.
120 ///
121 /// # Returns
122 ///
123 /// A chunked processor using no-op progress reporting.
124 ///
125 /// # Type Constraints
126 ///
127 /// This constructor only stores `delegate`; it intentionally does not
128 /// require `P: BatchProcessor<Item>` because the item type is not part of
129 /// construction. That bound is enforced when this wrapper is used as a
130 /// [`BatchProcessor<Item>`], such as when calling [`BatchProcessor::process`].
131 /// Therefore, a value can be constructed with any delegate type, but it can
132 /// only process items for item types that the delegate actually supports.
133 #[inline]
134 pub fn new(delegate: P, chunk_size: NonZeroUsize) -> Self {
135 Self::builder(delegate, chunk_size).build()
136 }
137
138 /// Creates a builder for configuring a chunked batch processor.
139 ///
140 /// # Parameters
141 ///
142 /// * `delegate` - Processor receiving each chunk.
143 /// * `chunk_size` - Maximum number of items submitted in one chunk.
144 ///
145 /// # Returns
146 ///
147 /// A builder initialized with default settings.
148 #[inline]
149 pub fn builder(delegate: P, chunk_size: NonZeroUsize) -> ChunkedBatchProcessorBuilder<P> {
150 ChunkedBatchProcessorBuilder::new(delegate, chunk_size)
151 }
152
153 /// Returns the configured chunk size.
154 ///
155 /// # Returns
156 ///
157 /// The maximum number of items submitted to the delegate at once.
158 #[inline]
159 pub const fn chunk_size(&self) -> NonZeroUsize {
160 self.chunk_size
161 }
162
163 /// Returns the configured progress-report interval.
164 ///
165 /// # Returns
166 ///
167 /// The minimum time between due-based running progress callbacks.
168 #[inline]
169 pub const fn report_interval(&self) -> Duration {
170 self.report_interval
171 }
172
173 /// Returns the configured progress reporter.
174 ///
175 /// # Returns
176 ///
177 /// A shared reference to the configured progress reporter.
178 #[inline]
179 pub fn reporter(&self) -> &Arc<dyn ProgressReporter> {
180 &self.reporter
181 }
182
183 /// Returns a shared reference to the delegate processor.
184 ///
185 /// # Returns
186 ///
187 /// The wrapped delegate processor.
188 #[inline]
189 pub const fn delegate(&self) -> &P {
190 &self.delegate
191 }
192
193 /// Returns a mutable reference to the delegate processor.
194 ///
195 /// # Returns
196 ///
197 /// The wrapped delegate processor.
198 #[inline]
199 pub fn delegate_mut(&mut self) -> &mut P {
200 &mut self.delegate
201 }
202
203 /// Consumes this wrapper and returns the delegate processor.
204 ///
205 /// # Returns
206 ///
207 /// The wrapped delegate processor.
208 #[inline]
209 pub fn into_delegate(self) -> P {
210 self.delegate
211 }
212}
213
214impl<Item, P> BatchProcessor<Item> for ChunkedBatchProcessor<P>
215where
216 P: BatchProcessor<Item>,
217{
218 type Error = ChunkedBatchProcessError<P::Error>;
219
220 /// Processes items by delegating fixed-size chunks.
221 ///
222 /// # Parameters
223 ///
224 /// * `items` - Item source for the logical batch.
225 /// * `count` - Declared number of items expected from `items`.
226 ///
227 /// # Returns
228 ///
229 /// A result aggregating all successfully delegated chunks.
230 ///
231 /// # Errors
232 ///
233 /// Returns [`ChunkedBatchProcessError`] when the source count does not
234 /// match `count`, when the delegate fails for one chunk, or when a delegate
235 /// `Ok` result does not describe the submitted chunk.
236 fn process_with_count<I>(
237 &mut self,
238 items: I,
239 count: usize,
240 ) -> Result<BatchProcessResult, Self::Error>
241 where
242 I: IntoIterator<Item = Item>,
243 {
244 let reporter = Arc::clone(&self.reporter);
245 let mut progress = Progress::new(reporter.as_ref(), self.report_interval);
246 let state = BatchProcessState::new(count);
247 progress.report_started(state.progress_counters());
248 let capacity = cmp::min(self.chunk_size.get(), count.max(1));
249 let mut chunk = Vec::with_capacity(capacity);
250
251 for item in items {
252 let observed_count = state.record_item_observed();
253 if observed_count > count {
254 if !chunk.is_empty() {
255 self.process_chunk(&mut chunk, &state, &mut progress)?;
256 }
257 let failed = progress.report_failed(state.progress_counters());
258 let result = state.to_chunked_result(failed.elapsed());
259 return Err(ChunkedBatchProcessError::CountExceeded {
260 expected: count,
261 observed_at_least: observed_count,
262 result,
263 });
264 }
265 chunk.push(item);
266 if chunk.len() == self.chunk_size.get() {
267 self.process_chunk(&mut chunk, &state, &mut progress)?;
268 }
269 }
270
271 if !chunk.is_empty() {
272 self.process_chunk(&mut chunk, &state, &mut progress)?;
273 }
274
275 if state.observed_count() < count {
276 let failed = progress.report_failed(state.progress_counters());
277 let result = state.to_chunked_result(failed.elapsed());
278 Err(ChunkedBatchProcessError::CountShortfall {
279 expected: count,
280 actual: state.observed_count(),
281 result,
282 })
283 } else {
284 let finished = progress.report_finished(state.progress_counters());
285 let result = state.to_chunked_result(finished.elapsed());
286 Ok(result)
287 }
288 }
289}
290
291impl<P> ChunkedBatchProcessor<P> {
292 /// Submits one collected chunk to the delegate and updates aggregate state.
293 ///
294 /// # Parameters
295 ///
296 /// * `chunk` - Buffered items waiting to be submitted.
297 /// * `state` - Aggregate counters updated after successful delegation.
298 /// * `progress` - Progress run used for lifecycle and periodic callbacks.
299 ///
300 /// # Returns
301 ///
302 /// Returns `Ok(())` after the delegate accepts the chunk.
303 ///
304 /// # Errors
305 ///
306 /// Returns [`ChunkedBatchProcessError::ChunkFailed`] when the delegate
307 /// returns an error.
308 fn process_chunk<Item>(
309 &mut self,
310 chunk: &mut Vec<Item>,
311 state: &BatchProcessState,
312 progress: &mut Progress<'_>,
313 ) -> Result<(), ChunkedBatchProcessError<P::Error>>
314 where
315 P: BatchProcessor<Item>,
316 {
317 let chunk_len = chunk.len();
318 let start_index = state.completed_count();
319 let chunk_index = state.chunk_count();
320 let current_chunk = std::mem::take(chunk);
321 match self.delegate.process_with_count(current_chunk, chunk_len) {
322 Ok(chunk_result) => {
323 if chunk_result.item_count() != chunk_len
324 || chunk_result.completed_count() != chunk_len
325 {
326 let failed = progress.report_failed(state.progress_counters());
327 let result = state.to_chunked_result(failed.elapsed());
328 return Err(ChunkedBatchProcessError::InvalidChunkResult {
329 chunk_index,
330 start_index,
331 chunk_len,
332 item_count: chunk_result.item_count(),
333 completed_count: chunk_result.completed_count(),
334 result,
335 });
336 }
337 state.record_chunk_processed(chunk_len, chunk_result.processed_count());
338 let _ = progress.report_running_if_due(state.running_chunk_progress_counters());
339 Ok(())
340 }
341 Err(source) => {
342 let failed = progress.report_failed(state.progress_counters());
343 let result = state.to_chunked_result(failed.elapsed());
344 Err(ChunkedBatchProcessError::ChunkFailed {
345 chunk_index,
346 start_index,
347 chunk_len,
348 source,
349 result,
350 })
351 }
352 }
353 }
354}