tracing_throttle/application/
limiter.rs1use crate::application::circuit_breaker::CircuitBreaker;
7use crate::application::metrics::Metrics;
8use crate::application::ports::Storage;
9use crate::application::registry::SuppressionRegistry;
10use crate::domain::{
11 policy::{PolicyDecision, RateLimitPolicy},
12 signature::EventSignature,
13};
14
15#[cfg(feature = "human-readable")]
16use crate::domain::metadata::EventMetadata;
17use std::panic;
18use std::sync::Arc;
19
20#[derive(Debug, Clone, Copy, PartialEq, Eq)]
22pub enum LimitDecision {
23 Allow,
25 Suppress,
27}
28
29#[derive(Clone)]
31pub struct RateLimiter<S>
32where
33 S: Storage<EventSignature, crate::application::registry::EventState> + Clone,
34{
35 registry: SuppressionRegistry<S>,
36 metrics: Metrics,
37 circuit_breaker: Arc<CircuitBreaker>,
38}
39
40impl<S> RateLimiter<S>
41where
42 S: Storage<EventSignature, crate::application::registry::EventState> + Clone,
43{
44 pub fn new(
51 registry: SuppressionRegistry<S>,
52 metrics: Metrics,
53 circuit_breaker: Arc<CircuitBreaker>,
54 ) -> Self {
55 Self {
56 registry,
57 metrics,
58 circuit_breaker,
59 }
60 }
61
62 pub fn check_event(&self, signature: EventSignature) -> LimitDecision {
80 if !self.circuit_breaker.allow_request() {
82 self.metrics.record_allowed();
84 return LimitDecision::Allow;
85 }
86
87 let result = panic::catch_unwind(panic::AssertUnwindSafe(|| {
89 self.registry.with_event_state(signature, |state, now| {
90 let decision = state.policy.register_event(now);
92
93 match decision {
94 PolicyDecision::Allow => LimitDecision::Allow,
95 PolicyDecision::Suppress => {
96 state.counter.record_suppression(now);
98 LimitDecision::Suppress
99 }
100 }
101 })
102 }));
103
104 let decision = match result {
105 Ok(decision) => {
106 self.circuit_breaker.record_success();
108 decision
109 }
110 Err(_) => {
111 self.circuit_breaker.record_failure();
113 LimitDecision::Allow
114 }
115 };
116
117 match decision {
119 LimitDecision::Allow => self.metrics.record_allowed(),
120 LimitDecision::Suppress => self.metrics.record_suppressed(),
121 }
122
123 decision
124 }
125
126 #[cfg(feature = "human-readable")]
142 pub fn check_event_with_metadata(
143 &self,
144 signature: EventSignature,
145 metadata: EventMetadata,
146 ) -> LimitDecision {
147 if !self.circuit_breaker.allow_request() {
149 self.metrics.record_allowed();
151 return LimitDecision::Allow;
152 }
153
154 let result = panic::catch_unwind(panic::AssertUnwindSafe(|| {
156 self.registry.with_event_state(signature, |state, now| {
157 state.set_metadata(metadata);
159
160 let decision = state.policy.register_event(now);
162
163 match decision {
164 PolicyDecision::Allow => LimitDecision::Allow,
165 PolicyDecision::Suppress => {
166 state.counter.record_suppression(now);
168 LimitDecision::Suppress
169 }
170 }
171 })
172 }));
173
174 let decision = match result {
175 Ok(decision) => {
176 self.circuit_breaker.record_success();
178 decision
179 }
180 Err(_) => {
181 self.circuit_breaker.record_failure();
183 LimitDecision::Allow
184 }
185 };
186
187 match decision {
189 LimitDecision::Allow => self.metrics.record_allowed(),
190 LimitDecision::Suppress => self.metrics.record_suppressed(),
191 }
192
193 decision
194 }
195
196 pub fn registry(&self) -> &SuppressionRegistry<S> {
198 &self.registry
199 }
200
201 pub fn metrics(&self) -> &Metrics {
203 &self.metrics
204 }
205
206 pub fn circuit_breaker(&self) -> &Arc<CircuitBreaker> {
208 &self.circuit_breaker
209 }
210}
211
212#[cfg(test)]
213mod tests {
214 use super::*;
215 use crate::domain::policy::Policy;
216 use crate::infrastructure::clock::SystemClock;
217 use crate::infrastructure::mocks::MockClock;
218 use crate::infrastructure::storage::ShardedStorage;
219 use std::sync::Arc;
220 use std::time::Instant;
221
222 #[test]
223 fn test_rate_limiter_basic() {
224 let storage = Arc::new(ShardedStorage::new());
225 let clock = Arc::new(SystemClock::new());
226 let policy = Policy::count_based(2).unwrap();
227 let registry = SuppressionRegistry::new(storage, clock, policy);
228 let limiter = RateLimiter::new(registry, Metrics::new(), Arc::new(CircuitBreaker::new()));
229
230 let sig = EventSignature::simple("INFO", "Test message");
231
232 assert_eq!(limiter.check_event(sig), LimitDecision::Allow);
234 assert_eq!(limiter.check_event(sig), LimitDecision::Allow);
235
236 assert_eq!(limiter.check_event(sig), LimitDecision::Suppress);
238 assert_eq!(limiter.check_event(sig), LimitDecision::Suppress);
239 }
240
241 #[test]
242 fn test_rate_limiter_with_mock_clock() {
243 use std::time::Duration;
244
245 let storage = Arc::new(ShardedStorage::new());
246 let mock_clock = Arc::new(MockClock::new(Instant::now()));
247 let policy = Policy::time_window(2, Duration::from_secs(60)).unwrap();
248 let registry = SuppressionRegistry::new(storage, mock_clock.clone(), policy);
249 let limiter = RateLimiter::new(registry, Metrics::new(), Arc::new(CircuitBreaker::new()));
250
251 let sig = EventSignature::simple("INFO", "Test");
252
253 assert_eq!(limiter.check_event(sig), LimitDecision::Allow);
255 assert_eq!(limiter.check_event(sig), LimitDecision::Allow);
256
257 assert_eq!(limiter.check_event(sig), LimitDecision::Suppress);
259
260 mock_clock.advance(Duration::from_secs(61));
262
263 assert_eq!(limiter.check_event(sig), LimitDecision::Allow);
265 }
266
267 #[test]
268 fn test_rate_limiter_different_signatures() {
269 let storage = Arc::new(ShardedStorage::new());
270 let clock = Arc::new(SystemClock::new());
271 let policy = Policy::count_based(1).unwrap();
272 let registry = SuppressionRegistry::new(storage, clock, policy);
273 let limiter = RateLimiter::new(registry, Metrics::new(), Arc::new(CircuitBreaker::new()));
274
275 let sig1 = EventSignature::simple("INFO", "Message 1");
276 let sig2 = EventSignature::simple("INFO", "Message 2");
277
278 assert_eq!(limiter.check_event(sig1), LimitDecision::Allow);
280 assert_eq!(limiter.check_event(sig2), LimitDecision::Allow);
281
282 assert_eq!(limiter.check_event(sig1), LimitDecision::Suppress);
283 assert_eq!(limiter.check_event(sig2), LimitDecision::Suppress);
284 }
285
286 #[test]
287 fn test_rate_limiter_suppression_counting() {
288 let storage = Arc::new(ShardedStorage::new());
289 let clock = Arc::new(SystemClock::new());
290 let policy = Policy::count_based(1).unwrap();
291 let registry = SuppressionRegistry::new(storage, clock, policy);
292 let limiter = RateLimiter::new(
293 registry.clone(),
294 Metrics::new(),
295 Arc::new(CircuitBreaker::new()),
296 );
297
298 let sig = EventSignature::simple("INFO", "Test");
299
300 assert_eq!(limiter.check_event(sig), LimitDecision::Allow);
302
303 assert_eq!(limiter.check_event(sig), LimitDecision::Suppress);
305 assert_eq!(limiter.check_event(sig), LimitDecision::Suppress);
306 assert_eq!(limiter.check_event(sig), LimitDecision::Suppress);
307
308 registry.with_event_state(sig, |state, _now| {
310 assert_eq!(state.counter.count(), 4);
311 });
312 }
313
314 #[test]
315 fn test_concurrent_rate_limiting() {
316 use std::thread;
317
318 let storage = Arc::new(ShardedStorage::new());
319 let clock = Arc::new(SystemClock::new());
320 let policy = Policy::count_based(50).unwrap();
321 let registry = SuppressionRegistry::new(storage, clock, policy);
322 let limiter = Arc::new(RateLimiter::new(
323 registry,
324 Metrics::new(),
325 Arc::new(CircuitBreaker::new()),
326 ));
327
328 let sig = EventSignature::simple("INFO", "Concurrent test");
329 let mut handles = vec![];
330
331 for _ in 0..10 {
332 let limiter_clone = Arc::clone(&limiter);
333 let handle = thread::spawn(move || {
334 let mut allowed = 0;
335 let mut suppressed = 0;
336
337 for _ in 0..20 {
338 match limiter_clone.check_event(sig) {
339 LimitDecision::Allow => allowed += 1,
340 LimitDecision::Suppress => suppressed += 1,
341 }
342 }
343
344 (allowed, suppressed)
345 });
346 handles.push(handle);
347 }
348
349 let mut total_allowed = 0;
350 let mut total_suppressed = 0;
351
352 for handle in handles {
353 let (allowed, suppressed) = handle.join().unwrap();
354 total_allowed += allowed;
355 total_suppressed += suppressed;
356 }
357
358 assert_eq!(total_allowed + total_suppressed, 200);
360
361 assert!(total_allowed <= 50);
363
364 assert!(total_suppressed >= 150);
366 }
367}