use std::sync::atomic::{AtomicU64, Ordering};
#[derive(Debug, Clone, Copy, Default)]
pub struct BatchMetrics {
pub batches_dispatched: u64,
pub items_processed: u64,
pub items_submitted: u64,
pub items_rejected: u64,
pub items_timed_out: u64,
pub total_processing_ns: u64,
pub total_formation_ns: u64,
pub min_batch_size: u64,
pub max_batch_size_seen: u64,
pub configured_max_batch_size: u64,
pub consecutive_errors: u64,
}
impl BatchMetrics {
#[must_use]
pub fn pending_items(&self) -> u64 {
self.items_submitted
.saturating_sub(self.items_processed)
.saturating_sub(self.items_rejected)
}
#[must_use]
pub fn rejection_rate(&self) -> Option<f64> {
if self.items_submitted == 0 {
return None;
}
Some(self.items_rejected as f64 / self.items_submitted as f64)
}
#[must_use]
pub fn timeout_rate(&self) -> Option<f64> {
if self.items_submitted == 0 {
return None;
}
Some(self.items_timed_out as f64 / self.items_submitted as f64)
}
#[must_use]
pub fn avg_batch_size(&self) -> Option<f64> {
if self.batches_dispatched == 0 {
return None;
}
Some(self.items_processed as f64 / self.batches_dispatched as f64)
}
#[must_use]
pub fn avg_fill_ratio(&self) -> Option<f64> {
let avg = self.avg_batch_size()?;
if self.configured_max_batch_size == 0 {
return None;
}
Some(avg / self.configured_max_batch_size as f64)
}
#[must_use]
pub fn avg_processing_ns(&self) -> Option<f64> {
if self.batches_dispatched == 0 {
return None;
}
Some(self.total_processing_ns as f64 / self.batches_dispatched as f64)
}
#[must_use]
pub fn avg_formation_ns(&self) -> Option<f64> {
if self.batches_dispatched == 0 {
return None;
}
Some(self.total_formation_ns as f64 / self.batches_dispatched as f64)
}
}
impl std::fmt::Display for BatchMetrics {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(
f,
"batches={} items={}/{} rejected={} timed_out={} fill={} avg_proc={} avg_form={} consec_err={}",
self.batches_dispatched,
self.items_processed,
self.items_submitted,
self.items_rejected,
self.items_timed_out,
self.avg_fill_ratio()
.map_or_else(|| "n/a".to_string(), |r| format!("{r:.2}")),
self.avg_processing_ns().map_or_else(
|| "n/a".to_string(),
|ns| format!("{:.1}ms", ns / 1_000_000.0)
),
self.avg_formation_ns().map_or_else(
|| "n/a".to_string(),
|ns| format!("{:.1}ms", ns / 1_000_000.0)
),
self.consecutive_errors,
)
}
}
pub(super) struct BatchMetricsInner {
batches_dispatched: AtomicU64,
items_processed: AtomicU64,
items_submitted: AtomicU64,
items_rejected: AtomicU64,
items_timed_out: AtomicU64,
total_processing_ns: AtomicU64,
total_formation_ns: AtomicU64,
min_batch_size: AtomicU64,
max_batch_size_seen: AtomicU64,
consecutive_errors: AtomicU64,
}
impl BatchMetricsInner {
pub(super) fn new() -> Self {
Self {
batches_dispatched: AtomicU64::new(0),
items_processed: AtomicU64::new(0),
items_submitted: AtomicU64::new(0),
items_rejected: AtomicU64::new(0),
items_timed_out: AtomicU64::new(0),
total_processing_ns: AtomicU64::new(0),
total_formation_ns: AtomicU64::new(0),
min_batch_size: AtomicU64::new(u64::MAX),
max_batch_size_seen: AtomicU64::new(0),
consecutive_errors: AtomicU64::new(0),
}
}
pub(super) fn snapshot(&self, configured_max_batch_size: u64) -> BatchMetrics {
let min = self.min_batch_size.load(Ordering::Relaxed);
BatchMetrics {
batches_dispatched: self.batches_dispatched.load(Ordering::Relaxed),
items_processed: self.items_processed.load(Ordering::Relaxed),
items_submitted: self.items_submitted.load(Ordering::Relaxed),
items_rejected: self.items_rejected.load(Ordering::Relaxed),
items_timed_out: self.items_timed_out.load(Ordering::Relaxed),
total_processing_ns: self.total_processing_ns.load(Ordering::Relaxed),
total_formation_ns: self.total_formation_ns.load(Ordering::Relaxed),
min_batch_size: if min == u64::MAX { 0 } else { min },
max_batch_size_seen: self.max_batch_size_seen.load(Ordering::Relaxed),
configured_max_batch_size,
consecutive_errors: self.consecutive_errors.load(Ordering::Relaxed),
}
}
pub(super) fn record_dispatch(&self, batch_size: usize, formation_ns: u64, processing_ns: u64) {
let bs = batch_size as u64;
self.batches_dispatched.fetch_add(1, Ordering::Relaxed);
self.items_processed.fetch_add(bs, Ordering::Relaxed);
self.total_processing_ns
.fetch_add(processing_ns, Ordering::Relaxed);
self.total_formation_ns
.fetch_add(formation_ns, Ordering::Relaxed);
self.min_batch_size.fetch_min(bs, Ordering::Relaxed);
self.max_batch_size_seen.fetch_max(bs, Ordering::Relaxed);
}
pub(super) fn record_submission(&self) {
self.items_submitted.fetch_add(1, Ordering::Relaxed);
}
pub(super) fn record_rejection(&self) {
self.items_rejected.fetch_add(1, Ordering::Relaxed);
}
pub(super) fn record_timeout(&self) {
self.items_timed_out.fetch_add(1, Ordering::Relaxed);
}
pub(super) fn record_batch_success(&self) {
self.consecutive_errors.store(0, Ordering::Relaxed);
}
pub(super) fn record_batch_error(&self) {
self.consecutive_errors.fetch_add(1, Ordering::Relaxed);
}
}