Skip to main content

brainwires_network/remote/
telemetry.rs

1//! Protocol Telemetry and Metrics
2//!
3//! Provides observability for the remote control protocol,
4//! tracking message latency, throughput, and error rates.
5
6use serde::{Deserialize, Serialize};
7use std::collections::VecDeque;
8use std::sync::RwLock;
9use std::sync::atomic::{AtomicU64, Ordering};
10use std::time::{Duration, Instant};
11
12/// Maximum number of latency samples to keep for histogram
13const MAX_LATENCY_SAMPLES: usize = 1000;
14
15/// Protocol metrics for observability
16#[derive(Debug, Default)]
17pub struct ProtocolMetrics {
18    /// Message latency samples (in milliseconds)
19    latency_samples: RwLock<VecDeque<u64>>,
20
21    /// Command roundtrip samples (in milliseconds)
22    roundtrip_samples: RwLock<VecDeque<u64>>,
23
24    /// Total messages sent
25    messages_sent: AtomicU64,
26
27    /// Total messages failed
28    messages_failed: AtomicU64,
29
30    /// Total bytes sent
31    bytes_sent: AtomicU64,
32
33    /// Total bytes received
34    bytes_received: AtomicU64,
35
36    /// Total bytes before compression
37    bytes_uncompressed: AtomicU64,
38
39    /// Total bytes after compression
40    bytes_compressed: AtomicU64,
41
42    /// Connection start time
43    connection_start: RwLock<Option<Instant>>,
44
45    /// Last activity time
46    last_activity: RwLock<Option<Instant>>,
47}
48
49impl ProtocolMetrics {
50    /// Create new metrics instance
51    pub fn new() -> Self {
52        Self::default()
53    }
54
55    /// Record connection start
56    pub fn record_connection_start(&self) {
57        let mut start = self
58            .connection_start
59            .write()
60            .expect("metrics lock poisoned");
61        *start = Some(Instant::now());
62        let mut activity = self.last_activity.write().expect("metrics lock poisoned");
63        *activity = Some(Instant::now());
64    }
65
66    /// Record message sent
67    pub fn record_message_sent(&self, bytes: u64) {
68        self.messages_sent.fetch_add(1, Ordering::Relaxed);
69        self.bytes_sent.fetch_add(bytes, Ordering::Relaxed);
70        let mut activity = self.last_activity.write().expect("metrics lock poisoned");
71        *activity = Some(Instant::now());
72    }
73
74    /// Record message failed
75    pub fn record_message_failed(&self) {
76        self.messages_failed.fetch_add(1, Ordering::Relaxed);
77    }
78
79    /// Record bytes received
80    pub fn record_bytes_received(&self, bytes: u64) {
81        self.bytes_received.fetch_add(bytes, Ordering::Relaxed);
82        let mut activity = self.last_activity.write().expect("metrics lock poisoned");
83        *activity = Some(Instant::now());
84    }
85
86    /// Record compression ratio (uncompressed -> compressed)
87    pub fn record_compression(&self, uncompressed: u64, compressed: u64) {
88        self.bytes_uncompressed
89            .fetch_add(uncompressed, Ordering::Relaxed);
90        self.bytes_compressed
91            .fetch_add(compressed, Ordering::Relaxed);
92    }
93
94    /// Record message latency (one-way)
95    pub fn record_latency(&self, latency: Duration) {
96        let ms = latency.as_millis() as u64;
97        let mut samples = self.latency_samples.write().expect("metrics lock poisoned");
98        if samples.len() >= MAX_LATENCY_SAMPLES {
99            samples.pop_front();
100        }
101        samples.push_back(ms);
102    }
103
104    /// Record command roundtrip time
105    pub fn record_roundtrip(&self, roundtrip: Duration) {
106        let ms = roundtrip.as_millis() as u64;
107        let mut samples = self
108            .roundtrip_samples
109            .write()
110            .expect("metrics lock poisoned");
111        if samples.len() >= MAX_LATENCY_SAMPLES {
112            samples.pop_front();
113        }
114        samples.push_back(ms);
115    }
116
117    /// Get current metrics snapshot
118    pub fn snapshot(&self) -> MetricsSnapshot {
119        let latency_samples = self.latency_samples.read().expect("metrics lock poisoned");
120        let roundtrip_samples = self
121            .roundtrip_samples
122            .read()
123            .expect("metrics lock poisoned");
124        let connection_start = self.connection_start.read().expect("metrics lock poisoned");
125        let last_activity = self.last_activity.read().expect("metrics lock poisoned");
126
127        let uptime_secs = connection_start.map(|s| s.elapsed().as_secs()).unwrap_or(0);
128
129        let idle_secs = last_activity.map(|s| s.elapsed().as_secs()).unwrap_or(0);
130
131        let bytes_uncompressed = self.bytes_uncompressed.load(Ordering::Relaxed);
132        let bytes_compressed = self.bytes_compressed.load(Ordering::Relaxed);
133        let compression_ratio = if bytes_uncompressed > 0 {
134            bytes_compressed as f64 / bytes_uncompressed as f64
135        } else {
136            1.0
137        };
138
139        MetricsSnapshot {
140            messages_sent: self.messages_sent.load(Ordering::Relaxed),
141            messages_failed: self.messages_failed.load(Ordering::Relaxed),
142            bytes_sent: self.bytes_sent.load(Ordering::Relaxed),
143            bytes_received: self.bytes_received.load(Ordering::Relaxed),
144            compression_ratio,
145            latency_p50: percentile(&latency_samples, 50),
146            latency_p95: percentile(&latency_samples, 95),
147            latency_p99: percentile(&latency_samples, 99),
148            roundtrip_p50: percentile(&roundtrip_samples, 50),
149            roundtrip_p95: percentile(&roundtrip_samples, 95),
150            roundtrip_p99: percentile(&roundtrip_samples, 99),
151            uptime_secs,
152            idle_secs,
153        }
154    }
155
156    /// Reset all metrics
157    pub fn reset(&self) {
158        self.latency_samples
159            .write()
160            .expect("metrics lock poisoned")
161            .clear();
162        self.roundtrip_samples
163            .write()
164            .expect("metrics lock poisoned")
165            .clear();
166        self.messages_sent.store(0, Ordering::Relaxed);
167        self.messages_failed.store(0, Ordering::Relaxed);
168        self.bytes_sent.store(0, Ordering::Relaxed);
169        self.bytes_received.store(0, Ordering::Relaxed);
170        self.bytes_uncompressed.store(0, Ordering::Relaxed);
171        self.bytes_compressed.store(0, Ordering::Relaxed);
172        *self
173            .connection_start
174            .write()
175            .expect("metrics lock poisoned") = None;
176        *self.last_activity.write().expect("metrics lock poisoned") = None;
177    }
178}
179
180/// Calculate percentile from samples
181fn percentile(samples: &VecDeque<u64>, p: u32) -> Option<u64> {
182    if samples.is_empty() {
183        return None;
184    }
185
186    let mut sorted: Vec<_> = samples.iter().copied().collect();
187    sorted.sort_unstable();
188
189    let index = ((p as f64 / 100.0) * (sorted.len() - 1) as f64).round() as usize;
190    Some(sorted[index])
191}
192
193/// Snapshot of protocol metrics
194#[derive(Debug, Clone, Serialize, Deserialize)]
195pub struct MetricsSnapshot {
196    /// Total messages sent
197    pub messages_sent: u64,
198    /// Total messages that failed
199    pub messages_failed: u64,
200    /// Total bytes sent
201    pub bytes_sent: u64,
202    /// Total bytes received
203    pub bytes_received: u64,
204    /// Compression ratio (1.0 = no compression, <1.0 = good compression)
205    pub compression_ratio: f64,
206    /// Latency 50th percentile (ms)
207    #[serde(skip_serializing_if = "Option::is_none")]
208    pub latency_p50: Option<u64>,
209    /// Latency 95th percentile (ms)
210    #[serde(skip_serializing_if = "Option::is_none")]
211    pub latency_p95: Option<u64>,
212    /// Latency 99th percentile (ms)
213    #[serde(skip_serializing_if = "Option::is_none")]
214    pub latency_p99: Option<u64>,
215    /// Command roundtrip 50th percentile (ms)
216    #[serde(skip_serializing_if = "Option::is_none")]
217    pub roundtrip_p50: Option<u64>,
218    /// Command roundtrip 95th percentile (ms)
219    #[serde(skip_serializing_if = "Option::is_none")]
220    pub roundtrip_p95: Option<u64>,
221    /// Command roundtrip 99th percentile (ms)
222    #[serde(skip_serializing_if = "Option::is_none")]
223    pub roundtrip_p99: Option<u64>,
224    /// Connection uptime in seconds
225    pub uptime_secs: u64,
226    /// Time since last activity in seconds
227    pub idle_secs: u64,
228}
229
230impl Default for MetricsSnapshot {
231    fn default() -> Self {
232        Self {
233            messages_sent: 0,
234            messages_failed: 0,
235            bytes_sent: 0,
236            bytes_received: 0,
237            compression_ratio: 1.0,
238            latency_p50: None,
239            latency_p95: None,
240            latency_p99: None,
241            roundtrip_p50: None,
242            roundtrip_p95: None,
243            roundtrip_p99: None,
244            uptime_secs: 0,
245            idle_secs: 0,
246        }
247    }
248}
249
250/// Connection quality assessment
251#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
252#[serde(rename_all = "snake_case")]
253pub enum ConnectionQuality {
254    /// Excellent (< 50ms latency, < 1% error rate)
255    Excellent,
256    /// Good (< 100ms latency, < 5% error rate)
257    Good,
258    /// Fair (< 250ms latency, < 10% error rate)
259    Fair,
260    /// Poor (> 250ms latency or > 10% error rate)
261    Poor,
262    /// Unknown (not enough data)
263    Unknown,
264}
265
266impl MetricsSnapshot {
267    /// Assess connection quality based on metrics
268    pub fn connection_quality(&self) -> ConnectionQuality {
269        // Need enough samples
270        if self.messages_sent < 10 {
271            return ConnectionQuality::Unknown;
272        }
273
274        // Calculate error rate
275        let error_rate = if self.messages_sent > 0 {
276            self.messages_failed as f64 / self.messages_sent as f64
277        } else {
278            0.0
279        };
280
281        // Use p95 latency for quality assessment
282        let latency = self.latency_p95.unwrap_or(0);
283
284        if error_rate > 0.10 || latency > 250 {
285            ConnectionQuality::Poor
286        } else if error_rate > 0.05 || latency > 100 {
287            ConnectionQuality::Fair
288        } else if error_rate > 0.01 || latency > 50 {
289            ConnectionQuality::Good
290        } else {
291            ConnectionQuality::Excellent
292        }
293    }
294
295    /// Calculate throughput in bytes per second
296    pub fn throughput_bps(&self) -> f64 {
297        if self.uptime_secs > 0 {
298            (self.bytes_sent + self.bytes_received) as f64 / self.uptime_secs as f64
299        } else {
300            0.0
301        }
302    }
303
304    /// Calculate message rate per second
305    pub fn messages_per_second(&self) -> f64 {
306        if self.uptime_secs > 0 {
307            self.messages_sent as f64 / self.uptime_secs as f64
308        } else {
309            0.0
310        }
311    }
312}
313
314#[cfg(test)]
315mod tests {
316    use super::*;
317
318    #[test]
319    fn test_metrics_recording() {
320        let metrics = ProtocolMetrics::new();
321        metrics.record_connection_start();
322
323        metrics.record_message_sent(100);
324        metrics.record_message_sent(200);
325        metrics.record_message_failed();
326        metrics.record_bytes_received(150);
327
328        let snapshot = metrics.snapshot();
329        assert_eq!(snapshot.messages_sent, 2);
330        assert_eq!(snapshot.messages_failed, 1);
331        assert_eq!(snapshot.bytes_sent, 300);
332        assert_eq!(snapshot.bytes_received, 150);
333    }
334
335    #[test]
336    fn test_latency_percentiles() {
337        let metrics = ProtocolMetrics::new();
338
339        // Add 100 samples from 1-100ms
340        for i in 1..=100 {
341            metrics.record_latency(Duration::from_millis(i));
342        }
343
344        let snapshot = metrics.snapshot();
345        // Percentile calculation may vary slightly due to rounding
346        let p50 = snapshot.latency_p50.unwrap();
347        let p95 = snapshot.latency_p95.unwrap();
348        let p99 = snapshot.latency_p99.unwrap();
349        assert!(
350            p50 >= 49 && p50 <= 51,
351            "p50 should be around 50, got {}",
352            p50
353        );
354        assert!(
355            p95 >= 94 && p95 <= 96,
356            "p95 should be around 95, got {}",
357            p95
358        );
359        assert!(
360            p99 >= 98 && p99 <= 100,
361            "p99 should be around 99, got {}",
362            p99
363        );
364    }
365
366    #[test]
367    fn test_compression_ratio() {
368        let metrics = ProtocolMetrics::new();
369        metrics.record_compression(1000, 400); // 60% compression
370
371        let snapshot = metrics.snapshot();
372        assert!((snapshot.compression_ratio - 0.4).abs() < 0.01);
373    }
374
375    #[test]
376    fn test_connection_quality() {
377        let mut snapshot = MetricsSnapshot::default();
378
379        // Not enough data
380        assert_eq!(snapshot.connection_quality(), ConnectionQuality::Unknown);
381
382        // Good connection
383        snapshot.messages_sent = 100;
384        snapshot.messages_failed = 0;
385        snapshot.latency_p95 = Some(30);
386        assert_eq!(snapshot.connection_quality(), ConnectionQuality::Excellent);
387
388        // Fair connection
389        snapshot.latency_p95 = Some(120);
390        assert_eq!(snapshot.connection_quality(), ConnectionQuality::Fair);
391
392        // Poor connection
393        snapshot.messages_failed = 15; // 15% error rate
394        assert_eq!(snapshot.connection_quality(), ConnectionQuality::Poor);
395    }
396}