1use std::collections::{HashMap, VecDeque};
7use std::sync::Arc;
8use std::time::{Duration, Instant, SystemTime};
9use tokio::sync::RwLock;
10
11#[derive(Debug, Clone)]
13pub struct TelemetryPoint {
14 pub timestamp: SystemTime,
15 pub operation: OperationType,
16 pub duration: Duration,
17 pub hops: usize,
18 pub success: bool,
19 pub error_type: Option<String>,
20}
21
22#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
24pub enum OperationType {
25 Put,
26 Get,
27 FindNode,
28 Provide,
29}
30
31pub struct DhtTelemetry {
33 points: Arc<RwLock<VecDeque<TelemetryPoint>>>,
35 max_points: usize,
37 start_time: Instant,
39}
40
41impl DhtTelemetry {
42 pub fn new(max_points: usize) -> Self {
44 Self {
45 points: Arc::new(RwLock::new(VecDeque::new())),
46 max_points,
47 start_time: Instant::now(),
48 }
49 }
50
51 pub async fn record(&self, point: TelemetryPoint) {
53 let mut points = self.points.write().await;
54 points.push_back(point);
55
56 while points.len() > self.max_points {
58 points.pop_front();
59 }
60 }
61
62 pub async fn record_put(
64 &self,
65 duration: Duration,
66 hops: usize,
67 success: bool,
68 error: Option<String>,
69 ) {
70 self.record(TelemetryPoint {
71 timestamp: SystemTime::now(),
72 operation: OperationType::Put,
73 duration,
74 hops,
75 success,
76 error_type: error,
77 })
78 .await;
79 }
80
81 pub async fn record_get(
83 &self,
84 duration: Duration,
85 hops: usize,
86 success: bool,
87 error: Option<String>,
88 ) {
89 self.record(TelemetryPoint {
90 timestamp: SystemTime::now(),
91 operation: OperationType::Get,
92 duration,
93 hops,
94 success,
95 error_type: error,
96 })
97 .await;
98 }
99
100 pub async fn record_find_node(
102 &self,
103 duration: Duration,
104 hops: usize,
105 success: bool,
106 error: Option<String>,
107 ) {
108 self.record(TelemetryPoint {
109 timestamp: SystemTime::now(),
110 operation: OperationType::FindNode,
111 duration,
112 hops,
113 success,
114 error_type: error,
115 })
116 .await;
117 }
118
119 pub async fn get_stats(&self) -> TelemetryStats {
121 let points = self.points.read().await;
122
123 if points.is_empty() {
124 return TelemetryStats::default();
125 }
126
127 let mut latencies: Vec<_> = points
129 .iter()
130 .map(|p| p.duration.as_millis() as u64)
131 .collect();
132 latencies.sort();
133
134 let p50 = percentile(&latencies, 50);
135 let p95 = percentile(&latencies, 95);
136 let p99 = percentile(&latencies, 99);
137
138 let mut operation_stats = HashMap::new();
140 for op_type in &[
141 OperationType::Put,
142 OperationType::Get,
143 OperationType::FindNode,
144 OperationType::Provide,
145 ] {
146 let op_points: Vec<_> = points.iter().filter(|p| p.operation == *op_type).collect();
147
148 if !op_points.is_empty() {
149 let total = op_points.len();
150 let successful = op_points.iter().filter(|p| p.success).count();
151 let success_rate = successful as f64 / total as f64;
152
153 let avg_hops =
154 op_points.iter().map(|p| p.hops).sum::<usize>() as f64 / total as f64;
155 let avg_latency = op_points
156 .iter()
157 .map(|p| p.duration.as_millis())
158 .sum::<u128>() as f64
159 / total as f64;
160
161 operation_stats.insert(
162 *op_type,
163 OperationStats {
164 total_operations: total,
165 success_rate,
166 avg_hops,
167 avg_latency_ms: avg_latency,
168 },
169 );
170 }
171 }
172
173 let uptime_minutes = self.start_time.elapsed().as_secs() / 60;
175 let churn_rate = if uptime_minutes > 0 {
176 points.len() as f64 / uptime_minutes as f64
177 } else {
178 0.0
179 };
180
181 TelemetryStats {
182 total_operations: points.len(),
183 p50_latency_ms: p50,
184 p95_latency_ms: p95,
185 p99_latency_ms: p99,
186 operation_stats,
187 churn_rate_per_minute: churn_rate,
188 uptime_seconds: self.start_time.elapsed().as_secs(),
189 }
190 }
191
192 pub async fn get_error_summary(&self) -> HashMap<String, usize> {
194 let points = self.points.read().await;
195 let mut errors = HashMap::new();
196
197 for point in points.iter() {
198 if !point.success {
199 if let Some(error_type) = &point.error_type {
200 *errors.entry(error_type.clone()).or_insert(0) += 1;
201 } else {
202 *errors.entry("unknown".to_string()).or_insert(0) += 1;
203 }
204 }
205 }
206
207 errors
208 }
209}
210
211#[derive(Debug, Clone)]
213pub struct TelemetryStats {
214 pub total_operations: usize,
215 pub p50_latency_ms: u64,
216 pub p95_latency_ms: u64,
217 pub p99_latency_ms: u64,
218 pub operation_stats: HashMap<OperationType, OperationStats>,
219 pub churn_rate_per_minute: f64,
220 pub uptime_seconds: u64,
221}
222
223impl Default for TelemetryStats {
224 fn default() -> Self {
225 Self {
226 total_operations: 0,
227 p50_latency_ms: 0,
228 p95_latency_ms: 0,
229 p99_latency_ms: 0,
230 operation_stats: HashMap::new(),
231 churn_rate_per_minute: 0.0,
232 uptime_seconds: 0,
233 }
234 }
235}
236
237#[derive(Debug, Clone)]
239pub struct OperationStats {
240 pub total_operations: usize,
241 pub success_rate: f64,
242 pub avg_hops: f64,
243 pub avg_latency_ms: f64,
244}
245
246fn percentile(sorted_data: &[u64], percentile: u8) -> u64 {
248 if sorted_data.is_empty() {
249 return 0;
250 }
251
252 let pos = percentile as f64 / 100.0 * (sorted_data.len() - 1) as f64;
253 let index = pos.ceil() as usize;
254 sorted_data[index.min(sorted_data.len() - 1)]
255}
256
257#[cfg(test)]
258mod tests {
259 use super::*;
260 use std::time::Duration;
261
262 #[tokio::test]
263 async fn test_telemetry_recording() {
264 let telemetry = DhtTelemetry::new(1000);
265
266 telemetry
268 .record_put(Duration::from_millis(50), 3, true, None)
269 .await;
270 telemetry
271 .record_get(Duration::from_millis(100), 4, true, None)
272 .await;
273 telemetry
274 .record_find_node(Duration::from_millis(25), 2, true, None)
275 .await;
276 telemetry
277 .record_put(
278 Duration::from_millis(200),
279 5,
280 false,
281 Some("timeout".to_string()),
282 )
283 .await;
284
285 let stats = telemetry.get_stats().await;
286
287 assert_eq!(stats.total_operations, 4);
288 assert!(stats.p50_latency_ms > 0);
289 assert!(stats.p95_latency_ms > 0);
290
291 assert!(stats.operation_stats.contains_key(&OperationType::Put));
293 assert!(stats.operation_stats.contains_key(&OperationType::Get));
294
295 let put_stats = &stats.operation_stats[&OperationType::Put];
296 assert_eq!(put_stats.total_operations, 2);
297 assert_eq!(put_stats.success_rate, 0.5); }
299
300 #[test]
301 fn test_percentile_calculation() {
302 let data = vec![10, 20, 30, 40, 50];
303 assert_eq!(percentile(&data, 50), 30); assert_eq!(percentile(&data, 95), 50); }
306
307 #[tokio::test]
308 async fn test_error_summary() {
309 let telemetry = DhtTelemetry::new(1000);
310
311 telemetry
312 .record_put(
313 Duration::from_millis(100),
314 3,
315 false,
316 Some("timeout".to_string()),
317 )
318 .await;
319 telemetry
320 .record_get(
321 Duration::from_millis(100),
322 3,
323 false,
324 Some("timeout".to_string()),
325 )
326 .await;
327 telemetry
328 .record_put(
329 Duration::from_millis(100),
330 3,
331 false,
332 Some("network_error".to_string()),
333 )
334 .await;
335
336 let errors = telemetry.get_error_summary().await;
337
338 assert_eq!(errors.get("timeout").copied().unwrap_or(0), 2);
339 assert_eq!(errors.get("network_error").copied().unwrap_or(0), 1);
340 }
341}