Skip to main content

heliosdb_proxy/rate_limit/
metrics.rs

1//! Rate Limit Metrics
2//!
3//! Metrics collection and statistics for rate limiting decisions.
4
5use std::collections::HashMap;
6use std::sync::atomic::{AtomicU64, Ordering};
7use std::time::{Duration, Instant};
8
9use dashmap::DashMap;
10use parking_lot::RwLock;
11
12use super::limiter::{LimiterKey, RateLimitResult};
13
14/// Rate limit metrics collector
15pub struct RateLimitMetrics {
16    /// Total requests checked
17    total_requests: AtomicU64,
18
19    /// Requests allowed
20    allowed: AtomicU64,
21
22    /// Requests queued
23    queued: AtomicU64,
24
25    /// Requests throttled
26    throttled: AtomicU64,
27
28    /// Requests warned
29    warned: AtomicU64,
30
31    /// Requests denied
32    denied: AtomicU64,
33
34    /// Per-key statistics
35    key_stats: DashMap<String, KeyStats>,
36
37    /// Decision timing (microseconds)
38    decision_times_us: RwLock<Vec<u64>>,
39
40    /// Maximum timing samples
41    max_timing_samples: usize,
42
43    /// Start time
44    started_at: Instant,
45}
46
47impl RateLimitMetrics {
48    /// Create new metrics collector
49    pub fn new() -> Self {
50        Self {
51            total_requests: AtomicU64::new(0),
52            allowed: AtomicU64::new(0),
53            queued: AtomicU64::new(0),
54            throttled: AtomicU64::new(0),
55            warned: AtomicU64::new(0),
56            denied: AtomicU64::new(0),
57            key_stats: DashMap::new(),
58            decision_times_us: RwLock::new(Vec::with_capacity(1000)),
59            max_timing_samples: 1000,
60            started_at: Instant::now(),
61        }
62    }
63
64    /// Record a rate limit decision
65    pub fn record_decision(&self, key: &LimiterKey, result: &RateLimitResult, elapsed: Duration) {
66        self.total_requests.fetch_add(1, Ordering::Relaxed);
67
68        match result {
69            RateLimitResult::Allowed => {
70                self.allowed.fetch_add(1, Ordering::Relaxed);
71            }
72            RateLimitResult::Queued(_) => {
73                self.queued.fetch_add(1, Ordering::Relaxed);
74            }
75            RateLimitResult::Throttled(_) => {
76                self.throttled.fetch_add(1, Ordering::Relaxed);
77            }
78            RateLimitResult::Warned(_) => {
79                self.warned.fetch_add(1, Ordering::Relaxed);
80            }
81            RateLimitResult::Denied(_) => {
82                self.denied.fetch_add(1, Ordering::Relaxed);
83            }
84        }
85
86        // Update per-key stats
87        let key_str = key.to_string();
88        self.key_stats
89            .entry(key_str)
90            .and_modify(|stats| stats.record(result))
91            .or_insert_with(|| {
92                let stats = KeyStats::new();
93                stats.record(result);
94                stats
95            });
96
97        // Record timing
98        self.record_timing(elapsed);
99    }
100
101    /// Record timing sample
102    fn record_timing(&self, elapsed: Duration) {
103        let us = elapsed.as_micros() as u64;
104        let mut times = self.decision_times_us.write();
105
106        if times.len() >= self.max_timing_samples {
107            times.drain(0..self.max_timing_samples / 2);
108        }
109        times.push(us);
110    }
111
112    /// Reset stats for a key
113    pub fn reset_key(&self, key: &LimiterKey) {
114        let key_str = key.to_string();
115        self.key_stats.remove(&key_str);
116    }
117
118    /// Get current statistics snapshot
119    pub fn get_stats(&self) -> RateLimitStats {
120        let total = self.total_requests.load(Ordering::Relaxed);
121        let allowed = self.allowed.load(Ordering::Relaxed);
122        let queued = self.queued.load(Ordering::Relaxed);
123        let throttled = self.throttled.load(Ordering::Relaxed);
124        let warned = self.warned.load(Ordering::Relaxed);
125        let denied = self.denied.load(Ordering::Relaxed);
126
127        // Calculate timing stats
128        let times = self.decision_times_us.read();
129        let (avg_time_us, p50_time_us, p99_time_us) = if times.is_empty() {
130            (0, 0, 0)
131        } else {
132            let mut sorted = times.clone();
133            sorted.sort_unstable();
134
135            let avg = sorted.iter().sum::<u64>() / sorted.len() as u64;
136            let p50 = sorted[sorted.len() / 2];
137            let p99_idx = ((sorted.len() as f64) * 0.99) as usize;
138            let p99 = sorted.get(p99_idx).copied().unwrap_or(sorted[sorted.len() - 1]);
139
140            (avg, p50, p99)
141        };
142
143        // Collect per-key stats
144        let key_stats: HashMap<_, _> = self
145            .key_stats
146            .iter()
147            .map(|entry| (entry.key().clone(), entry.value().snapshot()))
148            .collect();
149
150        RateLimitStats {
151            total_requests: total,
152            allowed,
153            queued,
154            throttled,
155            warned,
156            denied,
157            avg_decision_time_us: avg_time_us,
158            p50_decision_time_us: p50_time_us,
159            p99_decision_time_us: p99_time_us,
160            key_stats,
161            uptime_secs: self.started_at.elapsed().as_secs(),
162        }
163    }
164
165    /// Get total requests
166    pub fn total_requests(&self) -> u64 {
167        self.total_requests.load(Ordering::Relaxed)
168    }
169
170    /// Get allowed count
171    pub fn allowed(&self) -> u64 {
172        self.allowed.load(Ordering::Relaxed)
173    }
174
175    /// Get denied count
176    pub fn denied(&self) -> u64 {
177        self.denied.load(Ordering::Relaxed)
178    }
179
180    /// Get denial rate (0.0 - 1.0)
181    pub fn denial_rate(&self) -> f64 {
182        let total = self.total_requests.load(Ordering::Relaxed);
183        let denied = self.denied.load(Ordering::Relaxed);
184
185        if total == 0 {
186            0.0
187        } else {
188            denied as f64 / total as f64
189        }
190    }
191
192    /// Get uptime
193    pub fn uptime(&self) -> Duration {
194        self.started_at.elapsed()
195    }
196
197    /// Reset all metrics
198    pub fn reset(&self) {
199        self.total_requests.store(0, Ordering::Relaxed);
200        self.allowed.store(0, Ordering::Relaxed);
201        self.queued.store(0, Ordering::Relaxed);
202        self.throttled.store(0, Ordering::Relaxed);
203        self.warned.store(0, Ordering::Relaxed);
204        self.denied.store(0, Ordering::Relaxed);
205        self.key_stats.clear();
206        self.decision_times_us.write().clear();
207    }
208}
209
210impl Default for RateLimitMetrics {
211    fn default() -> Self {
212        Self::new()
213    }
214}
215
216impl std::fmt::Debug for RateLimitMetrics {
217    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
218        f.debug_struct("RateLimitMetrics")
219            .field("total_requests", &self.total_requests.load(Ordering::Relaxed))
220            .field("denied", &self.denied.load(Ordering::Relaxed))
221            .field("key_count", &self.key_stats.len())
222            .finish()
223    }
224}
225
226/// Per-key statistics
227pub struct KeyStats {
228    /// Total requests for this key
229    total: AtomicU64,
230
231    /// Allowed requests
232    allowed: AtomicU64,
233
234    /// Denied requests
235    denied: AtomicU64,
236
237    /// Last request time (nanos since epoch)
238    last_request_ns: AtomicU64,
239
240    /// Epoch for time calculations
241    epoch: Instant,
242}
243
244impl KeyStats {
245    fn new() -> Self {
246        Self {
247            total: AtomicU64::new(0),
248            allowed: AtomicU64::new(0),
249            denied: AtomicU64::new(0),
250            last_request_ns: AtomicU64::new(0),
251            epoch: Instant::now(),
252        }
253    }
254
255    fn record(&self, result: &RateLimitResult) {
256        self.total.fetch_add(1, Ordering::Relaxed);
257
258        match result {
259            RateLimitResult::Allowed | RateLimitResult::Queued(_) |
260            RateLimitResult::Throttled(_) | RateLimitResult::Warned(_) => {
261                self.allowed.fetch_add(1, Ordering::Relaxed);
262            }
263            RateLimitResult::Denied(_) => {
264                self.denied.fetch_add(1, Ordering::Relaxed);
265            }
266        }
267
268        self.last_request_ns.store(
269            self.epoch.elapsed().as_nanos() as u64,
270            Ordering::Relaxed,
271        );
272    }
273
274    fn snapshot(&self) -> KeyStatsSnapshot {
275        let last_ns = self.last_request_ns.load(Ordering::Relaxed);
276        let last_request = if last_ns > 0 {
277            Some(Duration::from_nanos(last_ns))
278        } else {
279            None
280        };
281
282        KeyStatsSnapshot {
283            total: self.total.load(Ordering::Relaxed),
284            allowed: self.allowed.load(Ordering::Relaxed),
285            denied: self.denied.load(Ordering::Relaxed),
286            last_request_age: last_request,
287        }
288    }
289}
290
291/// Snapshot of per-key statistics
292#[derive(Debug, Clone)]
293pub struct KeyStatsSnapshot {
294    /// Total requests
295    pub total: u64,
296
297    /// Allowed requests
298    pub allowed: u64,
299
300    /// Denied requests
301    pub denied: u64,
302
303    /// Age of last request (time since)
304    pub last_request_age: Option<Duration>,
305}
306
307impl KeyStatsSnapshot {
308    /// Get denial rate for this key
309    pub fn denial_rate(&self) -> f64 {
310        if self.total == 0 {
311            0.0
312        } else {
313            self.denied as f64 / self.total as f64
314        }
315    }
316
317    /// Get allow rate for this key
318    pub fn allow_rate(&self) -> f64 {
319        if self.total == 0 {
320            0.0
321        } else {
322            self.allowed as f64 / self.total as f64
323        }
324    }
325}
326
327/// Overall rate limit statistics snapshot
328#[derive(Debug, Clone)]
329pub struct RateLimitStats {
330    /// Total requests checked
331    pub total_requests: u64,
332
333    /// Requests allowed
334    pub allowed: u64,
335
336    /// Requests queued
337    pub queued: u64,
338
339    /// Requests throttled
340    pub throttled: u64,
341
342    /// Requests warned
343    pub warned: u64,
344
345    /// Requests denied
346    pub denied: u64,
347
348    /// Average decision time (microseconds)
349    pub avg_decision_time_us: u64,
350
351    /// P50 decision time (microseconds)
352    pub p50_decision_time_us: u64,
353
354    /// P99 decision time (microseconds)
355    pub p99_decision_time_us: u64,
356
357    /// Per-key statistics
358    pub key_stats: HashMap<String, KeyStatsSnapshot>,
359
360    /// Uptime in seconds
361    pub uptime_secs: u64,
362}
363
364impl RateLimitStats {
365    /// Get overall denial rate
366    pub fn denial_rate(&self) -> f64 {
367        if self.total_requests == 0 {
368            0.0
369        } else {
370            self.denied as f64 / self.total_requests as f64
371        }
372    }
373
374    /// Get overall allow rate
375    pub fn allow_rate(&self) -> f64 {
376        if self.total_requests == 0 {
377            0.0
378        } else {
379            self.allowed as f64 / self.total_requests as f64
380        }
381    }
382
383    /// Get requests per second
384    pub fn requests_per_second(&self) -> f64 {
385        if self.uptime_secs == 0 {
386            0.0
387        } else {
388            self.total_requests as f64 / self.uptime_secs as f64
389        }
390    }
391
392    /// Get keys with highest denial rate
393    pub fn top_denied_keys(&self, n: usize) -> Vec<(&String, &KeyStatsSnapshot)> {
394        let mut entries: Vec<_> = self.key_stats.iter().collect();
395        entries.sort_by(|a, b| {
396            b.1.denial_rate()
397                .partial_cmp(&a.1.denial_rate())
398                .unwrap_or(std::cmp::Ordering::Equal)
399        });
400        entries.truncate(n);
401        entries
402    }
403
404    /// Get keys with most requests
405    pub fn top_request_keys(&self, n: usize) -> Vec<(&String, &KeyStatsSnapshot)> {
406        let mut entries: Vec<_> = self.key_stats.iter().collect();
407        entries.sort_by(|a, b| b.1.total.cmp(&a.1.total));
408        entries.truncate(n);
409        entries
410    }
411}
412
413#[cfg(test)]
414mod tests {
415    use super::*;
416
417    #[test]
418    fn test_metrics_creation() {
419        let metrics = RateLimitMetrics::new();
420        let stats = metrics.get_stats();
421
422        assert_eq!(stats.total_requests, 0);
423        assert_eq!(stats.denied, 0);
424    }
425
426    #[test]
427    fn test_record_allowed() {
428        let metrics = RateLimitMetrics::new();
429        let key = LimiterKey::User("test".to_string());
430
431        metrics.record_decision(&key, &RateLimitResult::Allowed, Duration::from_micros(10));
432
433        let stats = metrics.get_stats();
434        assert_eq!(stats.total_requests, 1);
435        assert_eq!(stats.allowed, 1);
436        assert_eq!(stats.denied, 0);
437    }
438
439    #[test]
440    fn test_record_denied() {
441        let metrics = RateLimitMetrics::new();
442        let key = LimiterKey::User("test".to_string());
443
444        let error = super::super::limiter::RateLimitExceeded {
445            key: key.clone(),
446            limit_type: super::super::limiter::LimitType::TokenBucket,
447            current: 0,
448            limit: 100,
449            retry_after: Duration::from_secs(1),
450            message: "test".to_string(),
451        };
452
453        metrics.record_decision(&key, &RateLimitResult::Denied(error), Duration::from_micros(10));
454
455        let stats = metrics.get_stats();
456        assert_eq!(stats.total_requests, 1);
457        assert_eq!(stats.denied, 1);
458    }
459
460    #[test]
461    fn test_record_queued_throttled_warned() {
462        let metrics = RateLimitMetrics::new();
463        let key = LimiterKey::User("test".to_string());
464
465        metrics.record_decision(&key, &RateLimitResult::Queued(Duration::from_secs(1)), Duration::from_micros(10));
466        metrics.record_decision(&key, &RateLimitResult::Throttled(Duration::from_secs(1)), Duration::from_micros(10));
467        metrics.record_decision(&key, &RateLimitResult::Warned("test".to_string()), Duration::from_micros(10));
468
469        let stats = metrics.get_stats();
470        assert_eq!(stats.total_requests, 3);
471        assert_eq!(stats.queued, 1);
472        assert_eq!(stats.throttled, 1);
473        assert_eq!(stats.warned, 1);
474    }
475
476    #[test]
477    fn test_per_key_stats() {
478        let metrics = RateLimitMetrics::new();
479        let key1 = LimiterKey::User("user1".to_string());
480        let key2 = LimiterKey::User("user2".to_string());
481
482        metrics.record_decision(&key1, &RateLimitResult::Allowed, Duration::from_micros(10));
483        metrics.record_decision(&key1, &RateLimitResult::Allowed, Duration::from_micros(10));
484        metrics.record_decision(&key2, &RateLimitResult::Allowed, Duration::from_micros(10));
485
486        let stats = metrics.get_stats();
487        assert_eq!(stats.key_stats.len(), 2);
488
489        let user1_stats = stats.key_stats.get("user:user1").unwrap();
490        assert_eq!(user1_stats.total, 2);
491    }
492
493    #[test]
494    fn test_denial_rate() {
495        let metrics = RateLimitMetrics::new();
496        let key = LimiterKey::User("test".to_string());
497
498        // 3 allowed, 2 denied = 40% denial rate
499        for _ in 0..3 {
500            metrics.record_decision(&key, &RateLimitResult::Allowed, Duration::from_micros(10));
501        }
502
503        let error = super::super::limiter::RateLimitExceeded {
504            key: key.clone(),
505            limit_type: super::super::limiter::LimitType::TokenBucket,
506            current: 0,
507            limit: 100,
508            retry_after: Duration::from_secs(1),
509            message: "test".to_string(),
510        };
511
512        for _ in 0..2 {
513            metrics.record_decision(&key, &RateLimitResult::Denied(error.clone()), Duration::from_micros(10));
514        }
515
516        let rate = metrics.denial_rate();
517        assert!((rate - 0.4).abs() < 0.01);
518    }
519
520    #[test]
521    fn test_timing_stats() {
522        let metrics = RateLimitMetrics::new();
523        let key = LimiterKey::User("test".to_string());
524
525        for i in 1..=100 {
526            metrics.record_decision(&key, &RateLimitResult::Allowed, Duration::from_micros(i * 10));
527        }
528
529        let stats = metrics.get_stats();
530        assert!(stats.avg_decision_time_us > 0);
531        assert!(stats.p50_decision_time_us > 0);
532        assert!(stats.p99_decision_time_us >= stats.p50_decision_time_us);
533    }
534
535    #[test]
536    fn test_reset() {
537        let metrics = RateLimitMetrics::new();
538        let key = LimiterKey::User("test".to_string());
539
540        metrics.record_decision(&key, &RateLimitResult::Allowed, Duration::from_micros(10));
541
542        assert!(metrics.total_requests() > 0);
543
544        metrics.reset();
545
546        assert_eq!(metrics.total_requests(), 0);
547        assert_eq!(metrics.denied(), 0);
548    }
549
550    #[test]
551    fn test_reset_key() {
552        let metrics = RateLimitMetrics::new();
553        let key1 = LimiterKey::User("user1".to_string());
554        let key2 = LimiterKey::User("user2".to_string());
555
556        metrics.record_decision(&key1, &RateLimitResult::Allowed, Duration::from_micros(10));
557        metrics.record_decision(&key2, &RateLimitResult::Allowed, Duration::from_micros(10));
558
559        assert_eq!(metrics.get_stats().key_stats.len(), 2);
560
561        metrics.reset_key(&key1);
562
563        let stats = metrics.get_stats();
564        assert_eq!(stats.key_stats.len(), 1);
565        assert!(!stats.key_stats.contains_key("user:user1"));
566        assert!(stats.key_stats.contains_key("user:user2"));
567    }
568
569    #[test]
570    fn test_stats_methods() {
571        let stats = RateLimitStats {
572            total_requests: 100,
573            allowed: 80,
574            queued: 5,
575            throttled: 5,
576            warned: 5,
577            denied: 5,
578            avg_decision_time_us: 50,
579            p50_decision_time_us: 45,
580            p99_decision_time_us: 100,
581            key_stats: HashMap::new(),
582            uptime_secs: 10,
583        };
584
585        assert!((stats.denial_rate() - 0.05).abs() < 0.01);
586        assert!((stats.allow_rate() - 0.80).abs() < 0.01);
587        assert!((stats.requests_per_second() - 10.0).abs() < 0.1);
588    }
589
590    #[test]
591    fn test_top_keys() {
592        let mut key_stats = HashMap::new();
593
594        key_stats.insert("user:high".to_string(), KeyStatsSnapshot {
595            total: 100,
596            allowed: 50,
597            denied: 50,
598            last_request_age: None,
599        });
600
601        key_stats.insert("user:low".to_string(), KeyStatsSnapshot {
602            total: 100,
603            allowed: 90,
604            denied: 10,
605            last_request_age: None,
606        });
607
608        key_stats.insert("user:most".to_string(), KeyStatsSnapshot {
609            total: 1000,
610            allowed: 900,
611            denied: 100,
612            last_request_age: None,
613        });
614
615        let stats = RateLimitStats {
616            total_requests: 1200,
617            allowed: 1040,
618            queued: 0,
619            throttled: 0,
620            warned: 0,
621            denied: 160,
622            avg_decision_time_us: 50,
623            p50_decision_time_us: 45,
624            p99_decision_time_us: 100,
625            key_stats,
626            uptime_secs: 60,
627        };
628
629        // Top denied should be "high" (50% denial rate)
630        let top_denied = stats.top_denied_keys(1);
631        assert_eq!(top_denied[0].0, "user:high");
632
633        // Top requests should be "most" (1000 requests)
634        let top_requests = stats.top_request_keys(1);
635        assert_eq!(top_requests[0].0, "user:most");
636    }
637
638    #[test]
639    fn test_key_stats_snapshot_rates() {
640        let snapshot = KeyStatsSnapshot {
641            total: 100,
642            allowed: 80,
643            denied: 20,
644            last_request_age: Some(Duration::from_secs(5)),
645        };
646
647        assert!((snapshot.denial_rate() - 0.2).abs() < 0.01);
648        assert!((snapshot.allow_rate() - 0.8).abs() < 0.01);
649    }
650}