Skip to main content

ChunkedBatchProcessor

Struct ChunkedBatchProcessor 

Source
pub struct ChunkedBatchProcessor<P> { /* private fields */ }
Expand description

Processes input items by submitting fixed-size chunks to a delegate.

ChunkedBatchProcessor is useful when the caller has a large logical batch but the real target must receive smaller batches, such as SQL batch insert operations with a maximum row count per statement.

The delegate must return a result whose item_count and completed_count match the submitted chunk length whenever it returns Ok. The delegate may still report a lower processed_count, such as when a database reports fewer affected rows than submitted rows. If the delegate cannot reach a terminal outcome for every item in the chunk, it should return Err; inconsistent Ok results are returned as ChunkedBatchProcessError::InvalidChunkResult.

§Type Parameters

  • P - Delegate processor receiving each chunk.
use std::{
    num::NonZeroUsize,
    time::Duration,
};

use qubit_batch::{
    BatchProcessResult,
    BatchProcessResultBuilder,
    BatchProcessor,
    ChunkedBatchProcessor,
};

struct InsertChunk;

impl BatchProcessor<i32> for InsertChunk {
    type Error = &'static str;

    fn process_with_count<I>(
        &mut self,
        rows: I,
        count: usize,
    ) -> Result<BatchProcessResult, Self::Error>
    where
        I: IntoIterator<Item = i32>,
    {
        let processed = rows.into_iter().count();
        BatchProcessResultBuilder::builder(count)
            .completed_count(processed)
            .processed_count(processed)
            .chunk_count(1)
            .elapsed(Duration::ZERO)
            .build()
            .map_err(|_| "invalid process result")
    }
}

let mut processor = ChunkedBatchProcessor::new(
    InsertChunk,
    NonZeroUsize::new(2).expect("chunk size should be non-zero"),
);

let result = processor
    .process([1, 2, 3, 4, 5])
    .expect("array length should be exact");

assert_eq!(result.completed_count(), 5);
assert_eq!(result.chunk_count(), 3);

Implementations§

Source§

impl<P> ChunkedBatchProcessor<P>

Source

pub const DEFAULT_REPORT_INTERVAL: Duration

Default interval between progress callbacks.

Source

pub fn new(delegate: P, chunk_size: NonZeroUsize) -> Self

Creates a chunked batch processor.

§Parameters
  • delegate - Processor receiving each chunk.
  • chunk_size - Maximum number of items submitted in one chunk.
§Returns

A chunked processor using no-op progress reporting.

§Type Constraints

This constructor only stores delegate; it intentionally does not require P: BatchProcessor<Item> because the item type is not part of construction. That bound is enforced when this wrapper is used as a BatchProcessor<Item>, such as when calling BatchProcessor::process. Therefore, a value can be constructed with any delegate type, but it can only process items for item types that the delegate actually supports.

Source

pub fn builder( delegate: P, chunk_size: NonZeroUsize, ) -> ChunkedBatchProcessorBuilder<P>

Creates a builder for configuring a chunked batch processor.

§Parameters
  • delegate - Processor receiving each chunk.
  • chunk_size - Maximum number of items submitted in one chunk.
§Returns

A builder initialized with default settings.

Source

pub const fn chunk_size(&self) -> NonZeroUsize

Returns the configured chunk size.

§Returns

The maximum number of items submitted to the delegate at once.

Source

pub const fn report_interval(&self) -> Duration

Returns the configured progress-report interval.

§Returns

The minimum time between due-based running progress callbacks.

Source

pub fn reporter(&self) -> &Arc<dyn ProgressReporter>

Returns the configured progress reporter.

§Returns

A shared reference to the configured progress reporter.

Source

pub const fn delegate(&self) -> &P

Returns a shared reference to the delegate processor.

§Returns

The wrapped delegate processor.

Source

pub fn delegate_mut(&mut self) -> &mut P

Returns a mutable reference to the delegate processor.

§Returns

The wrapped delegate processor.

Source

pub fn into_delegate(self) -> P

Consumes this wrapper and returns the delegate processor.

§Returns

The wrapped delegate processor.

Trait Implementations§

Source§

impl<Item, P> BatchProcessor<Item> for ChunkedBatchProcessor<P>
where P: BatchProcessor<Item>,

Source§

fn process_with_count<I>( &mut self, items: I, count: usize, ) -> Result<BatchProcessResult, Self::Error>
where I: IntoIterator<Item = Item>,

Processes items by delegating fixed-size chunks.

§Parameters
  • items - Item source for the logical batch.
  • count - Declared number of items expected from items.
§Returns

A result aggregating all successfully delegated chunks.

§Errors

Returns ChunkedBatchProcessError when the source count does not match count, when the delegate fails for one chunk, or when a delegate Ok result does not describe the submitted chunk.

Source§

type Error = ChunkedBatchProcessError<<P as BatchProcessor<Item>>::Error>

Error returned by this processor.
Source§

fn process<I>(&mut self, items: I) -> Result<BatchProcessResult, Self::Error>
where I: IntoIterator<Item = Item>, I::IntoIter: ExactSizeIterator,

Processes items as one batch using its exact iterator length. Read more

Auto Trait Implementations§

§

impl<P> Freeze for ChunkedBatchProcessor<P>
where P: Freeze,

§

impl<P> !RefUnwindSafe for ChunkedBatchProcessor<P>

§

impl<P> Send for ChunkedBatchProcessor<P>
where P: Send,

§

impl<P> Sync for ChunkedBatchProcessor<P>
where P: Sync,

§

impl<P> Unpin for ChunkedBatchProcessor<P>
where P: Unpin,

§

impl<P> UnsafeUnpin for ChunkedBatchProcessor<P>
where P: UnsafeUnpin,

§

impl<P> !UnwindSafe for ChunkedBatchProcessor<P>

Blanket Implementations§

Source§

impl<T> Any for T
where T: 'static + ?Sized,

Source§

fn type_id(&self) -> TypeId

Gets the TypeId of self. Read more
Source§

impl<T> Borrow<T> for T
where T: ?Sized,

Source§

fn borrow(&self) -> &T

Immutably borrows from an owned value. Read more
Source§

impl<T> BorrowMut<T> for T
where T: ?Sized,

Source§

fn borrow_mut(&mut self) -> &mut T

Mutably borrows from an owned value. Read more
Source§

impl<T> From<T> for T

Source§

fn from(t: T) -> T

Returns the argument unchanged.

Source§

impl<T, U> Into<U> for T
where U: From<T>,

Source§

fn into(self) -> U

Calls U::from(self).

That is, this conversion is whatever the implementation of From<T> for U chooses to do.

Source§

impl<T, U> TryFrom<U> for T
where U: Into<T>,

Source§

type Error = Infallible

The type returned in the event of a conversion error.
Source§

fn try_from(value: U) -> Result<T, <T as TryFrom<U>>::Error>

Performs the conversion.
Source§

impl<T, U> TryInto<U> for T
where U: TryFrom<T>,

Source§

type Error = <U as TryFrom<T>>::Error

The type returned in the event of a conversion error.
Source§

fn try_into(self) -> Result<U, <U as TryFrom<T>>::Error>

Performs the conversion.