tracing_throttle/application/
limiter.rs

1//! Rate limiter coordination logic.
2//!
3//! The rate limiter decides whether events should be allowed or suppressed
4//! based on policies and tracks suppression counts.
5
6use crate::application::circuit_breaker::CircuitBreaker;
7use crate::application::metrics::Metrics;
8use crate::application::ports::Storage;
9use crate::application::registry::SuppressionRegistry;
10use crate::domain::{
11    policy::{PolicyDecision, RateLimitPolicy},
12    signature::EventSignature,
13};
14
15#[cfg(feature = "human-readable")]
16use crate::domain::metadata::EventMetadata;
17use std::panic;
18use std::sync::Arc;
19
20/// Decision about how to handle an event.
21#[derive(Debug, Clone, Copy, PartialEq, Eq)]
22pub enum LimitDecision {
23    /// Allow the event to pass through
24    Allow,
25    /// Suppress the event
26    Suppress,
27}
28
29/// Coordinates rate limiting decisions.
30#[derive(Clone)]
31pub struct RateLimiter<S>
32where
33    S: Storage<EventSignature, crate::application::registry::EventState> + Clone,
34{
35    registry: SuppressionRegistry<S>,
36    metrics: Metrics,
37    circuit_breaker: Arc<CircuitBreaker>,
38}
39
40impl<S> RateLimiter<S>
41where
42    S: Storage<EventSignature, crate::application::registry::EventState> + Clone,
43{
44    /// Create a new rate limiter.
45    ///
46    /// # Arguments
47    /// * `registry` - The suppression registry (which contains the clock)
48    /// * `metrics` - Metrics tracker
49    /// * `circuit_breaker` - Circuit breaker for fail-safe operation
50    pub fn new(
51        registry: SuppressionRegistry<S>,
52        metrics: Metrics,
53        circuit_breaker: Arc<CircuitBreaker>,
54    ) -> Self {
55        Self {
56            registry,
57            metrics,
58            circuit_breaker,
59        }
60    }
61
62    /// Process an event and decide whether to allow or suppress it.
63    ///
64    /// # Arguments
65    /// * `signature` - The event signature
66    ///
67    /// # Returns
68    /// A `LimitDecision` indicating whether to allow or suppress the event.
69    ///
70    /// # Fail-Safe Behavior
71    /// If rate limiting operations fail (circuit breaker open), this method fails open
72    /// and allows all events through to preserve observability.
73    ///
74    /// # Performance
75    /// This method is designed for the hot path:
76    /// - Fast hash lookup in sharded map
77    /// - Lock-free atomic operations where possible
78    /// - No allocations in common case
79    pub fn check_event(&self, signature: EventSignature) -> LimitDecision {
80        // Check circuit breaker state
81        if !self.circuit_breaker.allow_request() {
82            // Circuit is open, fail open (allow all events)
83            self.metrics.record_allowed();
84            return LimitDecision::Allow;
85        }
86
87        // Attempt rate limiting operation with panic protection
88        let result = panic::catch_unwind(panic::AssertUnwindSafe(|| {
89            self.registry.with_event_state(signature, |state, now| {
90                // Ask the policy whether to allow this event
91                let decision = state.policy.register_event(now);
92
93                match decision {
94                    PolicyDecision::Allow => LimitDecision::Allow,
95                    PolicyDecision::Suppress => {
96                        // Record the suppression
97                        state.counter.record_suppression(now);
98                        LimitDecision::Suppress
99                    }
100                }
101            })
102        }));
103
104        let decision = match result {
105            Ok(decision) => {
106                // Operation succeeded
107                self.circuit_breaker.record_success();
108                decision
109            }
110            Err(_) => {
111                // Operation panicked, record failure and fail open
112                self.circuit_breaker.record_failure();
113                LimitDecision::Allow
114            }
115        };
116
117        // Record metrics
118        match decision {
119            LimitDecision::Allow => self.metrics.record_allowed(),
120            LimitDecision::Suppress => self.metrics.record_suppressed(),
121        }
122
123        decision
124    }
125
126    /// Process an event with metadata and decide whether to allow or suppress it.
127    ///
128    /// This method captures event metadata on first occurrence for human-readable summaries.
129    ///
130    /// **Note:** Only available with the `human-readable` feature flag.
131    ///
132    /// # Arguments
133    /// * `signature` - The event signature
134    /// * `metadata` - Event details (level, message, target, fields)
135    ///
136    /// # Returns
137    /// A `LimitDecision` indicating whether to allow or suppress the event.
138    ///
139    /// # Fail-Safe Behavior
140    /// Same as `check_event`: fails open if rate limiting operations fail.
141    #[cfg(feature = "human-readable")]
142    pub fn check_event_with_metadata(
143        &self,
144        signature: EventSignature,
145        metadata: EventMetadata,
146    ) -> LimitDecision {
147        // Check circuit breaker state
148        if !self.circuit_breaker.allow_request() {
149            // Circuit is open, fail open (allow all events)
150            self.metrics.record_allowed();
151            return LimitDecision::Allow;
152        }
153
154        // Attempt rate limiting operation with panic protection
155        let result = panic::catch_unwind(panic::AssertUnwindSafe(|| {
156            self.registry.with_event_state(signature, |state, now| {
157                // Capture metadata on first occurrence
158                state.set_metadata(metadata);
159
160                // Ask the policy whether to allow this event
161                let decision = state.policy.register_event(now);
162
163                match decision {
164                    PolicyDecision::Allow => LimitDecision::Allow,
165                    PolicyDecision::Suppress => {
166                        // Record the suppression
167                        state.counter.record_suppression(now);
168                        LimitDecision::Suppress
169                    }
170                }
171            })
172        }));
173
174        let decision = match result {
175            Ok(decision) => {
176                // Operation succeeded
177                self.circuit_breaker.record_success();
178                decision
179            }
180            Err(_) => {
181                // Operation panicked, record failure and fail open
182                self.circuit_breaker.record_failure();
183                LimitDecision::Allow
184            }
185        };
186
187        // Record metrics
188        match decision {
189            LimitDecision::Allow => self.metrics.record_allowed(),
190            LimitDecision::Suppress => self.metrics.record_suppressed(),
191        }
192
193        decision
194    }
195
196    /// Get a reference to the registry.
197    pub fn registry(&self) -> &SuppressionRegistry<S> {
198        &self.registry
199    }
200
201    /// Get a reference to the metrics.
202    pub fn metrics(&self) -> &Metrics {
203        &self.metrics
204    }
205
206    /// Get a reference to the circuit breaker.
207    pub fn circuit_breaker(&self) -> &Arc<CircuitBreaker> {
208        &self.circuit_breaker
209    }
210}
211
212#[cfg(test)]
213mod tests {
214    use super::*;
215    use crate::domain::policy::Policy;
216    use crate::infrastructure::clock::SystemClock;
217    use crate::infrastructure::mocks::MockClock;
218    use crate::infrastructure::storage::ShardedStorage;
219    use std::sync::Arc;
220    use std::time::Instant;
221
222    #[test]
223    fn test_rate_limiter_basic() {
224        let storage = Arc::new(ShardedStorage::new());
225        let clock = Arc::new(SystemClock::new());
226        let policy = Policy::count_based(2).unwrap();
227        let registry = SuppressionRegistry::new(storage, clock, policy);
228        let limiter = RateLimiter::new(registry, Metrics::new(), Arc::new(CircuitBreaker::new()));
229
230        let sig = EventSignature::simple("INFO", "Test message");
231
232        // First two events allowed
233        assert_eq!(limiter.check_event(sig), LimitDecision::Allow);
234        assert_eq!(limiter.check_event(sig), LimitDecision::Allow);
235
236        // Third and beyond suppressed
237        assert_eq!(limiter.check_event(sig), LimitDecision::Suppress);
238        assert_eq!(limiter.check_event(sig), LimitDecision::Suppress);
239    }
240
241    #[test]
242    fn test_rate_limiter_with_mock_clock() {
243        use std::time::Duration;
244
245        let storage = Arc::new(ShardedStorage::new());
246        let mock_clock = Arc::new(MockClock::new(Instant::now()));
247        let policy = Policy::time_window(2, Duration::from_secs(60)).unwrap();
248        let registry = SuppressionRegistry::new(storage, mock_clock.clone(), policy);
249        let limiter = RateLimiter::new(registry, Metrics::new(), Arc::new(CircuitBreaker::new()));
250
251        let sig = EventSignature::simple("INFO", "Test");
252
253        // First 2 allowed
254        assert_eq!(limiter.check_event(sig), LimitDecision::Allow);
255        assert_eq!(limiter.check_event(sig), LimitDecision::Allow);
256
257        // 3rd suppressed
258        assert_eq!(limiter.check_event(sig), LimitDecision::Suppress);
259
260        // Advance time by 61 seconds
261        mock_clock.advance(Duration::from_secs(61));
262
263        // Should allow again
264        assert_eq!(limiter.check_event(sig), LimitDecision::Allow);
265    }
266
267    #[test]
268    fn test_rate_limiter_different_signatures() {
269        let storage = Arc::new(ShardedStorage::new());
270        let clock = Arc::new(SystemClock::new());
271        let policy = Policy::count_based(1).unwrap();
272        let registry = SuppressionRegistry::new(storage, clock, policy);
273        let limiter = RateLimiter::new(registry, Metrics::new(), Arc::new(CircuitBreaker::new()));
274
275        let sig1 = EventSignature::simple("INFO", "Message 1");
276        let sig2 = EventSignature::simple("INFO", "Message 2");
277
278        // Each signature has independent limits
279        assert_eq!(limiter.check_event(sig1), LimitDecision::Allow);
280        assert_eq!(limiter.check_event(sig2), LimitDecision::Allow);
281
282        assert_eq!(limiter.check_event(sig1), LimitDecision::Suppress);
283        assert_eq!(limiter.check_event(sig2), LimitDecision::Suppress);
284    }
285
286    #[test]
287    fn test_rate_limiter_suppression_counting() {
288        let storage = Arc::new(ShardedStorage::new());
289        let clock = Arc::new(SystemClock::new());
290        let policy = Policy::count_based(1).unwrap();
291        let registry = SuppressionRegistry::new(storage, clock, policy);
292        let limiter = RateLimiter::new(
293            registry.clone(),
294            Metrics::new(),
295            Arc::new(CircuitBreaker::new()),
296        );
297
298        let sig = EventSignature::simple("INFO", "Test");
299
300        // Allow first
301        assert_eq!(limiter.check_event(sig), LimitDecision::Allow);
302
303        // Suppress next 3
304        assert_eq!(limiter.check_event(sig), LimitDecision::Suppress);
305        assert_eq!(limiter.check_event(sig), LimitDecision::Suppress);
306        assert_eq!(limiter.check_event(sig), LimitDecision::Suppress);
307
308        // Check counter - initial creation counts as 1, plus 3 recorded = 4 total
309        registry.with_event_state(sig, |state, _now| {
310            assert_eq!(state.counter.count(), 4);
311        });
312    }
313
314    #[test]
315    fn test_concurrent_rate_limiting() {
316        use std::thread;
317
318        let storage = Arc::new(ShardedStorage::new());
319        let clock = Arc::new(SystemClock::new());
320        let policy = Policy::count_based(50).unwrap();
321        let registry = SuppressionRegistry::new(storage, clock, policy);
322        let limiter = Arc::new(RateLimiter::new(
323            registry,
324            Metrics::new(),
325            Arc::new(CircuitBreaker::new()),
326        ));
327
328        let sig = EventSignature::simple("INFO", "Concurrent test");
329        let mut handles = vec![];
330
331        for _ in 0..10 {
332            let limiter_clone = Arc::clone(&limiter);
333            let handle = thread::spawn(move || {
334                let mut allowed = 0;
335                let mut suppressed = 0;
336
337                for _ in 0..20 {
338                    match limiter_clone.check_event(sig) {
339                        LimitDecision::Allow => allowed += 1,
340                        LimitDecision::Suppress => suppressed += 1,
341                    }
342                }
343
344                (allowed, suppressed)
345            });
346            handles.push(handle);
347        }
348
349        let mut total_allowed = 0;
350        let mut total_suppressed = 0;
351
352        for handle in handles {
353            let (allowed, suppressed) = handle.join().unwrap();
354            total_allowed += allowed;
355            total_suppressed += suppressed;
356        }
357
358        // Total events = 10 threads * 20 events = 200
359        assert_eq!(total_allowed + total_suppressed, 200);
360
361        // Should have allowed at most 50 (policy limit)
362        assert!(total_allowed <= 50);
363
364        // Should have suppressed the rest
365        assert!(total_suppressed >= 150);
366    }
367}