use std::time::Duration;
use qubit_atomic::AtomicCount;
use qubit_progress::model::ProgressCounters;
use crate::BatchProcessResult;
pub(crate) struct BatchProcessState {
item_count: usize,
observed_count: AtomicCount,
active_count: AtomicCount,
completed_count: AtomicCount,
processed_count: AtomicCount,
chunk_count: AtomicCount,
}
impl BatchProcessState {
#[inline]
pub(crate) const fn new(item_count: usize) -> Self {
Self {
item_count,
observed_count: AtomicCount::zero(),
active_count: AtomicCount::zero(),
completed_count: AtomicCount::zero(),
processed_count: AtomicCount::zero(),
chunk_count: AtomicCount::zero(),
}
}
#[inline]
pub(crate) fn record_item_observed(&self) -> usize {
self.observed_count.inc()
}
#[inline]
pub(crate) fn record_item_started(&self) {
self.active_count.inc();
}
#[inline]
pub(crate) fn record_item_processed(&self) {
self.active_count.dec();
self.completed_count.inc();
self.processed_count.inc();
}
#[inline]
pub(crate) fn record_chunk_processed(&self, completed_count: usize, processed_count: usize) {
self.completed_count.add(completed_count);
self.processed_count.add(processed_count);
self.chunk_count.inc();
}
#[inline]
pub(crate) fn observed_count(&self) -> usize {
self.observed_count.get()
}
#[inline]
pub(crate) fn completed_count(&self) -> usize {
self.completed_count.get()
}
#[inline]
pub(crate) fn chunk_count(&self) -> usize {
self.chunk_count.get()
}
#[inline]
pub(crate) fn to_direct_result(&self, elapsed: Duration) -> BatchProcessResult {
let processed_count = self.processed_count.get();
BatchProcessResult::builder(self.item_count)
.completed_count(self.completed_count.get())
.processed_count(processed_count)
.chunk_count(logical_chunk_count(processed_count))
.elapsed(elapsed)
.build()
.expect("direct batch process state should collect consistent counters")
}
#[inline]
pub(crate) fn to_chunked_result(&self, elapsed: Duration) -> BatchProcessResult {
BatchProcessResult::builder(self.item_count)
.completed_count(self.completed_count.get())
.processed_count(self.processed_count.get())
.chunk_count(self.chunk_count.get())
.elapsed(elapsed)
.build()
.expect("chunked batch process state should collect consistent counters")
}
#[inline]
pub(crate) fn progress_counters(&self) -> ProgressCounters {
ProgressCounters::new(Some(self.item_count))
.with_active_count(self.active_count.get())
.with_completed_count(self.completed_count.get())
.with_succeeded_count(self.processed_count.get())
}
#[inline]
pub(crate) fn running_chunk_progress_counters(&self) -> ProgressCounters {
ProgressCounters::new(Some(self.item_count))
.with_completed_count(self.completed_count.get())
.with_succeeded_count(self.completed_count.get())
}
}
#[inline]
const fn logical_chunk_count(processed_count: usize) -> usize {
if processed_count == 0 { 0 } else { 1 }
}