use std::sync::{
Mutex,
MutexGuard,
};
use std::time::Duration;
use qubit_atomic::AtomicCount;
use qubit_progress::model::ProgressCounters;
use crate::{
BatchOutcome,
BatchOutcomeBuilder,
BatchTaskError,
BatchTaskFailure,
};
pub(crate) struct BatchExecutionState<E> {
task_count: usize,
observed_count: AtomicCount,
active_count: AtomicCount,
completed_count: AtomicCount,
succeeded_count: AtomicCount,
failed_count: AtomicCount,
panicked_count: AtomicCount,
failures: Mutex<Vec<BatchTaskFailure<E>>>,
}
impl<E> BatchExecutionState<E> {
#[inline]
pub(crate) const fn new(task_count: usize) -> Self {
Self {
task_count,
observed_count: AtomicCount::zero(),
active_count: AtomicCount::zero(),
completed_count: AtomicCount::zero(),
succeeded_count: AtomicCount::zero(),
failed_count: AtomicCount::zero(),
panicked_count: AtomicCount::zero(),
failures: Mutex::new(Vec::new()),
}
}
#[inline]
pub(crate) fn record_task_observed(&self) -> usize {
self.observed_count.inc()
}
#[inline]
pub(crate) fn record_task_started(&self) {
self.active_count.inc();
}
#[inline]
pub(crate) fn record_task_succeeded(&self) {
self.active_count.dec();
self.completed_count.inc();
self.succeeded_count.inc();
}
#[inline]
pub(crate) fn record_task_failed(&self, index: usize, error: E) {
self.active_count.dec();
self.completed_count.inc();
self.failed_count.inc();
Self::lock_failures(&self.failures)
.push(BatchTaskFailure::new(index, BatchTaskError::Failed(error)));
}
#[inline]
pub(crate) fn record_task_panicked(&self, index: usize, error: BatchTaskError<E>) {
self.active_count.dec();
self.completed_count.inc();
self.panicked_count.inc();
Self::lock_failures(&self.failures).push(BatchTaskFailure::new(index, error));
}
#[inline]
pub(crate) fn progress_counters(&self) -> ProgressCounters {
ProgressCounters::new(Some(self.task_count))
.with_active_count(self.active_count.get())
.with_completed_count(self.completed_count.get())
.with_succeeded_count(self.succeeded_count.get())
.with_failed_count(
self.failed_count
.get()
.saturating_add(self.panicked_count.get()),
)
}
#[inline]
pub(crate) fn into_outcome(self, elapsed: Duration) -> BatchOutcome<E> {
let failures = self
.failures
.into_inner()
.unwrap_or_else(std::sync::PoisonError::into_inner);
BatchOutcomeBuilder::builder(self.task_count)
.completed_count(self.completed_count.get())
.succeeded_count(self.succeeded_count.get())
.failed_count(self.failed_count.get())
.panicked_count(self.panicked_count.get())
.elapsed(elapsed)
.failures(failures)
.build()
.expect("batch execution state should collect consistent counters")
}
fn lock_failures(
failures: &Mutex<Vec<BatchTaskFailure<E>>>,
) -> MutexGuard<'_, Vec<BatchTaskFailure<E>>> {
failures
.lock()
.unwrap_or_else(std::sync::PoisonError::into_inner)
}
}