nodedb_bridge/
telemetry.rs1use std::sync::atomic::{AtomicU64, Ordering};
19
20#[derive(Debug, Clone, Copy)]
22pub struct MetricSample {
23 pub metric_id: u32,
25
26 pub timestamp_ns: u64,
28
29 pub value: MetricValue,
31}
32
33#[derive(Debug, Clone, Copy)]
35pub enum MetricValue {
36 Counter(u64),
38 Gauge(f64),
40 Histogram(f64),
42}
43
44pub struct TelemetryRing {
50 slots: Box<[MetricSample]>,
52
53 write_pos: AtomicU64,
55
56 read_pos: AtomicU64,
58
59 capacity: usize,
61
62 mask: usize,
64
65 dropped: AtomicU64,
67}
68
69unsafe impl Send for TelemetryRing {}
72unsafe impl Sync for TelemetryRing {}
73
74impl TelemetryRing {
75 pub fn new(capacity: usize) -> Self {
77 let capacity = capacity.next_power_of_two();
78 let mask = capacity - 1;
79
80 let default_sample = MetricSample {
81 metric_id: 0,
82 timestamp_ns: 0,
83 value: MetricValue::Counter(0),
84 };
85
86 Self {
87 slots: vec![default_sample; capacity].into_boxed_slice(),
88 write_pos: AtomicU64::new(0),
89 read_pos: AtomicU64::new(0),
90 capacity,
91 mask,
92 dropped: AtomicU64::new(0),
93 }
94 }
95
96 pub fn record(&mut self, sample: MetricSample) {
101 let pos = self.write_pos.load(Ordering::Relaxed);
102 let read = self.read_pos.load(Ordering::Relaxed);
103
104 if pos.wrapping_sub(read) >= self.capacity as u64 {
106 self.read_pos.store(
107 pos.wrapping_sub(self.capacity as u64 - 1),
108 Ordering::Relaxed,
109 );
110 self.dropped.fetch_add(1, Ordering::Relaxed);
111 }
112
113 let idx = (pos as usize) & self.mask;
114 self.slots[idx] = sample;
115 self.write_pos.store(pos.wrapping_add(1), Ordering::Release);
116 }
117
118 pub fn drain_into(&self, buf: &mut Vec<MetricSample>) -> usize {
122 let write = self.write_pos.load(Ordering::Acquire);
123 let read = self.read_pos.load(Ordering::Relaxed);
124
125 let available = write.wrapping_sub(read) as usize;
126 if available == 0 {
127 return 0;
128 }
129
130 for i in 0..available {
131 let idx = ((read.wrapping_add(i as u64)) as usize) & self.mask;
132 buf.push(self.slots[idx]);
133 }
134
135 self.read_pos.store(write, Ordering::Release);
136 available
137 }
138
139 pub fn dropped_count(&self) -> u64 {
141 self.dropped.load(Ordering::Relaxed)
142 }
143}
144
145#[cfg(test)]
146mod tests {
147 use super::*;
148
149 fn sample(id: u32, val: u64) -> MetricSample {
150 MetricSample {
151 metric_id: id,
152 timestamp_ns: val,
153 value: MetricValue::Counter(val),
154 }
155 }
156
157 #[test]
158 fn basic_record_and_drain() {
159 let mut ring = TelemetryRing::new(8);
160
161 ring.record(sample(1, 100));
162 ring.record(sample(2, 200));
163 ring.record(sample(3, 300));
164
165 let mut buf = Vec::new();
166 let count = ring.drain_into(&mut buf);
167 assert_eq!(count, 3);
168 assert_eq!(buf[0].metric_id, 1);
169 assert_eq!(buf[2].metric_id, 3);
170 }
171
172 #[test]
173 fn overflow_drops_oldest() {
174 let mut ring = TelemetryRing::new(4);
175
176 for i in 0..6 {
178 ring.record(sample(i, i as u64));
179 }
180
181 assert!(ring.dropped_count() > 0);
182
183 let mut buf = Vec::new();
184 ring.drain_into(&mut buf);
185
186 assert!(!buf.is_empty());
188 let last = buf.last().unwrap();
189 assert_eq!(last.metric_id, 5);
190 }
191
192 #[test]
193 fn empty_drain_returns_zero() {
194 let ring = TelemetryRing::new(8);
195 let mut buf = Vec::new();
196 assert_eq!(ring.drain_into(&mut buf), 0);
197 }
198}