tracing_throttle/application/
limiter.rs

1//! Rate limiter coordination logic.
2//!
3//! The rate limiter decides whether events should be allowed or suppressed
4//! based on policies and tracks suppression counts.
5
6use crate::application::circuit_breaker::CircuitBreaker;
7use crate::application::metrics::Metrics;
8use crate::application::ports::Storage;
9use crate::application::registry::SuppressionRegistry;
10use crate::domain::{
11    policy::{PolicyDecision, RateLimitPolicy},
12    signature::EventSignature,
13};
14
15#[cfg(feature = "human-readable")]
16use crate::domain::metadata::EventMetadata;
17use std::panic;
18use std::sync::Arc;
19
20/// Decision about how to handle an event.
21#[derive(Debug, Clone, Copy, PartialEq, Eq)]
22pub enum LimitDecision {
23    /// Allow the event to pass through
24    Allow,
25    /// Suppress the event
26    Suppress,
27}
28
29/// Coordinates rate limiting decisions.
30#[derive(Clone)]
31pub struct RateLimiter<S>
32where
33    S: Storage<EventSignature, crate::application::registry::EventState> + Clone,
34{
35    registry: SuppressionRegistry<S>,
36    metrics: Metrics,
37    circuit_breaker: Arc<CircuitBreaker>,
38}
39
40impl<S> RateLimiter<S>
41where
42    S: Storage<EventSignature, crate::application::registry::EventState> + Clone,
43{
44    /// Create a new rate limiter.
45    ///
46    /// # Arguments
47    /// * `registry` - The suppression registry (which contains the clock)
48    /// * `metrics` - Metrics tracker
49    /// * `circuit_breaker` - Circuit breaker for fail-safe operation
50    pub fn new(
51        registry: SuppressionRegistry<S>,
52        metrics: Metrics,
53        circuit_breaker: Arc<CircuitBreaker>,
54    ) -> Self {
55        Self {
56            registry,
57            metrics,
58            circuit_breaker,
59        }
60    }
61
62    /// Process an event and decide whether to allow or suppress it.
63    ///
64    /// # Arguments
65    /// * `signature` - The event signature
66    ///
67    /// # Returns
68    /// A `LimitDecision` indicating whether to allow or suppress the event.
69    ///
70    /// # Fail-Safe Behavior
71    /// If rate limiting operations fail (circuit breaker open), this method fails open
72    /// and allows all events through to preserve observability.
73    ///
74    /// # Performance
75    /// This method is designed for the hot path:
76    /// - Fast hash lookup in sharded map
77    /// - Lock-free atomic operations where possible
78    /// - No allocations in common case
79    pub fn check_event(&self, signature: EventSignature) -> LimitDecision {
80        // Check circuit breaker state
81        if !self.circuit_breaker.allow_request() {
82            // Circuit is open, fail open (allow all events)
83            self.metrics.record_allowed();
84            return LimitDecision::Allow;
85        }
86
87        // Attempt rate limiting operation with panic protection
88        let result = panic::catch_unwind(panic::AssertUnwindSafe(|| {
89            self.registry.with_event_state(signature, |state, now| {
90                // Ask the policy whether to allow this event
91                let decision = state.policy.register_event(now);
92
93                match decision {
94                    PolicyDecision::Allow => LimitDecision::Allow,
95                    PolicyDecision::Suppress => {
96                        // Record the suppression
97                        state.counter.record_suppression(now);
98                        LimitDecision::Suppress
99                    }
100                }
101            })
102        }));
103
104        let decision = match result {
105            Ok(decision) => {
106                // Operation succeeded
107                self.circuit_breaker.record_success();
108                decision
109            }
110            Err(_) => {
111                // Operation panicked, record failure and fail open
112                self.circuit_breaker.record_failure();
113                LimitDecision::Allow
114            }
115        };
116
117        // Record metrics
118        match decision {
119            LimitDecision::Allow => self.metrics.record_allowed(),
120            LimitDecision::Suppress => self.metrics.record_suppressed(),
121        }
122
123        decision
124    }
125
126    /// Process an event with metadata and decide whether to allow or suppress it.
127    ///
128    /// This method captures event metadata on first occurrence for human-readable summaries.
129    ///
130    /// **Note:** Only available with the `human-readable` feature flag.
131    ///
132    /// # Arguments
133    /// * `signature` - The event signature
134    /// * `metadata` - Event details (level, message, target, fields)
135    ///
136    /// # Returns
137    /// A `LimitDecision` indicating whether to allow or suppress the event.
138    ///
139    /// # Fail-Safe Behavior
140    /// Same as `check_event`: fails open if rate limiting operations fail.
141    #[cfg(feature = "human-readable")]
142    pub fn check_event_with_metadata(
143        &self,
144        signature: EventSignature,
145        metadata: EventMetadata,
146    ) -> LimitDecision {
147        // Check circuit breaker state
148        if !self.circuit_breaker.allow_request() {
149            // Circuit is open, fail open (allow all events)
150            self.metrics.record_allowed();
151            return LimitDecision::Allow;
152        }
153
154        // Attempt rate limiting operation with panic protection
155        let result = panic::catch_unwind(panic::AssertUnwindSafe(|| {
156            self.registry.with_event_state(signature, |state, now| {
157                // Capture metadata on first occurrence
158                state.set_metadata(metadata);
159
160                // Ask the policy whether to allow this event
161                let decision = state.policy.register_event(now);
162
163                match decision {
164                    PolicyDecision::Allow => LimitDecision::Allow,
165                    PolicyDecision::Suppress => {
166                        // Record the suppression
167                        state.counter.record_suppression(now);
168                        LimitDecision::Suppress
169                    }
170                }
171            })
172        }));
173
174        let decision = match result {
175            Ok(decision) => {
176                // Operation succeeded
177                self.circuit_breaker.record_success();
178                decision
179            }
180            Err(_) => {
181                // Operation panicked, record failure and fail open
182                self.circuit_breaker.record_failure();
183                LimitDecision::Allow
184            }
185        };
186
187        // Record metrics
188        match decision {
189            LimitDecision::Allow => self.metrics.record_allowed(),
190            LimitDecision::Suppress => self.metrics.record_suppressed(),
191        }
192
193        decision
194    }
195
196    /// Get a reference to the registry.
197    pub fn registry(&self) -> &SuppressionRegistry<S> {
198        &self.registry
199    }
200
201    /// Get a reference to the metrics.
202    pub fn metrics(&self) -> &Metrics {
203        &self.metrics
204    }
205
206    /// Get a reference to the circuit breaker.
207    pub fn circuit_breaker(&self) -> &Arc<CircuitBreaker> {
208        &self.circuit_breaker
209    }
210}
211
212#[cfg(test)]
213mod tests {
214    use super::*;
215    use crate::application::circuit_breaker::CircuitBreakerConfig;
216    use crate::domain::policy::Policy;
217    use crate::infrastructure::clock::SystemClock;
218    use crate::infrastructure::mocks::MockClock;
219    use crate::infrastructure::storage::ShardedStorage;
220    use std::sync::Arc;
221    use std::time::Instant;
222
223    #[test]
224    fn test_rate_limiter_basic() {
225        let storage = Arc::new(ShardedStorage::new());
226        let clock = Arc::new(SystemClock::new());
227        let policy = Policy::count_based(2).unwrap();
228        let registry = SuppressionRegistry::new(storage, clock, policy);
229        let limiter = RateLimiter::new(registry, Metrics::new(), Arc::new(CircuitBreaker::new()));
230
231        let sig = EventSignature::simple("INFO", "Test message");
232
233        // First two events allowed
234        assert_eq!(limiter.check_event(sig), LimitDecision::Allow);
235        assert_eq!(limiter.check_event(sig), LimitDecision::Allow);
236
237        // Third and beyond suppressed
238        assert_eq!(limiter.check_event(sig), LimitDecision::Suppress);
239        assert_eq!(limiter.check_event(sig), LimitDecision::Suppress);
240    }
241
242    #[test]
243    fn test_rate_limiter_with_mock_clock() {
244        use std::time::Duration;
245
246        let storage = Arc::new(ShardedStorage::new());
247        let mock_clock = Arc::new(MockClock::new(Instant::now()));
248        let policy = Policy::time_window(2, Duration::from_secs(60)).unwrap();
249        let registry = SuppressionRegistry::new(storage, mock_clock.clone(), policy);
250        let limiter = RateLimiter::new(registry, Metrics::new(), Arc::new(CircuitBreaker::new()));
251
252        let sig = EventSignature::simple("INFO", "Test");
253
254        // First 2 allowed
255        assert_eq!(limiter.check_event(sig), LimitDecision::Allow);
256        assert_eq!(limiter.check_event(sig), LimitDecision::Allow);
257
258        // 3rd suppressed
259        assert_eq!(limiter.check_event(sig), LimitDecision::Suppress);
260
261        // Advance time by 61 seconds
262        mock_clock.advance(Duration::from_secs(61));
263
264        // Should allow again
265        assert_eq!(limiter.check_event(sig), LimitDecision::Allow);
266    }
267
268    #[test]
269    fn test_rate_limiter_different_signatures() {
270        let storage = Arc::new(ShardedStorage::new());
271        let clock = Arc::new(SystemClock::new());
272        let policy = Policy::count_based(1).unwrap();
273        let registry = SuppressionRegistry::new(storage, clock, policy);
274        let limiter = RateLimiter::new(registry, Metrics::new(), Arc::new(CircuitBreaker::new()));
275
276        let sig1 = EventSignature::simple("INFO", "Message 1");
277        let sig2 = EventSignature::simple("INFO", "Message 2");
278
279        // Each signature has independent limits
280        assert_eq!(limiter.check_event(sig1), LimitDecision::Allow);
281        assert_eq!(limiter.check_event(sig2), LimitDecision::Allow);
282
283        assert_eq!(limiter.check_event(sig1), LimitDecision::Suppress);
284        assert_eq!(limiter.check_event(sig2), LimitDecision::Suppress);
285    }
286
287    #[test]
288    fn test_rate_limiter_suppression_counting() {
289        let storage = Arc::new(ShardedStorage::new());
290        let clock = Arc::new(SystemClock::new());
291        let policy = Policy::count_based(1).unwrap();
292        let registry = SuppressionRegistry::new(storage, clock, policy);
293        let limiter = RateLimiter::new(
294            registry.clone(),
295            Metrics::new(),
296            Arc::new(CircuitBreaker::new()),
297        );
298
299        let sig = EventSignature::simple("INFO", "Test");
300
301        // Allow first
302        assert_eq!(limiter.check_event(sig), LimitDecision::Allow);
303
304        // Suppress next 3
305        assert_eq!(limiter.check_event(sig), LimitDecision::Suppress);
306        assert_eq!(limiter.check_event(sig), LimitDecision::Suppress);
307        assert_eq!(limiter.check_event(sig), LimitDecision::Suppress);
308
309        // Check counter - initial creation counts as 1, plus 3 recorded = 4 total
310        registry.with_event_state(sig, |state, _now| {
311            assert_eq!(state.counter.count(), 4);
312        });
313    }
314
315    #[test]
316    fn test_concurrent_rate_limiting() {
317        use std::thread;
318
319        let storage = Arc::new(ShardedStorage::new());
320        let clock = Arc::new(SystemClock::new());
321        let policy = Policy::count_based(50).unwrap();
322        let registry = SuppressionRegistry::new(storage, clock, policy);
323        let limiter = Arc::new(RateLimiter::new(
324            registry,
325            Metrics::new(),
326            Arc::new(CircuitBreaker::new()),
327        ));
328
329        let sig = EventSignature::simple("INFO", "Concurrent test");
330        let mut handles = vec![];
331
332        for _ in 0..10 {
333            let limiter_clone = Arc::clone(&limiter);
334            let handle = thread::spawn(move || {
335                let mut allowed = 0;
336                let mut suppressed = 0;
337
338                for _ in 0..20 {
339                    match limiter_clone.check_event(sig) {
340                        LimitDecision::Allow => allowed += 1,
341                        LimitDecision::Suppress => suppressed += 1,
342                    }
343                }
344
345                (allowed, suppressed)
346            });
347            handles.push(handle);
348        }
349
350        let mut total_allowed = 0;
351        let mut total_suppressed = 0;
352
353        for handle in handles {
354            let (allowed, suppressed) = handle.join().unwrap();
355            total_allowed += allowed;
356            total_suppressed += suppressed;
357        }
358
359        // Total events = 10 threads * 20 events = 200
360        assert_eq!(total_allowed + total_suppressed, 200);
361
362        // Should have allowed at most 50 (policy limit)
363        assert!(total_allowed <= 50);
364
365        // Should have suppressed the rest
366        assert!(total_suppressed >= 150);
367    }
368
369    #[test]
370    fn test_fail_open_when_circuit_breaker_open() {
371        let storage = Arc::new(ShardedStorage::new());
372        let clock = Arc::new(SystemClock::new());
373        let policy = Policy::count_based(1).unwrap(); // Very restrictive
374        let registry = SuppressionRegistry::new(storage, clock, policy);
375        let cb = Arc::new(CircuitBreaker::new());
376        let limiter = RateLimiter::new(registry, Metrics::new(), cb.clone());
377
378        let sig = EventSignature::simple("ERROR", "Critical failure");
379
380        // First event allowed
381        assert_eq!(limiter.check_event(sig), LimitDecision::Allow);
382
383        // Open circuit breaker by recording many failures
384        for _ in 0..10 {
385            cb.record_failure();
386        }
387        assert!(!cb.allow_request(), "Circuit breaker should be open");
388
389        // Even though policy would suppress (count exceeded),
390        // circuit breaker being open causes fail-open behavior
391        let decision = limiter.check_event(sig);
392        assert_eq!(
393            decision,
394            LimitDecision::Allow,
395            "Should fail open when circuit breaker is open"
396        );
397
398        // Verify metrics recorded as allowed
399        assert_eq!(limiter.metrics().events_allowed(), 2);
400    }
401
402    #[test]
403    fn test_fail_open_updates_metrics() {
404        let storage = Arc::new(ShardedStorage::new());
405        let clock = Arc::new(SystemClock::new());
406        let policy = Policy::count_based(1).unwrap();
407        let registry = SuppressionRegistry::new(storage, clock, policy);
408        let cb = Arc::new(CircuitBreaker::new());
409        let limiter = RateLimiter::new(registry, Metrics::new(), cb.clone());
410
411        let sig = EventSignature::simple("ERROR", "Test");
412
413        // Open the circuit breaker
414        for _ in 0..10 {
415            cb.record_failure();
416        }
417
418        // Process multiple events while circuit is open
419        for _ in 0..5 {
420            limiter.check_event(sig);
421        }
422
423        // All should be recorded as allowed (fail-open)
424        assert_eq!(limiter.metrics().events_allowed(), 5);
425        assert_eq!(limiter.metrics().events_suppressed(), 0);
426    }
427
428    #[test]
429    fn test_circuit_breaker_half_open_allows_some_requests() {
430        use std::time::Duration;
431
432        let storage = Arc::new(ShardedStorage::new());
433        let clock = Arc::new(SystemClock::new());
434        let policy = Policy::count_based(1).unwrap();
435        let registry = SuppressionRegistry::new(storage, clock, policy);
436        let cb = Arc::new(CircuitBreaker::with_config(CircuitBreakerConfig {
437            failure_threshold: 5,
438            recovery_timeout: Duration::from_millis(10),
439        }));
440        let limiter = RateLimiter::new(registry, Metrics::new(), cb.clone());
441
442        let sig = EventSignature::simple("ERROR", "Test");
443
444        // Open circuit breaker
445        for _ in 0..10 {
446            cb.record_failure();
447        }
448
449        // Wait for recovery timeout
450        std::thread::sleep(Duration::from_millis(20));
451
452        // Circuit should now be half-open
453        // First request should be allowed through for testing
454        let decision = limiter.check_event(sig);
455        assert_eq!(decision, LimitDecision::Allow);
456
457        // Since the operation succeeded, circuit breaker records success
458        assert_eq!(cb.consecutive_failures(), 0);
459    }
460
461    #[test]
462    fn test_normal_operation_after_circuit_breaker_closes() {
463        use std::time::Duration;
464
465        let storage = Arc::new(ShardedStorage::new());
466        let clock = Arc::new(SystemClock::new());
467        let policy = Policy::count_based(2).unwrap();
468        let registry = SuppressionRegistry::new(storage, clock, policy);
469        let cb = Arc::new(CircuitBreaker::with_config(CircuitBreakerConfig {
470            failure_threshold: 5,
471            recovery_timeout: Duration::from_millis(10),
472        }));
473        let limiter = RateLimiter::new(registry, Metrics::new(), cb.clone());
474
475        let sig = EventSignature::simple("INFO", "Test");
476
477        // Open circuit breaker
478        for _ in 0..10 {
479            cb.record_failure();
480        }
481
482        // Wait for recovery
483        std::thread::sleep(Duration::from_millis(20));
484
485        // Process events - should allow and record success
486        assert_eq!(limiter.check_event(sig), LimitDecision::Allow);
487
488        // Circuit breaker should be closed now
489        assert_eq!(cb.consecutive_failures(), 0);
490
491        // Normal rate limiting should work again
492        assert_eq!(limiter.check_event(sig), LimitDecision::Allow);
493        assert_eq!(limiter.check_event(sig), LimitDecision::Suppress);
494    }
495
496    #[test]
497    fn test_successful_operations_record_success_to_circuit_breaker() {
498        let storage = Arc::new(ShardedStorage::new());
499        let clock = Arc::new(SystemClock::new());
500        let policy = Policy::count_based(10).unwrap();
501        let registry = SuppressionRegistry::new(storage, clock, policy);
502        let cb = Arc::new(CircuitBreaker::new());
503        let limiter = RateLimiter::new(registry, Metrics::new(), cb.clone());
504
505        let sig = EventSignature::simple("INFO", "Test");
506
507        // Process events successfully
508        for _ in 0..5 {
509            limiter.check_event(sig);
510        }
511
512        // Circuit breaker should have no failures recorded
513        assert_eq!(cb.consecutive_failures(), 0);
514    }
515
516    #[test]
517    fn test_concurrent_fail_open_behavior() {
518        use std::thread;
519
520        let storage = Arc::new(ShardedStorage::new());
521        let clock = Arc::new(SystemClock::new());
522        let policy = Policy::count_based(5).unwrap();
523        let registry = SuppressionRegistry::new(storage, clock, policy);
524        let cb = Arc::new(CircuitBreaker::new());
525        let limiter = Arc::new(RateLimiter::new(registry, Metrics::new(), cb.clone()));
526
527        // Open circuit breaker
528        for _ in 0..10 {
529            cb.record_failure();
530        }
531
532        let sig = EventSignature::simple("ERROR", "Concurrent fail-open test");
533        let mut handles = vec![];
534
535        // Spawn multiple threads checking events while circuit is open
536        for _ in 0..5 {
537            let limiter_clone = Arc::clone(&limiter);
538            let handle = thread::spawn(move || {
539                let mut all_allowed = true;
540                for _ in 0..10 {
541                    if limiter_clone.check_event(sig) != LimitDecision::Allow {
542                        all_allowed = false;
543                    }
544                }
545                all_allowed
546            });
547            handles.push(handle);
548        }
549
550        // All threads should see only Allow decisions (fail-open)
551        for handle in handles {
552            assert!(
553                handle.join().unwrap(),
554                "All events should be allowed when circuit is open"
555            );
556        }
557
558        // Total events = 5 threads * 10 events = 50
559        // All should be allowed (fail-open)
560        assert_eq!(limiter.metrics().events_allowed(), 50);
561        assert_eq!(limiter.metrics().events_suppressed(), 0);
562    }
563
564    #[test]
565    fn test_metrics_consistency_during_fail_open() {
566        let storage = Arc::new(ShardedStorage::new());
567        let clock = Arc::new(SystemClock::new());
568        let policy = Policy::count_based(2).unwrap();
569        let registry = SuppressionRegistry::new(storage, clock, policy);
570        let cb = Arc::new(CircuitBreaker::new());
571        let limiter = RateLimiter::new(registry, Metrics::new(), cb.clone());
572
573        let sig = EventSignature::simple("INFO", "Test");
574
575        // Normal operation
576        assert_eq!(limiter.check_event(sig), LimitDecision::Allow); // 1 allowed
577        assert_eq!(limiter.check_event(sig), LimitDecision::Allow); // 2 allowed
578        assert_eq!(limiter.check_event(sig), LimitDecision::Suppress); // 1 suppressed
579
580        // Open circuit breaker
581        for _ in 0..10 {
582            cb.record_failure();
583        }
584
585        // Fail-open events
586        assert_eq!(limiter.check_event(sig), LimitDecision::Allow); // 3 allowed
587
588        // Verify metrics are consistent
589        let snapshot = limiter.metrics().snapshot();
590        assert_eq!(snapshot.events_allowed, 3);
591        assert_eq!(snapshot.events_suppressed, 1);
592        assert_eq!(snapshot.total_events(), 4);
593    }
594
595    #[test]
596    fn test_registry_state_unaffected_by_circuit_breaker() {
597        let storage = Arc::new(ShardedStorage::new());
598        let clock = Arc::new(SystemClock::new());
599        let policy = Policy::count_based(1).unwrap();
600        let registry = SuppressionRegistry::new(storage, clock, policy);
601        let cb = Arc::new(CircuitBreaker::new());
602        let limiter = RateLimiter::new(registry.clone(), Metrics::new(), cb.clone());
603
604        let sig = EventSignature::simple("INFO", "Test");
605
606        // Allow first event, establish state
607        assert_eq!(limiter.check_event(sig), LimitDecision::Allow);
608
609        // Verify state exists
610        let initial_count = registry.with_event_state(sig, |state, _| state.counter.count());
611        assert_eq!(initial_count, 1);
612
613        // Open circuit breaker
614        for _ in 0..10 {
615            cb.record_failure();
616        }
617
618        // Process events while circuit is open (fail-open)
619        for _ in 0..5 {
620            limiter.check_event(sig);
621        }
622
623        // Registry state should NOT be modified during fail-open
624        // (circuit breaker short-circuits before registry access)
625        let final_count = registry.with_event_state(sig, |state, _| state.counter.count());
626        assert_eq!(
627            final_count, initial_count,
628            "Registry state should not change during fail-open"
629        );
630    }
631}