nodedb_bridge/
telemetry.rs1use std::sync::atomic::{AtomicU64, Ordering};
17
18#[derive(Debug, Clone, Copy)]
20pub struct MetricSample {
21 pub metric_id: u32,
23
24 pub timestamp_ns: u64,
26
27 pub value: MetricValue,
29}
30
31#[derive(Debug, Clone, Copy)]
33pub enum MetricValue {
34 Counter(u64),
36 Gauge(f64),
38 Histogram(f64),
40}
41
42pub struct TelemetryRing {
48 slots: Box<[MetricSample]>,
50
51 write_pos: AtomicU64,
53
54 read_pos: AtomicU64,
56
57 capacity: usize,
59
60 mask: usize,
62
63 dropped: AtomicU64,
65}
66
67unsafe impl Send for TelemetryRing {}
70unsafe impl Sync for TelemetryRing {}
71
72impl TelemetryRing {
73 pub fn new(capacity: usize) -> Self {
75 let capacity = capacity.next_power_of_two();
76 let mask = capacity - 1;
77
78 let default_sample = MetricSample {
79 metric_id: 0,
80 timestamp_ns: 0,
81 value: MetricValue::Counter(0),
82 };
83
84 Self {
85 slots: vec![default_sample; capacity].into_boxed_slice(),
86 write_pos: AtomicU64::new(0),
87 read_pos: AtomicU64::new(0),
88 capacity,
89 mask,
90 dropped: AtomicU64::new(0),
91 }
92 }
93
94 pub fn record(&mut self, sample: MetricSample) {
99 let pos = self.write_pos.load(Ordering::Relaxed);
100 let read = self.read_pos.load(Ordering::Relaxed);
101
102 if pos.wrapping_sub(read) >= self.capacity as u64 {
104 self.read_pos.store(
105 pos.wrapping_sub(self.capacity as u64 - 1),
106 Ordering::Relaxed,
107 );
108 self.dropped.fetch_add(1, Ordering::Relaxed);
109 }
110
111 let idx = (pos as usize) & self.mask;
112 self.slots[idx] = sample;
113 self.write_pos.store(pos.wrapping_add(1), Ordering::Release);
114 }
115
116 pub fn drain_into(&self, buf: &mut Vec<MetricSample>) -> usize {
120 let write = self.write_pos.load(Ordering::Acquire);
121 let read = self.read_pos.load(Ordering::Relaxed);
122
123 let available = write.wrapping_sub(read) as usize;
124 if available == 0 {
125 return 0;
126 }
127
128 for i in 0..available {
129 let idx = ((read.wrapping_add(i as u64)) as usize) & self.mask;
130 buf.push(self.slots[idx]);
131 }
132
133 self.read_pos.store(write, Ordering::Release);
134 available
135 }
136
137 pub fn dropped_count(&self) -> u64 {
139 self.dropped.load(Ordering::Relaxed)
140 }
141}
142
143#[cfg(test)]
144mod tests {
145 use super::*;
146
147 fn sample(id: u32, val: u64) -> MetricSample {
148 MetricSample {
149 metric_id: id,
150 timestamp_ns: val,
151 value: MetricValue::Counter(val),
152 }
153 }
154
155 #[test]
156 fn basic_record_and_drain() {
157 let mut ring = TelemetryRing::new(8);
158
159 ring.record(sample(1, 100));
160 ring.record(sample(2, 200));
161 ring.record(sample(3, 300));
162
163 let mut buf = Vec::new();
164 let count = ring.drain_into(&mut buf);
165 assert_eq!(count, 3);
166 assert_eq!(buf[0].metric_id, 1);
167 assert_eq!(buf[2].metric_id, 3);
168 }
169
170 #[test]
171 fn overflow_drops_oldest() {
172 let mut ring = TelemetryRing::new(4);
173
174 for i in 0..6 {
176 ring.record(sample(i, i as u64));
177 }
178
179 assert!(ring.dropped_count() > 0);
180
181 let mut buf = Vec::new();
182 ring.drain_into(&mut buf);
183
184 assert!(!buf.is_empty());
186 let last = buf.last().unwrap();
187 assert_eq!(last.metric_id, 5);
188 }
189
190 #[test]
191 fn empty_drain_returns_zero() {
192 let ring = TelemetryRing::new(8);
193 let mut buf = Vec::new();
194 assert_eq!(ring.drain_into(&mut buf), 0);
195 }
196}