Skip to main content

saorsa_core/dht/
telemetry.rs

1//! DHT telemetry and performance monitoring
2//!
3//! Tracks P50/P95 lookup latency, hops, success rates, and churn metrics
4//! for DHT performance analysis and optimization.
5
6use std::collections::{HashMap, VecDeque};
7use std::sync::Arc;
8use std::time::{Duration, Instant, SystemTime};
9use tokio::sync::RwLock;
10
11/// Telemetry data point
12#[derive(Debug, Clone)]
13pub struct TelemetryPoint {
14    pub timestamp: SystemTime,
15    pub operation: OperationType,
16    pub duration: Duration,
17    pub hops: usize,
18    pub success: bool,
19    pub error_type: Option<String>,
20}
21
22/// Operation types for telemetry
23#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
24pub enum OperationType {
25    Put,
26    Get,
27    FindNode,
28    Provide,
29}
30
31/// DHT telemetry collector
32pub struct DhtTelemetry {
33    /// Recent telemetry points (rolling window)
34    points: Arc<RwLock<VecDeque<TelemetryPoint>>>,
35    /// Maximum number of points to keep
36    max_points: usize,
37    /// Start time for uptime calculation
38    start_time: Instant,
39}
40
41impl DhtTelemetry {
42    /// Create new telemetry collector
43    pub fn new(max_points: usize) -> Self {
44        Self {
45            points: Arc::new(RwLock::new(VecDeque::new())),
46            max_points,
47            start_time: Instant::now(),
48        }
49    }
50
51    /// Record a telemetry point
52    pub async fn record(&self, point: TelemetryPoint) {
53        let mut points = self.points.write().await;
54        points.push_back(point);
55
56        // Maintain rolling window
57        while points.len() > self.max_points {
58            points.pop_front();
59        }
60    }
61
62    /// Record a PUT operation
63    pub async fn record_put(
64        &self,
65        duration: Duration,
66        hops: usize,
67        success: bool,
68        error: Option<String>,
69    ) {
70        self.record(TelemetryPoint {
71            timestamp: SystemTime::now(),
72            operation: OperationType::Put,
73            duration,
74            hops,
75            success,
76            error_type: error,
77        })
78        .await;
79    }
80
81    /// Record a GET operation
82    pub async fn record_get(
83        &self,
84        duration: Duration,
85        hops: usize,
86        success: bool,
87        error: Option<String>,
88    ) {
89        self.record(TelemetryPoint {
90            timestamp: SystemTime::now(),
91            operation: OperationType::Get,
92            duration,
93            hops,
94            success,
95            error_type: error,
96        })
97        .await;
98    }
99
100    /// Record a FIND_NODE operation
101    pub async fn record_find_node(
102        &self,
103        duration: Duration,
104        hops: usize,
105        success: bool,
106        error: Option<String>,
107    ) {
108        self.record(TelemetryPoint {
109            timestamp: SystemTime::now(),
110            operation: OperationType::FindNode,
111            duration,
112            hops,
113            success,
114            error_type: error,
115        })
116        .await;
117    }
118
119    /// Get current statistics
120    pub async fn get_stats(&self) -> TelemetryStats {
121        let points = self.points.read().await;
122
123        if points.is_empty() {
124            return TelemetryStats::default();
125        }
126
127        // Calculate latency percentiles
128        let mut latencies: Vec<_> = points
129            .iter()
130            .map(|p| p.duration.as_millis() as u64)
131            .collect();
132        latencies.sort();
133
134        let p50 = percentile(&latencies, 50);
135        let p95 = percentile(&latencies, 95);
136        let p99 = percentile(&latencies, 99);
137
138        // Calculate success rates by operation type
139        let mut operation_stats = HashMap::new();
140        for op_type in &[
141            OperationType::Put,
142            OperationType::Get,
143            OperationType::FindNode,
144            OperationType::Provide,
145        ] {
146            let op_points: Vec<_> = points.iter().filter(|p| p.operation == *op_type).collect();
147
148            if !op_points.is_empty() {
149                let total = op_points.len();
150                let successful = op_points.iter().filter(|p| p.success).count();
151                let success_rate = successful as f64 / total as f64;
152
153                let avg_hops =
154                    op_points.iter().map(|p| p.hops).sum::<usize>() as f64 / total as f64;
155                let avg_latency = op_points
156                    .iter()
157                    .map(|p| p.duration.as_millis())
158                    .sum::<u128>() as f64
159                    / total as f64;
160
161                operation_stats.insert(
162                    *op_type,
163                    OperationStats {
164                        total_operations: total,
165                        success_rate,
166                        avg_hops,
167                        avg_latency_ms: avg_latency,
168                    },
169                );
170            }
171        }
172
173        // Calculate churn (simplified: operations per minute)
174        let uptime_minutes = self.start_time.elapsed().as_secs() / 60;
175        let churn_rate = if uptime_minutes > 0 {
176            points.len() as f64 / uptime_minutes as f64
177        } else {
178            0.0
179        };
180
181        TelemetryStats {
182            total_operations: points.len(),
183            p50_latency_ms: p50,
184            p95_latency_ms: p95,
185            p99_latency_ms: p99,
186            operation_stats,
187            churn_rate_per_minute: churn_rate,
188            uptime_seconds: self.start_time.elapsed().as_secs(),
189        }
190    }
191
192    /// Get recent error summary
193    pub async fn get_error_summary(&self) -> HashMap<String, usize> {
194        let points = self.points.read().await;
195        let mut errors = HashMap::new();
196
197        for point in points.iter() {
198            if !point.success {
199                if let Some(error_type) = &point.error_type {
200                    *errors.entry(error_type.clone()).or_insert(0) += 1;
201                } else {
202                    *errors.entry("unknown".to_string()).or_insert(0) += 1;
203                }
204            }
205        }
206
207        errors
208    }
209}
210
211/// Telemetry statistics
212#[derive(Debug, Clone)]
213pub struct TelemetryStats {
214    pub total_operations: usize,
215    pub p50_latency_ms: u64,
216    pub p95_latency_ms: u64,
217    pub p99_latency_ms: u64,
218    pub operation_stats: HashMap<OperationType, OperationStats>,
219    pub churn_rate_per_minute: f64,
220    pub uptime_seconds: u64,
221}
222
223impl Default for TelemetryStats {
224    fn default() -> Self {
225        Self {
226            total_operations: 0,
227            p50_latency_ms: 0,
228            p95_latency_ms: 0,
229            p99_latency_ms: 0,
230            operation_stats: HashMap::new(),
231            churn_rate_per_minute: 0.0,
232            uptime_seconds: 0,
233        }
234    }
235}
236
237/// Statistics for a specific operation type
238#[derive(Debug, Clone)]
239pub struct OperationStats {
240    pub total_operations: usize,
241    pub success_rate: f64,
242    pub avg_hops: f64,
243    pub avg_latency_ms: f64,
244}
245
246/// Calculate percentile from sorted data
247fn percentile(sorted_data: &[u64], percentile: u8) -> u64 {
248    if sorted_data.is_empty() {
249        return 0;
250    }
251
252    let pos = percentile as f64 / 100.0 * (sorted_data.len() - 1) as f64;
253    let index = pos.ceil() as usize;
254    sorted_data[index.min(sorted_data.len() - 1)]
255}
256
257#[cfg(test)]
258mod tests {
259    use super::*;
260    use std::time::Duration;
261
262    #[tokio::test]
263    async fn test_telemetry_recording() {
264        let telemetry = DhtTelemetry::new(1000);
265
266        // Record some operations
267        telemetry
268            .record_put(Duration::from_millis(50), 3, true, None)
269            .await;
270        telemetry
271            .record_get(Duration::from_millis(100), 4, true, None)
272            .await;
273        telemetry
274            .record_find_node(Duration::from_millis(25), 2, true, None)
275            .await;
276        telemetry
277            .record_put(
278                Duration::from_millis(200),
279                5,
280                false,
281                Some("timeout".to_string()),
282            )
283            .await;
284
285        let stats = telemetry.get_stats().await;
286
287        assert_eq!(stats.total_operations, 4);
288        assert!(stats.p50_latency_ms > 0);
289        assert!(stats.p95_latency_ms > 0);
290
291        // Check operation-specific stats
292        assert!(stats.operation_stats.contains_key(&OperationType::Put));
293        assert!(stats.operation_stats.contains_key(&OperationType::Get));
294
295        let put_stats = &stats.operation_stats[&OperationType::Put];
296        assert_eq!(put_stats.total_operations, 2);
297        assert_eq!(put_stats.success_rate, 0.5); // 1 success out of 2
298    }
299
300    #[test]
301    fn test_percentile_calculation() {
302        let data = vec![10, 20, 30, 40, 50];
303        assert_eq!(percentile(&data, 50), 30); // Median
304        assert_eq!(percentile(&data, 95), 50); // 95th percentile
305    }
306
307    #[tokio::test]
308    async fn test_error_summary() {
309        let telemetry = DhtTelemetry::new(1000);
310
311        telemetry
312            .record_put(
313                Duration::from_millis(100),
314                3,
315                false,
316                Some("timeout".to_string()),
317            )
318            .await;
319        telemetry
320            .record_get(
321                Duration::from_millis(100),
322                3,
323                false,
324                Some("timeout".to_string()),
325            )
326            .await;
327        telemetry
328            .record_put(
329                Duration::from_millis(100),
330                3,
331                false,
332                Some("network_error".to_string()),
333            )
334            .await;
335
336        let errors = telemetry.get_error_summary().await;
337
338        assert_eq!(errors.get("timeout").copied().unwrap_or(0), 2);
339        assert_eq!(errors.get("network_error").copied().unwrap_or(0), 1);
340    }
341}