1use crate::application::circuit_breaker::CircuitBreaker;
7use crate::application::metrics::Metrics;
8use crate::application::ports::Storage;
9use crate::application::registry::SuppressionRegistry;
10use crate::domain::{
11 policy::{PolicyDecision, RateLimitPolicy},
12 signature::EventSignature,
13};
14
15#[cfg(feature = "human-readable")]
16use crate::domain::metadata::EventMetadata;
17use std::panic;
18use std::sync::Arc;
19
20#[derive(Debug, Clone, Copy, PartialEq, Eq)]
22pub enum LimitDecision {
23 Allow,
25 Suppress,
27}
28
29#[derive(Clone)]
31pub struct RateLimiter<S>
32where
33 S: Storage<EventSignature, crate::application::registry::EventState> + Clone,
34{
35 registry: SuppressionRegistry<S>,
36 metrics: Metrics,
37 circuit_breaker: Arc<CircuitBreaker>,
38}
39
40impl<S> RateLimiter<S>
41where
42 S: Storage<EventSignature, crate::application::registry::EventState> + Clone,
43{
44 pub fn new(
51 registry: SuppressionRegistry<S>,
52 metrics: Metrics,
53 circuit_breaker: Arc<CircuitBreaker>,
54 ) -> Self {
55 Self {
56 registry,
57 metrics,
58 circuit_breaker,
59 }
60 }
61
62 pub fn check_event(&self, signature: EventSignature) -> LimitDecision {
80 if !self.circuit_breaker.allow_request() {
82 self.metrics.record_allowed();
84 return LimitDecision::Allow;
85 }
86
87 let result = panic::catch_unwind(panic::AssertUnwindSafe(|| {
89 self.registry.with_event_state(signature, |state, now| {
90 let decision = state.policy.register_event(now);
92
93 match decision {
94 PolicyDecision::Allow => LimitDecision::Allow,
95 PolicyDecision::Suppress => {
96 state.counter.record_suppression(now);
98 LimitDecision::Suppress
99 }
100 }
101 })
102 }));
103
104 let decision = match result {
105 Ok(decision) => {
106 self.circuit_breaker.record_success();
108 decision
109 }
110 Err(_) => {
111 self.circuit_breaker.record_failure();
113 LimitDecision::Allow
114 }
115 };
116
117 match decision {
119 LimitDecision::Allow => self.metrics.record_allowed(),
120 LimitDecision::Suppress => self.metrics.record_suppressed(),
121 }
122
123 decision
124 }
125
126 #[cfg(feature = "human-readable")]
142 pub fn check_event_with_metadata(
143 &self,
144 signature: EventSignature,
145 metadata: EventMetadata,
146 ) -> LimitDecision {
147 if !self.circuit_breaker.allow_request() {
149 self.metrics.record_allowed();
151 return LimitDecision::Allow;
152 }
153
154 let result = panic::catch_unwind(panic::AssertUnwindSafe(|| {
156 self.registry.with_event_state(signature, |state, now| {
157 state.set_metadata(metadata);
159
160 let decision = state.policy.register_event(now);
162
163 match decision {
164 PolicyDecision::Allow => LimitDecision::Allow,
165 PolicyDecision::Suppress => {
166 state.counter.record_suppression(now);
168 LimitDecision::Suppress
169 }
170 }
171 })
172 }));
173
174 let decision = match result {
175 Ok(decision) => {
176 self.circuit_breaker.record_success();
178 decision
179 }
180 Err(_) => {
181 self.circuit_breaker.record_failure();
183 LimitDecision::Allow
184 }
185 };
186
187 match decision {
189 LimitDecision::Allow => self.metrics.record_allowed(),
190 LimitDecision::Suppress => self.metrics.record_suppressed(),
191 }
192
193 decision
194 }
195
196 pub fn registry(&self) -> &SuppressionRegistry<S> {
198 &self.registry
199 }
200
201 pub fn metrics(&self) -> &Metrics {
203 &self.metrics
204 }
205
206 pub fn circuit_breaker(&self) -> &Arc<CircuitBreaker> {
208 &self.circuit_breaker
209 }
210}
211
212#[cfg(test)]
213mod tests {
214 use super::*;
215 use crate::application::circuit_breaker::CircuitBreakerConfig;
216 use crate::domain::policy::Policy;
217 use crate::infrastructure::clock::SystemClock;
218 use crate::infrastructure::mocks::MockClock;
219 use crate::infrastructure::storage::ShardedStorage;
220 use std::sync::Arc;
221 use std::time::Instant;
222
223 #[test]
224 fn test_rate_limiter_basic() {
225 let storage = Arc::new(ShardedStorage::new());
226 let clock = Arc::new(SystemClock::new());
227 let policy = Policy::count_based(2).unwrap();
228 let registry = SuppressionRegistry::new(storage, clock, policy);
229 let limiter = RateLimiter::new(registry, Metrics::new(), Arc::new(CircuitBreaker::new()));
230
231 let sig = EventSignature::simple("INFO", "Test message");
232
233 assert_eq!(limiter.check_event(sig), LimitDecision::Allow);
235 assert_eq!(limiter.check_event(sig), LimitDecision::Allow);
236
237 assert_eq!(limiter.check_event(sig), LimitDecision::Suppress);
239 assert_eq!(limiter.check_event(sig), LimitDecision::Suppress);
240 }
241
242 #[test]
243 fn test_rate_limiter_with_mock_clock() {
244 use std::time::Duration;
245
246 let storage = Arc::new(ShardedStorage::new());
247 let mock_clock = Arc::new(MockClock::new(Instant::now()));
248 let policy = Policy::time_window(2, Duration::from_secs(60)).unwrap();
249 let registry = SuppressionRegistry::new(storage, mock_clock.clone(), policy);
250 let limiter = RateLimiter::new(registry, Metrics::new(), Arc::new(CircuitBreaker::new()));
251
252 let sig = EventSignature::simple("INFO", "Test");
253
254 assert_eq!(limiter.check_event(sig), LimitDecision::Allow);
256 assert_eq!(limiter.check_event(sig), LimitDecision::Allow);
257
258 assert_eq!(limiter.check_event(sig), LimitDecision::Suppress);
260
261 mock_clock.advance(Duration::from_secs(61));
263
264 assert_eq!(limiter.check_event(sig), LimitDecision::Allow);
266 }
267
268 #[test]
269 fn test_rate_limiter_different_signatures() {
270 let storage = Arc::new(ShardedStorage::new());
271 let clock = Arc::new(SystemClock::new());
272 let policy = Policy::count_based(1).unwrap();
273 let registry = SuppressionRegistry::new(storage, clock, policy);
274 let limiter = RateLimiter::new(registry, Metrics::new(), Arc::new(CircuitBreaker::new()));
275
276 let sig1 = EventSignature::simple("INFO", "Message 1");
277 let sig2 = EventSignature::simple("INFO", "Message 2");
278
279 assert_eq!(limiter.check_event(sig1), LimitDecision::Allow);
281 assert_eq!(limiter.check_event(sig2), LimitDecision::Allow);
282
283 assert_eq!(limiter.check_event(sig1), LimitDecision::Suppress);
284 assert_eq!(limiter.check_event(sig2), LimitDecision::Suppress);
285 }
286
287 #[test]
288 fn test_rate_limiter_suppression_counting() {
289 let storage = Arc::new(ShardedStorage::new());
290 let clock = Arc::new(SystemClock::new());
291 let policy = Policy::count_based(1).unwrap();
292 let registry = SuppressionRegistry::new(storage, clock, policy);
293 let limiter = RateLimiter::new(
294 registry.clone(),
295 Metrics::new(),
296 Arc::new(CircuitBreaker::new()),
297 );
298
299 let sig = EventSignature::simple("INFO", "Test");
300
301 assert_eq!(limiter.check_event(sig), LimitDecision::Allow);
303
304 assert_eq!(limiter.check_event(sig), LimitDecision::Suppress);
306 assert_eq!(limiter.check_event(sig), LimitDecision::Suppress);
307 assert_eq!(limiter.check_event(sig), LimitDecision::Suppress);
308
309 registry.with_event_state(sig, |state, _now| {
311 assert_eq!(state.counter.count(), 4);
312 });
313 }
314
315 #[test]
316 fn test_concurrent_rate_limiting() {
317 use std::thread;
318
319 let storage = Arc::new(ShardedStorage::new());
320 let clock = Arc::new(SystemClock::new());
321 let policy = Policy::count_based(50).unwrap();
322 let registry = SuppressionRegistry::new(storage, clock, policy);
323 let limiter = Arc::new(RateLimiter::new(
324 registry,
325 Metrics::new(),
326 Arc::new(CircuitBreaker::new()),
327 ));
328
329 let sig = EventSignature::simple("INFO", "Concurrent test");
330 let mut handles = vec![];
331
332 for _ in 0..10 {
333 let limiter_clone = Arc::clone(&limiter);
334 let handle = thread::spawn(move || {
335 let mut allowed = 0;
336 let mut suppressed = 0;
337
338 for _ in 0..20 {
339 match limiter_clone.check_event(sig) {
340 LimitDecision::Allow => allowed += 1,
341 LimitDecision::Suppress => suppressed += 1,
342 }
343 }
344
345 (allowed, suppressed)
346 });
347 handles.push(handle);
348 }
349
350 let mut total_allowed = 0;
351 let mut total_suppressed = 0;
352
353 for handle in handles {
354 let (allowed, suppressed) = handle.join().unwrap();
355 total_allowed += allowed;
356 total_suppressed += suppressed;
357 }
358
359 assert_eq!(total_allowed + total_suppressed, 200);
361
362 assert!(total_allowed <= 50);
364
365 assert!(total_suppressed >= 150);
367 }
368
369 #[test]
370 fn test_fail_open_when_circuit_breaker_open() {
371 let storage = Arc::new(ShardedStorage::new());
372 let clock = Arc::new(SystemClock::new());
373 let policy = Policy::count_based(1).unwrap(); let registry = SuppressionRegistry::new(storage, clock, policy);
375 let cb = Arc::new(CircuitBreaker::new());
376 let limiter = RateLimiter::new(registry, Metrics::new(), cb.clone());
377
378 let sig = EventSignature::simple("ERROR", "Critical failure");
379
380 assert_eq!(limiter.check_event(sig), LimitDecision::Allow);
382
383 for _ in 0..10 {
385 cb.record_failure();
386 }
387 assert!(!cb.allow_request(), "Circuit breaker should be open");
388
389 let decision = limiter.check_event(sig);
392 assert_eq!(
393 decision,
394 LimitDecision::Allow,
395 "Should fail open when circuit breaker is open"
396 );
397
398 assert_eq!(limiter.metrics().events_allowed(), 2);
400 }
401
402 #[test]
403 fn test_fail_open_updates_metrics() {
404 let storage = Arc::new(ShardedStorage::new());
405 let clock = Arc::new(SystemClock::new());
406 let policy = Policy::count_based(1).unwrap();
407 let registry = SuppressionRegistry::new(storage, clock, policy);
408 let cb = Arc::new(CircuitBreaker::new());
409 let limiter = RateLimiter::new(registry, Metrics::new(), cb.clone());
410
411 let sig = EventSignature::simple("ERROR", "Test");
412
413 for _ in 0..10 {
415 cb.record_failure();
416 }
417
418 for _ in 0..5 {
420 limiter.check_event(sig);
421 }
422
423 assert_eq!(limiter.metrics().events_allowed(), 5);
425 assert_eq!(limiter.metrics().events_suppressed(), 0);
426 }
427
428 #[test]
429 fn test_circuit_breaker_half_open_allows_some_requests() {
430 use std::time::Duration;
431
432 let storage = Arc::new(ShardedStorage::new());
433 let clock = Arc::new(SystemClock::new());
434 let policy = Policy::count_based(1).unwrap();
435 let registry = SuppressionRegistry::new(storage, clock, policy);
436 let cb = Arc::new(CircuitBreaker::with_config(CircuitBreakerConfig {
437 failure_threshold: 5,
438 recovery_timeout: Duration::from_millis(10),
439 }));
440 let limiter = RateLimiter::new(registry, Metrics::new(), cb.clone());
441
442 let sig = EventSignature::simple("ERROR", "Test");
443
444 for _ in 0..10 {
446 cb.record_failure();
447 }
448
449 std::thread::sleep(Duration::from_millis(20));
451
452 let decision = limiter.check_event(sig);
455 assert_eq!(decision, LimitDecision::Allow);
456
457 assert_eq!(cb.consecutive_failures(), 0);
459 }
460
461 #[test]
462 fn test_normal_operation_after_circuit_breaker_closes() {
463 use std::time::Duration;
464
465 let storage = Arc::new(ShardedStorage::new());
466 let clock = Arc::new(SystemClock::new());
467 let policy = Policy::count_based(2).unwrap();
468 let registry = SuppressionRegistry::new(storage, clock, policy);
469 let cb = Arc::new(CircuitBreaker::with_config(CircuitBreakerConfig {
470 failure_threshold: 5,
471 recovery_timeout: Duration::from_millis(10),
472 }));
473 let limiter = RateLimiter::new(registry, Metrics::new(), cb.clone());
474
475 let sig = EventSignature::simple("INFO", "Test");
476
477 for _ in 0..10 {
479 cb.record_failure();
480 }
481
482 std::thread::sleep(Duration::from_millis(20));
484
485 assert_eq!(limiter.check_event(sig), LimitDecision::Allow);
487
488 assert_eq!(cb.consecutive_failures(), 0);
490
491 assert_eq!(limiter.check_event(sig), LimitDecision::Allow);
493 assert_eq!(limiter.check_event(sig), LimitDecision::Suppress);
494 }
495
496 #[test]
497 fn test_successful_operations_record_success_to_circuit_breaker() {
498 let storage = Arc::new(ShardedStorage::new());
499 let clock = Arc::new(SystemClock::new());
500 let policy = Policy::count_based(10).unwrap();
501 let registry = SuppressionRegistry::new(storage, clock, policy);
502 let cb = Arc::new(CircuitBreaker::new());
503 let limiter = RateLimiter::new(registry, Metrics::new(), cb.clone());
504
505 let sig = EventSignature::simple("INFO", "Test");
506
507 for _ in 0..5 {
509 limiter.check_event(sig);
510 }
511
512 assert_eq!(cb.consecutive_failures(), 0);
514 }
515
516 #[test]
517 fn test_concurrent_fail_open_behavior() {
518 use std::thread;
519
520 let storage = Arc::new(ShardedStorage::new());
521 let clock = Arc::new(SystemClock::new());
522 let policy = Policy::count_based(5).unwrap();
523 let registry = SuppressionRegistry::new(storage, clock, policy);
524 let cb = Arc::new(CircuitBreaker::new());
525 let limiter = Arc::new(RateLimiter::new(registry, Metrics::new(), cb.clone()));
526
527 for _ in 0..10 {
529 cb.record_failure();
530 }
531
532 let sig = EventSignature::simple("ERROR", "Concurrent fail-open test");
533 let mut handles = vec![];
534
535 for _ in 0..5 {
537 let limiter_clone = Arc::clone(&limiter);
538 let handle = thread::spawn(move || {
539 let mut all_allowed = true;
540 for _ in 0..10 {
541 if limiter_clone.check_event(sig) != LimitDecision::Allow {
542 all_allowed = false;
543 }
544 }
545 all_allowed
546 });
547 handles.push(handle);
548 }
549
550 for handle in handles {
552 assert!(
553 handle.join().unwrap(),
554 "All events should be allowed when circuit is open"
555 );
556 }
557
558 assert_eq!(limiter.metrics().events_allowed(), 50);
561 assert_eq!(limiter.metrics().events_suppressed(), 0);
562 }
563
564 #[test]
565 fn test_metrics_consistency_during_fail_open() {
566 let storage = Arc::new(ShardedStorage::new());
567 let clock = Arc::new(SystemClock::new());
568 let policy = Policy::count_based(2).unwrap();
569 let registry = SuppressionRegistry::new(storage, clock, policy);
570 let cb = Arc::new(CircuitBreaker::new());
571 let limiter = RateLimiter::new(registry, Metrics::new(), cb.clone());
572
573 let sig = EventSignature::simple("INFO", "Test");
574
575 assert_eq!(limiter.check_event(sig), LimitDecision::Allow); assert_eq!(limiter.check_event(sig), LimitDecision::Allow); assert_eq!(limiter.check_event(sig), LimitDecision::Suppress); for _ in 0..10 {
582 cb.record_failure();
583 }
584
585 assert_eq!(limiter.check_event(sig), LimitDecision::Allow); let snapshot = limiter.metrics().snapshot();
590 assert_eq!(snapshot.events_allowed, 3);
591 assert_eq!(snapshot.events_suppressed, 1);
592 assert_eq!(snapshot.total_events(), 4);
593 }
594
595 #[test]
596 fn test_registry_state_unaffected_by_circuit_breaker() {
597 let storage = Arc::new(ShardedStorage::new());
598 let clock = Arc::new(SystemClock::new());
599 let policy = Policy::count_based(1).unwrap();
600 let registry = SuppressionRegistry::new(storage, clock, policy);
601 let cb = Arc::new(CircuitBreaker::new());
602 let limiter = RateLimiter::new(registry.clone(), Metrics::new(), cb.clone());
603
604 let sig = EventSignature::simple("INFO", "Test");
605
606 assert_eq!(limiter.check_event(sig), LimitDecision::Allow);
608
609 let initial_count = registry.with_event_state(sig, |state, _| state.counter.count());
611 assert_eq!(initial_count, 1);
612
613 for _ in 0..10 {
615 cb.record_failure();
616 }
617
618 for _ in 0..5 {
620 limiter.check_event(sig);
621 }
622
623 let final_count = registry.with_event_state(sig, |state, _| state.counter.count());
626 assert_eq!(
627 final_count, initial_count,
628 "Registry state should not change during fail-open"
629 );
630 }
631}