tracing_throttle/infrastructure/
layer.rs1use crate::application::{
7 circuit_breaker::CircuitBreaker,
8 emitter::EmitterConfig,
9 limiter::{LimitDecision, RateLimiter},
10 metrics::Metrics,
11 ports::{Clock, Storage},
12 registry::{EventState, SuppressionRegistry},
13};
14use crate::domain::{policy::Policy, signature::EventSignature};
15use crate::infrastructure::clock::SystemClock;
16use crate::infrastructure::storage::ShardedStorage;
17
18use std::collections::BTreeMap;
19use std::sync::Arc;
20use std::time::Duration;
21use tracing::{Metadata, Subscriber};
22use tracing_subscriber::layer::Filter;
23use tracing_subscriber::{layer::Context, Layer};
24
25#[derive(Debug, Clone, PartialEq, Eq)]
27pub enum BuildError {
28 ZeroMaxSignatures,
30 EmitterConfig(crate::application::emitter::EmitterConfigError),
32}
33
34impl std::fmt::Display for BuildError {
35 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
36 match self {
37 BuildError::ZeroMaxSignatures => {
38 write!(f, "max_signatures must be greater than 0")
39 }
40 BuildError::EmitterConfig(e) => {
41 write!(f, "emitter configuration error: {}", e)
42 }
43 }
44 }
45}
46
47impl std::error::Error for BuildError {}
48
49impl From<crate::application::emitter::EmitterConfigError> for BuildError {
50 fn from(e: crate::application::emitter::EmitterConfigError) -> Self {
51 BuildError::EmitterConfig(e)
52 }
53}
54
55#[derive(Debug)]
57pub struct TracingRateLimitLayerBuilder {
58 policy: Policy,
59 summary_interval: Duration,
60 clock: Option<Arc<dyn Clock>>,
61 max_signatures: Option<usize>,
62}
63
64impl TracingRateLimitLayerBuilder {
65 pub fn with_policy(mut self, policy: Policy) -> Self {
67 self.policy = policy;
68 self
69 }
70
71 pub fn with_summary_interval(mut self, interval: Duration) -> Self {
75 self.summary_interval = interval;
76 self
77 }
78
79 pub fn with_clock(mut self, clock: Arc<dyn Clock>) -> Self {
81 self.clock = Some(clock);
82 self
83 }
84
85 pub fn with_max_signatures(mut self, max_signatures: usize) -> Self {
94 self.max_signatures = Some(max_signatures);
95 self
96 }
97
98 pub fn with_unlimited_signatures(mut self) -> Self {
104 self.max_signatures = None;
105 self
106 }
107
108 pub fn build(self) -> Result<TracingRateLimitLayer, BuildError> {
113 if let Some(max) = self.max_signatures {
115 if max == 0 {
116 return Err(BuildError::ZeroMaxSignatures);
117 }
118 }
119
120 let metrics = Metrics::new();
122 let circuit_breaker = Arc::new(CircuitBreaker::new());
123
124 let clock = self.clock.unwrap_or_else(|| Arc::new(SystemClock::new()));
125 let storage = if let Some(max) = self.max_signatures {
126 Arc::new(ShardedStorage::with_max_entries(max).with_metrics(metrics.clone()))
127 } else {
128 Arc::new(ShardedStorage::new().with_metrics(metrics.clone()))
129 };
130 let registry = SuppressionRegistry::new(storage, clock, self.policy);
131 let limiter = RateLimiter::new(registry, metrics.clone(), circuit_breaker);
132
133 let emitter_config = EmitterConfig::new(self.summary_interval)?;
135
136 Ok(TracingRateLimitLayer {
137 limiter,
138 _emitter_config: emitter_config,
139 })
140 }
141}
142
143#[derive(Clone)]
148pub struct TracingRateLimitLayer<S = Arc<ShardedStorage<EventSignature, EventState>>>
149where
150 S: Storage<EventSignature, EventState> + Clone,
151{
152 limiter: RateLimiter<S>,
153 _emitter_config: EmitterConfig,
154}
155
156impl<S> TracingRateLimitLayer<S>
157where
158 S: Storage<EventSignature, EventState> + Clone,
159{
160 fn compute_signature(
162 &self,
163 metadata: &Metadata,
164 _fields: &BTreeMap<String, String>,
165 ) -> EventSignature {
166 let level = metadata.level().as_str();
169 let message = metadata.name();
170 let target = Some(metadata.target());
171
172 let fields = BTreeMap::new();
175
176 EventSignature::new(level, message, &fields, target)
177 }
178
179 pub fn should_allow(&self, signature: EventSignature) -> bool {
181 matches!(self.limiter.check_event(signature), LimitDecision::Allow)
182 }
183
184 pub fn limiter(&self) -> &RateLimiter<S> {
186 &self.limiter
187 }
188
189 pub fn metrics(&self) -> &Metrics {
196 self.limiter.metrics()
197 }
198
199 pub fn signature_count(&self) -> usize {
201 self.limiter.registry().len()
202 }
203
204 pub fn circuit_breaker(&self) -> &Arc<CircuitBreaker> {
210 self.limiter.circuit_breaker()
211 }
212}
213
214impl TracingRateLimitLayer<Arc<ShardedStorage<EventSignature, EventState>>> {
215 pub fn builder() -> TracingRateLimitLayerBuilder {
222 TracingRateLimitLayerBuilder {
223 policy: Policy::count_based(100).expect("default policy with 100 > 0 is always valid"),
225 summary_interval: Duration::from_secs(30),
226 clock: None,
227 max_signatures: Some(10_000),
228 }
229 }
230
231 pub fn new() -> Self {
243 Self::builder()
244 .build()
245 .expect("default configuration is always valid")
246 }
247}
248
249impl Default for TracingRateLimitLayer<Arc<ShardedStorage<EventSignature, EventState>>> {
250 fn default() -> Self {
251 Self::new()
252 }
253}
254
255impl<S, Sub> Filter<Sub> for TracingRateLimitLayer<S>
257where
258 S: Storage<EventSignature, EventState> + Clone,
259 Sub: Subscriber,
260{
261 fn enabled(&self, meta: &Metadata<'_>, _cx: &Context<'_, Sub>) -> bool {
262 let fields = BTreeMap::new();
264 let signature = self.compute_signature(meta, &fields);
265 self.should_allow(signature)
266 }
267}
268
269impl<S, Sub> Layer<Sub> for TracingRateLimitLayer<S>
270where
271 S: Storage<EventSignature, EventState> + Clone + 'static,
272 Sub: Subscriber,
273{
274 }
276
277#[cfg(test)]
278mod tests {
279 use super::*;
280 use tracing::info;
281 use tracing_subscriber::layer::SubscriberExt;
282
283 #[test]
284 fn test_layer_builder() {
285 let layer = TracingRateLimitLayer::builder()
286 .with_policy(Policy::count_based(50).unwrap())
287 .with_summary_interval(Duration::from_secs(60))
288 .build()
289 .unwrap();
290
291 assert!(layer.limiter().registry().is_empty());
292 }
293
294 #[test]
295 fn test_layer_default() {
296 let layer = TracingRateLimitLayer::default();
297 assert!(layer.limiter().registry().is_empty());
298 }
299
300 #[test]
301 fn test_signature_computation() {
302 let _layer = TracingRateLimitLayer::new();
303
304 let sig1 = EventSignature::simple("INFO", "test_event");
306 let sig2 = EventSignature::simple("INFO", "test_event");
307
308 assert_eq!(sig1, sig2);
310 }
311
312 #[test]
313 fn test_basic_rate_limiting() {
314 let layer = TracingRateLimitLayer::builder()
315 .with_policy(Policy::count_based(2).unwrap())
316 .build()
317 .unwrap();
318
319 let sig = EventSignature::simple("INFO", "test_message");
320
321 assert!(layer.should_allow(sig));
323 assert!(layer.should_allow(sig));
324
325 assert!(!layer.should_allow(sig));
327 }
328
329 #[test]
330 fn test_layer_integration() {
331 let layer = TracingRateLimitLayer::builder()
332 .with_policy(Policy::count_based(3).unwrap())
333 .build()
334 .unwrap();
335
336 let layer_for_check = layer.clone();
338
339 let subscriber = tracing_subscriber::registry()
340 .with(tracing_subscriber::fmt::layer().with_filter(layer));
341
342 tracing::subscriber::with_default(subscriber, || {
344 for _ in 0..10 {
346 info!("test event");
347 }
348 });
349
350 assert_eq!(layer_for_check.limiter().registry().len(), 1);
354 }
355
356 #[test]
357 fn test_layer_suppression_logic() {
358 let layer = TracingRateLimitLayer::builder()
359 .with_policy(Policy::count_based(3).unwrap())
360 .build()
361 .unwrap();
362
363 let sig = EventSignature::simple("INFO", "test");
364
365 let mut allowed_count = 0;
367 for _ in 0..10 {
368 if layer.should_allow(sig) {
369 allowed_count += 1;
370 }
371 }
372
373 assert_eq!(allowed_count, 3);
374 }
375
376 #[test]
377 fn test_builder_zero_summary_interval() {
378 let result = TracingRateLimitLayer::builder()
379 .with_summary_interval(Duration::from_secs(0))
380 .build();
381
382 assert!(matches!(
383 result,
384 Err(BuildError::EmitterConfig(
385 crate::application::emitter::EmitterConfigError::ZeroSummaryInterval
386 ))
387 ));
388 }
389
390 #[test]
391 fn test_builder_zero_max_signatures() {
392 let result = TracingRateLimitLayer::builder()
393 .with_max_signatures(0)
394 .build();
395
396 assert!(matches!(result, Err(BuildError::ZeroMaxSignatures)));
397 }
398
399 #[test]
400 fn test_builder_valid_max_signatures() {
401 let layer = TracingRateLimitLayer::builder()
402 .with_max_signatures(100)
403 .build()
404 .unwrap();
405
406 assert!(layer.limiter().registry().is_empty());
407 }
408
409 #[test]
410 fn test_metrics_tracking() {
411 let layer = TracingRateLimitLayer::builder()
412 .with_policy(Policy::count_based(2).unwrap())
413 .build()
414 .unwrap();
415
416 let sig = EventSignature::simple("INFO", "test");
417
418 assert_eq!(layer.metrics().events_allowed(), 0);
420 assert_eq!(layer.metrics().events_suppressed(), 0);
421
422 assert!(layer.should_allow(sig));
424 assert!(layer.should_allow(sig));
425
426 assert_eq!(layer.metrics().events_allowed(), 2);
428 assert_eq!(layer.metrics().events_suppressed(), 0);
429
430 assert!(!layer.should_allow(sig));
432
433 assert_eq!(layer.metrics().events_allowed(), 2);
435 assert_eq!(layer.metrics().events_suppressed(), 1);
436 }
437
438 #[test]
439 fn test_metrics_snapshot() {
440 let layer = TracingRateLimitLayer::builder()
441 .with_policy(Policy::count_based(3).unwrap())
442 .build()
443 .unwrap();
444
445 let sig = EventSignature::simple("INFO", "test");
446
447 for _ in 0..5 {
449 layer.should_allow(sig);
450 }
451
452 let snapshot = layer.metrics().snapshot();
454 assert_eq!(snapshot.events_allowed, 3);
455 assert_eq!(snapshot.events_suppressed, 2);
456 assert_eq!(snapshot.total_events(), 5);
457 assert!((snapshot.suppression_rate() - 0.4).abs() < f64::EPSILON);
458 }
459
460 #[test]
461 fn test_signature_count() {
462 let layer = TracingRateLimitLayer::builder()
463 .with_policy(Policy::count_based(2).unwrap())
464 .build()
465 .unwrap();
466
467 assert_eq!(layer.signature_count(), 0);
468
469 let sig1 = EventSignature::simple("INFO", "test1");
470 let sig2 = EventSignature::simple("INFO", "test2");
471
472 layer.should_allow(sig1);
473 assert_eq!(layer.signature_count(), 1);
474
475 layer.should_allow(sig2);
476 assert_eq!(layer.signature_count(), 2);
477
478 layer.should_allow(sig1);
480 assert_eq!(layer.signature_count(), 2);
481 }
482
483 #[test]
484 fn test_metrics_with_eviction() {
485 let layer = TracingRateLimitLayer::builder()
486 .with_policy(Policy::count_based(1).unwrap())
487 .with_max_signatures(3)
488 .build()
489 .unwrap();
490
491 for i in 0..3 {
493 let sig = EventSignature::simple("INFO", &format!("test{}", i));
494 layer.should_allow(sig);
495 }
496
497 assert_eq!(layer.signature_count(), 3);
498 assert_eq!(layer.metrics().signatures_evicted(), 0);
499
500 let sig = EventSignature::simple("INFO", "test3");
502 layer.should_allow(sig);
503
504 assert_eq!(layer.signature_count(), 3);
505 assert_eq!(layer.metrics().signatures_evicted(), 1);
506 }
507
508 #[test]
509 fn test_circuit_breaker_observability() {
510 use crate::application::circuit_breaker::CircuitState;
511
512 let layer = TracingRateLimitLayer::builder()
513 .with_policy(Policy::count_based(2).unwrap())
514 .build()
515 .unwrap();
516
517 let cb = layer.circuit_breaker();
519 assert_eq!(cb.state(), CircuitState::Closed);
520 assert_eq!(cb.consecutive_failures(), 0);
521
522 let sig = EventSignature::simple("INFO", "test");
524 layer.should_allow(sig);
525 layer.should_allow(sig);
526 layer.should_allow(sig);
527
528 assert_eq!(cb.state(), CircuitState::Closed);
529 }
530
531 #[test]
532 fn test_circuit_breaker_fail_open_integration() {
533 use crate::application::circuit_breaker::{
534 CircuitBreaker, CircuitBreakerConfig, CircuitState,
535 };
536 use std::time::Duration;
537
538 let cb_config = CircuitBreakerConfig {
540 failure_threshold: 2,
541 recovery_timeout: Duration::from_secs(1),
542 };
543 let circuit_breaker = Arc::new(CircuitBreaker::with_config(cb_config));
544
545 let storage = Arc::new(ShardedStorage::new());
547 let clock = Arc::new(SystemClock::new());
548 let policy = Policy::count_based(2).unwrap();
549 let registry = SuppressionRegistry::new(storage, clock, policy);
550 let metrics = Metrics::new();
551 let limiter = RateLimiter::new(registry, metrics, circuit_breaker.clone());
552
553 let layer = TracingRateLimitLayer {
554 limiter,
555 _emitter_config: crate::application::emitter::EmitterConfig::new(Duration::from_secs(
556 30,
557 ))
558 .unwrap(),
559 };
560
561 let sig = EventSignature::simple("INFO", "test");
562
563 assert!(layer.should_allow(sig));
565 assert!(layer.should_allow(sig));
566 assert!(!layer.should_allow(sig));
567
568 assert_eq!(circuit_breaker.state(), CircuitState::Closed);
570
571 circuit_breaker.record_failure();
573 circuit_breaker.record_failure();
574
575 assert_eq!(circuit_breaker.state(), CircuitState::Open);
577
578 assert!(layer.should_allow(sig));
581 assert!(layer.should_allow(sig));
582 assert!(layer.should_allow(sig));
583
584 let snapshot = layer.metrics().snapshot();
586 assert!(snapshot.events_allowed >= 5); }
588}