use std::{
num::NonZeroUsize,
sync::Arc,
time::Duration,
};
use qubit_function::{
ArcConsumer,
Consumer,
};
use qubit_progress::reporter::{
NoOpProgressReporter,
ProgressReporter,
};
use super::ParallelBatchProcessor;
use super::ParallelBatchProcessorBuildError;
pub struct ParallelBatchProcessorBuilder<Item> {
consumer: ArcConsumer<Item>,
thread_count: usize,
sequential_threshold: usize,
report_interval: Duration,
reporter: Arc<dyn ProgressReporter>,
}
impl<Item> ParallelBatchProcessorBuilder<Item> {
#[inline]
pub fn new<C>(consumer: C) -> Self
where
C: Consumer<Item> + Send + Sync + 'static,
{
Self {
consumer: consumer.into_arc(),
thread_count: ParallelBatchProcessor::<Item>::default_thread_count(),
sequential_threshold: ParallelBatchProcessor::<Item>::DEFAULT_SEQUENTIAL_THRESHOLD,
report_interval: ParallelBatchProcessor::<Item>::DEFAULT_REPORT_INTERVAL,
reporter: Arc::new(NoOpProgressReporter),
}
}
#[inline]
pub const fn thread_count(mut self, thread_count: usize) -> Self {
self.thread_count = thread_count;
self
}
#[inline]
pub const fn sequential_threshold(mut self, sequential_threshold: usize) -> Self {
self.sequential_threshold = sequential_threshold;
self
}
#[inline]
pub const fn report_interval(mut self, report_interval: Duration) -> Self {
self.report_interval = report_interval;
self
}
#[inline]
pub fn reporter<R>(mut self, reporter: R) -> Self
where
R: ProgressReporter + 'static,
{
self.reporter = Arc::new(reporter);
self
}
#[inline]
pub fn reporter_arc(mut self, reporter: Arc<dyn ProgressReporter>) -> Self {
self.reporter = reporter;
self
}
#[inline]
pub fn no_reporter(mut self) -> Self {
self.reporter = Arc::new(NoOpProgressReporter);
self
}
#[inline]
pub fn build(self) -> Result<ParallelBatchProcessor<Item>, ParallelBatchProcessorBuildError> {
let thread_count = NonZeroUsize::new(self.thread_count)
.ok_or(ParallelBatchProcessorBuildError::ZeroThreadCount)?;
Ok(ParallelBatchProcessor {
consumer: self.consumer,
thread_count,
sequential_threshold: self.sequential_threshold,
report_interval: self.report_interval,
reporter: self.reporter,
})
}
}