use crate::application::circuit_breaker::CircuitBreaker;
use crate::application::metrics::Metrics;
use crate::application::ports::Storage;
use crate::application::registry::SuppressionRegistry;
use crate::domain::{
policy::{PolicyDecision, RateLimitPolicy},
signature::EventSignature,
};
#[cfg(feature = "human-readable")]
use crate::domain::metadata::EventMetadata;
use std::panic;
use std::sync::Arc;
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum LimitDecision {
Allow,
Suppress,
}
#[derive(Clone)]
pub struct RateLimiter<S>
where
S: Storage<EventSignature, crate::application::registry::EventState> + Clone,
{
registry: SuppressionRegistry<S>,
metrics: Metrics,
circuit_breaker: Arc<CircuitBreaker>,
}
impl<S> RateLimiter<S>
where
S: Storage<EventSignature, crate::application::registry::EventState> + Clone,
{
pub fn new(
registry: SuppressionRegistry<S>,
metrics: Metrics,
circuit_breaker: Arc<CircuitBreaker>,
) -> Self {
Self {
registry,
metrics,
circuit_breaker,
}
}
pub fn check_event(&self, signature: EventSignature) -> LimitDecision {
if !self.circuit_breaker.allow_request() {
self.metrics.record_allowed();
return LimitDecision::Allow;
}
let result = panic::catch_unwind(panic::AssertUnwindSafe(|| {
self.registry.with_event_state(signature, |state, now| {
let decision = state.policy.register_event(now);
match decision {
PolicyDecision::Allow => LimitDecision::Allow,
PolicyDecision::Suppress => {
state.counter.record_suppression(now);
LimitDecision::Suppress
}
}
})
}));
let decision = match result {
Ok(decision) => {
self.circuit_breaker.record_success();
decision
}
Err(_) => {
self.circuit_breaker.record_failure();
LimitDecision::Allow
}
};
match decision {
LimitDecision::Allow => self.metrics.record_allowed(),
LimitDecision::Suppress => self.metrics.record_suppressed(),
}
decision
}
#[cfg(feature = "human-readable")]
pub fn check_event_with_metadata(
&self,
signature: EventSignature,
metadata: EventMetadata,
) -> LimitDecision {
if !self.circuit_breaker.allow_request() {
self.metrics.record_allowed();
return LimitDecision::Allow;
}
let result = panic::catch_unwind(panic::AssertUnwindSafe(|| {
self.registry.with_event_state(signature, |state, now| {
state.set_metadata(metadata);
let decision = state.policy.register_event(now);
match decision {
PolicyDecision::Allow => LimitDecision::Allow,
PolicyDecision::Suppress => {
state.counter.record_suppression(now);
LimitDecision::Suppress
}
}
})
}));
let decision = match result {
Ok(decision) => {
self.circuit_breaker.record_success();
decision
}
Err(_) => {
self.circuit_breaker.record_failure();
LimitDecision::Allow
}
};
match decision {
LimitDecision::Allow => self.metrics.record_allowed(),
LimitDecision::Suppress => self.metrics.record_suppressed(),
}
decision
}
pub fn registry(&self) -> &SuppressionRegistry<S> {
&self.registry
}
pub fn metrics(&self) -> &Metrics {
&self.metrics
}
pub fn circuit_breaker(&self) -> &Arc<CircuitBreaker> {
&self.circuit_breaker
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::application::circuit_breaker::CircuitBreakerConfig;
use crate::domain::policy::Policy;
use crate::infrastructure::clock::SystemClock;
use crate::infrastructure::mocks::MockClock;
use crate::infrastructure::storage::ShardedStorage;
use std::sync::Arc;
use std::time::Instant;
#[test]
fn test_rate_limiter_basic() {
let storage = Arc::new(ShardedStorage::new());
let clock = Arc::new(SystemClock::new());
let policy = Policy::count_based(2).unwrap();
let registry = SuppressionRegistry::new(storage, clock, policy);
let limiter = RateLimiter::new(registry, Metrics::new(), Arc::new(CircuitBreaker::new()));
let sig = EventSignature::simple("INFO", "Test message");
assert_eq!(limiter.check_event(sig), LimitDecision::Allow);
assert_eq!(limiter.check_event(sig), LimitDecision::Allow);
assert_eq!(limiter.check_event(sig), LimitDecision::Suppress);
assert_eq!(limiter.check_event(sig), LimitDecision::Suppress);
}
#[test]
fn test_rate_limiter_with_mock_clock() {
use std::time::Duration;
let storage = Arc::new(ShardedStorage::new());
let mock_clock = Arc::new(MockClock::new(Instant::now()));
let policy = Policy::time_window(2, Duration::from_secs(60)).unwrap();
let registry = SuppressionRegistry::new(storage, mock_clock.clone(), policy);
let limiter = RateLimiter::new(registry, Metrics::new(), Arc::new(CircuitBreaker::new()));
let sig = EventSignature::simple("INFO", "Test");
assert_eq!(limiter.check_event(sig), LimitDecision::Allow);
assert_eq!(limiter.check_event(sig), LimitDecision::Allow);
assert_eq!(limiter.check_event(sig), LimitDecision::Suppress);
mock_clock.advance(Duration::from_secs(61));
assert_eq!(limiter.check_event(sig), LimitDecision::Allow);
}
#[test]
fn test_rate_limiter_different_signatures() {
let storage = Arc::new(ShardedStorage::new());
let clock = Arc::new(SystemClock::new());
let policy = Policy::count_based(1).unwrap();
let registry = SuppressionRegistry::new(storage, clock, policy);
let limiter = RateLimiter::new(registry, Metrics::new(), Arc::new(CircuitBreaker::new()));
let sig1 = EventSignature::simple("INFO", "Message 1");
let sig2 = EventSignature::simple("INFO", "Message 2");
assert_eq!(limiter.check_event(sig1), LimitDecision::Allow);
assert_eq!(limiter.check_event(sig2), LimitDecision::Allow);
assert_eq!(limiter.check_event(sig1), LimitDecision::Suppress);
assert_eq!(limiter.check_event(sig2), LimitDecision::Suppress);
}
#[test]
fn test_rate_limiter_suppression_counting() {
let storage = Arc::new(ShardedStorage::new());
let clock = Arc::new(SystemClock::new());
let policy = Policy::count_based(1).unwrap();
let registry = SuppressionRegistry::new(storage, clock, policy);
let limiter = RateLimiter::new(
registry.clone(),
Metrics::new(),
Arc::new(CircuitBreaker::new()),
);
let sig = EventSignature::simple("INFO", "Test");
assert_eq!(limiter.check_event(sig), LimitDecision::Allow);
assert_eq!(limiter.check_event(sig), LimitDecision::Suppress);
assert_eq!(limiter.check_event(sig), LimitDecision::Suppress);
assert_eq!(limiter.check_event(sig), LimitDecision::Suppress);
registry.with_event_state(sig, |state, _now| {
assert_eq!(state.counter.count(), 3);
});
}
#[test]
fn test_concurrent_rate_limiting() {
use std::thread;
let storage = Arc::new(ShardedStorage::new());
let clock = Arc::new(SystemClock::new());
let policy = Policy::count_based(50).unwrap();
let registry = SuppressionRegistry::new(storage, clock, policy);
let limiter = Arc::new(RateLimiter::new(
registry,
Metrics::new(),
Arc::new(CircuitBreaker::new()),
));
let sig = EventSignature::simple("INFO", "Concurrent test");
let mut handles = vec![];
for _ in 0..10 {
let limiter_clone = Arc::clone(&limiter);
let handle = thread::spawn(move || {
let mut allowed = 0;
let mut suppressed = 0;
for _ in 0..20 {
match limiter_clone.check_event(sig) {
LimitDecision::Allow => allowed += 1,
LimitDecision::Suppress => suppressed += 1,
}
}
(allowed, suppressed)
});
handles.push(handle);
}
let mut total_allowed = 0;
let mut total_suppressed = 0;
for handle in handles {
let (allowed, suppressed) = handle.join().unwrap();
total_allowed += allowed;
total_suppressed += suppressed;
}
assert_eq!(total_allowed + total_suppressed, 200);
assert!(total_allowed <= 50);
assert!(total_suppressed >= 150);
}
#[test]
fn test_fail_open_when_circuit_breaker_open() {
let storage = Arc::new(ShardedStorage::new());
let clock = Arc::new(SystemClock::new());
let policy = Policy::count_based(1).unwrap(); let registry = SuppressionRegistry::new(storage, clock, policy);
let cb = Arc::new(CircuitBreaker::new());
let limiter = RateLimiter::new(registry, Metrics::new(), cb.clone());
let sig = EventSignature::simple("ERROR", "Critical failure");
assert_eq!(limiter.check_event(sig), LimitDecision::Allow);
for _ in 0..10 {
cb.record_failure();
}
assert!(!cb.allow_request(), "Circuit breaker should be open");
let decision = limiter.check_event(sig);
assert_eq!(
decision,
LimitDecision::Allow,
"Should fail open when circuit breaker is open"
);
assert_eq!(limiter.metrics().events_allowed(), 2);
}
#[test]
fn test_fail_open_updates_metrics() {
let storage = Arc::new(ShardedStorage::new());
let clock = Arc::new(SystemClock::new());
let policy = Policy::count_based(1).unwrap();
let registry = SuppressionRegistry::new(storage, clock, policy);
let cb = Arc::new(CircuitBreaker::new());
let limiter = RateLimiter::new(registry, Metrics::new(), cb.clone());
let sig = EventSignature::simple("ERROR", "Test");
for _ in 0..10 {
cb.record_failure();
}
for _ in 0..5 {
limiter.check_event(sig);
}
assert_eq!(limiter.metrics().events_allowed(), 5);
assert_eq!(limiter.metrics().events_suppressed(), 0);
}
#[test]
fn test_circuit_breaker_half_open_allows_some_requests() {
use std::time::Duration;
let storage = Arc::new(ShardedStorage::new());
let clock = Arc::new(SystemClock::new());
let policy = Policy::count_based(1).unwrap();
let registry = SuppressionRegistry::new(storage, clock, policy);
let cb = Arc::new(CircuitBreaker::with_config(CircuitBreakerConfig {
failure_threshold: 5,
recovery_timeout: Duration::from_millis(10),
}));
let limiter = RateLimiter::new(registry, Metrics::new(), cb.clone());
let sig = EventSignature::simple("ERROR", "Test");
for _ in 0..10 {
cb.record_failure();
}
std::thread::sleep(Duration::from_millis(20));
let decision = limiter.check_event(sig);
assert_eq!(decision, LimitDecision::Allow);
assert_eq!(cb.consecutive_failures(), 0);
}
#[test]
fn test_normal_operation_after_circuit_breaker_closes() {
use std::time::Duration;
let storage = Arc::new(ShardedStorage::new());
let clock = Arc::new(SystemClock::new());
let policy = Policy::count_based(2).unwrap();
let registry = SuppressionRegistry::new(storage, clock, policy);
let cb = Arc::new(CircuitBreaker::with_config(CircuitBreakerConfig {
failure_threshold: 5,
recovery_timeout: Duration::from_millis(10),
}));
let limiter = RateLimiter::new(registry, Metrics::new(), cb.clone());
let sig = EventSignature::simple("INFO", "Test");
for _ in 0..10 {
cb.record_failure();
}
std::thread::sleep(Duration::from_millis(20));
assert_eq!(limiter.check_event(sig), LimitDecision::Allow);
assert_eq!(cb.consecutive_failures(), 0);
assert_eq!(limiter.check_event(sig), LimitDecision::Allow);
assert_eq!(limiter.check_event(sig), LimitDecision::Suppress);
}
#[test]
fn test_successful_operations_record_success_to_circuit_breaker() {
let storage = Arc::new(ShardedStorage::new());
let clock = Arc::new(SystemClock::new());
let policy = Policy::count_based(10).unwrap();
let registry = SuppressionRegistry::new(storage, clock, policy);
let cb = Arc::new(CircuitBreaker::new());
let limiter = RateLimiter::new(registry, Metrics::new(), cb.clone());
let sig = EventSignature::simple("INFO", "Test");
for _ in 0..5 {
limiter.check_event(sig);
}
assert_eq!(cb.consecutive_failures(), 0);
}
#[test]
fn test_concurrent_fail_open_behavior() {
use std::thread;
let storage = Arc::new(ShardedStorage::new());
let clock = Arc::new(SystemClock::new());
let policy = Policy::count_based(5).unwrap();
let registry = SuppressionRegistry::new(storage, clock, policy);
let cb = Arc::new(CircuitBreaker::new());
let limiter = Arc::new(RateLimiter::new(registry, Metrics::new(), cb.clone()));
for _ in 0..10 {
cb.record_failure();
}
let sig = EventSignature::simple("ERROR", "Concurrent fail-open test");
let mut handles = vec![];
for _ in 0..5 {
let limiter_clone = Arc::clone(&limiter);
let handle = thread::spawn(move || {
let mut all_allowed = true;
for _ in 0..10 {
if limiter_clone.check_event(sig) != LimitDecision::Allow {
all_allowed = false;
}
}
all_allowed
});
handles.push(handle);
}
for handle in handles {
assert!(
handle.join().unwrap(),
"All events should be allowed when circuit is open"
);
}
assert_eq!(limiter.metrics().events_allowed(), 50);
assert_eq!(limiter.metrics().events_suppressed(), 0);
}
#[test]
fn test_metrics_consistency_during_fail_open() {
let storage = Arc::new(ShardedStorage::new());
let clock = Arc::new(SystemClock::new());
let policy = Policy::count_based(2).unwrap();
let registry = SuppressionRegistry::new(storage, clock, policy);
let cb = Arc::new(CircuitBreaker::new());
let limiter = RateLimiter::new(registry, Metrics::new(), cb.clone());
let sig = EventSignature::simple("INFO", "Test");
assert_eq!(limiter.check_event(sig), LimitDecision::Allow); assert_eq!(limiter.check_event(sig), LimitDecision::Allow); assert_eq!(limiter.check_event(sig), LimitDecision::Suppress);
for _ in 0..10 {
cb.record_failure();
}
assert_eq!(limiter.check_event(sig), LimitDecision::Allow);
let snapshot = limiter.metrics().snapshot();
assert_eq!(snapshot.events_allowed, 3);
assert_eq!(snapshot.events_suppressed, 1);
assert_eq!(snapshot.total_events(), 4);
}
#[test]
fn test_registry_state_unaffected_by_circuit_breaker() {
let storage = Arc::new(ShardedStorage::new());
let clock = Arc::new(SystemClock::new());
let policy = Policy::count_based(1).unwrap();
let registry = SuppressionRegistry::new(storage, clock, policy);
let cb = Arc::new(CircuitBreaker::new());
let limiter = RateLimiter::new(registry.clone(), Metrics::new(), cb.clone());
let sig = EventSignature::simple("INFO", "Test");
assert_eq!(limiter.check_event(sig), LimitDecision::Allow);
let initial_count = registry.with_event_state(sig, |state, _| state.counter.count());
assert_eq!(initial_count, 0);
for _ in 0..10 {
cb.record_failure();
}
for _ in 0..5 {
limiter.check_event(sig);
}
let final_count = registry.with_event_state(sig, |state, _| state.counter.count());
assert_eq!(
final_count, initial_count,
"Registry state should not change during fail-open"
);
}
}