Skip to main content

heliosdb_proxy/rate_limit/
metrics.rs

1//! Rate Limit Metrics
2//!
3//! Metrics collection and statistics for rate limiting decisions.
4
5use std::collections::HashMap;
6use std::sync::atomic::{AtomicU64, Ordering};
7use std::time::{Duration, Instant};
8
9use dashmap::DashMap;
10use parking_lot::RwLock;
11
12use super::limiter::{LimiterKey, RateLimitResult};
13
14/// Rate limit metrics collector
15pub struct RateLimitMetrics {
16    /// Total requests checked
17    total_requests: AtomicU64,
18
19    /// Requests allowed
20    allowed: AtomicU64,
21
22    /// Requests queued
23    queued: AtomicU64,
24
25    /// Requests throttled
26    throttled: AtomicU64,
27
28    /// Requests warned
29    warned: AtomicU64,
30
31    /// Requests denied
32    denied: AtomicU64,
33
34    /// Per-key statistics
35    key_stats: DashMap<String, KeyStats>,
36
37    /// Decision timing (microseconds)
38    decision_times_us: RwLock<Vec<u64>>,
39
40    /// Maximum timing samples
41    max_timing_samples: usize,
42
43    /// Start time
44    started_at: Instant,
45}
46
47impl RateLimitMetrics {
48    /// Create new metrics collector
49    pub fn new() -> Self {
50        Self {
51            total_requests: AtomicU64::new(0),
52            allowed: AtomicU64::new(0),
53            queued: AtomicU64::new(0),
54            throttled: AtomicU64::new(0),
55            warned: AtomicU64::new(0),
56            denied: AtomicU64::new(0),
57            key_stats: DashMap::new(),
58            decision_times_us: RwLock::new(Vec::with_capacity(1000)),
59            max_timing_samples: 1000,
60            started_at: Instant::now(),
61        }
62    }
63
64    /// Record a rate limit decision
65    pub fn record_decision(&self, key: &LimiterKey, result: &RateLimitResult, elapsed: Duration) {
66        self.total_requests.fetch_add(1, Ordering::Relaxed);
67
68        match result {
69            RateLimitResult::Allowed => {
70                self.allowed.fetch_add(1, Ordering::Relaxed);
71            }
72            RateLimitResult::Queued(_) => {
73                self.queued.fetch_add(1, Ordering::Relaxed);
74            }
75            RateLimitResult::Throttled(_) => {
76                self.throttled.fetch_add(1, Ordering::Relaxed);
77            }
78            RateLimitResult::Warned(_) => {
79                self.warned.fetch_add(1, Ordering::Relaxed);
80            }
81            RateLimitResult::Denied(_) => {
82                self.denied.fetch_add(1, Ordering::Relaxed);
83            }
84        }
85
86        // Update per-key stats
87        let key_str = key.to_string();
88        self.key_stats
89            .entry(key_str)
90            .and_modify(|stats| stats.record(result))
91            .or_insert_with(|| {
92                let stats = KeyStats::new();
93                stats.record(result);
94                stats
95            });
96
97        // Record timing
98        self.record_timing(elapsed);
99    }
100
101    /// Record timing sample
102    fn record_timing(&self, elapsed: Duration) {
103        let us = elapsed.as_micros() as u64;
104        let mut times = self.decision_times_us.write();
105
106        if times.len() >= self.max_timing_samples {
107            times.drain(0..self.max_timing_samples / 2);
108        }
109        times.push(us);
110    }
111
112    /// Reset stats for a key
113    pub fn reset_key(&self, key: &LimiterKey) {
114        let key_str = key.to_string();
115        self.key_stats.remove(&key_str);
116    }
117
118    /// Get current statistics snapshot
119    pub fn get_stats(&self) -> RateLimitStats {
120        let total = self.total_requests.load(Ordering::Relaxed);
121        let allowed = self.allowed.load(Ordering::Relaxed);
122        let queued = self.queued.load(Ordering::Relaxed);
123        let throttled = self.throttled.load(Ordering::Relaxed);
124        let warned = self.warned.load(Ordering::Relaxed);
125        let denied = self.denied.load(Ordering::Relaxed);
126
127        // Calculate timing stats
128        let times = self.decision_times_us.read();
129        let (avg_time_us, p50_time_us, p99_time_us) = if times.is_empty() {
130            (0, 0, 0)
131        } else {
132            let mut sorted = times.clone();
133            sorted.sort_unstable();
134
135            let avg = sorted.iter().sum::<u64>() / sorted.len() as u64;
136            let p50 = sorted[sorted.len() / 2];
137            let p99_idx = ((sorted.len() as f64) * 0.99) as usize;
138            let p99 = sorted
139                .get(p99_idx)
140                .copied()
141                .unwrap_or(sorted[sorted.len() - 1]);
142
143            (avg, p50, p99)
144        };
145
146        // Collect per-key stats
147        let key_stats: HashMap<_, _> = self
148            .key_stats
149            .iter()
150            .map(|entry| (entry.key().clone(), entry.value().snapshot()))
151            .collect();
152
153        RateLimitStats {
154            total_requests: total,
155            allowed,
156            queued,
157            throttled,
158            warned,
159            denied,
160            avg_decision_time_us: avg_time_us,
161            p50_decision_time_us: p50_time_us,
162            p99_decision_time_us: p99_time_us,
163            key_stats,
164            uptime_secs: self.started_at.elapsed().as_secs(),
165        }
166    }
167
168    /// Get total requests
169    pub fn total_requests(&self) -> u64 {
170        self.total_requests.load(Ordering::Relaxed)
171    }
172
173    /// Get allowed count
174    pub fn allowed(&self) -> u64 {
175        self.allowed.load(Ordering::Relaxed)
176    }
177
178    /// Get denied count
179    pub fn denied(&self) -> u64 {
180        self.denied.load(Ordering::Relaxed)
181    }
182
183    /// Get denial rate (0.0 - 1.0)
184    pub fn denial_rate(&self) -> f64 {
185        let total = self.total_requests.load(Ordering::Relaxed);
186        let denied = self.denied.load(Ordering::Relaxed);
187
188        if total == 0 {
189            0.0
190        } else {
191            denied as f64 / total as f64
192        }
193    }
194
195    /// Get uptime
196    pub fn uptime(&self) -> Duration {
197        self.started_at.elapsed()
198    }
199
200    /// Reset all metrics
201    pub fn reset(&self) {
202        self.total_requests.store(0, Ordering::Relaxed);
203        self.allowed.store(0, Ordering::Relaxed);
204        self.queued.store(0, Ordering::Relaxed);
205        self.throttled.store(0, Ordering::Relaxed);
206        self.warned.store(0, Ordering::Relaxed);
207        self.denied.store(0, Ordering::Relaxed);
208        self.key_stats.clear();
209        self.decision_times_us.write().clear();
210    }
211}
212
213impl Default for RateLimitMetrics {
214    fn default() -> Self {
215        Self::new()
216    }
217}
218
219impl std::fmt::Debug for RateLimitMetrics {
220    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
221        f.debug_struct("RateLimitMetrics")
222            .field(
223                "total_requests",
224                &self.total_requests.load(Ordering::Relaxed),
225            )
226            .field("denied", &self.denied.load(Ordering::Relaxed))
227            .field("key_count", &self.key_stats.len())
228            .finish()
229    }
230}
231
232/// Per-key statistics
233pub struct KeyStats {
234    /// Total requests for this key
235    total: AtomicU64,
236
237    /// Allowed requests
238    allowed: AtomicU64,
239
240    /// Denied requests
241    denied: AtomicU64,
242
243    /// Last request time (nanos since epoch)
244    last_request_ns: AtomicU64,
245
246    /// Epoch for time calculations
247    epoch: Instant,
248}
249
250impl KeyStats {
251    fn new() -> Self {
252        Self {
253            total: AtomicU64::new(0),
254            allowed: AtomicU64::new(0),
255            denied: AtomicU64::new(0),
256            last_request_ns: AtomicU64::new(0),
257            epoch: Instant::now(),
258        }
259    }
260
261    fn record(&self, result: &RateLimitResult) {
262        self.total.fetch_add(1, Ordering::Relaxed);
263
264        match result {
265            RateLimitResult::Allowed
266            | RateLimitResult::Queued(_)
267            | RateLimitResult::Throttled(_)
268            | RateLimitResult::Warned(_) => {
269                self.allowed.fetch_add(1, Ordering::Relaxed);
270            }
271            RateLimitResult::Denied(_) => {
272                self.denied.fetch_add(1, Ordering::Relaxed);
273            }
274        }
275
276        self.last_request_ns
277            .store(self.epoch.elapsed().as_nanos() as u64, Ordering::Relaxed);
278    }
279
280    fn snapshot(&self) -> KeyStatsSnapshot {
281        let last_ns = self.last_request_ns.load(Ordering::Relaxed);
282        let last_request = if last_ns > 0 {
283            Some(Duration::from_nanos(last_ns))
284        } else {
285            None
286        };
287
288        KeyStatsSnapshot {
289            total: self.total.load(Ordering::Relaxed),
290            allowed: self.allowed.load(Ordering::Relaxed),
291            denied: self.denied.load(Ordering::Relaxed),
292            last_request_age: last_request,
293        }
294    }
295}
296
297/// Snapshot of per-key statistics
298#[derive(Debug, Clone)]
299pub struct KeyStatsSnapshot {
300    /// Total requests
301    pub total: u64,
302
303    /// Allowed requests
304    pub allowed: u64,
305
306    /// Denied requests
307    pub denied: u64,
308
309    /// Age of last request (time since)
310    pub last_request_age: Option<Duration>,
311}
312
313impl KeyStatsSnapshot {
314    /// Get denial rate for this key
315    pub fn denial_rate(&self) -> f64 {
316        if self.total == 0 {
317            0.0
318        } else {
319            self.denied as f64 / self.total as f64
320        }
321    }
322
323    /// Get allow rate for this key
324    pub fn allow_rate(&self) -> f64 {
325        if self.total == 0 {
326            0.0
327        } else {
328            self.allowed as f64 / self.total as f64
329        }
330    }
331}
332
333/// Overall rate limit statistics snapshot
334#[derive(Debug, Clone)]
335pub struct RateLimitStats {
336    /// Total requests checked
337    pub total_requests: u64,
338
339    /// Requests allowed
340    pub allowed: u64,
341
342    /// Requests queued
343    pub queued: u64,
344
345    /// Requests throttled
346    pub throttled: u64,
347
348    /// Requests warned
349    pub warned: u64,
350
351    /// Requests denied
352    pub denied: u64,
353
354    /// Average decision time (microseconds)
355    pub avg_decision_time_us: u64,
356
357    /// P50 decision time (microseconds)
358    pub p50_decision_time_us: u64,
359
360    /// P99 decision time (microseconds)
361    pub p99_decision_time_us: u64,
362
363    /// Per-key statistics
364    pub key_stats: HashMap<String, KeyStatsSnapshot>,
365
366    /// Uptime in seconds
367    pub uptime_secs: u64,
368}
369
370impl RateLimitStats {
371    /// Get overall denial rate
372    pub fn denial_rate(&self) -> f64 {
373        if self.total_requests == 0 {
374            0.0
375        } else {
376            self.denied as f64 / self.total_requests as f64
377        }
378    }
379
380    /// Get overall allow rate
381    pub fn allow_rate(&self) -> f64 {
382        if self.total_requests == 0 {
383            0.0
384        } else {
385            self.allowed as f64 / self.total_requests as f64
386        }
387    }
388
389    /// Get requests per second
390    pub fn requests_per_second(&self) -> f64 {
391        if self.uptime_secs == 0 {
392            0.0
393        } else {
394            self.total_requests as f64 / self.uptime_secs as f64
395        }
396    }
397
398    /// Get keys with highest denial rate
399    pub fn top_denied_keys(&self, n: usize) -> Vec<(&String, &KeyStatsSnapshot)> {
400        let mut entries: Vec<_> = self.key_stats.iter().collect();
401        entries.sort_by(|a, b| {
402            b.1.denial_rate()
403                .partial_cmp(&a.1.denial_rate())
404                .unwrap_or(std::cmp::Ordering::Equal)
405        });
406        entries.truncate(n);
407        entries
408    }
409
410    /// Get keys with most requests
411    pub fn top_request_keys(&self, n: usize) -> Vec<(&String, &KeyStatsSnapshot)> {
412        let mut entries: Vec<_> = self.key_stats.iter().collect();
413        entries.sort_by_key(|b| std::cmp::Reverse(b.1.total));
414        entries.truncate(n);
415        entries
416    }
417}
418
419#[cfg(test)]
420mod tests {
421    use super::*;
422
423    #[test]
424    fn test_metrics_creation() {
425        let metrics = RateLimitMetrics::new();
426        let stats = metrics.get_stats();
427
428        assert_eq!(stats.total_requests, 0);
429        assert_eq!(stats.denied, 0);
430    }
431
432    #[test]
433    fn test_record_allowed() {
434        let metrics = RateLimitMetrics::new();
435        let key = LimiterKey::User("test".to_string());
436
437        metrics.record_decision(&key, &RateLimitResult::Allowed, Duration::from_micros(10));
438
439        let stats = metrics.get_stats();
440        assert_eq!(stats.total_requests, 1);
441        assert_eq!(stats.allowed, 1);
442        assert_eq!(stats.denied, 0);
443    }
444
445    #[test]
446    fn test_record_denied() {
447        let metrics = RateLimitMetrics::new();
448        let key = LimiterKey::User("test".to_string());
449
450        let error = super::super::limiter::RateLimitExceeded {
451            key: key.clone(),
452            limit_type: super::super::limiter::LimitType::TokenBucket,
453            current: 0,
454            limit: 100,
455            retry_after: Duration::from_secs(1),
456            message: "test".to_string(),
457        };
458
459        metrics.record_decision(
460            &key,
461            &RateLimitResult::Denied(error),
462            Duration::from_micros(10),
463        );
464
465        let stats = metrics.get_stats();
466        assert_eq!(stats.total_requests, 1);
467        assert_eq!(stats.denied, 1);
468    }
469
470    #[test]
471    fn test_record_queued_throttled_warned() {
472        let metrics = RateLimitMetrics::new();
473        let key = LimiterKey::User("test".to_string());
474
475        metrics.record_decision(
476            &key,
477            &RateLimitResult::Queued(Duration::from_secs(1)),
478            Duration::from_micros(10),
479        );
480        metrics.record_decision(
481            &key,
482            &RateLimitResult::Throttled(Duration::from_secs(1)),
483            Duration::from_micros(10),
484        );
485        metrics.record_decision(
486            &key,
487            &RateLimitResult::Warned("test".to_string()),
488            Duration::from_micros(10),
489        );
490
491        let stats = metrics.get_stats();
492        assert_eq!(stats.total_requests, 3);
493        assert_eq!(stats.queued, 1);
494        assert_eq!(stats.throttled, 1);
495        assert_eq!(stats.warned, 1);
496    }
497
498    #[test]
499    fn test_per_key_stats() {
500        let metrics = RateLimitMetrics::new();
501        let key1 = LimiterKey::User("user1".to_string());
502        let key2 = LimiterKey::User("user2".to_string());
503
504        metrics.record_decision(&key1, &RateLimitResult::Allowed, Duration::from_micros(10));
505        metrics.record_decision(&key1, &RateLimitResult::Allowed, Duration::from_micros(10));
506        metrics.record_decision(&key2, &RateLimitResult::Allowed, Duration::from_micros(10));
507
508        let stats = metrics.get_stats();
509        assert_eq!(stats.key_stats.len(), 2);
510
511        let user1_stats = stats.key_stats.get("user:user1").unwrap();
512        assert_eq!(user1_stats.total, 2);
513    }
514
515    #[test]
516    fn test_denial_rate() {
517        let metrics = RateLimitMetrics::new();
518        let key = LimiterKey::User("test".to_string());
519
520        // 3 allowed, 2 denied = 40% denial rate
521        for _ in 0..3 {
522            metrics.record_decision(&key, &RateLimitResult::Allowed, Duration::from_micros(10));
523        }
524
525        let error = super::super::limiter::RateLimitExceeded {
526            key: key.clone(),
527            limit_type: super::super::limiter::LimitType::TokenBucket,
528            current: 0,
529            limit: 100,
530            retry_after: Duration::from_secs(1),
531            message: "test".to_string(),
532        };
533
534        for _ in 0..2 {
535            metrics.record_decision(
536                &key,
537                &RateLimitResult::Denied(error.clone()),
538                Duration::from_micros(10),
539            );
540        }
541
542        let rate = metrics.denial_rate();
543        assert!((rate - 0.4).abs() < 0.01);
544    }
545
546    #[test]
547    fn test_timing_stats() {
548        let metrics = RateLimitMetrics::new();
549        let key = LimiterKey::User("test".to_string());
550
551        for i in 1..=100 {
552            metrics.record_decision(
553                &key,
554                &RateLimitResult::Allowed,
555                Duration::from_micros(i * 10),
556            );
557        }
558
559        let stats = metrics.get_stats();
560        assert!(stats.avg_decision_time_us > 0);
561        assert!(stats.p50_decision_time_us > 0);
562        assert!(stats.p99_decision_time_us >= stats.p50_decision_time_us);
563    }
564
565    #[test]
566    fn test_reset() {
567        let metrics = RateLimitMetrics::new();
568        let key = LimiterKey::User("test".to_string());
569
570        metrics.record_decision(&key, &RateLimitResult::Allowed, Duration::from_micros(10));
571
572        assert!(metrics.total_requests() > 0);
573
574        metrics.reset();
575
576        assert_eq!(metrics.total_requests(), 0);
577        assert_eq!(metrics.denied(), 0);
578    }
579
580    #[test]
581    fn test_reset_key() {
582        let metrics = RateLimitMetrics::new();
583        let key1 = LimiterKey::User("user1".to_string());
584        let key2 = LimiterKey::User("user2".to_string());
585
586        metrics.record_decision(&key1, &RateLimitResult::Allowed, Duration::from_micros(10));
587        metrics.record_decision(&key2, &RateLimitResult::Allowed, Duration::from_micros(10));
588
589        assert_eq!(metrics.get_stats().key_stats.len(), 2);
590
591        metrics.reset_key(&key1);
592
593        let stats = metrics.get_stats();
594        assert_eq!(stats.key_stats.len(), 1);
595        assert!(!stats.key_stats.contains_key("user:user1"));
596        assert!(stats.key_stats.contains_key("user:user2"));
597    }
598
599    #[test]
600    fn test_stats_methods() {
601        let stats = RateLimitStats {
602            total_requests: 100,
603            allowed: 80,
604            queued: 5,
605            throttled: 5,
606            warned: 5,
607            denied: 5,
608            avg_decision_time_us: 50,
609            p50_decision_time_us: 45,
610            p99_decision_time_us: 100,
611            key_stats: HashMap::new(),
612            uptime_secs: 10,
613        };
614
615        assert!((stats.denial_rate() - 0.05).abs() < 0.01);
616        assert!((stats.allow_rate() - 0.80).abs() < 0.01);
617        assert!((stats.requests_per_second() - 10.0).abs() < 0.1);
618    }
619
620    #[test]
621    fn test_top_keys() {
622        let mut key_stats = HashMap::new();
623
624        key_stats.insert(
625            "user:high".to_string(),
626            KeyStatsSnapshot {
627                total: 100,
628                allowed: 50,
629                denied: 50,
630                last_request_age: None,
631            },
632        );
633
634        key_stats.insert(
635            "user:low".to_string(),
636            KeyStatsSnapshot {
637                total: 100,
638                allowed: 90,
639                denied: 10,
640                last_request_age: None,
641            },
642        );
643
644        key_stats.insert(
645            "user:most".to_string(),
646            KeyStatsSnapshot {
647                total: 1000,
648                allowed: 900,
649                denied: 100,
650                last_request_age: None,
651            },
652        );
653
654        let stats = RateLimitStats {
655            total_requests: 1200,
656            allowed: 1040,
657            queued: 0,
658            throttled: 0,
659            warned: 0,
660            denied: 160,
661            avg_decision_time_us: 50,
662            p50_decision_time_us: 45,
663            p99_decision_time_us: 100,
664            key_stats,
665            uptime_secs: 60,
666        };
667
668        // Top denied should be "high" (50% denial rate)
669        let top_denied = stats.top_denied_keys(1);
670        assert_eq!(top_denied[0].0, "user:high");
671
672        // Top requests should be "most" (1000 requests)
673        let top_requests = stats.top_request_keys(1);
674        assert_eq!(top_requests[0].0, "user:most");
675    }
676
677    #[test]
678    fn test_key_stats_snapshot_rates() {
679        let snapshot = KeyStatsSnapshot {
680            total: 100,
681            allowed: 80,
682            denied: 20,
683            last_request_age: Some(Duration::from_secs(5)),
684        };
685
686        assert!((snapshot.denial_rate() - 0.2).abs() < 0.01);
687        assert!((snapshot.allow_rate() - 0.8).abs() < 0.01);
688    }
689}