Skip to main content

qubit_batch/process/impls/
sequential_batch_processor.rs

1/*******************************************************************************
2 *
3 *    Copyright (c) 2025 - 2026 Haixing Hu.
4 *
5 *    SPDX-License-Identifier: Apache-2.0
6 *
7 *    Licensed under the Apache License, Version 2.0.
8 *
9 ******************************************************************************/
10use std::{
11    sync::Arc,
12    time::Duration,
13};
14
15use qubit_function::{
16    BoxConsumer,
17    Consumer,
18};
19use qubit_progress::{
20    Progress,
21    reporter::ProgressReporter,
22};
23
24use crate::process::{
25    BatchProcessError,
26    BatchProcessResult,
27    BatchProcessState,
28    BatchProcessor,
29};
30
31use super::SequentialBatchProcessorBuilder;
32
33/// Processes batch items sequentially by invoking a [`Consumer`] per item.
34///
35/// The processor stores the supplied consumer as a [`BoxConsumer`] and invokes it
36/// on the caller thread in input order. Consumer panics are not caught; they
37/// propagate to the caller and no [`BatchProcessResult`] is produced. Progress
38/// updates are emitted only between items.
39///
40/// # Type Parameters
41///
42/// * `Item` - Item type consumed by the stored consumer.
43///
44/// ```rust
45/// use qubit_batch::{
46///     BatchProcessor,
47///     SequentialBatchProcessor,
48/// };
49///
50/// let mut processor = SequentialBatchProcessor::new(|item: &i32| {
51///     assert!(*item > 0);
52/// });
53///
54/// let result = processor
55///     .process([1, 2, 3])
56///     .expect("array length should be exact");
57///
58/// assert!(result.is_success());
59/// ```
60pub struct SequentialBatchProcessor<Item> {
61    /// Consumer called once for each accepted item.
62    pub(crate) consumer: BoxConsumer<Item>,
63    /// Interval between progress callbacks while the batch is running.
64    pub(crate) report_interval: Duration,
65    /// Reporter receiving batch lifecycle callbacks.
66    pub(crate) reporter: Arc<dyn ProgressReporter>,
67}
68
69impl<Item> SequentialBatchProcessor<Item> {
70    /// Default interval between progress callbacks.
71    pub const DEFAULT_REPORT_INTERVAL: Duration = Duration::from_secs(5);
72
73    /// Creates a sequential consumer-backed batch processor.
74    ///
75    /// # Parameters
76    ///
77    /// * `consumer` - Consumer invoked once for each input item.
78    ///
79    /// # Returns
80    ///
81    /// A processor storing `consumer` as a [`BoxConsumer`].
82    #[inline]
83    pub fn new<C>(consumer: C) -> Self
84    where
85        C: Consumer<Item> + 'static,
86    {
87        Self::builder(consumer).build()
88    }
89
90    /// Creates a builder for configuring a sequential consumer-backed
91    /// processor.
92    ///
93    /// # Parameters
94    ///
95    /// * `consumer` - Consumer invoked once for each input item.
96    ///
97    /// # Returns
98    ///
99    /// A builder initialized with default settings.
100    #[inline]
101    pub fn builder<C>(consumer: C) -> SequentialBatchProcessorBuilder<Item>
102    where
103        C: Consumer<Item> + 'static,
104    {
105        SequentialBatchProcessorBuilder::new(consumer)
106    }
107
108    /// Returns the configured progress-report interval.
109    ///
110    /// # Returns
111    ///
112    /// The minimum time between due-based running progress callbacks.
113    #[inline]
114    pub const fn report_interval(&self) -> Duration {
115        self.report_interval
116    }
117
118    /// Returns the configured progress reporter.
119    ///
120    /// # Returns
121    ///
122    /// A shared reference to the configured progress reporter.
123    #[inline]
124    pub fn reporter(&self) -> &Arc<dyn ProgressReporter> {
125        &self.reporter
126    }
127
128    /// Returns the stored consumer.
129    ///
130    /// # Returns
131    ///
132    /// A shared reference to the boxed consumer.
133    #[inline]
134    pub const fn consumer(&self) -> &BoxConsumer<Item> {
135        &self.consumer
136    }
137
138    /// Consumes this processor and returns the stored consumer.
139    ///
140    /// # Returns
141    ///
142    /// The boxed consumer used by this processor.
143    #[inline]
144    pub fn into_consumer(self) -> BoxConsumer<Item> {
145        self.consumer
146    }
147}
148
149impl<Item> BatchProcessor<Item> for SequentialBatchProcessor<Item> {
150    type Error = BatchProcessError;
151
152    /// Processes items sequentially on the caller thread.
153    ///
154    /// # Parameters
155    ///
156    /// * `items` - Item source for the batch.
157    /// * `count` - Declared number of items expected from `items`.
158    ///
159    /// # Returns
160    ///
161    /// A result with completed and processed counts equal to the number of
162    /// consumer calls when the input source yields exactly `count` items.
163    ///
164    /// # Errors
165    ///
166    /// Returns [`BatchProcessError::CountShortfall`] when the source ends before
167    /// `count`, or [`BatchProcessError::CountExceeded`] when the source yields an
168    /// extra item. Extra items are observed but not passed to the consumer.
169    ///
170    /// # Panics
171    ///
172    /// Propagates any panic raised by the stored consumer or the configured
173    /// progress reporter.
174    fn process_with_count<I>(
175        &mut self,
176        items: I,
177        count: usize,
178    ) -> Result<BatchProcessResult, Self::Error>
179    where
180        I: IntoIterator<Item = Item>,
181    {
182        let state = BatchProcessState::new(count);
183        let mut progress = Progress::new(self.reporter.as_ref(), self.report_interval);
184        progress.report_started(state.progress_counters());
185
186        for item in items {
187            let observed_count = state.record_item_observed();
188            if observed_count > count {
189                let failed = progress.report_failed(state.progress_counters());
190                let result = state.to_direct_result(failed.elapsed());
191                return Err(BatchProcessError::CountExceeded {
192                    expected: count,
193                    observed_at_least: observed_count,
194                    result,
195                });
196            }
197            state.record_item_started();
198            self.consumer.accept(&item);
199            state.record_item_processed();
200            let _ = progress.report_running_if_due(state.progress_counters());
201        }
202
203        if state.observed_count() < count {
204            let failed = progress.report_failed(state.progress_counters());
205            let result = state.to_direct_result(failed.elapsed());
206            Err(BatchProcessError::CountShortfall {
207                expected: count,
208                actual: state.observed_count(),
209                result,
210            })
211        } else {
212            let finished = progress.report_finished(state.progress_counters());
213            let result = state.to_direct_result(finished.elapsed());
214            Ok(result)
215        }
216    }
217}