1use std::collections::HashMap;
6use std::sync::atomic::{AtomicU64, Ordering};
7use std::time::{Duration, Instant};
8
9use dashmap::DashMap;
10use parking_lot::RwLock;
11
12use super::limiter::{LimiterKey, RateLimitResult};
13
14pub struct RateLimitMetrics {
16 total_requests: AtomicU64,
18
19 allowed: AtomicU64,
21
22 queued: AtomicU64,
24
25 throttled: AtomicU64,
27
28 warned: AtomicU64,
30
31 denied: AtomicU64,
33
34 key_stats: DashMap<String, KeyStats>,
36
37 decision_times_us: RwLock<Vec<u64>>,
39
40 max_timing_samples: usize,
42
43 started_at: Instant,
45}
46
47impl RateLimitMetrics {
48 pub fn new() -> Self {
50 Self {
51 total_requests: AtomicU64::new(0),
52 allowed: AtomicU64::new(0),
53 queued: AtomicU64::new(0),
54 throttled: AtomicU64::new(0),
55 warned: AtomicU64::new(0),
56 denied: AtomicU64::new(0),
57 key_stats: DashMap::new(),
58 decision_times_us: RwLock::new(Vec::with_capacity(1000)),
59 max_timing_samples: 1000,
60 started_at: Instant::now(),
61 }
62 }
63
64 pub fn record_decision(&self, key: &LimiterKey, result: &RateLimitResult, elapsed: Duration) {
66 self.total_requests.fetch_add(1, Ordering::Relaxed);
67
68 match result {
69 RateLimitResult::Allowed => {
70 self.allowed.fetch_add(1, Ordering::Relaxed);
71 }
72 RateLimitResult::Queued(_) => {
73 self.queued.fetch_add(1, Ordering::Relaxed);
74 }
75 RateLimitResult::Throttled(_) => {
76 self.throttled.fetch_add(1, Ordering::Relaxed);
77 }
78 RateLimitResult::Warned(_) => {
79 self.warned.fetch_add(1, Ordering::Relaxed);
80 }
81 RateLimitResult::Denied(_) => {
82 self.denied.fetch_add(1, Ordering::Relaxed);
83 }
84 }
85
86 let key_str = key.to_string();
88 self.key_stats
89 .entry(key_str)
90 .and_modify(|stats| stats.record(result))
91 .or_insert_with(|| {
92 let stats = KeyStats::new();
93 stats.record(result);
94 stats
95 });
96
97 self.record_timing(elapsed);
99 }
100
101 fn record_timing(&self, elapsed: Duration) {
103 let us = elapsed.as_micros() as u64;
104 let mut times = self.decision_times_us.write();
105
106 if times.len() >= self.max_timing_samples {
107 times.drain(0..self.max_timing_samples / 2);
108 }
109 times.push(us);
110 }
111
112 pub fn reset_key(&self, key: &LimiterKey) {
114 let key_str = key.to_string();
115 self.key_stats.remove(&key_str);
116 }
117
118 pub fn get_stats(&self) -> RateLimitStats {
120 let total = self.total_requests.load(Ordering::Relaxed);
121 let allowed = self.allowed.load(Ordering::Relaxed);
122 let queued = self.queued.load(Ordering::Relaxed);
123 let throttled = self.throttled.load(Ordering::Relaxed);
124 let warned = self.warned.load(Ordering::Relaxed);
125 let denied = self.denied.load(Ordering::Relaxed);
126
127 let times = self.decision_times_us.read();
129 let (avg_time_us, p50_time_us, p99_time_us) = if times.is_empty() {
130 (0, 0, 0)
131 } else {
132 let mut sorted = times.clone();
133 sorted.sort_unstable();
134
135 let avg = sorted.iter().sum::<u64>() / sorted.len() as u64;
136 let p50 = sorted[sorted.len() / 2];
137 let p99_idx = ((sorted.len() as f64) * 0.99) as usize;
138 let p99 = sorted.get(p99_idx).copied().unwrap_or(sorted[sorted.len() - 1]);
139
140 (avg, p50, p99)
141 };
142
143 let key_stats: HashMap<_, _> = self
145 .key_stats
146 .iter()
147 .map(|entry| (entry.key().clone(), entry.value().snapshot()))
148 .collect();
149
150 RateLimitStats {
151 total_requests: total,
152 allowed,
153 queued,
154 throttled,
155 warned,
156 denied,
157 avg_decision_time_us: avg_time_us,
158 p50_decision_time_us: p50_time_us,
159 p99_decision_time_us: p99_time_us,
160 key_stats,
161 uptime_secs: self.started_at.elapsed().as_secs(),
162 }
163 }
164
165 pub fn total_requests(&self) -> u64 {
167 self.total_requests.load(Ordering::Relaxed)
168 }
169
170 pub fn allowed(&self) -> u64 {
172 self.allowed.load(Ordering::Relaxed)
173 }
174
175 pub fn denied(&self) -> u64 {
177 self.denied.load(Ordering::Relaxed)
178 }
179
180 pub fn denial_rate(&self) -> f64 {
182 let total = self.total_requests.load(Ordering::Relaxed);
183 let denied = self.denied.load(Ordering::Relaxed);
184
185 if total == 0 {
186 0.0
187 } else {
188 denied as f64 / total as f64
189 }
190 }
191
192 pub fn uptime(&self) -> Duration {
194 self.started_at.elapsed()
195 }
196
197 pub fn reset(&self) {
199 self.total_requests.store(0, Ordering::Relaxed);
200 self.allowed.store(0, Ordering::Relaxed);
201 self.queued.store(0, Ordering::Relaxed);
202 self.throttled.store(0, Ordering::Relaxed);
203 self.warned.store(0, Ordering::Relaxed);
204 self.denied.store(0, Ordering::Relaxed);
205 self.key_stats.clear();
206 self.decision_times_us.write().clear();
207 }
208}
209
210impl Default for RateLimitMetrics {
211 fn default() -> Self {
212 Self::new()
213 }
214}
215
216impl std::fmt::Debug for RateLimitMetrics {
217 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
218 f.debug_struct("RateLimitMetrics")
219 .field("total_requests", &self.total_requests.load(Ordering::Relaxed))
220 .field("denied", &self.denied.load(Ordering::Relaxed))
221 .field("key_count", &self.key_stats.len())
222 .finish()
223 }
224}
225
226pub struct KeyStats {
228 total: AtomicU64,
230
231 allowed: AtomicU64,
233
234 denied: AtomicU64,
236
237 last_request_ns: AtomicU64,
239
240 epoch: Instant,
242}
243
244impl KeyStats {
245 fn new() -> Self {
246 Self {
247 total: AtomicU64::new(0),
248 allowed: AtomicU64::new(0),
249 denied: AtomicU64::new(0),
250 last_request_ns: AtomicU64::new(0),
251 epoch: Instant::now(),
252 }
253 }
254
255 fn record(&self, result: &RateLimitResult) {
256 self.total.fetch_add(1, Ordering::Relaxed);
257
258 match result {
259 RateLimitResult::Allowed | RateLimitResult::Queued(_) |
260 RateLimitResult::Throttled(_) | RateLimitResult::Warned(_) => {
261 self.allowed.fetch_add(1, Ordering::Relaxed);
262 }
263 RateLimitResult::Denied(_) => {
264 self.denied.fetch_add(1, Ordering::Relaxed);
265 }
266 }
267
268 self.last_request_ns.store(
269 self.epoch.elapsed().as_nanos() as u64,
270 Ordering::Relaxed,
271 );
272 }
273
274 fn snapshot(&self) -> KeyStatsSnapshot {
275 let last_ns = self.last_request_ns.load(Ordering::Relaxed);
276 let last_request = if last_ns > 0 {
277 Some(Duration::from_nanos(last_ns))
278 } else {
279 None
280 };
281
282 KeyStatsSnapshot {
283 total: self.total.load(Ordering::Relaxed),
284 allowed: self.allowed.load(Ordering::Relaxed),
285 denied: self.denied.load(Ordering::Relaxed),
286 last_request_age: last_request,
287 }
288 }
289}
290
291#[derive(Debug, Clone)]
293pub struct KeyStatsSnapshot {
294 pub total: u64,
296
297 pub allowed: u64,
299
300 pub denied: u64,
302
303 pub last_request_age: Option<Duration>,
305}
306
307impl KeyStatsSnapshot {
308 pub fn denial_rate(&self) -> f64 {
310 if self.total == 0 {
311 0.0
312 } else {
313 self.denied as f64 / self.total as f64
314 }
315 }
316
317 pub fn allow_rate(&self) -> f64 {
319 if self.total == 0 {
320 0.0
321 } else {
322 self.allowed as f64 / self.total as f64
323 }
324 }
325}
326
327#[derive(Debug, Clone)]
329pub struct RateLimitStats {
330 pub total_requests: u64,
332
333 pub allowed: u64,
335
336 pub queued: u64,
338
339 pub throttled: u64,
341
342 pub warned: u64,
344
345 pub denied: u64,
347
348 pub avg_decision_time_us: u64,
350
351 pub p50_decision_time_us: u64,
353
354 pub p99_decision_time_us: u64,
356
357 pub key_stats: HashMap<String, KeyStatsSnapshot>,
359
360 pub uptime_secs: u64,
362}
363
364impl RateLimitStats {
365 pub fn denial_rate(&self) -> f64 {
367 if self.total_requests == 0 {
368 0.0
369 } else {
370 self.denied as f64 / self.total_requests as f64
371 }
372 }
373
374 pub fn allow_rate(&self) -> f64 {
376 if self.total_requests == 0 {
377 0.0
378 } else {
379 self.allowed as f64 / self.total_requests as f64
380 }
381 }
382
383 pub fn requests_per_second(&self) -> f64 {
385 if self.uptime_secs == 0 {
386 0.0
387 } else {
388 self.total_requests as f64 / self.uptime_secs as f64
389 }
390 }
391
392 pub fn top_denied_keys(&self, n: usize) -> Vec<(&String, &KeyStatsSnapshot)> {
394 let mut entries: Vec<_> = self.key_stats.iter().collect();
395 entries.sort_by(|a, b| {
396 b.1.denial_rate()
397 .partial_cmp(&a.1.denial_rate())
398 .unwrap_or(std::cmp::Ordering::Equal)
399 });
400 entries.truncate(n);
401 entries
402 }
403
404 pub fn top_request_keys(&self, n: usize) -> Vec<(&String, &KeyStatsSnapshot)> {
406 let mut entries: Vec<_> = self.key_stats.iter().collect();
407 entries.sort_by(|a, b| b.1.total.cmp(&a.1.total));
408 entries.truncate(n);
409 entries
410 }
411}
412
413#[cfg(test)]
414mod tests {
415 use super::*;
416
417 #[test]
418 fn test_metrics_creation() {
419 let metrics = RateLimitMetrics::new();
420 let stats = metrics.get_stats();
421
422 assert_eq!(stats.total_requests, 0);
423 assert_eq!(stats.denied, 0);
424 }
425
426 #[test]
427 fn test_record_allowed() {
428 let metrics = RateLimitMetrics::new();
429 let key = LimiterKey::User("test".to_string());
430
431 metrics.record_decision(&key, &RateLimitResult::Allowed, Duration::from_micros(10));
432
433 let stats = metrics.get_stats();
434 assert_eq!(stats.total_requests, 1);
435 assert_eq!(stats.allowed, 1);
436 assert_eq!(stats.denied, 0);
437 }
438
439 #[test]
440 fn test_record_denied() {
441 let metrics = RateLimitMetrics::new();
442 let key = LimiterKey::User("test".to_string());
443
444 let error = super::super::limiter::RateLimitExceeded {
445 key: key.clone(),
446 limit_type: super::super::limiter::LimitType::TokenBucket,
447 current: 0,
448 limit: 100,
449 retry_after: Duration::from_secs(1),
450 message: "test".to_string(),
451 };
452
453 metrics.record_decision(&key, &RateLimitResult::Denied(error), Duration::from_micros(10));
454
455 let stats = metrics.get_stats();
456 assert_eq!(stats.total_requests, 1);
457 assert_eq!(stats.denied, 1);
458 }
459
460 #[test]
461 fn test_record_queued_throttled_warned() {
462 let metrics = RateLimitMetrics::new();
463 let key = LimiterKey::User("test".to_string());
464
465 metrics.record_decision(&key, &RateLimitResult::Queued(Duration::from_secs(1)), Duration::from_micros(10));
466 metrics.record_decision(&key, &RateLimitResult::Throttled(Duration::from_secs(1)), Duration::from_micros(10));
467 metrics.record_decision(&key, &RateLimitResult::Warned("test".to_string()), Duration::from_micros(10));
468
469 let stats = metrics.get_stats();
470 assert_eq!(stats.total_requests, 3);
471 assert_eq!(stats.queued, 1);
472 assert_eq!(stats.throttled, 1);
473 assert_eq!(stats.warned, 1);
474 }
475
476 #[test]
477 fn test_per_key_stats() {
478 let metrics = RateLimitMetrics::new();
479 let key1 = LimiterKey::User("user1".to_string());
480 let key2 = LimiterKey::User("user2".to_string());
481
482 metrics.record_decision(&key1, &RateLimitResult::Allowed, Duration::from_micros(10));
483 metrics.record_decision(&key1, &RateLimitResult::Allowed, Duration::from_micros(10));
484 metrics.record_decision(&key2, &RateLimitResult::Allowed, Duration::from_micros(10));
485
486 let stats = metrics.get_stats();
487 assert_eq!(stats.key_stats.len(), 2);
488
489 let user1_stats = stats.key_stats.get("user:user1").unwrap();
490 assert_eq!(user1_stats.total, 2);
491 }
492
493 #[test]
494 fn test_denial_rate() {
495 let metrics = RateLimitMetrics::new();
496 let key = LimiterKey::User("test".to_string());
497
498 for _ in 0..3 {
500 metrics.record_decision(&key, &RateLimitResult::Allowed, Duration::from_micros(10));
501 }
502
503 let error = super::super::limiter::RateLimitExceeded {
504 key: key.clone(),
505 limit_type: super::super::limiter::LimitType::TokenBucket,
506 current: 0,
507 limit: 100,
508 retry_after: Duration::from_secs(1),
509 message: "test".to_string(),
510 };
511
512 for _ in 0..2 {
513 metrics.record_decision(&key, &RateLimitResult::Denied(error.clone()), Duration::from_micros(10));
514 }
515
516 let rate = metrics.denial_rate();
517 assert!((rate - 0.4).abs() < 0.01);
518 }
519
520 #[test]
521 fn test_timing_stats() {
522 let metrics = RateLimitMetrics::new();
523 let key = LimiterKey::User("test".to_string());
524
525 for i in 1..=100 {
526 metrics.record_decision(&key, &RateLimitResult::Allowed, Duration::from_micros(i * 10));
527 }
528
529 let stats = metrics.get_stats();
530 assert!(stats.avg_decision_time_us > 0);
531 assert!(stats.p50_decision_time_us > 0);
532 assert!(stats.p99_decision_time_us >= stats.p50_decision_time_us);
533 }
534
535 #[test]
536 fn test_reset() {
537 let metrics = RateLimitMetrics::new();
538 let key = LimiterKey::User("test".to_string());
539
540 metrics.record_decision(&key, &RateLimitResult::Allowed, Duration::from_micros(10));
541
542 assert!(metrics.total_requests() > 0);
543
544 metrics.reset();
545
546 assert_eq!(metrics.total_requests(), 0);
547 assert_eq!(metrics.denied(), 0);
548 }
549
550 #[test]
551 fn test_reset_key() {
552 let metrics = RateLimitMetrics::new();
553 let key1 = LimiterKey::User("user1".to_string());
554 let key2 = LimiterKey::User("user2".to_string());
555
556 metrics.record_decision(&key1, &RateLimitResult::Allowed, Duration::from_micros(10));
557 metrics.record_decision(&key2, &RateLimitResult::Allowed, Duration::from_micros(10));
558
559 assert_eq!(metrics.get_stats().key_stats.len(), 2);
560
561 metrics.reset_key(&key1);
562
563 let stats = metrics.get_stats();
564 assert_eq!(stats.key_stats.len(), 1);
565 assert!(!stats.key_stats.contains_key("user:user1"));
566 assert!(stats.key_stats.contains_key("user:user2"));
567 }
568
569 #[test]
570 fn test_stats_methods() {
571 let stats = RateLimitStats {
572 total_requests: 100,
573 allowed: 80,
574 queued: 5,
575 throttled: 5,
576 warned: 5,
577 denied: 5,
578 avg_decision_time_us: 50,
579 p50_decision_time_us: 45,
580 p99_decision_time_us: 100,
581 key_stats: HashMap::new(),
582 uptime_secs: 10,
583 };
584
585 assert!((stats.denial_rate() - 0.05).abs() < 0.01);
586 assert!((stats.allow_rate() - 0.80).abs() < 0.01);
587 assert!((stats.requests_per_second() - 10.0).abs() < 0.1);
588 }
589
590 #[test]
591 fn test_top_keys() {
592 let mut key_stats = HashMap::new();
593
594 key_stats.insert("user:high".to_string(), KeyStatsSnapshot {
595 total: 100,
596 allowed: 50,
597 denied: 50,
598 last_request_age: None,
599 });
600
601 key_stats.insert("user:low".to_string(), KeyStatsSnapshot {
602 total: 100,
603 allowed: 90,
604 denied: 10,
605 last_request_age: None,
606 });
607
608 key_stats.insert("user:most".to_string(), KeyStatsSnapshot {
609 total: 1000,
610 allowed: 900,
611 denied: 100,
612 last_request_age: None,
613 });
614
615 let stats = RateLimitStats {
616 total_requests: 1200,
617 allowed: 1040,
618 queued: 0,
619 throttled: 0,
620 warned: 0,
621 denied: 160,
622 avg_decision_time_us: 50,
623 p50_decision_time_us: 45,
624 p99_decision_time_us: 100,
625 key_stats,
626 uptime_secs: 60,
627 };
628
629 let top_denied = stats.top_denied_keys(1);
631 assert_eq!(top_denied[0].0, "user:high");
632
633 let top_requests = stats.top_request_keys(1);
635 assert_eq!(top_requests[0].0, "user:most");
636 }
637
638 #[test]
639 fn test_key_stats_snapshot_rates() {
640 let snapshot = KeyStatsSnapshot {
641 total: 100,
642 allowed: 80,
643 denied: 20,
644 last_request_age: Some(Duration::from_secs(5)),
645 };
646
647 assert!((snapshot.denial_rate() - 0.2).abs() < 0.01);
648 assert!((snapshot.allow_rate() - 0.8).abs() < 0.01);
649 }
650}