tracing_throttle/infrastructure/
layer.rs

1//! Tracing integration layer.
2//!
3//! Provides a `tracing::Layer` implementation that applies rate limiting
4//! to log events.
5
6use crate::application::{
7    circuit_breaker::CircuitBreaker,
8    emitter::EmitterConfig,
9    limiter::{LimitDecision, RateLimiter},
10    metrics::Metrics,
11    ports::{Clock, Storage},
12    registry::{EventState, SuppressionRegistry},
13};
14use crate::domain::{policy::Policy, signature::EventSignature};
15use crate::infrastructure::clock::SystemClock;
16use crate::infrastructure::storage::ShardedStorage;
17
18use std::collections::BTreeMap;
19use std::sync::Arc;
20use std::time::Duration;
21use tracing::{Metadata, Subscriber};
22use tracing_subscriber::layer::Filter;
23use tracing_subscriber::{layer::Context, Layer};
24
25/// Error returned when building a TracingRateLimitLayer fails.
26#[derive(Debug, Clone, PartialEq, Eq)]
27pub enum BuildError {
28    /// Maximum signatures must be greater than zero
29    ZeroMaxSignatures,
30    /// Emitter configuration validation failed
31    EmitterConfig(crate::application::emitter::EmitterConfigError),
32}
33
34impl std::fmt::Display for BuildError {
35    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
36        match self {
37            BuildError::ZeroMaxSignatures => {
38                write!(f, "max_signatures must be greater than 0")
39            }
40            BuildError::EmitterConfig(e) => {
41                write!(f, "emitter configuration error: {}", e)
42            }
43        }
44    }
45}
46
47impl std::error::Error for BuildError {}
48
49impl From<crate::application::emitter::EmitterConfigError> for BuildError {
50    fn from(e: crate::application::emitter::EmitterConfigError) -> Self {
51        BuildError::EmitterConfig(e)
52    }
53}
54
55/// Builder for constructing a `TracingRateLimitLayer`.
56#[derive(Debug)]
57pub struct TracingRateLimitLayerBuilder {
58    policy: Policy,
59    summary_interval: Duration,
60    clock: Option<Arc<dyn Clock>>,
61    max_signatures: Option<usize>,
62}
63
64impl TracingRateLimitLayerBuilder {
65    /// Set the rate limiting policy.
66    pub fn with_policy(mut self, policy: Policy) -> Self {
67        self.policy = policy;
68        self
69    }
70
71    /// Set the summary emission interval.
72    ///
73    /// The interval will be validated when `build()` is called.
74    pub fn with_summary_interval(mut self, interval: Duration) -> Self {
75        self.summary_interval = interval;
76        self
77    }
78
79    /// Set a custom clock (mainly for testing).
80    pub fn with_clock(mut self, clock: Arc<dyn Clock>) -> Self {
81        self.clock = Some(clock);
82        self
83    }
84
85    /// Set the maximum number of unique event signatures to track.
86    ///
87    /// When this limit is reached, the least recently used signatures will be evicted.
88    /// This prevents unbounded memory growth in applications with high signature cardinality.
89    ///
90    /// Default: 10,000 signatures
91    ///
92    /// The value will be validated when `build()` is called.
93    pub fn with_max_signatures(mut self, max_signatures: usize) -> Self {
94        self.max_signatures = Some(max_signatures);
95        self
96    }
97
98    /// Disable the signature limit, allowing unbounded growth.
99    ///
100    /// **Warning**: This can lead to unbounded memory usage in applications that generate
101    /// many unique event signatures. Only use this if you're certain your application has
102    /// bounded signature cardinality or you have external memory monitoring.
103    pub fn with_unlimited_signatures(mut self) -> Self {
104        self.max_signatures = None;
105        self
106    }
107
108    /// Build the layer.
109    ///
110    /// # Errors
111    /// Returns `BuildError` if the configuration is invalid.
112    pub fn build(self) -> Result<TracingRateLimitLayer, BuildError> {
113        // Validate max_signatures if set
114        if let Some(max) = self.max_signatures {
115            if max == 0 {
116                return Err(BuildError::ZeroMaxSignatures);
117            }
118        }
119
120        // Create shared metrics and circuit breaker
121        let metrics = Metrics::new();
122        let circuit_breaker = Arc::new(CircuitBreaker::new());
123
124        let clock = self.clock.unwrap_or_else(|| Arc::new(SystemClock::new()));
125        let storage = if let Some(max) = self.max_signatures {
126            Arc::new(ShardedStorage::with_max_entries(max).with_metrics(metrics.clone()))
127        } else {
128            Arc::new(ShardedStorage::new().with_metrics(metrics.clone()))
129        };
130        let registry = SuppressionRegistry::new(storage, clock, self.policy);
131        let limiter = RateLimiter::new(registry, metrics.clone(), circuit_breaker);
132
133        // Let EmitterConfig validate the interval
134        let emitter_config = EmitterConfig::new(self.summary_interval)?;
135
136        Ok(TracingRateLimitLayer {
137            limiter,
138            _emitter_config: emitter_config,
139        })
140    }
141}
142
143/// A `tracing::Layer` that applies rate limiting to events.
144///
145/// This layer intercepts events, computes their signature, and decides
146/// whether to allow or suppress them based on the configured policy.
147#[derive(Clone)]
148pub struct TracingRateLimitLayer<S = Arc<ShardedStorage<EventSignature, EventState>>>
149where
150    S: Storage<EventSignature, EventState> + Clone,
151{
152    limiter: RateLimiter<S>,
153    _emitter_config: EmitterConfig,
154}
155
156impl<S> TracingRateLimitLayer<S>
157where
158    S: Storage<EventSignature, EventState> + Clone,
159{
160    /// Compute event signature from tracing metadata and fields.
161    fn compute_signature(
162        &self,
163        metadata: &Metadata,
164        _fields: &BTreeMap<String, String>,
165    ) -> EventSignature {
166        // For MVP, we'll use a simple signature based on level and target
167        // In a full implementation, we'd extract and include field values
168        let level = metadata.level().as_str();
169        let message = metadata.name();
170        let target = Some(metadata.target());
171
172        // TODO: Extract actual field values from the event
173        // For now, use empty fields map
174        let fields = BTreeMap::new();
175
176        EventSignature::new(level, message, &fields, target)
177    }
178
179    /// Check if an event should be allowed through.
180    pub fn should_allow(&self, signature: EventSignature) -> bool {
181        matches!(self.limiter.check_event(signature), LimitDecision::Allow)
182    }
183
184    /// Get a reference to the underlying limiter.
185    pub fn limiter(&self) -> &RateLimiter<S> {
186        &self.limiter
187    }
188
189    /// Get a reference to the metrics.
190    ///
191    /// Returns metrics about rate limiting behavior including:
192    /// - Events allowed
193    /// - Events suppressed
194    /// - Signatures evicted
195    pub fn metrics(&self) -> &Metrics {
196        self.limiter.metrics()
197    }
198
199    /// Get the current number of tracked signatures.
200    pub fn signature_count(&self) -> usize {
201        self.limiter.registry().len()
202    }
203
204    /// Get a reference to the circuit breaker.
205    ///
206    /// Use this to check the circuit breaker state and health:
207    /// - `circuit_breaker().state()` - Current circuit state
208    /// - `circuit_breaker().consecutive_failures()` - Failure count
209    pub fn circuit_breaker(&self) -> &Arc<CircuitBreaker> {
210        self.limiter.circuit_breaker()
211    }
212}
213
214impl TracingRateLimitLayer<Arc<ShardedStorage<EventSignature, EventState>>> {
215    /// Create a builder for configuring the layer.
216    ///
217    /// Defaults:
218    /// - Policy: count-based (100 events)
219    /// - Max signatures: 10,000 (with LRU eviction)
220    /// - Summary interval: 30 seconds
221    pub fn builder() -> TracingRateLimitLayerBuilder {
222        TracingRateLimitLayerBuilder {
223            // Safe unwrap: 100 > 0 is always valid
224            policy: Policy::count_based(100).expect("default policy with 100 > 0 is always valid"),
225            summary_interval: Duration::from_secs(30),
226            clock: None,
227            max_signatures: Some(10_000),
228        }
229    }
230
231    /// Create a layer with default settings.
232    ///
233    /// Equivalent to `TracingRateLimitLayer::builder().build().unwrap()`.
234    ///
235    /// Defaults:
236    /// - Policy: count-based (100 events)
237    /// - Max signatures: 10,000 (with LRU eviction)
238    /// - Summary interval: 30 seconds
239    ///
240    /// # Panics
241    /// This method cannot panic because all default values are valid.
242    pub fn new() -> Self {
243        Self::builder()
244            .build()
245            .expect("default configuration is always valid")
246    }
247}
248
249impl Default for TracingRateLimitLayer<Arc<ShardedStorage<EventSignature, EventState>>> {
250    fn default() -> Self {
251        Self::new()
252    }
253}
254
255// Implement the Filter trait for rate limiting
256impl<S, Sub> Filter<Sub> for TracingRateLimitLayer<S>
257where
258    S: Storage<EventSignature, EventState> + Clone,
259    Sub: Subscriber,
260{
261    fn enabled(&self, meta: &Metadata<'_>, _cx: &Context<'_, Sub>) -> bool {
262        // Compute signature and check with rate limiter
263        let fields = BTreeMap::new();
264        let signature = self.compute_signature(meta, &fields);
265        self.should_allow(signature)
266    }
267}
268
269impl<S, Sub> Layer<Sub> for TracingRateLimitLayer<S>
270where
271    S: Storage<EventSignature, EventState> + Clone + 'static,
272    Sub: Subscriber,
273{
274    // Layer methods can be empty since filtering is handled by the Filter impl
275}
276
277#[cfg(test)]
278mod tests {
279    use super::*;
280    use tracing::info;
281    use tracing_subscriber::layer::SubscriberExt;
282
283    #[test]
284    fn test_layer_builder() {
285        let layer = TracingRateLimitLayer::builder()
286            .with_policy(Policy::count_based(50).unwrap())
287            .with_summary_interval(Duration::from_secs(60))
288            .build()
289            .unwrap();
290
291        assert!(layer.limiter().registry().is_empty());
292    }
293
294    #[test]
295    fn test_layer_default() {
296        let layer = TracingRateLimitLayer::default();
297        assert!(layer.limiter().registry().is_empty());
298    }
299
300    #[test]
301    fn test_signature_computation() {
302        let _layer = TracingRateLimitLayer::new();
303
304        // Use a simple signature test without metadata construction
305        let sig1 = EventSignature::simple("INFO", "test_event");
306        let sig2 = EventSignature::simple("INFO", "test_event");
307
308        // Same inputs should produce same signature
309        assert_eq!(sig1, sig2);
310    }
311
312    #[test]
313    fn test_basic_rate_limiting() {
314        let layer = TracingRateLimitLayer::builder()
315            .with_policy(Policy::count_based(2).unwrap())
316            .build()
317            .unwrap();
318
319        let sig = EventSignature::simple("INFO", "test_message");
320
321        // First two should be allowed
322        assert!(layer.should_allow(sig));
323        assert!(layer.should_allow(sig));
324
325        // Third should be suppressed
326        assert!(!layer.should_allow(sig));
327    }
328
329    #[test]
330    fn test_layer_integration() {
331        let layer = TracingRateLimitLayer::builder()
332            .with_policy(Policy::count_based(3).unwrap())
333            .build()
334            .unwrap();
335
336        // Clone for use in subscriber, keep original for checking state
337        let layer_for_check = layer.clone();
338
339        let subscriber = tracing_subscriber::registry()
340            .with(tracing_subscriber::fmt::layer().with_filter(layer));
341
342        // Test that the layer correctly tracks event signatures
343        tracing::subscriber::with_default(subscriber, || {
344            // Emit 10 identical events
345            for _ in 0..10 {
346                info!("test event");
347            }
348        });
349
350        // After emitting 10 events with the same signature, the layer should have
351        // tracked them and only the first 3 should have been marked as allowed
352        // The registry should contain one entry for this signature
353        assert_eq!(layer_for_check.limiter().registry().len(), 1);
354    }
355
356    #[test]
357    fn test_layer_suppression_logic() {
358        let layer = TracingRateLimitLayer::builder()
359            .with_policy(Policy::count_based(3).unwrap())
360            .build()
361            .unwrap();
362
363        let sig = EventSignature::simple("INFO", "test");
364
365        // Verify the suppression logic works correctly
366        let mut allowed_count = 0;
367        for _ in 0..10 {
368            if layer.should_allow(sig) {
369                allowed_count += 1;
370            }
371        }
372
373        assert_eq!(allowed_count, 3);
374    }
375
376    #[test]
377    fn test_builder_zero_summary_interval() {
378        let result = TracingRateLimitLayer::builder()
379            .with_summary_interval(Duration::from_secs(0))
380            .build();
381
382        assert!(matches!(
383            result,
384            Err(BuildError::EmitterConfig(
385                crate::application::emitter::EmitterConfigError::ZeroSummaryInterval
386            ))
387        ));
388    }
389
390    #[test]
391    fn test_builder_zero_max_signatures() {
392        let result = TracingRateLimitLayer::builder()
393            .with_max_signatures(0)
394            .build();
395
396        assert!(matches!(result, Err(BuildError::ZeroMaxSignatures)));
397    }
398
399    #[test]
400    fn test_builder_valid_max_signatures() {
401        let layer = TracingRateLimitLayer::builder()
402            .with_max_signatures(100)
403            .build()
404            .unwrap();
405
406        assert!(layer.limiter().registry().is_empty());
407    }
408
409    #[test]
410    fn test_metrics_tracking() {
411        let layer = TracingRateLimitLayer::builder()
412            .with_policy(Policy::count_based(2).unwrap())
413            .build()
414            .unwrap();
415
416        let sig = EventSignature::simple("INFO", "test");
417
418        // Check initial metrics
419        assert_eq!(layer.metrics().events_allowed(), 0);
420        assert_eq!(layer.metrics().events_suppressed(), 0);
421
422        // Allow first two events
423        assert!(layer.should_allow(sig));
424        assert!(layer.should_allow(sig));
425
426        // Check metrics after allowed events
427        assert_eq!(layer.metrics().events_allowed(), 2);
428        assert_eq!(layer.metrics().events_suppressed(), 0);
429
430        // Suppress third event
431        assert!(!layer.should_allow(sig));
432
433        // Check metrics after suppressed event
434        assert_eq!(layer.metrics().events_allowed(), 2);
435        assert_eq!(layer.metrics().events_suppressed(), 1);
436    }
437
438    #[test]
439    fn test_metrics_snapshot() {
440        let layer = TracingRateLimitLayer::builder()
441            .with_policy(Policy::count_based(3).unwrap())
442            .build()
443            .unwrap();
444
445        let sig = EventSignature::simple("INFO", "test");
446
447        // Generate some events
448        for _ in 0..5 {
449            layer.should_allow(sig);
450        }
451
452        // Get snapshot
453        let snapshot = layer.metrics().snapshot();
454        assert_eq!(snapshot.events_allowed, 3);
455        assert_eq!(snapshot.events_suppressed, 2);
456        assert_eq!(snapshot.total_events(), 5);
457        assert!((snapshot.suppression_rate() - 0.4).abs() < f64::EPSILON);
458    }
459
460    #[test]
461    fn test_signature_count() {
462        let layer = TracingRateLimitLayer::builder()
463            .with_policy(Policy::count_based(2).unwrap())
464            .build()
465            .unwrap();
466
467        assert_eq!(layer.signature_count(), 0);
468
469        let sig1 = EventSignature::simple("INFO", "test1");
470        let sig2 = EventSignature::simple("INFO", "test2");
471
472        layer.should_allow(sig1);
473        assert_eq!(layer.signature_count(), 1);
474
475        layer.should_allow(sig2);
476        assert_eq!(layer.signature_count(), 2);
477
478        // Same signature shouldn't increase count
479        layer.should_allow(sig1);
480        assert_eq!(layer.signature_count(), 2);
481    }
482
483    #[test]
484    fn test_metrics_with_eviction() {
485        let layer = TracingRateLimitLayer::builder()
486            .with_policy(Policy::count_based(1).unwrap())
487            .with_max_signatures(3)
488            .build()
489            .unwrap();
490
491        // Fill up to capacity
492        for i in 0..3 {
493            let sig = EventSignature::simple("INFO", &format!("test{}", i));
494            layer.should_allow(sig);
495        }
496
497        assert_eq!(layer.signature_count(), 3);
498        assert_eq!(layer.metrics().signatures_evicted(), 0);
499
500        // Add one more, which should trigger eviction
501        let sig = EventSignature::simple("INFO", "test3");
502        layer.should_allow(sig);
503
504        assert_eq!(layer.signature_count(), 3);
505        assert_eq!(layer.metrics().signatures_evicted(), 1);
506    }
507
508    #[test]
509    fn test_circuit_breaker_observability() {
510        use crate::application::circuit_breaker::CircuitState;
511
512        let layer = TracingRateLimitLayer::builder()
513            .with_policy(Policy::count_based(2).unwrap())
514            .build()
515            .unwrap();
516
517        // Check initial circuit breaker state
518        let cb = layer.circuit_breaker();
519        assert_eq!(cb.state(), CircuitState::Closed);
520        assert_eq!(cb.consecutive_failures(), 0);
521
522        // Circuit breaker should remain closed during normal operation
523        let sig = EventSignature::simple("INFO", "test");
524        layer.should_allow(sig);
525        layer.should_allow(sig);
526        layer.should_allow(sig);
527
528        assert_eq!(cb.state(), CircuitState::Closed);
529    }
530
531    #[test]
532    fn test_circuit_breaker_fail_open_integration() {
533        use crate::application::circuit_breaker::{
534            CircuitBreaker, CircuitBreakerConfig, CircuitState,
535        };
536        use std::time::Duration;
537
538        // Create a circuit breaker with low threshold for testing
539        let cb_config = CircuitBreakerConfig {
540            failure_threshold: 2,
541            recovery_timeout: Duration::from_secs(1),
542        };
543        let circuit_breaker = Arc::new(CircuitBreaker::with_config(cb_config));
544
545        // Build layer with custom circuit breaker
546        let storage = Arc::new(ShardedStorage::new());
547        let clock = Arc::new(SystemClock::new());
548        let policy = Policy::count_based(2).unwrap();
549        let registry = SuppressionRegistry::new(storage, clock, policy);
550        let metrics = Metrics::new();
551        let limiter = RateLimiter::new(registry, metrics, circuit_breaker.clone());
552
553        let layer = TracingRateLimitLayer {
554            limiter,
555            _emitter_config: crate::application::emitter::EmitterConfig::new(Duration::from_secs(
556                30,
557            ))
558            .unwrap(),
559        };
560
561        let sig = EventSignature::simple("INFO", "test");
562
563        // Normal operation - first 2 events allowed, third suppressed
564        assert!(layer.should_allow(sig));
565        assert!(layer.should_allow(sig));
566        assert!(!layer.should_allow(sig));
567
568        // Circuit should still be closed
569        assert_eq!(circuit_breaker.state(), CircuitState::Closed);
570
571        // Manually trigger circuit breaker failures to test fail-open
572        circuit_breaker.record_failure();
573        circuit_breaker.record_failure();
574
575        // Circuit should now be open
576        assert_eq!(circuit_breaker.state(), CircuitState::Open);
577
578        // With circuit open, rate limiter should fail open (allow all events)
579        // even though we've already hit the rate limit
580        assert!(layer.should_allow(sig));
581        assert!(layer.should_allow(sig));
582        assert!(layer.should_allow(sig));
583
584        // Metrics should show these as allowed (fail-open behavior)
585        let snapshot = layer.metrics().snapshot();
586        assert!(snapshot.events_allowed >= 5); // 2 normal + 3 fail-open
587    }
588}