use core::sync::atomic::Ordering;
use portable_atomic::AtomicU64;
use super::BufferMetricsSnapshot;
#[derive(Debug)]
pub struct BufferCounters {
produced: AtomicU64,
consumed: AtomicU64,
dropped: AtomicU64,
capacity: usize,
}
impl BufferCounters {
pub fn new(capacity: usize) -> Self {
Self {
produced: AtomicU64::new(0),
consumed: AtomicU64::new(0),
dropped: AtomicU64::new(0),
capacity,
}
}
pub fn increment_produced(&self) {
self.produced.fetch_add(1, Ordering::Relaxed);
}
pub fn increment_consumed(&self) {
self.consumed.fetch_add(1, Ordering::Relaxed);
}
pub fn add_dropped(&self, count: u64) {
self.dropped.fetch_add(count, Ordering::Relaxed);
}
pub fn capacity(&self) -> usize {
self.capacity
}
pub fn snapshot(&self, occupancy: (usize, usize)) -> BufferMetricsSnapshot {
BufferMetricsSnapshot {
produced_count: self.produced.load(Ordering::Relaxed),
consumed_count: self.consumed.load(Ordering::Relaxed),
dropped_count: self.dropped.load(Ordering::Relaxed),
occupancy,
}
}
pub fn reset(&self) {
self.produced.store(0, Ordering::Relaxed);
self.consumed.store(0, Ordering::Relaxed);
self.dropped.store(0, Ordering::Relaxed);
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn zero_state() {
let c = BufferCounters::new(16);
let snap = c.snapshot((0, 16));
assert_eq!(snap.produced_count, 0);
assert_eq!(snap.consumed_count, 0);
assert_eq!(snap.dropped_count, 0);
assert_eq!(snap.occupancy, (0, 16));
assert_eq!(c.capacity(), 16);
}
#[test]
fn record_one_of_each() {
let c = BufferCounters::new(4);
c.increment_produced();
c.increment_consumed();
c.add_dropped(3);
let snap = c.snapshot((1, 4));
assert_eq!(snap.produced_count, 1);
assert_eq!(snap.consumed_count, 1);
assert_eq!(snap.dropped_count, 3);
assert_eq!(snap.occupancy, (1, 4));
}
#[test]
fn record_many() {
let c = BufferCounters::new(8);
for _ in 0..5 {
c.increment_produced();
}
for _ in 0..3 {
c.increment_consumed();
}
c.add_dropped(2);
c.add_dropped(5);
let snap = c.snapshot((2, 8));
assert_eq!(snap.produced_count, 5);
assert_eq!(snap.consumed_count, 3);
assert_eq!(snap.dropped_count, 7);
}
#[test]
fn reset_clears_counts_but_not_capacity() {
let c = BufferCounters::new(32);
c.increment_produced();
c.increment_consumed();
c.add_dropped(9);
c.reset();
let snap = c.snapshot((0, 32));
assert_eq!(snap.produced_count, 0);
assert_eq!(snap.consumed_count, 0);
assert_eq!(snap.dropped_count, 0);
assert_eq!(c.capacity(), 32);
c.increment_produced();
assert_eq!(c.snapshot((1, 32)).produced_count, 1);
}
#[cfg(feature = "std")]
#[test]
fn concurrent_increments() {
use std::sync::Arc;
use std::thread;
let c = Arc::new(BufferCounters::new(64));
let threads = 8;
let per_thread = 1000u64;
let handles: Vec<_> = (0..threads)
.map(|_| {
let c = Arc::clone(&c);
thread::spawn(move || {
for _ in 0..per_thread {
c.increment_produced();
c.increment_consumed();
c.add_dropped(1);
}
})
})
.collect();
for h in handles {
h.join().unwrap();
}
let snap = c.snapshot((0, 64));
assert_eq!(snap.produced_count, threads as u64 * per_thread);
assert_eq!(snap.consumed_count, threads as u64 * per_thread);
assert_eq!(snap.dropped_count, threads as u64 * per_thread);
}
}