use std::sync::atomic::{AtomicU64, Ordering};
#[derive(Debug, Clone, Copy)]
pub struct MetricSample {
pub metric_id: u32,
pub timestamp_ns: u64,
pub value: MetricValue,
}
#[derive(Debug, Clone, Copy)]
pub enum MetricValue {
Counter(u64),
Gauge(f64),
Histogram(f64),
}
pub struct TelemetryRing {
slots: Box<[MetricSample]>,
write_pos: AtomicU64,
read_pos: AtomicU64,
capacity: usize,
mask: usize,
dropped: AtomicU64,
}
unsafe impl Send for TelemetryRing {}
unsafe impl Sync for TelemetryRing {}
impl TelemetryRing {
pub fn new(capacity: usize) -> Self {
let capacity = capacity.next_power_of_two();
let mask = capacity - 1;
let default_sample = MetricSample {
metric_id: 0,
timestamp_ns: 0,
value: MetricValue::Counter(0),
};
Self {
slots: vec![default_sample; capacity].into_boxed_slice(),
write_pos: AtomicU64::new(0),
read_pos: AtomicU64::new(0),
capacity,
mask,
dropped: AtomicU64::new(0),
}
}
pub fn record(&mut self, sample: MetricSample) {
let pos = self.write_pos.load(Ordering::Relaxed);
let read = self.read_pos.load(Ordering::Relaxed);
if pos.wrapping_sub(read) >= self.capacity as u64 {
self.read_pos.store(
pos.wrapping_sub(self.capacity as u64 - 1),
Ordering::Relaxed,
);
self.dropped.fetch_add(1, Ordering::Relaxed);
}
let idx = (pos as usize) & self.mask;
self.slots[idx] = sample;
self.write_pos.store(pos.wrapping_add(1), Ordering::Release);
}
pub fn drain_into(&self, buf: &mut Vec<MetricSample>) -> usize {
let write = self.write_pos.load(Ordering::Acquire);
let read = self.read_pos.load(Ordering::Relaxed);
let available = write.wrapping_sub(read) as usize;
if available == 0 {
return 0;
}
for i in 0..available {
let idx = ((read.wrapping_add(i as u64)) as usize) & self.mask;
buf.push(self.slots[idx]);
}
self.read_pos.store(write, Ordering::Release);
available
}
pub fn dropped_count(&self) -> u64 {
self.dropped.load(Ordering::Relaxed)
}
}
#[cfg(test)]
mod tests {
use super::*;
fn sample(id: u32, val: u64) -> MetricSample {
MetricSample {
metric_id: id,
timestamp_ns: val,
value: MetricValue::Counter(val),
}
}
#[test]
fn basic_record_and_drain() {
let mut ring = TelemetryRing::new(8);
ring.record(sample(1, 100));
ring.record(sample(2, 200));
ring.record(sample(3, 300));
let mut buf = Vec::new();
let count = ring.drain_into(&mut buf);
assert_eq!(count, 3);
assert_eq!(buf[0].metric_id, 1);
assert_eq!(buf[2].metric_id, 3);
}
#[test]
fn overflow_drops_oldest() {
let mut ring = TelemetryRing::new(4);
for i in 0..6 {
ring.record(sample(i, i as u64));
}
assert!(ring.dropped_count() > 0);
let mut buf = Vec::new();
ring.drain_into(&mut buf);
assert!(!buf.is_empty());
let last = buf.last().unwrap();
assert_eq!(last.metric_id, 5);
}
#[test]
fn empty_drain_returns_zero() {
let ring = TelemetryRing::new(8);
let mut buf = Vec::new();
assert_eq!(ring.drain_into(&mut buf), 0);
}
}