1use serde::{Deserialize, Serialize};
19use std::collections::VecDeque;
20use std::sync::Arc;
21use std::sync::atomic::{AtomicU64, AtomicUsize, Ordering};
22use std::time::{Duration, Instant};
23use tokio::sync::RwLock;
24
25#[derive(Debug, Clone, Serialize, Deserialize)]
27pub struct Metrics {
28 pub lookups_p95_ms: u64,
30 pub hop_p95: u8,
32 pub timeout_rate: f32,
34}
35
36#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Serialize, Deserialize)]
38pub enum StreamClass {
39 Control,
40 Mls,
41 File,
42 Media,
43}
44
45pub struct TelemetryCollector {
47 lookup_latencies: Arc<RwLock<VecDeque<u64>>>,
49 hop_counts: Arc<RwLock<VecDeque<u8>>>,
51 total_ops: Arc<AtomicUsize>,
53 timeouts: Arc<AtomicUsize>,
55 dht_puts: Arc<AtomicU64>,
57 dht_gets: Arc<AtomicU64>,
59 auth_failures: Arc<AtomicU64>,
61 stream_bandwidth: Arc<RwLock<HashMap<StreamClass, VecDeque<u64>>>>,
63 stream_rtt: Arc<RwLock<HashMap<StreamClass, VecDeque<u64>>>>,
65}
66
67use std::collections::HashMap;
68
69impl TelemetryCollector {
70 pub fn new() -> Self {
72 Self {
73 lookup_latencies: Arc::new(RwLock::new(VecDeque::with_capacity(1000))),
74 hop_counts: Arc::new(RwLock::new(VecDeque::with_capacity(1000))),
75 total_ops: Arc::new(AtomicUsize::new(0)),
76 timeouts: Arc::new(AtomicUsize::new(0)),
77 dht_puts: Arc::new(AtomicU64::new(0)),
78 dht_gets: Arc::new(AtomicU64::new(0)),
79 auth_failures: Arc::new(AtomicU64::new(0)),
80 stream_bandwidth: Arc::new(RwLock::new(HashMap::new())),
81 stream_rtt: Arc::new(RwLock::new(HashMap::new())),
82 }
83 }
84
85 pub async fn record_lookup(&self, latency: Duration, hops: u8) {
87 self.total_ops.fetch_add(1, Ordering::Relaxed);
88
89 let micros = latency.as_micros() as u64;
90 let mut latencies = self.lookup_latencies.write().await;
91 if latencies.len() >= 1000 {
92 latencies.pop_front();
93 }
94 latencies.push_back(micros);
95
96 let mut hop_counts = self.hop_counts.write().await;
97 if hop_counts.len() >= 1000 {
98 hop_counts.pop_front();
99 }
100 hop_counts.push_back(hops);
101 }
102
103 pub fn record_timeout(&self) {
105 self.timeouts.fetch_add(1, Ordering::Relaxed);
106 self.total_ops.fetch_add(1, Ordering::Relaxed);
107 }
108
109 pub fn record_dht_put(&self) {
111 self.dht_puts.fetch_add(1, Ordering::Relaxed);
112 }
113
114 pub fn record_dht_get(&self) {
116 self.dht_gets.fetch_add(1, Ordering::Relaxed);
117 }
118
119 pub fn record_auth_failure(&self) {
121 self.auth_failures.fetch_add(1, Ordering::Relaxed);
122 }
123
124 pub async fn record_stream_bandwidth(&self, class: StreamClass, bytes_per_sec: u64) {
126 let mut bandwidth = self.stream_bandwidth.write().await;
127 let samples = bandwidth
128 .entry(class)
129 .or_insert_with(|| VecDeque::with_capacity(100));
130
131 if samples.len() >= 100 {
132 samples.pop_front();
133 }
134 samples.push_back(bytes_per_sec);
135 }
136
137 pub async fn record_stream_rtt(&self, class: StreamClass, rtt: Duration) {
139 let micros = rtt.as_micros() as u64;
140 let mut rtts = self.stream_rtt.write().await;
141 let samples = rtts
142 .entry(class)
143 .or_insert_with(|| VecDeque::with_capacity(100));
144
145 if samples.len() >= 100 {
146 samples.pop_front();
147 }
148 samples.push_back(micros);
149 }
150
151 pub async fn get_metrics(&self) -> Metrics {
153 let latencies = self.lookup_latencies.read().await;
154 let hops = self.hop_counts.read().await;
155
156 let lookups_p95_ms = calculate_percentile(&latencies, 95) / 1000;
157 let hop_p95 = calculate_percentile_u8(&hops, 95);
158
159 let total = self.total_ops.load(Ordering::Relaxed) as f32;
160 let timeouts = self.timeouts.load(Ordering::Relaxed) as f32;
161 let timeout_rate = if total > 0.0 { timeouts / total } else { 0.0 };
162
163 Metrics {
164 lookups_p95_ms,
165 hop_p95,
166 timeout_rate,
167 }
168 }
169
170 pub fn get_counters(&self) -> EventCounters {
172 EventCounters {
173 dht_puts: self.dht_puts.load(Ordering::Relaxed),
174 dht_gets: self.dht_gets.load(Ordering::Relaxed),
175 auth_failures: self.auth_failures.load(Ordering::Relaxed),
176 }
177 }
178
179 pub async fn get_stream_metrics(&self, class: StreamClass) -> Option<StreamMetrics> {
181 let bandwidth = self.stream_bandwidth.read().await;
182 let rtts = self.stream_rtt.read().await;
183
184 let bw_samples = bandwidth.get(&class)?;
185 let rtt_samples = rtts.get(&class)?;
186
187 if bw_samples.is_empty() || rtt_samples.is_empty() {
188 return None;
189 }
190
191 Some(StreamMetrics {
192 bandwidth_p50: calculate_percentile(bw_samples, 50),
193 bandwidth_p95: calculate_percentile(bw_samples, 95),
194 rtt_p50_ms: calculate_percentile(rtt_samples, 50) / 1000,
195 rtt_p95_ms: calculate_percentile(rtt_samples, 95) / 1000,
196 })
197 }
198
199 pub async fn record_stream_class_usage(&self, class: StreamClass) {
201 let class_id = match class {
203 StreamClass::Control => "control",
204 StreamClass::Mls => "mls",
205 StreamClass::File => "file",
206 StreamClass::Media => "media",
207 };
208
209 tracing::debug!("Stream class {} opened", class_id);
211
212 self.record_stream_bandwidth(class, 1024).await;
214 }
215
216 pub async fn reset(&self) {
218 self.lookup_latencies.write().await.clear();
219 self.hop_counts.write().await.clear();
220 self.total_ops.store(0, Ordering::Relaxed);
221 self.timeouts.store(0, Ordering::Relaxed);
222 self.dht_puts.store(0, Ordering::Relaxed);
223 self.dht_gets.store(0, Ordering::Relaxed);
224 self.auth_failures.store(0, Ordering::Relaxed);
225 self.stream_bandwidth.write().await.clear();
226 self.stream_rtt.write().await.clear();
227 }
228}
229
230impl Default for TelemetryCollector {
231 fn default() -> Self {
232 Self::new()
233 }
234}
235
236#[derive(Debug, Clone, Serialize, Deserialize)]
238pub struct EventCounters {
239 pub dht_puts: u64,
240 pub dht_gets: u64,
241 pub auth_failures: u64,
242}
243
244#[derive(Debug, Clone, Serialize, Deserialize)]
246pub struct StreamMetrics {
247 pub bandwidth_p50: u64,
248 pub bandwidth_p95: u64,
249 pub rtt_p50_ms: u64,
250 pub rtt_p95_ms: u64,
251}
252
253fn calculate_percentile(samples: &VecDeque<u64>, percentile: usize) -> u64 {
255 if samples.is_empty() {
256 return 0;
257 }
258
259 let mut sorted: Vec<u64> = samples.iter().copied().collect();
260 sorted.sort_unstable();
261
262 let index = (sorted.len() * percentile) / 100;
263 let index = index.min(sorted.len() - 1);
264
265 sorted[index]
266}
267
268fn calculate_percentile_u8(samples: &VecDeque<u8>, percentile: usize) -> u8 {
270 if samples.is_empty() {
271 return 0;
272 }
273
274 let mut sorted: Vec<u8> = samples.iter().copied().collect();
275 sorted.sort_unstable();
276
277 let index = (sorted.len() * percentile) / 100;
278 let index = index.min(sorted.len() - 1);
279
280 sorted[index]
281}
282
283#[derive(Debug, Clone, Serialize, Deserialize)]
285pub struct HealthStatus {
286 pub healthy: bool,
287 pub uptime: Duration,
288 pub metrics: Metrics,
289 pub counters: EventCounters,
290}
291
292pub struct HealthMonitor {
294 start_time: Instant,
295 collector: Arc<TelemetryCollector>,
296}
297
298impl HealthMonitor {
299 pub fn new(collector: Arc<TelemetryCollector>) -> Self {
301 Self {
302 start_time: Instant::now(),
303 collector,
304 }
305 }
306
307 pub async fn get_status(&self) -> HealthStatus {
309 let metrics = self.collector.get_metrics().await;
310 let counters = self.collector.get_counters();
311
312 let healthy = metrics.timeout_rate < 0.1 && metrics.lookups_p95_ms < 5000;
314
315 HealthStatus {
316 healthy,
317 uptime: self.start_time.elapsed(),
318 metrics,
319 counters,
320 }
321 }
322}
323
324static GLOBAL_TELEMETRY: once_cell::sync::Lazy<Arc<TelemetryCollector>> =
326 once_cell::sync::Lazy::new(|| Arc::new(TelemetryCollector::new()));
327
328pub fn telemetry() -> Arc<TelemetryCollector> {
330 GLOBAL_TELEMETRY.clone()
331}
332
333pub async fn record_lookup(latency: Duration, hops: u8) {
335 telemetry().record_lookup(latency, hops).await;
336}
337
338pub fn record_timeout() {
340 telemetry().record_timeout();
341}
342
343#[cfg(test)]
344mod tests {
345 use super::*;
346
347 #[tokio::test]
348 async fn test_telemetry_collector() {
349 let collector = TelemetryCollector::new();
350
351 collector.record_lookup(Duration::from_millis(10), 3).await;
353 collector.record_lookup(Duration::from_millis(20), 4).await;
354 collector.record_lookup(Duration::from_millis(30), 5).await;
355
356 collector.record_timeout();
358
359 let metrics = collector.get_metrics().await;
361 assert!(metrics.lookups_p95_ms > 0);
362 assert!(metrics.hop_p95 > 0);
363 assert!(metrics.timeout_rate > 0.0);
364 }
365
366 #[tokio::test]
367 async fn test_event_counters() {
368 let collector = TelemetryCollector::new();
369
370 collector.record_dht_put();
371 collector.record_dht_put();
372 collector.record_dht_get();
373 collector.record_auth_failure();
374
375 let counters = collector.get_counters();
376 assert_eq!(counters.dht_puts, 2);
377 assert_eq!(counters.dht_gets, 1);
378 assert_eq!(counters.auth_failures, 1);
379 }
380
381 #[tokio::test]
382 async fn test_stream_metrics() {
383 let collector = TelemetryCollector::new();
384
385 collector
386 .record_stream_bandwidth(StreamClass::Media, 1000000)
387 .await;
388 collector
389 .record_stream_bandwidth(StreamClass::Media, 2000000)
390 .await;
391
392 collector
393 .record_stream_rtt(StreamClass::Media, Duration::from_millis(10))
394 .await;
395 collector
396 .record_stream_rtt(StreamClass::Media, Duration::from_millis(20))
397 .await;
398
399 let metrics = collector
400 .get_stream_metrics(StreamClass::Media)
401 .await
402 .unwrap();
403 assert!(metrics.bandwidth_p50 > 0);
404 assert!(metrics.rtt_p50_ms > 0);
405 }
406
407 #[tokio::test]
408 async fn test_health_monitor() {
409 let collector = Arc::new(TelemetryCollector::new());
410 let monitor = HealthMonitor::new(collector.clone());
411
412 collector.record_lookup(Duration::from_millis(100), 3).await;
414
415 let status = monitor.get_status().await;
416 assert!(status.healthy);
417 assert!(status.uptime.as_secs() < 10);
418 }
419
420 #[test]
421 fn test_percentile_calculation() {
422 let mut samples = VecDeque::new();
423 for i in 1..=100 {
424 samples.push_back(i as u64);
425 }
426
427 assert_eq!(calculate_percentile(&samples, 50), 51);
429 assert_eq!(calculate_percentile(&samples, 95), 96);
430 }
431}