Skip to main content

saorsa_core/telemetry/
mod.rs

1// Copyright 2024 Saorsa Labs Limited
2//
3// This software is dual-licensed under:
4// - GNU Affero General Public License v3.0 or later (AGPL-3.0-or-later)
5// - Commercial License
6//
7// For AGPL-3.0 license, see LICENSE-AGPL-3.0
8// For commercial licensing, contact: david@saorsalabs.com
9//
10// Unless required by applicable law or agreed to in writing, software
11// distributed under these licenses is distributed on an "AS IS" BASIS,
12// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
14//! Telemetry module for metrics, tracing, and health signals.
15//!
16//! Provides observability into system performance and health.
17
18use serde::{Deserialize, Serialize};
19use std::collections::VecDeque;
20use std::sync::Arc;
21use std::sync::atomic::{AtomicU64, AtomicUsize, Ordering};
22use std::time::{Duration, Instant};
23use tokio::sync::RwLock;
24
25/// Core metrics structure
26#[derive(Debug, Clone, Serialize, Deserialize)]
27pub struct Metrics {
28    /// Lookup latency P95 in milliseconds
29    pub lookups_p95_ms: u64,
30    /// Hop count P95
31    pub hop_p95: u8,
32    /// Timeout rate (0.0 to 1.0)
33    pub timeout_rate: f32,
34}
35
36/// Stream class for QoS tracking
37#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Serialize, Deserialize)]
38pub enum StreamClass {
39    Control,
40    Mls,
41    File,
42    Media,
43}
44
45/// Telemetry collector for system metrics
46pub struct TelemetryCollector {
47    /// Lookup latencies in microseconds
48    lookup_latencies: Arc<RwLock<VecDeque<u64>>>,
49    /// Hop counts
50    hop_counts: Arc<RwLock<VecDeque<u8>>>,
51    /// Total operations count
52    total_ops: Arc<AtomicUsize>,
53    /// Timeout count
54    timeouts: Arc<AtomicUsize>,
55    /// DHT put counter
56    dht_puts: Arc<AtomicU64>,
57    /// DHT get counter
58    dht_gets: Arc<AtomicU64>,
59    /// Auth failure counter
60    auth_failures: Arc<AtomicU64>,
61    /// Stream bandwidth by class (bytes/sec)
62    stream_bandwidth: Arc<RwLock<HashMap<StreamClass, VecDeque<u64>>>>,
63    /// Stream RTT by class (microseconds)
64    stream_rtt: Arc<RwLock<HashMap<StreamClass, VecDeque<u64>>>>,
65}
66
67use std::collections::HashMap;
68
69impl TelemetryCollector {
70    /// Create a new telemetry collector
71    pub fn new() -> Self {
72        Self {
73            lookup_latencies: Arc::new(RwLock::new(VecDeque::with_capacity(1000))),
74            hop_counts: Arc::new(RwLock::new(VecDeque::with_capacity(1000))),
75            total_ops: Arc::new(AtomicUsize::new(0)),
76            timeouts: Arc::new(AtomicUsize::new(0)),
77            dht_puts: Arc::new(AtomicU64::new(0)),
78            dht_gets: Arc::new(AtomicU64::new(0)),
79            auth_failures: Arc::new(AtomicU64::new(0)),
80            stream_bandwidth: Arc::new(RwLock::new(HashMap::new())),
81            stream_rtt: Arc::new(RwLock::new(HashMap::new())),
82        }
83    }
84
85    /// Record a lookup operation
86    pub async fn record_lookup(&self, latency: Duration, hops: u8) {
87        self.total_ops.fetch_add(1, Ordering::Relaxed);
88
89        let micros = latency.as_micros() as u64;
90        let mut latencies = self.lookup_latencies.write().await;
91        if latencies.len() >= 1000 {
92            latencies.pop_front();
93        }
94        latencies.push_back(micros);
95
96        let mut hop_counts = self.hop_counts.write().await;
97        if hop_counts.len() >= 1000 {
98            hop_counts.pop_front();
99        }
100        hop_counts.push_back(hops);
101    }
102
103    /// Record a timeout
104    pub fn record_timeout(&self) {
105        self.timeouts.fetch_add(1, Ordering::Relaxed);
106        self.total_ops.fetch_add(1, Ordering::Relaxed);
107    }
108
109    /// Record a DHT put operation
110    pub fn record_dht_put(&self) {
111        self.dht_puts.fetch_add(1, Ordering::Relaxed);
112    }
113
114    /// Record a DHT get operation
115    pub fn record_dht_get(&self) {
116        self.dht_gets.fetch_add(1, Ordering::Relaxed);
117    }
118
119    /// Record an auth failure
120    pub fn record_auth_failure(&self) {
121        self.auth_failures.fetch_add(1, Ordering::Relaxed);
122    }
123
124    /// Record stream bandwidth
125    pub async fn record_stream_bandwidth(&self, class: StreamClass, bytes_per_sec: u64) {
126        let mut bandwidth = self.stream_bandwidth.write().await;
127        let samples = bandwidth
128            .entry(class)
129            .or_insert_with(|| VecDeque::with_capacity(100));
130
131        if samples.len() >= 100 {
132            samples.pop_front();
133        }
134        samples.push_back(bytes_per_sec);
135    }
136
137    /// Record stream RTT
138    pub async fn record_stream_rtt(&self, class: StreamClass, rtt: Duration) {
139        let micros = rtt.as_micros() as u64;
140        let mut rtts = self.stream_rtt.write().await;
141        let samples = rtts
142            .entry(class)
143            .or_insert_with(|| VecDeque::with_capacity(100));
144
145        if samples.len() >= 100 {
146            samples.pop_front();
147        }
148        samples.push_back(micros);
149    }
150
151    /// Get current metrics
152    pub async fn get_metrics(&self) -> Metrics {
153        let latencies = self.lookup_latencies.read().await;
154        let hops = self.hop_counts.read().await;
155
156        let lookups_p95_ms = calculate_percentile(&latencies, 95) / 1000;
157        let hop_p95 = calculate_percentile_u8(&hops, 95);
158
159        let total = self.total_ops.load(Ordering::Relaxed) as f32;
160        let timeouts = self.timeouts.load(Ordering::Relaxed) as f32;
161        let timeout_rate = if total > 0.0 { timeouts / total } else { 0.0 };
162
163        Metrics {
164            lookups_p95_ms,
165            hop_p95,
166            timeout_rate,
167        }
168    }
169
170    /// Get event counters
171    pub fn get_counters(&self) -> EventCounters {
172        EventCounters {
173            dht_puts: self.dht_puts.load(Ordering::Relaxed),
174            dht_gets: self.dht_gets.load(Ordering::Relaxed),
175            auth_failures: self.auth_failures.load(Ordering::Relaxed),
176        }
177    }
178
179    /// Get stream metrics for a class
180    pub async fn get_stream_metrics(&self, class: StreamClass) -> Option<StreamMetrics> {
181        let bandwidth = self.stream_bandwidth.read().await;
182        let rtts = self.stream_rtt.read().await;
183
184        let bw_samples = bandwidth.get(&class)?;
185        let rtt_samples = rtts.get(&class)?;
186
187        if bw_samples.is_empty() || rtt_samples.is_empty() {
188            return None;
189        }
190
191        Some(StreamMetrics {
192            bandwidth_p50: calculate_percentile(bw_samples, 50),
193            bandwidth_p95: calculate_percentile(bw_samples, 95),
194            rtt_p50_ms: calculate_percentile(rtt_samples, 50) / 1000,
195            rtt_p95_ms: calculate_percentile(rtt_samples, 95) / 1000,
196        })
197    }
198
199    /// Record stream class usage
200    pub async fn record_stream_class_usage(&self, class: StreamClass) {
201        // Track stream class usage for QoS monitoring
202        let class_id = match class {
203            StreamClass::Control => "control",
204            StreamClass::Mls => "mls",
205            StreamClass::File => "file",
206            StreamClass::Media => "media",
207        };
208
209        // For now, just log the usage - could be extended to track counts
210        tracing::debug!("Stream class {} opened", class_id);
211
212        // Record a bandwidth sample for the class (placeholder value)
213        self.record_stream_bandwidth(class, 1024).await;
214    }
215
216    /// Reset all metrics
217    pub async fn reset(&self) {
218        self.lookup_latencies.write().await.clear();
219        self.hop_counts.write().await.clear();
220        self.total_ops.store(0, Ordering::Relaxed);
221        self.timeouts.store(0, Ordering::Relaxed);
222        self.dht_puts.store(0, Ordering::Relaxed);
223        self.dht_gets.store(0, Ordering::Relaxed);
224        self.auth_failures.store(0, Ordering::Relaxed);
225        self.stream_bandwidth.write().await.clear();
226        self.stream_rtt.write().await.clear();
227    }
228}
229
230impl Default for TelemetryCollector {
231    fn default() -> Self {
232        Self::new()
233    }
234}
235
236/// Event counters
237#[derive(Debug, Clone, Serialize, Deserialize)]
238pub struct EventCounters {
239    pub dht_puts: u64,
240    pub dht_gets: u64,
241    pub auth_failures: u64,
242}
243
244/// Stream metrics
245#[derive(Debug, Clone, Serialize, Deserialize)]
246pub struct StreamMetrics {
247    pub bandwidth_p50: u64,
248    pub bandwidth_p95: u64,
249    pub rtt_p50_ms: u64,
250    pub rtt_p95_ms: u64,
251}
252
253/// Calculate percentile for u64 values
254fn calculate_percentile(samples: &VecDeque<u64>, percentile: usize) -> u64 {
255    if samples.is_empty() {
256        return 0;
257    }
258
259    let mut sorted: Vec<u64> = samples.iter().copied().collect();
260    sorted.sort_unstable();
261
262    let index = (sorted.len() * percentile) / 100;
263    let index = index.min(sorted.len() - 1);
264
265    sorted[index]
266}
267
268/// Calculate percentile for u8 values
269fn calculate_percentile_u8(samples: &VecDeque<u8>, percentile: usize) -> u8 {
270    if samples.is_empty() {
271        return 0;
272    }
273
274    let mut sorted: Vec<u8> = samples.iter().copied().collect();
275    sorted.sort_unstable();
276
277    let index = (sorted.len() * percentile) / 100;
278    let index = index.min(sorted.len() - 1);
279
280    sorted[index]
281}
282
283/// Health check status
284#[derive(Debug, Clone, Serialize, Deserialize)]
285pub struct HealthStatus {
286    pub healthy: bool,
287    pub uptime: Duration,
288    pub metrics: Metrics,
289    pub counters: EventCounters,
290}
291
292/// Health monitor
293pub struct HealthMonitor {
294    start_time: Instant,
295    collector: Arc<TelemetryCollector>,
296}
297
298impl HealthMonitor {
299    /// Create a new health monitor
300    pub fn new(collector: Arc<TelemetryCollector>) -> Self {
301        Self {
302            start_time: Instant::now(),
303            collector,
304        }
305    }
306
307    /// Get health status
308    pub async fn get_status(&self) -> HealthStatus {
309        let metrics = self.collector.get_metrics().await;
310        let counters = self.collector.get_counters();
311
312        // Simple health check: timeout rate < 10% and latency < 5 seconds
313        let healthy = metrics.timeout_rate < 0.1 && metrics.lookups_p95_ms < 5000;
314
315        HealthStatus {
316            healthy,
317            uptime: self.start_time.elapsed(),
318            metrics,
319            counters,
320        }
321    }
322}
323
324/// Global telemetry instance
325static GLOBAL_TELEMETRY: once_cell::sync::Lazy<Arc<TelemetryCollector>> =
326    once_cell::sync::Lazy::new(|| Arc::new(TelemetryCollector::new()));
327
328/// Get the global telemetry collector
329pub fn telemetry() -> Arc<TelemetryCollector> {
330    GLOBAL_TELEMETRY.clone()
331}
332
333/// Record a lookup operation globally
334pub async fn record_lookup(latency: Duration, hops: u8) {
335    telemetry().record_lookup(latency, hops).await;
336}
337
338/// Record a timeout globally
339pub fn record_timeout() {
340    telemetry().record_timeout();
341}
342
343#[cfg(test)]
344mod tests {
345    use super::*;
346
347    #[tokio::test]
348    async fn test_telemetry_collector() {
349        let collector = TelemetryCollector::new();
350
351        // Record some lookups
352        collector.record_lookup(Duration::from_millis(10), 3).await;
353        collector.record_lookup(Duration::from_millis(20), 4).await;
354        collector.record_lookup(Duration::from_millis(30), 5).await;
355
356        // Record a timeout
357        collector.record_timeout();
358
359        // Get metrics
360        let metrics = collector.get_metrics().await;
361        assert!(metrics.lookups_p95_ms > 0);
362        assert!(metrics.hop_p95 > 0);
363        assert!(metrics.timeout_rate > 0.0);
364    }
365
366    #[tokio::test]
367    async fn test_event_counters() {
368        let collector = TelemetryCollector::new();
369
370        collector.record_dht_put();
371        collector.record_dht_put();
372        collector.record_dht_get();
373        collector.record_auth_failure();
374
375        let counters = collector.get_counters();
376        assert_eq!(counters.dht_puts, 2);
377        assert_eq!(counters.dht_gets, 1);
378        assert_eq!(counters.auth_failures, 1);
379    }
380
381    #[tokio::test]
382    async fn test_stream_metrics() {
383        let collector = TelemetryCollector::new();
384
385        collector
386            .record_stream_bandwidth(StreamClass::Media, 1000000)
387            .await;
388        collector
389            .record_stream_bandwidth(StreamClass::Media, 2000000)
390            .await;
391
392        collector
393            .record_stream_rtt(StreamClass::Media, Duration::from_millis(10))
394            .await;
395        collector
396            .record_stream_rtt(StreamClass::Media, Duration::from_millis(20))
397            .await;
398
399        let metrics = collector
400            .get_stream_metrics(StreamClass::Media)
401            .await
402            .unwrap();
403        assert!(metrics.bandwidth_p50 > 0);
404        assert!(metrics.rtt_p50_ms > 0);
405    }
406
407    #[tokio::test]
408    async fn test_health_monitor() {
409        let collector = Arc::new(TelemetryCollector::new());
410        let monitor = HealthMonitor::new(collector.clone());
411
412        // Record some healthy operations
413        collector.record_lookup(Duration::from_millis(100), 3).await;
414
415        let status = monitor.get_status().await;
416        assert!(status.healthy);
417        assert!(status.uptime.as_secs() < 10);
418    }
419
420    #[test]
421    fn test_percentile_calculation() {
422        let mut samples = VecDeque::new();
423        for i in 1..=100 {
424            samples.push_back(i as u64);
425        }
426
427        // Due to 0-based indexing, 50th percentile of 1-100 gives index 50 which is value 51
428        assert_eq!(calculate_percentile(&samples, 50), 51);
429        assert_eq!(calculate_percentile(&samples, 95), 96);
430    }
431}