use std::sync::atomic::{AtomicU64, Ordering};
#[derive(Debug, Default)]
pub struct PipelineStats {
pub received: AtomicU64,
pub processed: AtomicU64,
pub errors: AtomicU64,
pub dlq: AtomicU64,
pub filtered: AtomicU64,
pub bytes_received: AtomicU64,
pub bytes_written: AtomicU64,
pub batches_flushed: AtomicU64,
}
impl PipelineStats {
#[must_use]
pub fn new() -> Self {
Self::default()
}
pub fn incr_received(&self) {
self.received.fetch_add(1, Ordering::Relaxed);
}
pub fn incr_processed(&self) {
self.processed.fetch_add(1, Ordering::Relaxed);
}
pub fn incr_errors(&self) {
self.errors.fetch_add(1, Ordering::Relaxed);
}
pub fn incr_dlq(&self) {
self.dlq.fetch_add(1, Ordering::Relaxed);
}
pub fn incr_filtered(&self) {
self.filtered.fetch_add(1, Ordering::Relaxed);
}
pub fn incr_batches_flushed(&self) {
self.batches_flushed.fetch_add(1, Ordering::Relaxed);
}
pub fn add_received(&self, n: u64) {
self.received.fetch_add(n, Ordering::Relaxed);
}
pub fn add_processed(&self, n: u64) {
self.processed.fetch_add(n, Ordering::Relaxed);
}
pub fn add_filtered(&self, n: u64) {
self.filtered.fetch_add(n, Ordering::Relaxed);
}
pub fn add_bytes_received(&self, n: u64) {
self.bytes_received.fetch_add(n, Ordering::Relaxed);
}
pub fn add_bytes_written(&self, n: u64) {
self.bytes_written.fetch_add(n, Ordering::Relaxed);
}
#[must_use]
pub fn snapshot(&self) -> PipelineStatsSnapshot {
PipelineStatsSnapshot {
received: self.received.load(Ordering::Relaxed),
processed: self.processed.load(Ordering::Relaxed),
errors: self.errors.load(Ordering::Relaxed),
dlq: self.dlq.load(Ordering::Relaxed),
filtered: self.filtered.load(Ordering::Relaxed),
bytes_received: self.bytes_received.load(Ordering::Relaxed),
bytes_written: self.bytes_written.load(Ordering::Relaxed),
batches_flushed: self.batches_flushed.load(Ordering::Relaxed),
}
}
}
#[derive(Debug, Clone, Copy, Default)]
pub struct PipelineStatsSnapshot {
pub received: u64,
pub processed: u64,
pub errors: u64,
pub dlq: u64,
pub filtered: u64,
pub bytes_received: u64,
pub bytes_written: u64,
pub batches_flushed: u64,
}
impl std::fmt::Display for PipelineStatsSnapshot {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(
f,
"received={} processed={} errors={} dlq={} filtered={} batches={}",
self.received,
self.processed,
self.errors,
self.dlq,
self.filtered,
self.batches_flushed,
)
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_default_is_zero() {
let stats = PipelineStats::new();
let snap = stats.snapshot();
assert_eq!(snap.received, 0);
assert_eq!(snap.processed, 0);
assert_eq!(snap.errors, 0);
assert_eq!(snap.dlq, 0);
}
#[test]
fn test_increments() {
let stats = PipelineStats::new();
stats.incr_received();
stats.incr_received();
stats.incr_processed();
stats.incr_errors();
stats.incr_dlq();
stats.add_bytes_received(1024);
let snap = stats.snapshot();
assert_eq!(snap.received, 2);
assert_eq!(snap.processed, 1);
assert_eq!(snap.errors, 1);
assert_eq!(snap.dlq, 1);
assert_eq!(snap.bytes_received, 1024);
}
#[test]
fn test_bulk_add() {
let stats = PipelineStats::new();
stats.add_received(100);
stats.add_processed(95);
stats.add_bytes_written(4096);
stats.incr_batches_flushed();
let snap = stats.snapshot();
assert_eq!(snap.received, 100);
assert_eq!(snap.processed, 95);
assert_eq!(snap.bytes_written, 4096);
assert_eq!(snap.batches_flushed, 1);
}
#[test]
fn test_snapshot_is_copy() {
let stats = PipelineStats::new();
stats.add_received(42);
let snap = stats.snapshot();
let copy = snap; assert_eq!(snap.received, copy.received);
}
#[test]
fn test_filtered_counter() {
let stats = PipelineStats::new();
stats.incr_filtered();
stats.add_filtered(9);
let snap = stats.snapshot();
assert_eq!(snap.filtered, 10);
}
#[test]
fn test_display() {
let stats = PipelineStats::new();
stats.add_received(100);
stats.add_processed(90);
let snap = stats.snapshot();
let display = format!("{snap}");
assert!(display.contains("received=100"));
assert!(display.contains("processed=90"));
}
}