qubit_batch/process/impls/sequential_batch_processor.rs
1/*******************************************************************************
2 *
3 * Copyright (c) 2025 - 2026 Haixing Hu.
4 *
5 * SPDX-License-Identifier: Apache-2.0
6 *
7 * Licensed under the Apache License, Version 2.0.
8 *
9 ******************************************************************************/
10use std::{
11 sync::Arc,
12 time::Duration,
13};
14
15use qubit_function::{
16 BoxConsumer,
17 Consumer,
18};
19use qubit_progress::{
20 Progress,
21 reporter::ProgressReporter,
22};
23
24use crate::process::{
25 BatchProcessError,
26 BatchProcessResult,
27 BatchProcessState,
28 BatchProcessor,
29};
30
31use super::SequentialBatchProcessorBuilder;
32
33/// Processes batch items sequentially by invoking a [`Consumer`] per item.
34///
35/// The processor stores the supplied consumer as a [`BoxConsumer`] and invokes it
36/// on the caller thread in input order. Consumer panics are not caught; they
37/// propagate to the caller and no [`BatchProcessResult`] is produced. Progress
38/// updates are emitted only between items.
39///
40/// # Type Parameters
41///
42/// * `Item` - Item type consumed by the stored consumer.
43///
44/// ```rust
45/// use qubit_batch::{
46/// BatchProcessor,
47/// SequentialBatchProcessor,
48/// };
49///
50/// let mut processor = SequentialBatchProcessor::new(|item: &i32| {
51/// assert!(*item > 0);
52/// });
53///
54/// let result = processor
55/// .process([1, 2, 3])
56/// .expect("array length should be exact");
57///
58/// assert!(result.is_success());
59/// ```
60pub struct SequentialBatchProcessor<Item> {
61 /// Consumer called once for each accepted item.
62 pub(crate) consumer: BoxConsumer<Item>,
63 /// Interval between progress callbacks while the batch is running.
64 pub(crate) report_interval: Duration,
65 /// Reporter receiving batch lifecycle callbacks.
66 pub(crate) reporter: Arc<dyn ProgressReporter>,
67}
68
69impl<Item> SequentialBatchProcessor<Item> {
70 /// Default interval between progress callbacks.
71 pub const DEFAULT_REPORT_INTERVAL: Duration = Duration::from_secs(5);
72
73 /// Creates a sequential consumer-backed batch processor.
74 ///
75 /// # Parameters
76 ///
77 /// * `consumer` - Consumer invoked once for each input item.
78 ///
79 /// # Returns
80 ///
81 /// A processor storing `consumer` as a [`BoxConsumer`].
82 #[inline]
83 pub fn new<C>(consumer: C) -> Self
84 where
85 C: Consumer<Item> + 'static,
86 {
87 Self::builder(consumer).build()
88 }
89
90 /// Creates a builder for configuring a sequential consumer-backed
91 /// processor.
92 ///
93 /// # Parameters
94 ///
95 /// * `consumer` - Consumer invoked once for each input item.
96 ///
97 /// # Returns
98 ///
99 /// A builder initialized with default settings.
100 #[inline]
101 pub fn builder<C>(consumer: C) -> SequentialBatchProcessorBuilder<Item>
102 where
103 C: Consumer<Item> + 'static,
104 {
105 SequentialBatchProcessorBuilder::new(consumer)
106 }
107
108 /// Returns the configured progress-report interval.
109 ///
110 /// # Returns
111 ///
112 /// The minimum time between due-based running progress callbacks.
113 #[inline]
114 pub const fn report_interval(&self) -> Duration {
115 self.report_interval
116 }
117
118 /// Returns the configured progress reporter.
119 ///
120 /// # Returns
121 ///
122 /// A shared reference to the configured progress reporter.
123 #[inline]
124 pub fn reporter(&self) -> &Arc<dyn ProgressReporter> {
125 &self.reporter
126 }
127
128 /// Returns the stored consumer.
129 ///
130 /// # Returns
131 ///
132 /// A shared reference to the boxed consumer.
133 #[inline]
134 pub const fn consumer(&self) -> &BoxConsumer<Item> {
135 &self.consumer
136 }
137
138 /// Consumes this processor and returns the stored consumer.
139 ///
140 /// # Returns
141 ///
142 /// The boxed consumer used by this processor.
143 #[inline]
144 pub fn into_consumer(self) -> BoxConsumer<Item> {
145 self.consumer
146 }
147}
148
149impl<Item> BatchProcessor<Item> for SequentialBatchProcessor<Item> {
150 type Error = BatchProcessError;
151
152 /// Processes items sequentially on the caller thread.
153 ///
154 /// # Parameters
155 ///
156 /// * `items` - Item source for the batch.
157 /// * `count` - Declared number of items expected from `items`.
158 ///
159 /// # Returns
160 ///
161 /// A result with completed and processed counts equal to the number of
162 /// consumer calls when the input source yields exactly `count` items.
163 ///
164 /// # Errors
165 ///
166 /// Returns [`BatchProcessError::CountShortfall`] when the source ends before
167 /// `count`, or [`BatchProcessError::CountExceeded`] when the source yields an
168 /// extra item. Extra items are observed but not passed to the consumer.
169 ///
170 /// # Panics
171 ///
172 /// Propagates any panic raised by the stored consumer or the configured
173 /// progress reporter.
174 fn process_with_count<I>(
175 &mut self,
176 items: I,
177 count: usize,
178 ) -> Result<BatchProcessResult, Self::Error>
179 where
180 I: IntoIterator<Item = Item>,
181 {
182 let state = BatchProcessState::new(count);
183 let mut progress = Progress::new(self.reporter.as_ref(), self.report_interval);
184 progress.report_started(state.progress_counters());
185
186 for item in items {
187 let observed_count = state.record_item_observed();
188 if observed_count > count {
189 let failed = progress.report_failed(state.progress_counters());
190 let result = state.to_direct_result(failed.elapsed());
191 return Err(BatchProcessError::CountExceeded {
192 expected: count,
193 observed_at_least: observed_count,
194 result,
195 });
196 }
197 state.record_item_started();
198 self.consumer.accept(&item);
199 state.record_item_processed();
200 let _ = progress.report_running_if_due(state.progress_counters());
201 }
202
203 if state.observed_count() < count {
204 let failed = progress.report_failed(state.progress_counters());
205 let result = state.to_direct_result(failed.elapsed());
206 Err(BatchProcessError::CountShortfall {
207 expected: count,
208 actual: state.observed_count(),
209 result,
210 })
211 } else {
212 let finished = progress.report_finished(state.progress_counters());
213 let result = state.to_direct_result(finished.elapsed());
214 Ok(result)
215 }
216 }
217}