1use std::collections::HashMap;
6use std::sync::atomic::{AtomicU64, Ordering};
7use std::time::{Duration, Instant};
8
9use dashmap::DashMap;
10use parking_lot::RwLock;
11
12use super::limiter::{LimiterKey, RateLimitResult};
13
14pub struct RateLimitMetrics {
16 total_requests: AtomicU64,
18
19 allowed: AtomicU64,
21
22 queued: AtomicU64,
24
25 throttled: AtomicU64,
27
28 warned: AtomicU64,
30
31 denied: AtomicU64,
33
34 key_stats: DashMap<String, KeyStats>,
36
37 decision_times_us: RwLock<Vec<u64>>,
39
40 max_timing_samples: usize,
42
43 started_at: Instant,
45}
46
47impl RateLimitMetrics {
48 pub fn new() -> Self {
50 Self {
51 total_requests: AtomicU64::new(0),
52 allowed: AtomicU64::new(0),
53 queued: AtomicU64::new(0),
54 throttled: AtomicU64::new(0),
55 warned: AtomicU64::new(0),
56 denied: AtomicU64::new(0),
57 key_stats: DashMap::new(),
58 decision_times_us: RwLock::new(Vec::with_capacity(1000)),
59 max_timing_samples: 1000,
60 started_at: Instant::now(),
61 }
62 }
63
64 pub fn record_decision(&self, key: &LimiterKey, result: &RateLimitResult, elapsed: Duration) {
66 self.total_requests.fetch_add(1, Ordering::Relaxed);
67
68 match result {
69 RateLimitResult::Allowed => {
70 self.allowed.fetch_add(1, Ordering::Relaxed);
71 }
72 RateLimitResult::Queued(_) => {
73 self.queued.fetch_add(1, Ordering::Relaxed);
74 }
75 RateLimitResult::Throttled(_) => {
76 self.throttled.fetch_add(1, Ordering::Relaxed);
77 }
78 RateLimitResult::Warned(_) => {
79 self.warned.fetch_add(1, Ordering::Relaxed);
80 }
81 RateLimitResult::Denied(_) => {
82 self.denied.fetch_add(1, Ordering::Relaxed);
83 }
84 }
85
86 let key_str = key.to_string();
88 self.key_stats
89 .entry(key_str)
90 .and_modify(|stats| stats.record(result))
91 .or_insert_with(|| {
92 let stats = KeyStats::new();
93 stats.record(result);
94 stats
95 });
96
97 self.record_timing(elapsed);
99 }
100
101 fn record_timing(&self, elapsed: Duration) {
103 let us = elapsed.as_micros() as u64;
104 let mut times = self.decision_times_us.write();
105
106 if times.len() >= self.max_timing_samples {
107 times.drain(0..self.max_timing_samples / 2);
108 }
109 times.push(us);
110 }
111
112 pub fn reset_key(&self, key: &LimiterKey) {
114 let key_str = key.to_string();
115 self.key_stats.remove(&key_str);
116 }
117
118 pub fn get_stats(&self) -> RateLimitStats {
120 let total = self.total_requests.load(Ordering::Relaxed);
121 let allowed = self.allowed.load(Ordering::Relaxed);
122 let queued = self.queued.load(Ordering::Relaxed);
123 let throttled = self.throttled.load(Ordering::Relaxed);
124 let warned = self.warned.load(Ordering::Relaxed);
125 let denied = self.denied.load(Ordering::Relaxed);
126
127 let times = self.decision_times_us.read();
129 let (avg_time_us, p50_time_us, p99_time_us) = if times.is_empty() {
130 (0, 0, 0)
131 } else {
132 let mut sorted = times.clone();
133 sorted.sort_unstable();
134
135 let avg = sorted.iter().sum::<u64>() / sorted.len() as u64;
136 let p50 = sorted[sorted.len() / 2];
137 let p99_idx = ((sorted.len() as f64) * 0.99) as usize;
138 let p99 = sorted
139 .get(p99_idx)
140 .copied()
141 .unwrap_or(sorted[sorted.len() - 1]);
142
143 (avg, p50, p99)
144 };
145
146 let key_stats: HashMap<_, _> = self
148 .key_stats
149 .iter()
150 .map(|entry| (entry.key().clone(), entry.value().snapshot()))
151 .collect();
152
153 RateLimitStats {
154 total_requests: total,
155 allowed,
156 queued,
157 throttled,
158 warned,
159 denied,
160 avg_decision_time_us: avg_time_us,
161 p50_decision_time_us: p50_time_us,
162 p99_decision_time_us: p99_time_us,
163 key_stats,
164 uptime_secs: self.started_at.elapsed().as_secs(),
165 }
166 }
167
168 pub fn total_requests(&self) -> u64 {
170 self.total_requests.load(Ordering::Relaxed)
171 }
172
173 pub fn allowed(&self) -> u64 {
175 self.allowed.load(Ordering::Relaxed)
176 }
177
178 pub fn denied(&self) -> u64 {
180 self.denied.load(Ordering::Relaxed)
181 }
182
183 pub fn denial_rate(&self) -> f64 {
185 let total = self.total_requests.load(Ordering::Relaxed);
186 let denied = self.denied.load(Ordering::Relaxed);
187
188 if total == 0 {
189 0.0
190 } else {
191 denied as f64 / total as f64
192 }
193 }
194
195 pub fn uptime(&self) -> Duration {
197 self.started_at.elapsed()
198 }
199
200 pub fn reset(&self) {
202 self.total_requests.store(0, Ordering::Relaxed);
203 self.allowed.store(0, Ordering::Relaxed);
204 self.queued.store(0, Ordering::Relaxed);
205 self.throttled.store(0, Ordering::Relaxed);
206 self.warned.store(0, Ordering::Relaxed);
207 self.denied.store(0, Ordering::Relaxed);
208 self.key_stats.clear();
209 self.decision_times_us.write().clear();
210 }
211}
212
213impl Default for RateLimitMetrics {
214 fn default() -> Self {
215 Self::new()
216 }
217}
218
219impl std::fmt::Debug for RateLimitMetrics {
220 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
221 f.debug_struct("RateLimitMetrics")
222 .field(
223 "total_requests",
224 &self.total_requests.load(Ordering::Relaxed),
225 )
226 .field("denied", &self.denied.load(Ordering::Relaxed))
227 .field("key_count", &self.key_stats.len())
228 .finish()
229 }
230}
231
232pub struct KeyStats {
234 total: AtomicU64,
236
237 allowed: AtomicU64,
239
240 denied: AtomicU64,
242
243 last_request_ns: AtomicU64,
245
246 epoch: Instant,
248}
249
250impl KeyStats {
251 fn new() -> Self {
252 Self {
253 total: AtomicU64::new(0),
254 allowed: AtomicU64::new(0),
255 denied: AtomicU64::new(0),
256 last_request_ns: AtomicU64::new(0),
257 epoch: Instant::now(),
258 }
259 }
260
261 fn record(&self, result: &RateLimitResult) {
262 self.total.fetch_add(1, Ordering::Relaxed);
263
264 match result {
265 RateLimitResult::Allowed
266 | RateLimitResult::Queued(_)
267 | RateLimitResult::Throttled(_)
268 | RateLimitResult::Warned(_) => {
269 self.allowed.fetch_add(1, Ordering::Relaxed);
270 }
271 RateLimitResult::Denied(_) => {
272 self.denied.fetch_add(1, Ordering::Relaxed);
273 }
274 }
275
276 self.last_request_ns
277 .store(self.epoch.elapsed().as_nanos() as u64, Ordering::Relaxed);
278 }
279
280 fn snapshot(&self) -> KeyStatsSnapshot {
281 let last_ns = self.last_request_ns.load(Ordering::Relaxed);
282 let last_request = if last_ns > 0 {
283 Some(Duration::from_nanos(last_ns))
284 } else {
285 None
286 };
287
288 KeyStatsSnapshot {
289 total: self.total.load(Ordering::Relaxed),
290 allowed: self.allowed.load(Ordering::Relaxed),
291 denied: self.denied.load(Ordering::Relaxed),
292 last_request_age: last_request,
293 }
294 }
295}
296
297#[derive(Debug, Clone)]
299pub struct KeyStatsSnapshot {
300 pub total: u64,
302
303 pub allowed: u64,
305
306 pub denied: u64,
308
309 pub last_request_age: Option<Duration>,
311}
312
313impl KeyStatsSnapshot {
314 pub fn denial_rate(&self) -> f64 {
316 if self.total == 0 {
317 0.0
318 } else {
319 self.denied as f64 / self.total as f64
320 }
321 }
322
323 pub fn allow_rate(&self) -> f64 {
325 if self.total == 0 {
326 0.0
327 } else {
328 self.allowed as f64 / self.total as f64
329 }
330 }
331}
332
333#[derive(Debug, Clone)]
335pub struct RateLimitStats {
336 pub total_requests: u64,
338
339 pub allowed: u64,
341
342 pub queued: u64,
344
345 pub throttled: u64,
347
348 pub warned: u64,
350
351 pub denied: u64,
353
354 pub avg_decision_time_us: u64,
356
357 pub p50_decision_time_us: u64,
359
360 pub p99_decision_time_us: u64,
362
363 pub key_stats: HashMap<String, KeyStatsSnapshot>,
365
366 pub uptime_secs: u64,
368}
369
370impl RateLimitStats {
371 pub fn denial_rate(&self) -> f64 {
373 if self.total_requests == 0 {
374 0.0
375 } else {
376 self.denied as f64 / self.total_requests as f64
377 }
378 }
379
380 pub fn allow_rate(&self) -> f64 {
382 if self.total_requests == 0 {
383 0.0
384 } else {
385 self.allowed as f64 / self.total_requests as f64
386 }
387 }
388
389 pub fn requests_per_second(&self) -> f64 {
391 if self.uptime_secs == 0 {
392 0.0
393 } else {
394 self.total_requests as f64 / self.uptime_secs as f64
395 }
396 }
397
398 pub fn top_denied_keys(&self, n: usize) -> Vec<(&String, &KeyStatsSnapshot)> {
400 let mut entries: Vec<_> = self.key_stats.iter().collect();
401 entries.sort_by(|a, b| {
402 b.1.denial_rate()
403 .partial_cmp(&a.1.denial_rate())
404 .unwrap_or(std::cmp::Ordering::Equal)
405 });
406 entries.truncate(n);
407 entries
408 }
409
410 pub fn top_request_keys(&self, n: usize) -> Vec<(&String, &KeyStatsSnapshot)> {
412 let mut entries: Vec<_> = self.key_stats.iter().collect();
413 entries.sort_by_key(|b| std::cmp::Reverse(b.1.total));
414 entries.truncate(n);
415 entries
416 }
417}
418
419#[cfg(test)]
420mod tests {
421 use super::*;
422
423 #[test]
424 fn test_metrics_creation() {
425 let metrics = RateLimitMetrics::new();
426 let stats = metrics.get_stats();
427
428 assert_eq!(stats.total_requests, 0);
429 assert_eq!(stats.denied, 0);
430 }
431
432 #[test]
433 fn test_record_allowed() {
434 let metrics = RateLimitMetrics::new();
435 let key = LimiterKey::User("test".to_string());
436
437 metrics.record_decision(&key, &RateLimitResult::Allowed, Duration::from_micros(10));
438
439 let stats = metrics.get_stats();
440 assert_eq!(stats.total_requests, 1);
441 assert_eq!(stats.allowed, 1);
442 assert_eq!(stats.denied, 0);
443 }
444
445 #[test]
446 fn test_record_denied() {
447 let metrics = RateLimitMetrics::new();
448 let key = LimiterKey::User("test".to_string());
449
450 let error = super::super::limiter::RateLimitExceeded {
451 key: key.clone(),
452 limit_type: super::super::limiter::LimitType::TokenBucket,
453 current: 0,
454 limit: 100,
455 retry_after: Duration::from_secs(1),
456 message: "test".to_string(),
457 };
458
459 metrics.record_decision(
460 &key,
461 &RateLimitResult::Denied(error),
462 Duration::from_micros(10),
463 );
464
465 let stats = metrics.get_stats();
466 assert_eq!(stats.total_requests, 1);
467 assert_eq!(stats.denied, 1);
468 }
469
470 #[test]
471 fn test_record_queued_throttled_warned() {
472 let metrics = RateLimitMetrics::new();
473 let key = LimiterKey::User("test".to_string());
474
475 metrics.record_decision(
476 &key,
477 &RateLimitResult::Queued(Duration::from_secs(1)),
478 Duration::from_micros(10),
479 );
480 metrics.record_decision(
481 &key,
482 &RateLimitResult::Throttled(Duration::from_secs(1)),
483 Duration::from_micros(10),
484 );
485 metrics.record_decision(
486 &key,
487 &RateLimitResult::Warned("test".to_string()),
488 Duration::from_micros(10),
489 );
490
491 let stats = metrics.get_stats();
492 assert_eq!(stats.total_requests, 3);
493 assert_eq!(stats.queued, 1);
494 assert_eq!(stats.throttled, 1);
495 assert_eq!(stats.warned, 1);
496 }
497
498 #[test]
499 fn test_per_key_stats() {
500 let metrics = RateLimitMetrics::new();
501 let key1 = LimiterKey::User("user1".to_string());
502 let key2 = LimiterKey::User("user2".to_string());
503
504 metrics.record_decision(&key1, &RateLimitResult::Allowed, Duration::from_micros(10));
505 metrics.record_decision(&key1, &RateLimitResult::Allowed, Duration::from_micros(10));
506 metrics.record_decision(&key2, &RateLimitResult::Allowed, Duration::from_micros(10));
507
508 let stats = metrics.get_stats();
509 assert_eq!(stats.key_stats.len(), 2);
510
511 let user1_stats = stats.key_stats.get("user:user1").unwrap();
512 assert_eq!(user1_stats.total, 2);
513 }
514
515 #[test]
516 fn test_denial_rate() {
517 let metrics = RateLimitMetrics::new();
518 let key = LimiterKey::User("test".to_string());
519
520 for _ in 0..3 {
522 metrics.record_decision(&key, &RateLimitResult::Allowed, Duration::from_micros(10));
523 }
524
525 let error = super::super::limiter::RateLimitExceeded {
526 key: key.clone(),
527 limit_type: super::super::limiter::LimitType::TokenBucket,
528 current: 0,
529 limit: 100,
530 retry_after: Duration::from_secs(1),
531 message: "test".to_string(),
532 };
533
534 for _ in 0..2 {
535 metrics.record_decision(
536 &key,
537 &RateLimitResult::Denied(error.clone()),
538 Duration::from_micros(10),
539 );
540 }
541
542 let rate = metrics.denial_rate();
543 assert!((rate - 0.4).abs() < 0.01);
544 }
545
546 #[test]
547 fn test_timing_stats() {
548 let metrics = RateLimitMetrics::new();
549 let key = LimiterKey::User("test".to_string());
550
551 for i in 1..=100 {
552 metrics.record_decision(
553 &key,
554 &RateLimitResult::Allowed,
555 Duration::from_micros(i * 10),
556 );
557 }
558
559 let stats = metrics.get_stats();
560 assert!(stats.avg_decision_time_us > 0);
561 assert!(stats.p50_decision_time_us > 0);
562 assert!(stats.p99_decision_time_us >= stats.p50_decision_time_us);
563 }
564
565 #[test]
566 fn test_reset() {
567 let metrics = RateLimitMetrics::new();
568 let key = LimiterKey::User("test".to_string());
569
570 metrics.record_decision(&key, &RateLimitResult::Allowed, Duration::from_micros(10));
571
572 assert!(metrics.total_requests() > 0);
573
574 metrics.reset();
575
576 assert_eq!(metrics.total_requests(), 0);
577 assert_eq!(metrics.denied(), 0);
578 }
579
580 #[test]
581 fn test_reset_key() {
582 let metrics = RateLimitMetrics::new();
583 let key1 = LimiterKey::User("user1".to_string());
584 let key2 = LimiterKey::User("user2".to_string());
585
586 metrics.record_decision(&key1, &RateLimitResult::Allowed, Duration::from_micros(10));
587 metrics.record_decision(&key2, &RateLimitResult::Allowed, Duration::from_micros(10));
588
589 assert_eq!(metrics.get_stats().key_stats.len(), 2);
590
591 metrics.reset_key(&key1);
592
593 let stats = metrics.get_stats();
594 assert_eq!(stats.key_stats.len(), 1);
595 assert!(!stats.key_stats.contains_key("user:user1"));
596 assert!(stats.key_stats.contains_key("user:user2"));
597 }
598
599 #[test]
600 fn test_stats_methods() {
601 let stats = RateLimitStats {
602 total_requests: 100,
603 allowed: 80,
604 queued: 5,
605 throttled: 5,
606 warned: 5,
607 denied: 5,
608 avg_decision_time_us: 50,
609 p50_decision_time_us: 45,
610 p99_decision_time_us: 100,
611 key_stats: HashMap::new(),
612 uptime_secs: 10,
613 };
614
615 assert!((stats.denial_rate() - 0.05).abs() < 0.01);
616 assert!((stats.allow_rate() - 0.80).abs() < 0.01);
617 assert!((stats.requests_per_second() - 10.0).abs() < 0.1);
618 }
619
620 #[test]
621 fn test_top_keys() {
622 let mut key_stats = HashMap::new();
623
624 key_stats.insert(
625 "user:high".to_string(),
626 KeyStatsSnapshot {
627 total: 100,
628 allowed: 50,
629 denied: 50,
630 last_request_age: None,
631 },
632 );
633
634 key_stats.insert(
635 "user:low".to_string(),
636 KeyStatsSnapshot {
637 total: 100,
638 allowed: 90,
639 denied: 10,
640 last_request_age: None,
641 },
642 );
643
644 key_stats.insert(
645 "user:most".to_string(),
646 KeyStatsSnapshot {
647 total: 1000,
648 allowed: 900,
649 denied: 100,
650 last_request_age: None,
651 },
652 );
653
654 let stats = RateLimitStats {
655 total_requests: 1200,
656 allowed: 1040,
657 queued: 0,
658 throttled: 0,
659 warned: 0,
660 denied: 160,
661 avg_decision_time_us: 50,
662 p50_decision_time_us: 45,
663 p99_decision_time_us: 100,
664 key_stats,
665 uptime_secs: 60,
666 };
667
668 let top_denied = stats.top_denied_keys(1);
670 assert_eq!(top_denied[0].0, "user:high");
671
672 let top_requests = stats.top_request_keys(1);
674 assert_eq!(top_requests[0].0, "user:most");
675 }
676
677 #[test]
678 fn test_key_stats_snapshot_rates() {
679 let snapshot = KeyStatsSnapshot {
680 total: 100,
681 allowed: 80,
682 denied: 20,
683 last_request_age: Some(Duration::from_secs(5)),
684 };
685
686 assert!((snapshot.denial_rate() - 0.2).abs() < 0.01);
687 assert!((snapshot.allow_rate() - 0.8).abs() < 0.01);
688 }
689}