1use chrono::{DateTime, Utc};
6use serde::{Deserialize, Serialize};
7use std::collections::VecDeque;
8use std::sync::Arc;
9use tokio::sync::RwLock;
10
11#[derive(Debug, Clone, Serialize, Deserialize)]
13pub struct LatencySample {
14 pub timestamp: DateTime<Utc>,
16 pub latency_ms: u64,
18 pub endpoint: Option<String>,
20 pub method: Option<String>,
22 pub status_code: Option<u16>,
24 pub error: Option<String>,
26}
27
28#[derive(Debug, Clone)]
32pub struct LatencyRecorder {
33 samples: Arc<RwLock<VecDeque<LatencySample>>>,
35 max_samples: usize,
37 max_age_seconds: u64,
39}
40
41impl LatencyRecorder {
42 pub fn new(max_samples: usize, max_age_seconds: u64) -> Self {
44 Self {
45 samples: Arc::new(RwLock::new(VecDeque::new())),
46 max_samples,
47 max_age_seconds,
48 }
49 }
50
51 pub async fn record(
53 &self,
54 latency_ms: u64,
55 endpoint: Option<String>,
56 method: Option<String>,
57 status_code: Option<u16>,
58 error: Option<String>,
59 ) {
60 let sample = LatencySample {
61 timestamp: Utc::now(),
62 latency_ms,
63 endpoint,
64 method,
65 status_code,
66 error,
67 };
68
69 let mut samples = self.samples.write().await;
70 samples.push_back(sample);
71
72 self.cleanup_old_samples(&mut samples).await;
74 }
75
76 pub async fn get_samples(&self) -> Vec<LatencySample> {
78 let mut samples = self.samples.write().await;
79 self.cleanup_old_samples(&mut samples).await;
80 samples.iter().cloned().collect()
81 }
82
83 pub async fn get_samples_for_endpoint(&self, endpoint: &str) -> Vec<LatencySample> {
85 let samples = self.get_samples().await;
86 samples
87 .into_iter()
88 .filter(|s| s.endpoint.as_ref().map(|e| e == endpoint).unwrap_or(false))
89 .collect()
90 }
91
92 pub async fn get_samples_in_range(
94 &self,
95 start: DateTime<Utc>,
96 end: DateTime<Utc>,
97 ) -> Vec<LatencySample> {
98 let samples = self.get_samples().await;
99 samples
100 .into_iter()
101 .filter(|s| s.timestamp >= start && s.timestamp <= end)
102 .collect()
103 }
104
105 async fn cleanup_old_samples(&self, samples: &mut VecDeque<LatencySample>) {
107 let now = Utc::now();
108 let cutoff = now
109 .checked_sub_signed(chrono::Duration::seconds(self.max_age_seconds as i64))
110 .unwrap_or(now);
111
112 while samples.front().map(|s| s.timestamp < cutoff).unwrap_or(false) {
114 samples.pop_front();
115 }
116
117 while samples.len() > self.max_samples {
119 samples.pop_front();
120 }
121 }
122
123 pub async fn clear(&self) {
125 let mut samples = self.samples.write().await;
126 samples.clear();
127 }
128
129 pub async fn sample_count(&self) -> usize {
131 let samples = self.samples.read().await;
132 samples.len()
133 }
134}
135
136#[derive(Debug, Clone)]
140pub struct LatencyAnalyzer {
141 recorder: Arc<LatencyRecorder>,
142}
143
144impl LatencyAnalyzer {
145 pub fn new(recorder: Arc<LatencyRecorder>) -> Self {
147 Self { recorder }
148 }
149
150 pub async fn calculate_stats(&self) -> LatencyStats {
152 let samples = self.recorder.get_samples().await;
153 self.calculate_stats_from_samples(&samples)
154 }
155
156 pub async fn calculate_stats_for_endpoint(&self, endpoint: &str) -> LatencyStats {
158 let samples = self.recorder.get_samples_for_endpoint(endpoint).await;
159 self.calculate_stats_from_samples(&samples)
160 }
161
162 fn calculate_stats_from_samples(&self, samples: &[LatencySample]) -> LatencyStats {
164 if samples.is_empty() {
165 return LatencyStats::default();
166 }
167
168 let mut latencies: Vec<u64> = samples.iter().map(|s| s.latency_ms).collect();
169 latencies.sort();
170
171 let count = latencies.len();
172 let sum: u64 = latencies.iter().sum();
173 let avg = sum as f64 / count as f64;
174
175 let min = latencies[0];
176 let max = latencies[count - 1];
177 let median = if count % 2 == 0 {
178 (latencies[count / 2 - 1] + latencies[count / 2]) as f64 / 2.0
179 } else {
180 latencies[count / 2] as f64
181 };
182
183 let p95 = if count > 0 {
184 latencies[(count as f64 * 0.95) as usize]
185 } else {
186 0
187 };
188
189 let p99 = if count > 0 {
190 latencies[(count as f64 * 0.99) as usize]
191 } else {
192 0
193 };
194
195 let error_count = samples
197 .iter()
198 .filter(|s| s.error.is_some() || s.status_code.map(|c| c >= 400).unwrap_or(false))
199 .count();
200 let error_rate = error_count as f64 / count as f64;
201
202 LatencyStats {
203 count,
204 min,
205 max,
206 avg,
207 median,
208 p95,
209 p99,
210 error_rate,
211 }
212 }
213}
214
215#[derive(Debug, Clone, Serialize, Deserialize, Default)]
217pub struct LatencyStats {
218 pub count: usize,
220 pub min: u64,
222 pub max: u64,
224 pub avg: f64,
226 pub median: f64,
228 pub p95: u64,
230 pub p99: u64,
232 pub error_rate: f64,
234}
235
236#[cfg(test)]
237mod tests {
238 use super::*;
239
240 #[test]
242 fn test_latency_sample_clone() {
243 let sample = LatencySample {
244 timestamp: Utc::now(),
245 latency_ms: 100,
246 endpoint: Some("/api/users".to_string()),
247 method: Some("GET".to_string()),
248 status_code: Some(200),
249 error: None,
250 };
251
252 let cloned = sample.clone();
253 assert_eq!(sample.latency_ms, cloned.latency_ms);
254 assert_eq!(sample.endpoint, cloned.endpoint);
255 }
256
257 #[test]
258 fn test_latency_sample_debug() {
259 let sample = LatencySample {
260 timestamp: Utc::now(),
261 latency_ms: 150,
262 endpoint: None,
263 method: None,
264 status_code: None,
265 error: None,
266 };
267
268 let debug = format!("{:?}", sample);
269 assert!(debug.contains("LatencySample"));
270 assert!(debug.contains("150"));
271 }
272
273 #[test]
274 fn test_latency_sample_serialize() {
275 let sample = LatencySample {
276 timestamp: Utc::now(),
277 latency_ms: 200,
278 endpoint: Some("/api/test".to_string()),
279 method: Some("POST".to_string()),
280 status_code: Some(201),
281 error: None,
282 };
283
284 let json = serde_json::to_string(&sample).unwrap();
285 assert!(json.contains("\"latency_ms\":200"));
286 assert!(json.contains("POST"));
287 }
288
289 #[test]
290 fn test_latency_sample_with_error() {
291 let sample = LatencySample {
292 timestamp: Utc::now(),
293 latency_ms: 500,
294 endpoint: Some("/api/users".to_string()),
295 method: Some("GET".to_string()),
296 status_code: Some(500),
297 error: Some("Internal Server Error".to_string()),
298 };
299
300 assert!(sample.error.is_some());
301 assert_eq!(sample.status_code, Some(500));
302 }
303
304 #[tokio::test]
306 async fn test_latency_recorder() {
307 let recorder = LatencyRecorder::new(1000, 300);
308
309 recorder
310 .record(100, Some("/api/users".to_string()), Some("GET".to_string()), Some(200), None)
311 .await;
312 recorder
313 .record(150, Some("/api/users".to_string()), Some("GET".to_string()), Some(200), None)
314 .await;
315
316 let samples = recorder.get_samples().await;
317 assert_eq!(samples.len(), 2);
318 }
319
320 #[tokio::test]
321 async fn test_latency_recorder_debug() {
322 let recorder = LatencyRecorder::new(100, 60);
323 let debug = format!("{:?}", recorder);
324 assert!(debug.contains("LatencyRecorder"));
325 }
326
327 #[tokio::test]
328 async fn test_latency_recorder_clone() {
329 let recorder = LatencyRecorder::new(100, 60);
330 let _cloned = recorder.clone();
331 }
332
333 #[tokio::test]
334 async fn test_latency_recorder_sample_count() {
335 let recorder = LatencyRecorder::new(100, 300);
336
337 assert_eq!(recorder.sample_count().await, 0);
338
339 recorder.record(100, None, None, None, None).await;
340 assert_eq!(recorder.sample_count().await, 1);
341
342 recorder.record(200, None, None, None, None).await;
343 assert_eq!(recorder.sample_count().await, 2);
344 }
345
346 #[tokio::test]
347 async fn test_latency_recorder_clear() {
348 let recorder = LatencyRecorder::new(100, 300);
349
350 recorder.record(100, None, None, None, None).await;
351 recorder.record(200, None, None, None, None).await;
352
353 assert_eq!(recorder.sample_count().await, 2);
354
355 recorder.clear().await;
356
357 assert_eq!(recorder.sample_count().await, 0);
358 }
359
360 #[tokio::test]
361 async fn test_latency_recorder_get_samples_for_endpoint() {
362 let recorder = LatencyRecorder::new(100, 300);
363
364 recorder.record(100, Some("/api/users".to_string()), None, None, None).await;
365 recorder.record(150, Some("/api/users".to_string()), None, None, None).await;
366 recorder.record(200, Some("/api/orders".to_string()), None, None, None).await;
367
368 let user_samples = recorder.get_samples_for_endpoint("/api/users").await;
369 assert_eq!(user_samples.len(), 2);
370
371 let order_samples = recorder.get_samples_for_endpoint("/api/orders").await;
372 assert_eq!(order_samples.len(), 1);
373
374 let unknown_samples = recorder.get_samples_for_endpoint("/api/unknown").await;
375 assert_eq!(unknown_samples.len(), 0);
376 }
377
378 #[tokio::test]
379 async fn test_latency_recorder_get_samples_in_range() {
380 let recorder = LatencyRecorder::new(100, 300);
381
382 let now = Utc::now();
383
384 recorder.record(100, None, None, None, None).await;
385 recorder.record(200, None, None, None, None).await;
386
387 let start = now - chrono::Duration::seconds(1);
388 let end = now + chrono::Duration::seconds(1);
389
390 let samples = recorder.get_samples_in_range(start, end).await;
391 assert_eq!(samples.len(), 2);
392 }
393
394 #[tokio::test]
395 async fn test_latency_recorder_max_samples() {
396 let recorder = LatencyRecorder::new(5, 300);
397
398 for i in 0..10 {
399 recorder.record(i * 10, None, None, None, None).await;
400 }
401
402 let samples = recorder.get_samples().await;
403 assert!(samples.len() <= 5);
404 }
405
406 #[tokio::test]
408 async fn test_latency_analyzer() {
409 let recorder = Arc::new(LatencyRecorder::new(1000, 300));
410 let analyzer = LatencyAnalyzer::new(recorder.clone());
411
412 for latency in [100, 150, 200, 250, 300] {
414 recorder.record(latency, None, None, None, None).await;
415 }
416
417 let stats = analyzer.calculate_stats().await;
418 assert_eq!(stats.count, 5);
419 assert_eq!(stats.min, 100);
420 assert_eq!(stats.max, 300);
421 }
422
423 #[test]
424 fn test_latency_analyzer_debug() {
425 let recorder = Arc::new(LatencyRecorder::new(100, 60));
426 let analyzer = LatencyAnalyzer::new(recorder);
427 let debug = format!("{:?}", analyzer);
428 assert!(debug.contains("LatencyAnalyzer"));
429 }
430
431 #[test]
432 fn test_latency_analyzer_clone() {
433 let recorder = Arc::new(LatencyRecorder::new(100, 60));
434 let analyzer = LatencyAnalyzer::new(recorder);
435 let _cloned = analyzer.clone();
436 }
437
438 #[tokio::test]
439 async fn test_latency_analyzer_empty_stats() {
440 let recorder = Arc::new(LatencyRecorder::new(100, 300));
441 let analyzer = LatencyAnalyzer::new(recorder);
442
443 let stats = analyzer.calculate_stats().await;
444 assert_eq!(stats.count, 0);
445 assert_eq!(stats.min, 0);
446 assert_eq!(stats.max, 0);
447 assert_eq!(stats.avg, 0.0);
448 }
449
450 #[tokio::test]
451 async fn test_latency_analyzer_calculate_stats_for_endpoint() {
452 let recorder = Arc::new(LatencyRecorder::new(1000, 300));
453 let analyzer = LatencyAnalyzer::new(recorder.clone());
454
455 recorder.record(100, Some("/api/users".to_string()), None, None, None).await;
456 recorder.record(200, Some("/api/users".to_string()), None, None, None).await;
457 recorder.record(500, Some("/api/orders".to_string()), None, None, None).await;
458
459 let user_stats = analyzer.calculate_stats_for_endpoint("/api/users").await;
460 assert_eq!(user_stats.count, 2);
461 assert_eq!(user_stats.min, 100);
462 assert_eq!(user_stats.max, 200);
463 }
464
465 #[tokio::test]
466 async fn test_latency_analyzer_error_rate() {
467 let recorder = Arc::new(LatencyRecorder::new(1000, 300));
468 let analyzer = LatencyAnalyzer::new(recorder.clone());
469
470 recorder.record(100, None, None, Some(200), None).await;
472 recorder.record(100, None, None, Some(200), None).await;
473 recorder.record(100, None, None, Some(200), None).await;
474
475 recorder.record(100, None, None, Some(500), None).await;
477 recorder.record(100, None, None, Some(404), None).await;
478
479 let stats = analyzer.calculate_stats().await;
480 assert_eq!(stats.count, 5);
481 assert_eq!(stats.error_rate, 0.4); }
483
484 #[tokio::test]
485 async fn test_latency_analyzer_error_rate_with_error_message() {
486 let recorder = Arc::new(LatencyRecorder::new(1000, 300));
487 let analyzer = LatencyAnalyzer::new(recorder.clone());
488
489 recorder.record(100, None, None, None, None).await;
490 recorder.record(100, None, None, None, Some("Timeout".to_string())).await;
491
492 let stats = analyzer.calculate_stats().await;
493 assert_eq!(stats.error_rate, 0.5); }
495
496 #[test]
498 fn test_latency_stats_default() {
499 let stats = LatencyStats::default();
500 assert_eq!(stats.count, 0);
501 assert_eq!(stats.min, 0);
502 assert_eq!(stats.max, 0);
503 assert_eq!(stats.avg, 0.0);
504 assert_eq!(stats.median, 0.0);
505 assert_eq!(stats.p95, 0);
506 assert_eq!(stats.p99, 0);
507 assert_eq!(stats.error_rate, 0.0);
508 }
509
510 #[test]
511 fn test_latency_stats_clone() {
512 let stats = LatencyStats {
513 count: 100,
514 min: 10,
515 max: 500,
516 avg: 150.0,
517 median: 140.0,
518 p95: 400,
519 p99: 480,
520 error_rate: 0.05,
521 };
522
523 let cloned = stats.clone();
524 assert_eq!(stats.count, cloned.count);
525 assert_eq!(stats.min, cloned.min);
526 assert_eq!(stats.avg, cloned.avg);
527 }
528
529 #[test]
530 fn test_latency_stats_debug() {
531 let stats = LatencyStats::default();
532 let debug = format!("{:?}", stats);
533 assert!(debug.contains("LatencyStats"));
534 }
535
536 #[test]
537 fn test_latency_stats_serialize() {
538 let stats = LatencyStats {
539 count: 50,
540 min: 10,
541 max: 200,
542 avg: 100.0,
543 median: 95.0,
544 p95: 180,
545 p99: 195,
546 error_rate: 0.02,
547 };
548
549 let json = serde_json::to_string(&stats).unwrap();
550 assert!(json.contains("\"count\":50"));
551 assert!(json.contains("\"p95\":180"));
552 }
553
554 #[tokio::test]
555 async fn test_latency_analyzer_percentiles() {
556 let recorder = Arc::new(LatencyRecorder::new(1000, 300));
557 let analyzer = LatencyAnalyzer::new(recorder.clone());
558
559 for i in 1..=100 {
561 recorder.record(i, None, None, None, None).await;
562 }
563
564 let stats = analyzer.calculate_stats().await;
565 assert_eq!(stats.count, 100);
566 assert_eq!(stats.min, 1);
567 assert_eq!(stats.max, 100);
568 assert!(stats.p95 >= 90);
570 assert!(stats.p99 >= 95);
572 }
573
574 #[tokio::test]
575 async fn test_latency_analyzer_median_odd_count() {
576 let recorder = Arc::new(LatencyRecorder::new(1000, 300));
577 let analyzer = LatencyAnalyzer::new(recorder.clone());
578
579 for latency in [10, 20, 30, 40, 50] {
581 recorder.record(latency, None, None, None, None).await;
582 }
583
584 let stats = analyzer.calculate_stats().await;
585 assert_eq!(stats.median, 30.0);
586 }
587
588 #[tokio::test]
589 async fn test_latency_analyzer_median_even_count() {
590 let recorder = Arc::new(LatencyRecorder::new(1000, 300));
591 let analyzer = LatencyAnalyzer::new(recorder.clone());
592
593 for latency in [10, 20, 30, 40] {
595 recorder.record(latency, None, None, None, None).await;
596 }
597
598 let stats = analyzer.calculate_stats().await;
599 assert_eq!(stats.median, 25.0); }
601}