brainwires_network/remote/
telemetry.rs1use serde::{Deserialize, Serialize};
7use std::collections::VecDeque;
8use std::sync::RwLock;
9use std::sync::atomic::{AtomicU64, Ordering};
10use std::time::{Duration, Instant};
11
12const MAX_LATENCY_SAMPLES: usize = 1000;
14
15#[derive(Debug, Default)]
17pub struct ProtocolMetrics {
18 latency_samples: RwLock<VecDeque<u64>>,
20
21 roundtrip_samples: RwLock<VecDeque<u64>>,
23
24 messages_sent: AtomicU64,
26
27 messages_failed: AtomicU64,
29
30 bytes_sent: AtomicU64,
32
33 bytes_received: AtomicU64,
35
36 bytes_uncompressed: AtomicU64,
38
39 bytes_compressed: AtomicU64,
41
42 connection_start: RwLock<Option<Instant>>,
44
45 last_activity: RwLock<Option<Instant>>,
47}
48
49impl ProtocolMetrics {
50 pub fn new() -> Self {
52 Self::default()
53 }
54
55 pub fn record_connection_start(&self) {
57 let mut start = self
58 .connection_start
59 .write()
60 .expect("metrics lock poisoned");
61 *start = Some(Instant::now());
62 let mut activity = self.last_activity.write().expect("metrics lock poisoned");
63 *activity = Some(Instant::now());
64 }
65
66 pub fn record_message_sent(&self, bytes: u64) {
68 self.messages_sent.fetch_add(1, Ordering::Relaxed);
69 self.bytes_sent.fetch_add(bytes, Ordering::Relaxed);
70 let mut activity = self.last_activity.write().expect("metrics lock poisoned");
71 *activity = Some(Instant::now());
72 }
73
74 pub fn record_message_failed(&self) {
76 self.messages_failed.fetch_add(1, Ordering::Relaxed);
77 }
78
79 pub fn record_bytes_received(&self, bytes: u64) {
81 self.bytes_received.fetch_add(bytes, Ordering::Relaxed);
82 let mut activity = self.last_activity.write().expect("metrics lock poisoned");
83 *activity = Some(Instant::now());
84 }
85
86 pub fn record_compression(&self, uncompressed: u64, compressed: u64) {
88 self.bytes_uncompressed
89 .fetch_add(uncompressed, Ordering::Relaxed);
90 self.bytes_compressed
91 .fetch_add(compressed, Ordering::Relaxed);
92 }
93
94 pub fn record_latency(&self, latency: Duration) {
96 let ms = latency.as_millis() as u64;
97 let mut samples = self.latency_samples.write().expect("metrics lock poisoned");
98 if samples.len() >= MAX_LATENCY_SAMPLES {
99 samples.pop_front();
100 }
101 samples.push_back(ms);
102 }
103
104 pub fn record_roundtrip(&self, roundtrip: Duration) {
106 let ms = roundtrip.as_millis() as u64;
107 let mut samples = self
108 .roundtrip_samples
109 .write()
110 .expect("metrics lock poisoned");
111 if samples.len() >= MAX_LATENCY_SAMPLES {
112 samples.pop_front();
113 }
114 samples.push_back(ms);
115 }
116
117 pub fn snapshot(&self) -> MetricsSnapshot {
119 let latency_samples = self.latency_samples.read().expect("metrics lock poisoned");
120 let roundtrip_samples = self
121 .roundtrip_samples
122 .read()
123 .expect("metrics lock poisoned");
124 let connection_start = self.connection_start.read().expect("metrics lock poisoned");
125 let last_activity = self.last_activity.read().expect("metrics lock poisoned");
126
127 let uptime_secs = connection_start.map(|s| s.elapsed().as_secs()).unwrap_or(0);
128
129 let idle_secs = last_activity.map(|s| s.elapsed().as_secs()).unwrap_or(0);
130
131 let bytes_uncompressed = self.bytes_uncompressed.load(Ordering::Relaxed);
132 let bytes_compressed = self.bytes_compressed.load(Ordering::Relaxed);
133 let compression_ratio = if bytes_uncompressed > 0 {
134 bytes_compressed as f64 / bytes_uncompressed as f64
135 } else {
136 1.0
137 };
138
139 MetricsSnapshot {
140 messages_sent: self.messages_sent.load(Ordering::Relaxed),
141 messages_failed: self.messages_failed.load(Ordering::Relaxed),
142 bytes_sent: self.bytes_sent.load(Ordering::Relaxed),
143 bytes_received: self.bytes_received.load(Ordering::Relaxed),
144 compression_ratio,
145 latency_p50: percentile(&latency_samples, 50),
146 latency_p95: percentile(&latency_samples, 95),
147 latency_p99: percentile(&latency_samples, 99),
148 roundtrip_p50: percentile(&roundtrip_samples, 50),
149 roundtrip_p95: percentile(&roundtrip_samples, 95),
150 roundtrip_p99: percentile(&roundtrip_samples, 99),
151 uptime_secs,
152 idle_secs,
153 }
154 }
155
156 pub fn reset(&self) {
158 self.latency_samples
159 .write()
160 .expect("metrics lock poisoned")
161 .clear();
162 self.roundtrip_samples
163 .write()
164 .expect("metrics lock poisoned")
165 .clear();
166 self.messages_sent.store(0, Ordering::Relaxed);
167 self.messages_failed.store(0, Ordering::Relaxed);
168 self.bytes_sent.store(0, Ordering::Relaxed);
169 self.bytes_received.store(0, Ordering::Relaxed);
170 self.bytes_uncompressed.store(0, Ordering::Relaxed);
171 self.bytes_compressed.store(0, Ordering::Relaxed);
172 *self
173 .connection_start
174 .write()
175 .expect("metrics lock poisoned") = None;
176 *self.last_activity.write().expect("metrics lock poisoned") = None;
177 }
178}
179
180fn percentile(samples: &VecDeque<u64>, p: u32) -> Option<u64> {
182 if samples.is_empty() {
183 return None;
184 }
185
186 let mut sorted: Vec<_> = samples.iter().copied().collect();
187 sorted.sort_unstable();
188
189 let index = ((p as f64 / 100.0) * (sorted.len() - 1) as f64).round() as usize;
190 Some(sorted[index])
191}
192
193#[derive(Debug, Clone, Serialize, Deserialize)]
195pub struct MetricsSnapshot {
196 pub messages_sent: u64,
198 pub messages_failed: u64,
200 pub bytes_sent: u64,
202 pub bytes_received: u64,
204 pub compression_ratio: f64,
206 #[serde(skip_serializing_if = "Option::is_none")]
208 pub latency_p50: Option<u64>,
209 #[serde(skip_serializing_if = "Option::is_none")]
211 pub latency_p95: Option<u64>,
212 #[serde(skip_serializing_if = "Option::is_none")]
214 pub latency_p99: Option<u64>,
215 #[serde(skip_serializing_if = "Option::is_none")]
217 pub roundtrip_p50: Option<u64>,
218 #[serde(skip_serializing_if = "Option::is_none")]
220 pub roundtrip_p95: Option<u64>,
221 #[serde(skip_serializing_if = "Option::is_none")]
223 pub roundtrip_p99: Option<u64>,
224 pub uptime_secs: u64,
226 pub idle_secs: u64,
228}
229
230impl Default for MetricsSnapshot {
231 fn default() -> Self {
232 Self {
233 messages_sent: 0,
234 messages_failed: 0,
235 bytes_sent: 0,
236 bytes_received: 0,
237 compression_ratio: 1.0,
238 latency_p50: None,
239 latency_p95: None,
240 latency_p99: None,
241 roundtrip_p50: None,
242 roundtrip_p95: None,
243 roundtrip_p99: None,
244 uptime_secs: 0,
245 idle_secs: 0,
246 }
247 }
248}
249
250#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
252#[serde(rename_all = "snake_case")]
253pub enum ConnectionQuality {
254 Excellent,
256 Good,
258 Fair,
260 Poor,
262 Unknown,
264}
265
266impl MetricsSnapshot {
267 pub fn connection_quality(&self) -> ConnectionQuality {
269 if self.messages_sent < 10 {
271 return ConnectionQuality::Unknown;
272 }
273
274 let error_rate = if self.messages_sent > 0 {
276 self.messages_failed as f64 / self.messages_sent as f64
277 } else {
278 0.0
279 };
280
281 let latency = self.latency_p95.unwrap_or(0);
283
284 if error_rate > 0.10 || latency > 250 {
285 ConnectionQuality::Poor
286 } else if error_rate > 0.05 || latency > 100 {
287 ConnectionQuality::Fair
288 } else if error_rate > 0.01 || latency > 50 {
289 ConnectionQuality::Good
290 } else {
291 ConnectionQuality::Excellent
292 }
293 }
294
295 pub fn throughput_bps(&self) -> f64 {
297 if self.uptime_secs > 0 {
298 (self.bytes_sent + self.bytes_received) as f64 / self.uptime_secs as f64
299 } else {
300 0.0
301 }
302 }
303
304 pub fn messages_per_second(&self) -> f64 {
306 if self.uptime_secs > 0 {
307 self.messages_sent as f64 / self.uptime_secs as f64
308 } else {
309 0.0
310 }
311 }
312}
313
314#[cfg(test)]
315mod tests {
316 use super::*;
317
318 #[test]
319 fn test_metrics_recording() {
320 let metrics = ProtocolMetrics::new();
321 metrics.record_connection_start();
322
323 metrics.record_message_sent(100);
324 metrics.record_message_sent(200);
325 metrics.record_message_failed();
326 metrics.record_bytes_received(150);
327
328 let snapshot = metrics.snapshot();
329 assert_eq!(snapshot.messages_sent, 2);
330 assert_eq!(snapshot.messages_failed, 1);
331 assert_eq!(snapshot.bytes_sent, 300);
332 assert_eq!(snapshot.bytes_received, 150);
333 }
334
335 #[test]
336 fn test_latency_percentiles() {
337 let metrics = ProtocolMetrics::new();
338
339 for i in 1..=100 {
341 metrics.record_latency(Duration::from_millis(i));
342 }
343
344 let snapshot = metrics.snapshot();
345 let p50 = snapshot.latency_p50.unwrap();
347 let p95 = snapshot.latency_p95.unwrap();
348 let p99 = snapshot.latency_p99.unwrap();
349 assert!(
350 p50 >= 49 && p50 <= 51,
351 "p50 should be around 50, got {}",
352 p50
353 );
354 assert!(
355 p95 >= 94 && p95 <= 96,
356 "p95 should be around 95, got {}",
357 p95
358 );
359 assert!(
360 p99 >= 98 && p99 <= 100,
361 "p99 should be around 99, got {}",
362 p99
363 );
364 }
365
366 #[test]
367 fn test_compression_ratio() {
368 let metrics = ProtocolMetrics::new();
369 metrics.record_compression(1000, 400); let snapshot = metrics.snapshot();
372 assert!((snapshot.compression_ratio - 0.4).abs() < 0.01);
373 }
374
375 #[test]
376 fn test_connection_quality() {
377 let mut snapshot = MetricsSnapshot::default();
378
379 assert_eq!(snapshot.connection_quality(), ConnectionQuality::Unknown);
381
382 snapshot.messages_sent = 100;
384 snapshot.messages_failed = 0;
385 snapshot.latency_p95 = Some(30);
386 assert_eq!(snapshot.connection_quality(), ConnectionQuality::Excellent);
387
388 snapshot.latency_p95 = Some(120);
390 assert_eq!(snapshot.connection_quality(), ConnectionQuality::Fair);
391
392 snapshot.messages_failed = 15; assert_eq!(snapshot.connection_quality(), ConnectionQuality::Poor);
395 }
396}