use std::sync::atomic::{AtomicU64, Ordering};
#[derive(Debug, Default)]
pub struct PipelineMetrics {
events_processed_total: AtomicU64,
events_dropped_total: AtomicU64,
last_batch_size: AtomicU64,
}
impl PipelineMetrics {
pub fn record_processed(&self, n: u64) {
self.events_processed_total.fetch_add(n, Ordering::Relaxed);
}
pub fn record_dropped(&self, n: u64) {
self.events_dropped_total.fetch_add(n, Ordering::Relaxed);
}
pub fn record_batch_size(&self, n: u64) {
self.last_batch_size.store(n, Ordering::Relaxed);
}
pub fn processed(&self) -> u64 {
self.events_processed_total.load(Ordering::Relaxed)
}
pub fn dropped(&self) -> u64 {
self.events_dropped_total.load(Ordering::Relaxed)
}
pub fn last_batch_size(&self) -> u64 {
self.last_batch_size.load(Ordering::Relaxed)
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn default_state_is_all_zeros() {
let metrics = PipelineMetrics::default();
assert_eq!(metrics.processed(), 0);
assert_eq!(metrics.dropped(), 0);
assert_eq!(metrics.last_batch_size(), 0);
}
#[test]
fn record_processed_accumulates() {
let metrics = PipelineMetrics::default();
metrics.record_processed(5);
metrics.record_processed(3);
assert_eq!(metrics.processed(), 8);
}
#[test]
fn record_dropped_accumulates() {
let metrics = PipelineMetrics::default();
metrics.record_dropped(10);
metrics.record_dropped(2);
assert_eq!(metrics.dropped(), 12);
}
#[test]
fn record_batch_size_is_last_value() {
let metrics = PipelineMetrics::default();
metrics.record_batch_size(100);
metrics.record_batch_size(42);
assert_eq!(metrics.last_batch_size(), 42);
}
#[test]
fn metrics_are_independent() {
let metrics = PipelineMetrics::default();
metrics.record_processed(7);
metrics.record_dropped(3);
assert_eq!(metrics.processed(), 7);
assert_eq!(metrics.dropped(), 3);
}
}