Skip to main content

ParallelBatchProcessor

Struct ParallelBatchProcessor 

Source
pub struct ParallelBatchProcessor<Item> { /* private fields */ }
Expand description

Processes batch items with sequential fallback and scoped standard threads.

The processor stores the supplied consumer as an ArcConsumer so every worker can share it safely. By default, small batches run sequentially to avoid thread setup overhead. Larger batches use scoped worker threads for each BatchProcessor::process call, therefore input items may borrow data from the caller as long as they are Send. Running progress is reported between items on the sequential path and from a scoped reporter thread on the parallel path.

§Type Parameters

  • Item - Item type consumed by the stored consumer.
use std::{
    sync::{
        Arc,
        atomic::{
            AtomicUsize,
            Ordering,
        },
    },
};

use qubit_batch::{
    BatchProcessor,
    ParallelBatchProcessor,
};

let total = Arc::new(AtomicUsize::new(0));
let total_for_consumer = Arc::clone(&total);
let mut processor = ParallelBatchProcessor::builder(move |item: &usize| {
    total_for_consumer.fetch_add(*item, Ordering::Relaxed);
})
.thread_count(2)
.sequential_threshold(0)
.build()
.expect("parallel processor configuration should be valid");

let result = processor
    .process([1, 2, 3])
    .expect("array length should be exact");

assert!(result.is_success());
assert_eq!(total.load(Ordering::Relaxed), 6);

Implementations§

Source§

impl<Item> ParallelBatchProcessor<Item>

Source

pub const DEFAULT_REPORT_INTERVAL: Duration

Default interval between progress callbacks.

Source

pub const DEFAULT_SEQUENTIAL_THRESHOLD: usize = 100

Default maximum batch size that still uses sequential processing.

Source

pub fn new<C>(consumer: C) -> Self
where C: Consumer<Item> + Send + Sync + 'static,

Creates a parallel consumer-backed batch processor.

§Parameters
  • consumer - Thread-safe consumer invoked once for each accepted item.
§Returns

A processor storing consumer as an ArcConsumer and using Self::default_thread_count workers.

Source

pub fn builder<C>(consumer: C) -> ParallelBatchProcessorBuilder<Item>
where C: Consumer<Item> + Send + Sync + 'static,

Creates a builder for configuring a parallel consumer-backed processor.

§Parameters
  • consumer - Thread-safe consumer invoked once for each accepted item.
§Returns

A builder initialized with default settings.

Source

pub fn default_thread_count() -> usize

Returns the default worker-thread count.

§Returns

The available CPU parallelism, or 1 if it cannot be detected.

Source

pub const fn thread_count(&self) -> usize

Returns the configured worker-thread count.

§Returns

The maximum number of scoped worker threads used for one batch.

Source

pub const fn sequential_threshold(&self) -> usize

Returns the configured sequential fallback threshold.

§Returns

The maximum item count that still runs sequentially.

Source

pub const fn report_interval(&self) -> Duration

Returns the configured progress-report interval.

§Returns

The minimum time between due-based running progress callbacks.

Source

pub fn reporter(&self) -> &Arc<dyn ProgressReporter>

Returns the configured progress reporter.

§Returns

A shared reference to the configured progress reporter.

Source

pub const fn consumer(&self) -> &ArcConsumer<Item>

Returns the stored consumer.

§Returns

A shared reference to the arc-backed consumer.

Source

pub fn into_consumer(self) -> ArcConsumer<Item>

Consumes this processor and returns the stored consumer.

§Returns

The arc-backed consumer used by this processor.

Trait Implementations§

Source§

impl<Item> BatchProcessor<Item> for ParallelBatchProcessor<Item>
where Item: Send,

Source§

fn process_with_count<I>( &mut self, items: I, count: usize, ) -> Result<BatchProcessResult, Self::Error>
where I: IntoIterator<Item = Item>,

Processes items sequentially for small batches or on scoped workers.

§Parameters
  • items - Item source for the batch.
  • count - Declared number of items expected from items.
§Returns

A result with completed and processed counts equal to the number of consumer calls when the input source yields exactly count items.

§Errors

Returns BatchProcessError::CountShortfall when the source ends before count, or BatchProcessError::CountExceeded when the source yields an extra item. Extra items are observed but not passed to the consumer.

§Panics

Propagates any panic raised by the stored consumer from the caller thread or a worker thread, or by the configured progress reporter.

Source§

type Error = BatchProcessError

Error returned by this processor.
Source§

fn process<I>(&mut self, items: I) -> Result<BatchProcessResult, Self::Error>
where I: IntoIterator<Item = Item>, I::IntoIter: ExactSizeIterator,

Processes items as one batch using its exact iterator length. Read more

Auto Trait Implementations§

§

impl<Item> Freeze for ParallelBatchProcessor<Item>

§

impl<Item> !RefUnwindSafe for ParallelBatchProcessor<Item>

§

impl<Item> Send for ParallelBatchProcessor<Item>

§

impl<Item> Sync for ParallelBatchProcessor<Item>

§

impl<Item> Unpin for ParallelBatchProcessor<Item>

§

impl<Item> UnsafeUnpin for ParallelBatchProcessor<Item>

§

impl<Item> !UnwindSafe for ParallelBatchProcessor<Item>

Blanket Implementations§

Source§

impl<T> Any for T
where T: 'static + ?Sized,

Source§

fn type_id(&self) -> TypeId

Gets the TypeId of self. Read more
Source§

impl<T> Borrow<T> for T
where T: ?Sized,

Source§

fn borrow(&self) -> &T

Immutably borrows from an owned value. Read more
Source§

impl<T> BorrowMut<T> for T
where T: ?Sized,

Source§

fn borrow_mut(&mut self) -> &mut T

Mutably borrows from an owned value. Read more
Source§

impl<T> From<T> for T

Source§

fn from(t: T) -> T

Returns the argument unchanged.

Source§

impl<T, U> Into<U> for T
where U: From<T>,

Source§

fn into(self) -> U

Calls U::from(self).

That is, this conversion is whatever the implementation of From<T> for U chooses to do.

Source§

impl<T, U> TryFrom<U> for T
where U: Into<T>,

Source§

type Error = Infallible

The type returned in the event of a conversion error.
Source§

fn try_from(value: U) -> Result<T, <T as TryFrom<U>>::Error>

Performs the conversion.
Source§

impl<T, U> TryInto<U> for T
where U: TryFrom<T>,

Source§

type Error = <U as TryFrom<T>>::Error

The type returned in the event of a conversion error.
Source§

fn try_into(self) -> Result<U, <U as TryFrom<T>>::Error>

Performs the conversion.