tracing_throttle/application/
limiter.rs

1//! Rate limiter coordination logic.
2//!
3//! The rate limiter decides whether events should be allowed or suppressed
4//! based on policies and tracks suppression counts.
5
6use crate::application::circuit_breaker::CircuitBreaker;
7use crate::application::metrics::Metrics;
8use crate::application::ports::Storage;
9use crate::application::registry::SuppressionRegistry;
10use crate::domain::{
11    policy::{PolicyDecision, RateLimitPolicy},
12    signature::EventSignature,
13};
14use std::panic;
15use std::sync::Arc;
16
17/// Decision about how to handle an event.
18#[derive(Debug, Clone, Copy, PartialEq, Eq)]
19pub enum LimitDecision {
20    /// Allow the event to pass through
21    Allow,
22    /// Suppress the event
23    Suppress,
24}
25
26/// Coordinates rate limiting decisions.
27#[derive(Clone)]
28pub struct RateLimiter<S>
29where
30    S: Storage<EventSignature, crate::application::registry::EventState> + Clone,
31{
32    registry: SuppressionRegistry<S>,
33    metrics: Metrics,
34    circuit_breaker: Arc<CircuitBreaker>,
35}
36
37impl<S> RateLimiter<S>
38where
39    S: Storage<EventSignature, crate::application::registry::EventState> + Clone,
40{
41    /// Create a new rate limiter.
42    ///
43    /// # Arguments
44    /// * `registry` - The suppression registry (which contains the clock)
45    /// * `metrics` - Metrics tracker
46    /// * `circuit_breaker` - Circuit breaker for fail-safe operation
47    pub fn new(
48        registry: SuppressionRegistry<S>,
49        metrics: Metrics,
50        circuit_breaker: Arc<CircuitBreaker>,
51    ) -> Self {
52        Self {
53            registry,
54            metrics,
55            circuit_breaker,
56        }
57    }
58
59    /// Process an event and decide whether to allow or suppress it.
60    ///
61    /// # Arguments
62    /// * `signature` - The event signature
63    ///
64    /// # Returns
65    /// A `LimitDecision` indicating whether to allow or suppress the event.
66    ///
67    /// # Fail-Safe Behavior
68    /// If rate limiting operations fail (circuit breaker open), this method fails open
69    /// and allows all events through to preserve observability.
70    ///
71    /// # Performance
72    /// This method is designed for the hot path:
73    /// - Fast hash lookup in sharded map
74    /// - Lock-free atomic operations where possible
75    /// - No allocations in common case
76    pub fn check_event(&self, signature: EventSignature) -> LimitDecision {
77        // Check circuit breaker state
78        if !self.circuit_breaker.allow_request() {
79            // Circuit is open, fail open (allow all events)
80            self.metrics.record_allowed();
81            return LimitDecision::Allow;
82        }
83
84        // Attempt rate limiting operation with panic protection
85        let result = panic::catch_unwind(panic::AssertUnwindSafe(|| {
86            self.registry.with_event_state(signature, |state, now| {
87                // Ask the policy whether to allow this event
88                let decision = state.policy.register_event(now);
89
90                match decision {
91                    PolicyDecision::Allow => LimitDecision::Allow,
92                    PolicyDecision::Suppress => {
93                        // Record the suppression
94                        state.counter.record_suppression(now);
95                        LimitDecision::Suppress
96                    }
97                }
98            })
99        }));
100
101        let decision = match result {
102            Ok(decision) => {
103                // Operation succeeded
104                self.circuit_breaker.record_success();
105                decision
106            }
107            Err(_) => {
108                // Operation panicked, record failure and fail open
109                self.circuit_breaker.record_failure();
110                LimitDecision::Allow
111            }
112        };
113
114        // Record metrics
115        match decision {
116            LimitDecision::Allow => self.metrics.record_allowed(),
117            LimitDecision::Suppress => self.metrics.record_suppressed(),
118        }
119
120        decision
121    }
122
123    /// Get a reference to the registry.
124    pub fn registry(&self) -> &SuppressionRegistry<S> {
125        &self.registry
126    }
127
128    /// Get a reference to the metrics.
129    pub fn metrics(&self) -> &Metrics {
130        &self.metrics
131    }
132
133    /// Get a reference to the circuit breaker.
134    pub fn circuit_breaker(&self) -> &Arc<CircuitBreaker> {
135        &self.circuit_breaker
136    }
137}
138
139#[cfg(test)]
140mod tests {
141    use super::*;
142    use crate::domain::policy::Policy;
143    use crate::infrastructure::clock::SystemClock;
144    use crate::infrastructure::mocks::MockClock;
145    use crate::infrastructure::storage::ShardedStorage;
146    use std::sync::Arc;
147    use std::time::Instant;
148
149    #[test]
150    fn test_rate_limiter_basic() {
151        let storage = Arc::new(ShardedStorage::new());
152        let clock = Arc::new(SystemClock::new());
153        let policy = Policy::count_based(2).unwrap();
154        let registry = SuppressionRegistry::new(storage, clock, policy);
155        let limiter = RateLimiter::new(registry, Metrics::new(), Arc::new(CircuitBreaker::new()));
156
157        let sig = EventSignature::simple("INFO", "Test message");
158
159        // First two events allowed
160        assert_eq!(limiter.check_event(sig), LimitDecision::Allow);
161        assert_eq!(limiter.check_event(sig), LimitDecision::Allow);
162
163        // Third and beyond suppressed
164        assert_eq!(limiter.check_event(sig), LimitDecision::Suppress);
165        assert_eq!(limiter.check_event(sig), LimitDecision::Suppress);
166    }
167
168    #[test]
169    fn test_rate_limiter_with_mock_clock() {
170        use std::time::Duration;
171
172        let storage = Arc::new(ShardedStorage::new());
173        let mock_clock = Arc::new(MockClock::new(Instant::now()));
174        let policy = Policy::time_window(2, Duration::from_secs(60)).unwrap();
175        let registry = SuppressionRegistry::new(storage, mock_clock.clone(), policy);
176        let limiter = RateLimiter::new(registry, Metrics::new(), Arc::new(CircuitBreaker::new()));
177
178        let sig = EventSignature::simple("INFO", "Test");
179
180        // First 2 allowed
181        assert_eq!(limiter.check_event(sig), LimitDecision::Allow);
182        assert_eq!(limiter.check_event(sig), LimitDecision::Allow);
183
184        // 3rd suppressed
185        assert_eq!(limiter.check_event(sig), LimitDecision::Suppress);
186
187        // Advance time by 61 seconds
188        mock_clock.advance(Duration::from_secs(61));
189
190        // Should allow again
191        assert_eq!(limiter.check_event(sig), LimitDecision::Allow);
192    }
193
194    #[test]
195    fn test_rate_limiter_different_signatures() {
196        let storage = Arc::new(ShardedStorage::new());
197        let clock = Arc::new(SystemClock::new());
198        let policy = Policy::count_based(1).unwrap();
199        let registry = SuppressionRegistry::new(storage, clock, policy);
200        let limiter = RateLimiter::new(registry, Metrics::new(), Arc::new(CircuitBreaker::new()));
201
202        let sig1 = EventSignature::simple("INFO", "Message 1");
203        let sig2 = EventSignature::simple("INFO", "Message 2");
204
205        // Each signature has independent limits
206        assert_eq!(limiter.check_event(sig1), LimitDecision::Allow);
207        assert_eq!(limiter.check_event(sig2), LimitDecision::Allow);
208
209        assert_eq!(limiter.check_event(sig1), LimitDecision::Suppress);
210        assert_eq!(limiter.check_event(sig2), LimitDecision::Suppress);
211    }
212
213    #[test]
214    fn test_rate_limiter_suppression_counting() {
215        let storage = Arc::new(ShardedStorage::new());
216        let clock = Arc::new(SystemClock::new());
217        let policy = Policy::count_based(1).unwrap();
218        let registry = SuppressionRegistry::new(storage, clock, policy);
219        let limiter = RateLimiter::new(
220            registry.clone(),
221            Metrics::new(),
222            Arc::new(CircuitBreaker::new()),
223        );
224
225        let sig = EventSignature::simple("INFO", "Test");
226
227        // Allow first
228        assert_eq!(limiter.check_event(sig), LimitDecision::Allow);
229
230        // Suppress next 3
231        assert_eq!(limiter.check_event(sig), LimitDecision::Suppress);
232        assert_eq!(limiter.check_event(sig), LimitDecision::Suppress);
233        assert_eq!(limiter.check_event(sig), LimitDecision::Suppress);
234
235        // Check counter - initial creation counts as 1, plus 3 recorded = 4 total
236        registry.with_event_state(sig, |state, _now| {
237            assert_eq!(state.counter.count(), 4);
238        });
239    }
240
241    #[test]
242    fn test_concurrent_rate_limiting() {
243        use std::thread;
244
245        let storage = Arc::new(ShardedStorage::new());
246        let clock = Arc::new(SystemClock::new());
247        let policy = Policy::count_based(50).unwrap();
248        let registry = SuppressionRegistry::new(storage, clock, policy);
249        let limiter = Arc::new(RateLimiter::new(
250            registry,
251            Metrics::new(),
252            Arc::new(CircuitBreaker::new()),
253        ));
254
255        let sig = EventSignature::simple("INFO", "Concurrent test");
256        let mut handles = vec![];
257
258        for _ in 0..10 {
259            let limiter_clone = Arc::clone(&limiter);
260            let handle = thread::spawn(move || {
261                let mut allowed = 0;
262                let mut suppressed = 0;
263
264                for _ in 0..20 {
265                    match limiter_clone.check_event(sig) {
266                        LimitDecision::Allow => allowed += 1,
267                        LimitDecision::Suppress => suppressed += 1,
268                    }
269                }
270
271                (allowed, suppressed)
272            });
273            handles.push(handle);
274        }
275
276        let mut total_allowed = 0;
277        let mut total_suppressed = 0;
278
279        for handle in handles {
280            let (allowed, suppressed) = handle.join().unwrap();
281            total_allowed += allowed;
282            total_suppressed += suppressed;
283        }
284
285        // Total events = 10 threads * 20 events = 200
286        assert_eq!(total_allowed + total_suppressed, 200);
287
288        // Should have allowed at most 50 (policy limit)
289        assert!(total_allowed <= 50);
290
291        // Should have suppressed the rest
292        assert!(total_suppressed >= 150);
293    }
294}