pub struct ChunkedBatchProcessor<P> { /* private fields */ }Expand description
Processes input items by submitting fixed-size chunks to a delegate.
ChunkedBatchProcessor is useful when the caller has a large logical batch
but the real target must receive smaller batches, such as SQL batch insert
operations with a maximum row count per statement.
The delegate must return a result whose item_count and completed_count
match the submitted chunk length whenever it returns Ok. The delegate may
still report a lower processed_count, such as when a database reports
fewer affected rows than submitted rows. If the delegate cannot reach a
terminal outcome for every item in the chunk, it should return Err;
inconsistent Ok results are returned as
ChunkedBatchProcessError::InvalidChunkResult.
§Type Parameters
P- Delegate processor receiving each chunk.
use std::{
num::NonZeroUsize,
time::Duration,
};
use qubit_batch::{
BatchProcessResult,
BatchProcessResultBuilder,
BatchProcessor,
ChunkedBatchProcessor,
};
struct InsertChunk;
impl BatchProcessor<i32> for InsertChunk {
type Error = &'static str;
fn process_with_count<I>(
&mut self,
rows: I,
count: usize,
) -> Result<BatchProcessResult, Self::Error>
where
I: IntoIterator<Item = i32>,
{
let processed = rows.into_iter().count();
BatchProcessResultBuilder::builder(count)
.completed_count(processed)
.processed_count(processed)
.chunk_count(1)
.elapsed(Duration::ZERO)
.build()
.map_err(|_| "invalid process result")
}
}
let mut processor = ChunkedBatchProcessor::new(
InsertChunk,
NonZeroUsize::new(2).expect("chunk size should be non-zero"),
);
let result = processor
.process([1, 2, 3, 4, 5])
.expect("array length should be exact");
assert_eq!(result.completed_count(), 5);
assert_eq!(result.chunk_count(), 3);Implementations§
Source§impl<P> ChunkedBatchProcessor<P>
impl<P> ChunkedBatchProcessor<P>
Sourcepub const DEFAULT_REPORT_INTERVAL: Duration
pub const DEFAULT_REPORT_INTERVAL: Duration
Default interval between progress callbacks.
Sourcepub fn new(delegate: P, chunk_size: NonZeroUsize) -> Self
pub fn new(delegate: P, chunk_size: NonZeroUsize) -> Self
Creates a chunked batch processor.
§Parameters
delegate- Processor receiving each chunk.chunk_size- Maximum number of items submitted in one chunk.
§Returns
A chunked processor using no-op progress reporting.
§Type Constraints
This constructor only stores delegate; it intentionally does not
require P: BatchProcessor<Item> because the item type is not part of
construction. That bound is enforced when this wrapper is used as a
BatchProcessor<Item>, such as when calling BatchProcessor::process.
Therefore, a value can be constructed with any delegate type, but it can
only process items for item types that the delegate actually supports.
Sourcepub fn builder(
delegate: P,
chunk_size: NonZeroUsize,
) -> ChunkedBatchProcessorBuilder<P>
pub fn builder( delegate: P, chunk_size: NonZeroUsize, ) -> ChunkedBatchProcessorBuilder<P>
Sourcepub const fn chunk_size(&self) -> NonZeroUsize
pub const fn chunk_size(&self) -> NonZeroUsize
Returns the configured chunk size.
§Returns
The maximum number of items submitted to the delegate at once.
Sourcepub const fn report_interval(&self) -> Duration
pub const fn report_interval(&self) -> Duration
Returns the configured progress-report interval.
§Returns
The minimum time between due-based running progress callbacks.
Sourcepub fn reporter(&self) -> &Arc<dyn ProgressReporter>
pub fn reporter(&self) -> &Arc<dyn ProgressReporter>
Returns the configured progress reporter.
§Returns
A shared reference to the configured progress reporter.
Sourcepub fn delegate_mut(&mut self) -> &mut P
pub fn delegate_mut(&mut self) -> &mut P
Sourcepub fn into_delegate(self) -> P
pub fn into_delegate(self) -> P
Trait Implementations§
Source§impl<Item, P> BatchProcessor<Item> for ChunkedBatchProcessor<P>where
P: BatchProcessor<Item>,
impl<Item, P> BatchProcessor<Item> for ChunkedBatchProcessor<P>where
P: BatchProcessor<Item>,
Source§fn process_with_count<I>(
&mut self,
items: I,
count: usize,
) -> Result<BatchProcessResult, Self::Error>where
I: IntoIterator<Item = Item>,
fn process_with_count<I>(
&mut self,
items: I,
count: usize,
) -> Result<BatchProcessResult, Self::Error>where
I: IntoIterator<Item = Item>,
Processes items by delegating fixed-size chunks.
§Parameters
items- Item source for the logical batch.count- Declared number of items expected fromitems.
§Returns
A result aggregating all successfully delegated chunks.
§Errors
Returns ChunkedBatchProcessError when the source count does not
match count, when the delegate fails for one chunk, or when a delegate
Ok result does not describe the submitted chunk.