tracing_throttle/application/
limiter.rs1use crate::application::circuit_breaker::CircuitBreaker;
7use crate::application::metrics::Metrics;
8use crate::application::ports::Storage;
9use crate::application::registry::SuppressionRegistry;
10use crate::domain::{
11 policy::{PolicyDecision, RateLimitPolicy},
12 signature::EventSignature,
13};
14use std::panic;
15use std::sync::Arc;
16
17#[derive(Debug, Clone, Copy, PartialEq, Eq)]
19pub enum LimitDecision {
20 Allow,
22 Suppress,
24}
25
26#[derive(Clone)]
28pub struct RateLimiter<S>
29where
30 S: Storage<EventSignature, crate::application::registry::EventState> + Clone,
31{
32 registry: SuppressionRegistry<S>,
33 metrics: Metrics,
34 circuit_breaker: Arc<CircuitBreaker>,
35}
36
37impl<S> RateLimiter<S>
38where
39 S: Storage<EventSignature, crate::application::registry::EventState> + Clone,
40{
41 pub fn new(
48 registry: SuppressionRegistry<S>,
49 metrics: Metrics,
50 circuit_breaker: Arc<CircuitBreaker>,
51 ) -> Self {
52 Self {
53 registry,
54 metrics,
55 circuit_breaker,
56 }
57 }
58
59 pub fn check_event(&self, signature: EventSignature) -> LimitDecision {
77 if !self.circuit_breaker.allow_request() {
79 self.metrics.record_allowed();
81 return LimitDecision::Allow;
82 }
83
84 let result = panic::catch_unwind(panic::AssertUnwindSafe(|| {
86 self.registry.with_event_state(signature, |state, now| {
87 let decision = state.policy.register_event(now);
89
90 match decision {
91 PolicyDecision::Allow => LimitDecision::Allow,
92 PolicyDecision::Suppress => {
93 state.counter.record_suppression(now);
95 LimitDecision::Suppress
96 }
97 }
98 })
99 }));
100
101 let decision = match result {
102 Ok(decision) => {
103 self.circuit_breaker.record_success();
105 decision
106 }
107 Err(_) => {
108 self.circuit_breaker.record_failure();
110 LimitDecision::Allow
111 }
112 };
113
114 match decision {
116 LimitDecision::Allow => self.metrics.record_allowed(),
117 LimitDecision::Suppress => self.metrics.record_suppressed(),
118 }
119
120 decision
121 }
122
123 pub fn registry(&self) -> &SuppressionRegistry<S> {
125 &self.registry
126 }
127
128 pub fn metrics(&self) -> &Metrics {
130 &self.metrics
131 }
132
133 pub fn circuit_breaker(&self) -> &Arc<CircuitBreaker> {
135 &self.circuit_breaker
136 }
137}
138
139#[cfg(test)]
140mod tests {
141 use super::*;
142 use crate::domain::policy::Policy;
143 use crate::infrastructure::clock::SystemClock;
144 use crate::infrastructure::mocks::MockClock;
145 use crate::infrastructure::storage::ShardedStorage;
146 use std::sync::Arc;
147 use std::time::Instant;
148
149 #[test]
150 fn test_rate_limiter_basic() {
151 let storage = Arc::new(ShardedStorage::new());
152 let clock = Arc::new(SystemClock::new());
153 let policy = Policy::count_based(2).unwrap();
154 let registry = SuppressionRegistry::new(storage, clock, policy);
155 let limiter = RateLimiter::new(registry, Metrics::new(), Arc::new(CircuitBreaker::new()));
156
157 let sig = EventSignature::simple("INFO", "Test message");
158
159 assert_eq!(limiter.check_event(sig), LimitDecision::Allow);
161 assert_eq!(limiter.check_event(sig), LimitDecision::Allow);
162
163 assert_eq!(limiter.check_event(sig), LimitDecision::Suppress);
165 assert_eq!(limiter.check_event(sig), LimitDecision::Suppress);
166 }
167
168 #[test]
169 fn test_rate_limiter_with_mock_clock() {
170 use std::time::Duration;
171
172 let storage = Arc::new(ShardedStorage::new());
173 let mock_clock = Arc::new(MockClock::new(Instant::now()));
174 let policy = Policy::time_window(2, Duration::from_secs(60)).unwrap();
175 let registry = SuppressionRegistry::new(storage, mock_clock.clone(), policy);
176 let limiter = RateLimiter::new(registry, Metrics::new(), Arc::new(CircuitBreaker::new()));
177
178 let sig = EventSignature::simple("INFO", "Test");
179
180 assert_eq!(limiter.check_event(sig), LimitDecision::Allow);
182 assert_eq!(limiter.check_event(sig), LimitDecision::Allow);
183
184 assert_eq!(limiter.check_event(sig), LimitDecision::Suppress);
186
187 mock_clock.advance(Duration::from_secs(61));
189
190 assert_eq!(limiter.check_event(sig), LimitDecision::Allow);
192 }
193
194 #[test]
195 fn test_rate_limiter_different_signatures() {
196 let storage = Arc::new(ShardedStorage::new());
197 let clock = Arc::new(SystemClock::new());
198 let policy = Policy::count_based(1).unwrap();
199 let registry = SuppressionRegistry::new(storage, clock, policy);
200 let limiter = RateLimiter::new(registry, Metrics::new(), Arc::new(CircuitBreaker::new()));
201
202 let sig1 = EventSignature::simple("INFO", "Message 1");
203 let sig2 = EventSignature::simple("INFO", "Message 2");
204
205 assert_eq!(limiter.check_event(sig1), LimitDecision::Allow);
207 assert_eq!(limiter.check_event(sig2), LimitDecision::Allow);
208
209 assert_eq!(limiter.check_event(sig1), LimitDecision::Suppress);
210 assert_eq!(limiter.check_event(sig2), LimitDecision::Suppress);
211 }
212
213 #[test]
214 fn test_rate_limiter_suppression_counting() {
215 let storage = Arc::new(ShardedStorage::new());
216 let clock = Arc::new(SystemClock::new());
217 let policy = Policy::count_based(1).unwrap();
218 let registry = SuppressionRegistry::new(storage, clock, policy);
219 let limiter = RateLimiter::new(
220 registry.clone(),
221 Metrics::new(),
222 Arc::new(CircuitBreaker::new()),
223 );
224
225 let sig = EventSignature::simple("INFO", "Test");
226
227 assert_eq!(limiter.check_event(sig), LimitDecision::Allow);
229
230 assert_eq!(limiter.check_event(sig), LimitDecision::Suppress);
232 assert_eq!(limiter.check_event(sig), LimitDecision::Suppress);
233 assert_eq!(limiter.check_event(sig), LimitDecision::Suppress);
234
235 registry.with_event_state(sig, |state, _now| {
237 assert_eq!(state.counter.count(), 4);
238 });
239 }
240
241 #[test]
242 fn test_concurrent_rate_limiting() {
243 use std::thread;
244
245 let storage = Arc::new(ShardedStorage::new());
246 let clock = Arc::new(SystemClock::new());
247 let policy = Policy::count_based(50).unwrap();
248 let registry = SuppressionRegistry::new(storage, clock, policy);
249 let limiter = Arc::new(RateLimiter::new(
250 registry,
251 Metrics::new(),
252 Arc::new(CircuitBreaker::new()),
253 ));
254
255 let sig = EventSignature::simple("INFO", "Concurrent test");
256 let mut handles = vec![];
257
258 for _ in 0..10 {
259 let limiter_clone = Arc::clone(&limiter);
260 let handle = thread::spawn(move || {
261 let mut allowed = 0;
262 let mut suppressed = 0;
263
264 for _ in 0..20 {
265 match limiter_clone.check_event(sig) {
266 LimitDecision::Allow => allowed += 1,
267 LimitDecision::Suppress => suppressed += 1,
268 }
269 }
270
271 (allowed, suppressed)
272 });
273 handles.push(handle);
274 }
275
276 let mut total_allowed = 0;
277 let mut total_suppressed = 0;
278
279 for handle in handles {
280 let (allowed, suppressed) = handle.join().unwrap();
281 total_allowed += allowed;
282 total_suppressed += suppressed;
283 }
284
285 assert_eq!(total_allowed + total_suppressed, 200);
287
288 assert!(total_allowed <= 50);
290
291 assert!(total_suppressed >= 150);
293 }
294}