tracing_throttle/infrastructure/
layer.rs

1//! Tracing integration layer.
2//!
3//! Provides a `tracing::Layer` implementation that applies rate limiting
4//! to log events.
5
6use crate::application::{
7    circuit_breaker::CircuitBreaker,
8    emitter::EmitterConfig,
9    limiter::{LimitDecision, RateLimiter},
10    metrics::Metrics,
11    ports::{Clock, Storage},
12    registry::{EventState, SuppressionRegistry},
13};
14use crate::domain::{policy::Policy, signature::EventSignature};
15use crate::infrastructure::clock::SystemClock;
16use crate::infrastructure::storage::ShardedStorage;
17use crate::infrastructure::visitor::FieldVisitor;
18
19use std::collections::{BTreeMap, BTreeSet};
20use std::sync::Arc;
21use std::time::Duration;
22use tracing::{Metadata, Subscriber};
23use tracing_subscriber::layer::Filter;
24use tracing_subscriber::registry::LookupSpan;
25use tracing_subscriber::{layer::Context, Layer};
26
27#[cfg(feature = "async")]
28use crate::application::emitter::{EmitterHandle, SummaryEmitter};
29
30#[cfg(feature = "async")]
31use crate::domain::summary::SuppressionSummary;
32
33#[cfg(feature = "async")]
34use std::sync::Mutex;
35
36/// Function type for formatting suppression summaries.
37///
38/// Takes a reference to a `SuppressionSummary` and emits it as a tracing event.
39/// The function is responsible for choosing the log level and format.
40#[cfg(feature = "async")]
41pub type SummaryFormatter = Arc<dyn Fn(&SuppressionSummary) + Send + Sync + 'static>;
42
43/// Error returned when building a TracingRateLimitLayer fails.
44#[derive(Debug, Clone, PartialEq, Eq)]
45pub enum BuildError {
46    /// Maximum signatures must be greater than zero
47    ZeroMaxSignatures,
48    /// Emitter configuration validation failed
49    EmitterConfig(crate::application::emitter::EmitterConfigError),
50}
51
52impl std::fmt::Display for BuildError {
53    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
54        match self {
55            BuildError::ZeroMaxSignatures => {
56                write!(f, "max_signatures must be greater than 0")
57            }
58            BuildError::EmitterConfig(e) => {
59                write!(f, "emitter configuration error: {}", e)
60            }
61        }
62    }
63}
64
65impl std::error::Error for BuildError {}
66
67impl From<crate::application::emitter::EmitterConfigError> for BuildError {
68    fn from(e: crate::application::emitter::EmitterConfigError) -> Self {
69        BuildError::EmitterConfig(e)
70    }
71}
72
73/// Builder for constructing a `TracingRateLimitLayer`.
74pub struct TracingRateLimitLayerBuilder {
75    policy: Policy,
76    summary_interval: Duration,
77    clock: Option<Arc<dyn Clock>>,
78    max_signatures: Option<usize>,
79    enable_active_emission: bool,
80    #[cfg(feature = "async")]
81    summary_formatter: Option<SummaryFormatter>,
82    span_context_fields: Vec<String>,
83    event_fields: Vec<String>,
84    eviction_strategy:
85        Option<crate::infrastructure::eviction::EvictionStrategy<EventSignature, EventState>>,
86}
87
88impl TracingRateLimitLayerBuilder {
89    /// Set the rate limiting policy.
90    pub fn with_policy(mut self, policy: Policy) -> Self {
91        self.policy = policy;
92        self
93    }
94
95    /// Set the summary emission interval.
96    ///
97    /// The interval will be validated when `build()` is called.
98    pub fn with_summary_interval(mut self, interval: Duration) -> Self {
99        self.summary_interval = interval;
100        self
101    }
102
103    /// Set a custom clock (mainly for testing).
104    pub fn with_clock(mut self, clock: Arc<dyn Clock>) -> Self {
105        self.clock = Some(clock);
106        self
107    }
108
109    /// Set the maximum number of unique event signatures to track.
110    ///
111    /// When this limit is reached, the least recently used signatures will be evicted.
112    /// This prevents unbounded memory growth in applications with high signature cardinality.
113    ///
114    /// Default: 10,000 signatures
115    ///
116    /// The value will be validated when `build()` is called.
117    pub fn with_max_signatures(mut self, max_signatures: usize) -> Self {
118        self.max_signatures = Some(max_signatures);
119        self
120    }
121
122    /// Disable the signature limit, allowing unbounded growth.
123    ///
124    /// **Warning**: This can lead to unbounded memory usage in applications that generate
125    /// many unique event signatures. Only use this if you're certain your application has
126    /// bounded signature cardinality or you have external memory monitoring.
127    pub fn with_unlimited_signatures(mut self) -> Self {
128        self.max_signatures = None;
129        self
130    }
131
132    /// Enable active emission of suppression summaries.
133    ///
134    /// When enabled, the layer will automatically emit `WARN`-level tracing events
135    /// containing summaries of suppressed log events at the configured interval.
136    ///
137    /// **Requires the `async` feature** - this method has no effect without it.
138    ///
139    /// Default: disabled
140    ///
141    /// # Example
142    ///
143    /// ```no_run
144    /// # use tracing_throttle::TracingRateLimitLayer;
145    /// # use std::time::Duration;
146    /// let layer = TracingRateLimitLayer::builder()
147    ///     .with_active_emission(true)
148    ///     .with_summary_interval(Duration::from_secs(60))
149    ///     .build()
150    ///     .unwrap();
151    /// ```
152    pub fn with_active_emission(mut self, enabled: bool) -> Self {
153        self.enable_active_emission = enabled;
154        self
155    }
156
157    /// Set a custom formatter for suppression summaries.
158    ///
159    /// The formatter is responsible for emitting summaries as tracing events.
160    /// This allows full control over log level, message format, and structured fields.
161    ///
162    /// **Requires the `async` feature.**
163    ///
164    /// If not set, a default formatter is used that emits at WARN level with
165    /// `signature` and `count` fields.
166    ///
167    /// # Example
168    ///
169    /// ```no_run
170    /// # use tracing_throttle::TracingRateLimitLayer;
171    /// # use std::sync::Arc;
172    /// # use std::time::Duration;
173    /// let layer = TracingRateLimitLayer::builder()
174    ///     .with_active_emission(true)
175    ///     .with_summary_formatter(Arc::new(|summary| {
176    ///         tracing::info!(
177    ///             signature = %summary.signature,
178    ///             count = summary.count,
179    ///             duration_secs = summary.duration.as_secs(),
180    ///             "Suppression summary"
181    ///         );
182    ///     }))
183    ///     .build()
184    ///     .unwrap();
185    /// ```
186    #[cfg(feature = "async")]
187    pub fn with_summary_formatter(mut self, formatter: SummaryFormatter) -> Self {
188        self.summary_formatter = Some(formatter);
189        self
190    }
191
192    /// Include span context fields in event signatures.
193    ///
194    /// When specified, the layer will extract these fields from the current span
195    /// context and include them in the event signature. This enables rate limiting
196    /// per-user, per-tenant, per-request, or any other span-level context.
197    ///
198    /// Duplicate field names are automatically removed, and empty field names are filtered out.
199    ///
200    /// # Example
201    ///
202    /// ```no_run
203    /// # use tracing_throttle::TracingRateLimitLayer;
204    /// // Rate limit separately per user
205    /// let layer = TracingRateLimitLayer::builder()
206    ///     .with_span_context_fields(vec!["user_id".to_string()])
207    ///     .build()
208    ///     .unwrap();
209    ///
210    /// // Rate limit per user and tenant
211    /// let layer = TracingRateLimitLayer::builder()
212    ///     .with_span_context_fields(vec!["user_id".to_string(), "tenant_id".to_string()])
213    ///     .build()
214    ///     .unwrap();
215    /// ```
216    ///
217    /// # Usage with Spans
218    ///
219    /// ```no_run
220    /// # use tracing::{info, info_span};
221    /// // Create a span with user context
222    /// let span = info_span!("request", user_id = "alice");
223    /// let _enter = span.enter();
224    ///
225    /// // These events will be rate limited separately per user
226    /// info!("Processing request");  // Limited for user "alice"
227    /// ```
228    pub fn with_span_context_fields(mut self, fields: Vec<String>) -> Self {
229        // Deduplicate and filter out empty field names
230        let unique_fields: BTreeSet<_> = fields.into_iter().filter(|f| !f.is_empty()).collect();
231        self.span_context_fields = unique_fields.into_iter().collect();
232        self
233    }
234
235    /// Include event fields in event signatures.
236    ///
237    /// When specified, the layer will extract these fields from events themselves
238    /// and include them in the event signature. This enables rate limiting per
239    /// error code, status, endpoint, or any other event-level field.
240    ///
241    /// Duplicate field names are automatically removed, and empty field names are filtered out.
242    ///
243    /// # Example
244    ///
245    /// ```no_run
246    /// # use tracing_throttle::TracingRateLimitLayer;
247    /// // Rate limit separately per error_code
248    /// let layer = TracingRateLimitLayer::builder()
249    ///     .with_event_fields(vec!["error_code".to_string()])
250    ///     .build()
251    ///     .unwrap();
252    ///
253    /// // Rate limit per status and endpoint
254    /// let layer = TracingRateLimitLayer::builder()
255    ///     .with_event_fields(vec!["status".to_string(), "endpoint".to_string()])
256    ///     .build()
257    ///     .unwrap();
258    /// ```
259    ///
260    /// # Usage with Events
261    ///
262    /// ```no_run
263    /// # use tracing::error;
264    /// // Events with different error codes are rate limited independently
265    /// error!(error_code = "AUTH_FAILED", "Authentication failed");
266    /// error!(error_code = "TIMEOUT", "Request timeout");
267    /// ```
268    pub fn with_event_fields(mut self, fields: Vec<String>) -> Self {
269        // Deduplicate and filter out empty field names
270        let unique_fields: BTreeSet<_> = fields.into_iter().filter(|f| !f.is_empty()).collect();
271        self.event_fields = unique_fields.into_iter().collect();
272        self
273    }
274
275    /// Set a custom eviction strategy for signature management.
276    ///
277    /// Controls which signatures are evicted when storage limits are reached.
278    /// If not set, uses the default LRU (Least Recently Used) strategy.
279    ///
280    /// # Example: Priority-based eviction
281    ///
282    /// ```no_run
283    /// # use tracing_throttle::{TracingRateLimitLayer, EvictionStrategy};
284    /// # use std::sync::Arc;
285    /// let layer = TracingRateLimitLayer::builder()
286    ///     .with_eviction_strategy(EvictionStrategy::Priority(Arc::new(|_sig, state| {
287    ///         // Keep ERROR events longer than INFO events
288    ///         match state.metadata.as_ref().map(|m| m.level.as_str()) {
289    ///             Some("ERROR") => 100,
290    ///             Some("WARN") => 50,
291    ///             Some("INFO") => 10,
292    ///             _ => 5,
293    ///         }
294    ///     })))
295    ///     .build()
296    ///     .unwrap();
297    /// ```
298    ///
299    /// # Example: Memory-based eviction
300    ///
301    /// ```no_run
302    /// # use tracing_throttle::{TracingRateLimitLayer, EvictionStrategy};
303    /// // Evict when total memory exceeds 5MB
304    /// let layer = TracingRateLimitLayer::builder()
305    ///     .with_eviction_strategy(EvictionStrategy::Memory {
306    ///         max_bytes: 5 * 1024 * 1024,
307    ///     })
308    ///     .build()
309    ///     .unwrap();
310    /// ```
311    pub fn with_eviction_strategy(
312        mut self,
313        strategy: crate::infrastructure::eviction::EvictionStrategy<EventSignature, EventState>,
314    ) -> Self {
315        self.eviction_strategy = Some(strategy);
316        self
317    }
318
319    /// Build the layer.
320    ///
321    /// # Errors
322    /// Returns `BuildError` if the configuration is invalid.
323    pub fn build(self) -> Result<TracingRateLimitLayer, BuildError> {
324        // Validate max_signatures if set
325        if let Some(max) = self.max_signatures {
326            if max == 0 {
327                return Err(BuildError::ZeroMaxSignatures);
328            }
329        }
330
331        // Create shared metrics and circuit breaker
332        let metrics = Metrics::new();
333        let circuit_breaker = Arc::new(CircuitBreaker::new());
334
335        let clock = self.clock.unwrap_or_else(|| Arc::new(SystemClock::new()));
336        let mut storage = if let Some(max) = self.max_signatures {
337            ShardedStorage::with_max_entries(max).with_metrics(metrics.clone())
338        } else {
339            ShardedStorage::new().with_metrics(metrics.clone())
340        };
341
342        // Apply eviction strategy if configured
343        if let Some(strategy) = self.eviction_strategy {
344            storage = storage.with_eviction_strategy(strategy);
345        }
346
347        let storage = Arc::new(storage);
348        let registry = SuppressionRegistry::new(storage, clock, self.policy);
349        let limiter = RateLimiter::new(registry.clone(), metrics.clone(), circuit_breaker);
350
351        // Let EmitterConfig validate the interval
352        let emitter_config = EmitterConfig::new(self.summary_interval)?;
353
354        #[cfg(feature = "async")]
355        let emitter_handle = if self.enable_active_emission {
356            let emitter = SummaryEmitter::new(registry, emitter_config);
357
358            // Use custom formatter or default
359            let formatter = self.summary_formatter.unwrap_or_else(|| {
360                Arc::new(|summary: &SuppressionSummary| {
361                    tracing::warn!(
362                        signature = %summary.signature,
363                        count = summary.count,
364                        "{}",
365                        summary.format_message()
366                    );
367                })
368            });
369
370            let handle = emitter.start(
371                move |summaries| {
372                    for summary in summaries {
373                        formatter(&summary);
374                    }
375                },
376                false, // Don't emit final summaries on shutdown
377            );
378            Arc::new(Mutex::new(Some(handle)))
379        } else {
380            Arc::new(Mutex::new(None))
381        };
382
383        Ok(TracingRateLimitLayer {
384            limiter,
385            span_context_fields: Arc::new(self.span_context_fields),
386            event_fields: Arc::new(self.event_fields),
387            #[cfg(feature = "async")]
388            emitter_handle,
389            #[cfg(not(feature = "async"))]
390            _emitter_config: emitter_config,
391        })
392    }
393}
394
395/// A `tracing::Layer` that applies rate limiting to events.
396///
397/// This layer intercepts events, computes their signature, and decides
398/// whether to allow or suppress them based on the configured policy.
399///
400/// Optionally emits periodic summaries of suppressed events when active
401/// emission is enabled (requires `async` feature).
402#[derive(Clone)]
403pub struct TracingRateLimitLayer<S = Arc<ShardedStorage<EventSignature, EventState>>>
404where
405    S: Storage<EventSignature, EventState> + Clone,
406{
407    limiter: RateLimiter<S>,
408    span_context_fields: Arc<Vec<String>>,
409    event_fields: Arc<Vec<String>>,
410    #[cfg(feature = "async")]
411    emitter_handle: Arc<Mutex<Option<EmitterHandle>>>,
412    #[cfg(not(feature = "async"))]
413    _emitter_config: EmitterConfig,
414}
415
416impl<S> TracingRateLimitLayer<S>
417where
418    S: Storage<EventSignature, EventState> + Clone,
419{
420    /// Extract span context fields from the current span.
421    fn extract_span_context<Sub>(&self, cx: &Context<'_, Sub>) -> BTreeMap<String, String>
422    where
423        Sub: Subscriber + for<'lookup> LookupSpan<'lookup>,
424    {
425        if self.span_context_fields.is_empty() {
426            return BTreeMap::new();
427        }
428
429        let mut context_fields = BTreeMap::new();
430
431        if let Some(span) = cx.lookup_current() {
432            for span_ref in span.scope() {
433                let extensions = span_ref.extensions();
434
435                if let Some(stored_fields) = extensions.get::<BTreeMap<String, String>>() {
436                    for field_name in self.span_context_fields.as_ref() {
437                        if !context_fields.contains_key(field_name) {
438                            if let Some(value) = stored_fields.get(field_name) {
439                                context_fields.insert(field_name.clone(), value.clone());
440                            }
441                        }
442                    }
443                }
444
445                if context_fields.len() == self.span_context_fields.len() {
446                    break;
447                }
448            }
449        }
450
451        context_fields
452    }
453
454    /// Extract event fields from an event.
455    fn extract_event_fields(&self, event: &tracing::Event<'_>) -> BTreeMap<String, String> {
456        if self.event_fields.is_empty() {
457            return BTreeMap::new();
458        }
459
460        let mut visitor = FieldVisitor::new();
461        event.record(&mut visitor);
462        let all_fields = visitor.into_fields();
463
464        // Filter to only the configured event fields
465        self.event_fields
466            .iter()
467            .filter_map(|field_name| {
468                all_fields
469                    .get(field_name)
470                    .map(|value| (field_name.clone(), value.clone()))
471            })
472            .collect()
473    }
474
475    /// Compute event signature from tracing metadata, span context, and event fields.
476    ///
477    /// The signature includes:
478    /// - Log level (INFO, WARN, ERROR, etc.)
479    /// - Message template
480    /// - Target module path
481    /// - Span context fields (if configured)
482    /// - Event fields (if configured)
483    fn compute_signature(
484        &self,
485        metadata: &Metadata,
486        combined_fields: &BTreeMap<String, String>,
487    ) -> EventSignature {
488        let level = metadata.level().as_str();
489        let message = metadata.name();
490        let target = Some(metadata.target());
491
492        // Use combined fields (span context + event fields) in signature
493        EventSignature::new(level, message, combined_fields, target)
494    }
495
496    /// Check if an event should be allowed through.
497    pub fn should_allow(&self, signature: EventSignature) -> bool {
498        matches!(self.limiter.check_event(signature), LimitDecision::Allow)
499    }
500
501    /// Check if an event should be allowed through and capture metadata.
502    ///
503    /// This method stores event metadata on first occurrence so summaries
504    /// can show human-readable event details instead of just signature hashes.
505    ///
506    /// **Note:** Only available with the `human-readable` feature flag.
507    #[cfg(feature = "human-readable")]
508    pub fn should_allow_with_metadata(
509        &self,
510        signature: EventSignature,
511        metadata: crate::domain::metadata::EventMetadata,
512    ) -> bool {
513        matches!(
514            self.limiter.check_event_with_metadata(signature, metadata),
515            LimitDecision::Allow
516        )
517    }
518
519    /// Get a reference to the underlying limiter.
520    pub fn limiter(&self) -> &RateLimiter<S> {
521        &self.limiter
522    }
523
524    /// Get a reference to the metrics.
525    ///
526    /// Returns metrics about rate limiting behavior including:
527    /// - Events allowed
528    /// - Events suppressed
529    /// - Signatures evicted
530    pub fn metrics(&self) -> &Metrics {
531        self.limiter.metrics()
532    }
533
534    /// Get the current number of tracked signatures.
535    pub fn signature_count(&self) -> usize {
536        self.limiter.registry().len()
537    }
538
539    /// Get a reference to the circuit breaker.
540    ///
541    /// Use this to check the circuit breaker state and health:
542    /// - `circuit_breaker().state()` - Current circuit state
543    /// - `circuit_breaker().consecutive_failures()` - Failure count
544    pub fn circuit_breaker(&self) -> &Arc<CircuitBreaker> {
545        self.limiter.circuit_breaker()
546    }
547
548    /// Shutdown the active suppression summary emitter, if running.
549    ///
550    /// This method gracefully stops the background emission task.  If active emission
551    /// is not enabled, this method does nothing.
552    ///
553    /// **Requires the `async` feature.**
554    ///
555    /// # Errors
556    ///
557    /// Returns an error if the emitter task fails to shut down gracefully.
558    ///
559    /// # Example
560    ///
561    /// ```no_run
562    /// # use tracing_throttle::TracingRateLimitLayer;
563    /// # async fn example() {
564    /// let layer = TracingRateLimitLayer::builder()
565    ///     .with_active_emission(true)
566    ///     .build()
567    ///     .unwrap();
568    ///
569    /// // Use the layer...
570    ///
571    /// // Shutdown before dropping
572    /// layer.shutdown().await.expect("shutdown failed");
573    /// # }
574    /// ```
575    #[cfg(feature = "async")]
576    pub async fn shutdown(&self) -> Result<(), crate::application::emitter::ShutdownError> {
577        // Take the handle while holding the lock, then release the lock before awaiting
578        let handle = {
579            let mut handle_guard = self.emitter_handle.lock().unwrap();
580            handle_guard.take()
581        };
582
583        if let Some(handle) = handle {
584            handle.shutdown().await?;
585        }
586        Ok(())
587    }
588}
589
590impl TracingRateLimitLayer<Arc<ShardedStorage<EventSignature, EventState>>> {
591    /// Create a builder for configuring the layer.
592    ///
593    /// Defaults:
594    /// - Policy: token bucket (50 burst capacity, 1 token/sec refill rate)
595    /// - Max signatures: 10,000 (with LRU eviction)
596    /// - Summary interval: 30 seconds
597    /// - Active emission: disabled
598    /// - Summary formatter: default (WARN level with signature and count)
599    pub fn builder() -> TracingRateLimitLayerBuilder {
600        TracingRateLimitLayerBuilder {
601            policy: Policy::token_bucket(50.0, 1.0)
602                .expect("default policy with 50 capacity and 1/sec refill is always valid"),
603            summary_interval: Duration::from_secs(30),
604            clock: None,
605            max_signatures: Some(10_000),
606            enable_active_emission: false,
607            #[cfg(feature = "async")]
608            summary_formatter: None,
609            span_context_fields: Vec::new(),
610            event_fields: Vec::new(),
611            eviction_strategy: None,
612        }
613    }
614
615    /// Create a layer with default settings.
616    ///
617    /// Equivalent to `TracingRateLimitLayer::builder().build().unwrap()`.
618    ///
619    /// Defaults:
620    /// - Policy: token bucket (50 burst capacity, 1 token/sec refill rate = 60/min)
621    /// - Max signatures: 10,000 (with LRU eviction)
622    /// - Summary interval: 30 seconds
623    ///
624    /// # Panics
625    /// This method cannot panic because all default values are valid.
626    pub fn new() -> Self {
627        Self::builder()
628            .build()
629            .expect("default configuration is always valid")
630    }
631
632    /// Create a layer with custom storage backend.
633    ///
634    /// This allows using alternative storage implementations like Redis for distributed
635    /// rate limiting across multiple application instances.
636    ///
637    /// # Arguments
638    ///
639    /// * `storage` - Custom storage implementation (must implement `Storage<EventSignature, EventState>`)
640    /// * `policy` - Rate limiting policy to apply
641    /// * `clock` - Clock implementation (use `SystemClock::new()` for production)
642    ///
643    /// # Example with Redis
644    ///
645    /// ```rust,ignore
646    /// use tracing_throttle::{TracingRateLimitLayer, RedisStorage, Policy, SystemClock};
647    /// use std::sync::Arc;
648    ///
649    /// #[tokio::main]
650    /// async fn main() {
651    ///     let storage = Arc::new(
652    ///         RedisStorage::connect("redis://127.0.0.1/")
653    ///             .await
654    ///             .expect("Failed to connect")
655    ///     );
656    ///     let policy = Policy::token_bucket(100.0, 10.0).unwrap();
657    ///     let clock = Arc::new(SystemClock::new());
658    ///
659    ///     let layer = TracingRateLimitLayer::with_storage(storage, policy, clock);
660    /// }
661    /// ```
662    pub fn with_storage<ST>(
663        storage: ST,
664        policy: Policy,
665        clock: Arc<dyn Clock>,
666    ) -> TracingRateLimitLayer<ST>
667    where
668        ST: Storage<EventSignature, EventState> + Clone,
669    {
670        let metrics = Metrics::new();
671        let circuit_breaker = Arc::new(CircuitBreaker::new());
672        let registry = SuppressionRegistry::new(storage, clock, policy);
673        let limiter = RateLimiter::new(registry, metrics, circuit_breaker);
674
675        TracingRateLimitLayer {
676            limiter,
677            span_context_fields: Arc::new(Vec::new()),
678            event_fields: Arc::new(Vec::new()),
679            #[cfg(feature = "async")]
680            emitter_handle: Arc::new(Mutex::new(None)),
681            #[cfg(not(feature = "async"))]
682            _emitter_config: EmitterConfig::new(Duration::from_secs(30))
683                .expect("30 seconds is valid"),
684        }
685    }
686}
687
688impl Default for TracingRateLimitLayer<Arc<ShardedStorage<EventSignature, EventState>>> {
689    fn default() -> Self {
690        Self::new()
691    }
692}
693
694// Implement the Filter trait for rate limiting
695impl<S, Sub> Filter<Sub> for TracingRateLimitLayer<S>
696where
697    S: Storage<EventSignature, EventState> + Clone,
698    Sub: Subscriber + for<'lookup> LookupSpan<'lookup>,
699{
700    fn enabled(&self, _meta: &Metadata<'_>, _cx: &Context<'_, Sub>) -> bool {
701        // Always return true - actual filtering happens in event_enabled
702        // This prevents double-checking in dual-layer setups
703        true
704    }
705
706    fn event_enabled(&self, event: &tracing::Event<'_>, cx: &Context<'_, Sub>) -> bool {
707        // Combine span context and event fields
708        let mut combined_fields = self.extract_span_context(cx);
709        let event_fields = self.extract_event_fields(event);
710        combined_fields.extend(event_fields);
711
712        let metadata_obj = event.metadata();
713        let signature = self.compute_signature(metadata_obj, &combined_fields);
714
715        #[cfg(feature = "human-readable")]
716        {
717            // Extract message from event for metadata
718            let mut visitor = FieldVisitor::new();
719            event.record(&mut visitor);
720            let all_fields = visitor.into_fields();
721            let message = all_fields
722                .get("message")
723                .cloned()
724                .unwrap_or_else(|| event.metadata().name().to_string());
725
726            // Create EventMetadata for this event
727            let event_metadata = crate::domain::metadata::EventMetadata::new(
728                metadata_obj.level().as_str().to_string(),
729                message,
730                metadata_obj.target().to_string(),
731                combined_fields,
732            );
733
734            self.should_allow_with_metadata(signature, event_metadata)
735        }
736
737        #[cfg(not(feature = "human-readable"))]
738        {
739            self.should_allow(signature)
740        }
741    }
742}
743
744impl<S, Sub> Layer<Sub> for TracingRateLimitLayer<S>
745where
746    S: Storage<EventSignature, EventState> + Clone + 'static,
747    Sub: Subscriber + for<'lookup> LookupSpan<'lookup>,
748{
749    fn on_new_span(
750        &self,
751        attrs: &tracing::span::Attributes<'_>,
752        id: &tracing::span::Id,
753        ctx: Context<'_, Sub>,
754    ) {
755        if self.span_context_fields.is_empty() {
756            return;
757        }
758
759        let mut visitor = FieldVisitor::new();
760        attrs.record(&mut visitor);
761        let fields = visitor.into_fields();
762
763        if let Some(span) = ctx.span(id) {
764            let mut extensions = span.extensions_mut();
765            extensions.insert(fields);
766        }
767    }
768}
769
770#[cfg(test)]
771mod tests {
772    use super::*;
773    use tracing::info;
774    use tracing_subscriber::layer::SubscriberExt;
775
776    #[test]
777    fn test_layer_builder() {
778        let layer = TracingRateLimitLayer::builder()
779            .with_policy(Policy::count_based(50).unwrap())
780            .with_summary_interval(Duration::from_secs(60))
781            .build()
782            .unwrap();
783
784        assert!(layer.limiter().registry().is_empty());
785    }
786
787    #[test]
788    fn test_span_context_fields_deduplication() {
789        let layer = TracingRateLimitLayer::builder()
790            .with_span_context_fields(vec![
791                "user_id".to_string(),
792                "user_id".to_string(), // duplicate
793                "tenant_id".to_string(),
794                "".to_string(),        // empty, should be filtered
795                "user_id".to_string(), // another duplicate
796            ])
797            .build()
798            .unwrap();
799
800        // Should only have 2 unique fields: user_id and tenant_id
801        assert_eq!(layer.span_context_fields.len(), 2);
802        assert!(layer.span_context_fields.iter().any(|f| f == "user_id"));
803        assert!(layer.span_context_fields.iter().any(|f| f == "tenant_id"));
804    }
805
806    #[test]
807    fn test_event_fields_deduplication() {
808        let layer = TracingRateLimitLayer::builder()
809            .with_event_fields(vec![
810                "error_code".to_string(),
811                "error_code".to_string(), // duplicate
812                "status".to_string(),
813                "".to_string(),           // empty, should be filtered
814                "error_code".to_string(), // another duplicate
815            ])
816            .build()
817            .unwrap();
818
819        // Should only have 2 unique fields: error_code and status
820        assert_eq!(layer.event_fields.len(), 2);
821        assert!(layer.event_fields.iter().any(|f| f == "error_code"));
822        assert!(layer.event_fields.iter().any(|f| f == "status"));
823    }
824
825    #[test]
826    fn test_layer_default() {
827        let layer = TracingRateLimitLayer::default();
828        assert!(layer.limiter().registry().is_empty());
829    }
830
831    #[test]
832    fn test_signature_computation() {
833        let _layer = TracingRateLimitLayer::new();
834
835        // Use a simple signature test without metadata construction
836        let sig1 = EventSignature::simple("INFO", "test_event");
837        let sig2 = EventSignature::simple("INFO", "test_event");
838
839        // Same inputs should produce same signature
840        assert_eq!(sig1, sig2);
841    }
842
843    #[test]
844    fn test_basic_rate_limiting() {
845        let layer = TracingRateLimitLayer::builder()
846            .with_policy(Policy::count_based(2).unwrap())
847            .build()
848            .unwrap();
849
850        let sig = EventSignature::simple("INFO", "test_message");
851
852        // First two should be allowed
853        assert!(layer.should_allow(sig));
854        assert!(layer.should_allow(sig));
855
856        // Third should be suppressed
857        assert!(!layer.should_allow(sig));
858    }
859
860    #[test]
861    fn test_layer_integration() {
862        let layer = TracingRateLimitLayer::builder()
863            .with_policy(Policy::count_based(3).unwrap())
864            .build()
865            .unwrap();
866
867        // Clone for use in subscriber, keep original for checking state
868        let layer_for_check = layer.clone();
869
870        let subscriber = tracing_subscriber::registry()
871            .with(tracing_subscriber::fmt::layer().with_filter(layer));
872
873        // Test that the layer correctly tracks event signatures
874        tracing::subscriber::with_default(subscriber, || {
875            // Emit 10 identical events
876            for _ in 0..10 {
877                info!("test event");
878            }
879        });
880
881        // After emitting 10 events with the same signature, the layer should have
882        // tracked them and only the first 3 should have been marked as allowed
883        // The registry should contain one entry for this signature
884        assert_eq!(layer_for_check.limiter().registry().len(), 1);
885    }
886
887    #[test]
888    fn test_layer_suppression_logic() {
889        let layer = TracingRateLimitLayer::builder()
890            .with_policy(Policy::count_based(3).unwrap())
891            .build()
892            .unwrap();
893
894        let sig = EventSignature::simple("INFO", "test");
895
896        // Verify the suppression logic works correctly
897        let mut allowed_count = 0;
898        for _ in 0..10 {
899            if layer.should_allow(sig) {
900                allowed_count += 1;
901            }
902        }
903
904        assert_eq!(allowed_count, 3);
905    }
906
907    #[test]
908    fn test_builder_zero_summary_interval() {
909        let result = TracingRateLimitLayer::builder()
910            .with_summary_interval(Duration::from_secs(0))
911            .build();
912
913        assert!(matches!(
914            result,
915            Err(BuildError::EmitterConfig(
916                crate::application::emitter::EmitterConfigError::ZeroSummaryInterval
917            ))
918        ));
919    }
920
921    #[test]
922    fn test_builder_zero_max_signatures() {
923        let result = TracingRateLimitLayer::builder()
924            .with_max_signatures(0)
925            .build();
926
927        assert!(matches!(result, Err(BuildError::ZeroMaxSignatures)));
928    }
929
930    #[test]
931    fn test_builder_valid_max_signatures() {
932        let layer = TracingRateLimitLayer::builder()
933            .with_max_signatures(100)
934            .build()
935            .unwrap();
936
937        assert!(layer.limiter().registry().is_empty());
938    }
939
940    #[test]
941    fn test_metrics_tracking() {
942        let layer = TracingRateLimitLayer::builder()
943            .with_policy(Policy::count_based(2).unwrap())
944            .build()
945            .unwrap();
946
947        let sig = EventSignature::simple("INFO", "test");
948
949        // Check initial metrics
950        assert_eq!(layer.metrics().events_allowed(), 0);
951        assert_eq!(layer.metrics().events_suppressed(), 0);
952
953        // Allow first two events
954        assert!(layer.should_allow(sig));
955        assert!(layer.should_allow(sig));
956
957        // Check metrics after allowed events
958        assert_eq!(layer.metrics().events_allowed(), 2);
959        assert_eq!(layer.metrics().events_suppressed(), 0);
960
961        // Suppress third event
962        assert!(!layer.should_allow(sig));
963
964        // Check metrics after suppressed event
965        assert_eq!(layer.metrics().events_allowed(), 2);
966        assert_eq!(layer.metrics().events_suppressed(), 1);
967    }
968
969    #[test]
970    fn test_metrics_snapshot() {
971        let layer = TracingRateLimitLayer::builder()
972            .with_policy(Policy::count_based(3).unwrap())
973            .build()
974            .unwrap();
975
976        let sig = EventSignature::simple("INFO", "test");
977
978        // Generate some events
979        for _ in 0..5 {
980            layer.should_allow(sig);
981        }
982
983        // Get snapshot
984        let snapshot = layer.metrics().snapshot();
985        assert_eq!(snapshot.events_allowed, 3);
986        assert_eq!(snapshot.events_suppressed, 2);
987        assert_eq!(snapshot.total_events(), 5);
988        assert!((snapshot.suppression_rate() - 0.4).abs() < f64::EPSILON);
989    }
990
991    #[test]
992    fn test_signature_count() {
993        let layer = TracingRateLimitLayer::builder()
994            .with_policy(Policy::count_based(2).unwrap())
995            .build()
996            .unwrap();
997
998        assert_eq!(layer.signature_count(), 0);
999
1000        let sig1 = EventSignature::simple("INFO", "test1");
1001        let sig2 = EventSignature::simple("INFO", "test2");
1002
1003        layer.should_allow(sig1);
1004        assert_eq!(layer.signature_count(), 1);
1005
1006        layer.should_allow(sig2);
1007        assert_eq!(layer.signature_count(), 2);
1008
1009        // Same signature shouldn't increase count
1010        layer.should_allow(sig1);
1011        assert_eq!(layer.signature_count(), 2);
1012    }
1013
1014    #[test]
1015    fn test_metrics_with_eviction() {
1016        let layer = TracingRateLimitLayer::builder()
1017            .with_policy(Policy::count_based(1).unwrap())
1018            .with_max_signatures(3)
1019            .build()
1020            .unwrap();
1021
1022        // Fill up to capacity
1023        for i in 0..3 {
1024            let sig = EventSignature::simple("INFO", &format!("test{}", i));
1025            layer.should_allow(sig);
1026        }
1027
1028        assert_eq!(layer.signature_count(), 3);
1029        assert_eq!(layer.metrics().signatures_evicted(), 0);
1030
1031        // Add one more, which should trigger eviction
1032        let sig = EventSignature::simple("INFO", "test3");
1033        layer.should_allow(sig);
1034
1035        assert_eq!(layer.signature_count(), 3);
1036        assert_eq!(layer.metrics().signatures_evicted(), 1);
1037    }
1038
1039    #[test]
1040    fn test_circuit_breaker_observability() {
1041        use crate::application::circuit_breaker::CircuitState;
1042
1043        let layer = TracingRateLimitLayer::builder()
1044            .with_policy(Policy::count_based(2).unwrap())
1045            .build()
1046            .unwrap();
1047
1048        // Check initial circuit breaker state
1049        let cb = layer.circuit_breaker();
1050        assert_eq!(cb.state(), CircuitState::Closed);
1051        assert_eq!(cb.consecutive_failures(), 0);
1052
1053        // Circuit breaker should remain closed during normal operation
1054        let sig = EventSignature::simple("INFO", "test");
1055        layer.should_allow(sig);
1056        layer.should_allow(sig);
1057        layer.should_allow(sig);
1058
1059        assert_eq!(cb.state(), CircuitState::Closed);
1060    }
1061
1062    #[test]
1063    fn test_circuit_breaker_fail_open_integration() {
1064        use crate::application::circuit_breaker::{
1065            CircuitBreaker, CircuitBreakerConfig, CircuitState,
1066        };
1067        use std::time::Duration;
1068
1069        // Create a circuit breaker with low threshold for testing
1070        let cb_config = CircuitBreakerConfig {
1071            failure_threshold: 2,
1072            recovery_timeout: Duration::from_secs(1),
1073        };
1074        let circuit_breaker = Arc::new(CircuitBreaker::with_config(cb_config));
1075
1076        // Build layer with custom circuit breaker
1077        let storage = Arc::new(ShardedStorage::new());
1078        let clock = Arc::new(SystemClock::new());
1079        let policy = Policy::count_based(2).unwrap();
1080        let registry = SuppressionRegistry::new(storage, clock, policy);
1081        let metrics = Metrics::new();
1082        let limiter = RateLimiter::new(registry, metrics, circuit_breaker.clone());
1083
1084        let layer = TracingRateLimitLayer {
1085            limiter,
1086            span_context_fields: Arc::new(Vec::new()),
1087            event_fields: Arc::new(Vec::new()),
1088            #[cfg(feature = "async")]
1089            emitter_handle: Arc::new(Mutex::new(None)),
1090            #[cfg(not(feature = "async"))]
1091            _emitter_config: crate::application::emitter::EmitterConfig::new(Duration::from_secs(
1092                30,
1093            ))
1094            .unwrap(),
1095        };
1096
1097        let sig = EventSignature::simple("INFO", "test");
1098
1099        // Normal operation - first 2 events allowed, third suppressed
1100        assert!(layer.should_allow(sig));
1101        assert!(layer.should_allow(sig));
1102        assert!(!layer.should_allow(sig));
1103
1104        // Circuit should still be closed
1105        assert_eq!(circuit_breaker.state(), CircuitState::Closed);
1106
1107        // Manually trigger circuit breaker failures to test fail-open
1108        circuit_breaker.record_failure();
1109        circuit_breaker.record_failure();
1110
1111        // Circuit should now be open
1112        assert_eq!(circuit_breaker.state(), CircuitState::Open);
1113
1114        // With circuit open, rate limiter should fail open (allow all events)
1115        // even though we've already hit the rate limit
1116        assert!(layer.should_allow(sig));
1117        assert!(layer.should_allow(sig));
1118        assert!(layer.should_allow(sig));
1119
1120        // Metrics should show these as allowed (fail-open behavior)
1121        let snapshot = layer.metrics().snapshot();
1122        assert!(snapshot.events_allowed >= 5); // 2 normal + 3 fail-open
1123    }
1124
1125    #[cfg(feature = "async")]
1126    #[tokio::test]
1127    async fn test_active_emission_integration() {
1128        use std::sync::atomic::{AtomicUsize, Ordering};
1129        use std::time::Duration;
1130
1131        // Use an atomic counter to track emissions
1132        let emission_count = Arc::new(AtomicUsize::new(0));
1133        let count_clone = Arc::clone(&emission_count);
1134
1135        // Create a layer with a custom emitter that increments our counter
1136        let storage = Arc::new(ShardedStorage::new());
1137        let clock = Arc::new(SystemClock::new());
1138        let policy = Policy::count_based(2).unwrap();
1139        let registry = SuppressionRegistry::new(storage, clock, policy);
1140
1141        let emitter_config = EmitterConfig::new(Duration::from_millis(100)).unwrap();
1142        let emitter = SummaryEmitter::new(registry.clone(), emitter_config);
1143
1144        // Start emitter with custom callback
1145        let handle = emitter.start(
1146            move |summaries| {
1147                count_clone.fetch_add(summaries.len(), Ordering::SeqCst);
1148            },
1149            false,
1150        );
1151
1152        // Emit events that will be suppressed
1153        let sig = EventSignature::simple("INFO", "test_message");
1154        for _ in 0..10 {
1155            registry.with_event_state(sig, |state, now| {
1156                state.counter.record_suppression(now);
1157            });
1158        }
1159
1160        // Wait for at least two emission intervals
1161        tokio::time::sleep(Duration::from_millis(250)).await;
1162
1163        // Check that summaries were emitted
1164        let count = emission_count.load(Ordering::SeqCst);
1165        assert!(
1166            count > 0,
1167            "Expected at least one suppression summary to be emitted, got {}",
1168            count
1169        );
1170
1171        // Graceful shutdown
1172        handle.shutdown().await.expect("shutdown failed");
1173    }
1174
1175    #[cfg(feature = "async")]
1176    #[tokio::test]
1177    async fn test_active_emission_disabled() {
1178        use crate::infrastructure::mocks::layer::MockCaptureLayer;
1179        use std::time::Duration;
1180
1181        // Create layer with active emission disabled (default)
1182        let layer = TracingRateLimitLayer::builder()
1183            .with_policy(Policy::count_based(2).unwrap())
1184            .with_summary_interval(Duration::from_millis(100))
1185            .build()
1186            .unwrap();
1187
1188        let mock = MockCaptureLayer::new();
1189        let mock_clone = mock.clone();
1190
1191        let subscriber = tracing_subscriber::registry()
1192            .with(mock)
1193            .with(tracing_subscriber::fmt::layer().with_filter(layer.clone()));
1194
1195        tracing::subscriber::with_default(subscriber, || {
1196            let sig = EventSignature::simple("INFO", "test_message");
1197            for _ in 0..10 {
1198                layer.should_allow(sig);
1199            }
1200        });
1201
1202        // Wait to ensure no emissions occur
1203        tokio::time::sleep(Duration::from_millis(250)).await;
1204
1205        // Should not have emitted any summaries
1206        let events = mock_clone.get_captured();
1207        let summary_count = events
1208            .iter()
1209            .filter(|e| e.message.contains("suppressed"))
1210            .count();
1211
1212        assert_eq!(
1213            summary_count, 0,
1214            "Should not emit summaries when active emission is disabled"
1215        );
1216
1217        // Shutdown should succeed even when emitter was never started
1218        layer.shutdown().await.expect("shutdown failed");
1219    }
1220
1221    #[cfg(feature = "async")]
1222    #[tokio::test]
1223    async fn test_shutdown_without_emission() {
1224        // Test that shutdown works when emission was never enabled
1225        let layer = TracingRateLimitLayer::new();
1226
1227        // Should not error
1228        layer
1229            .shutdown()
1230            .await
1231            .expect("shutdown should succeed when emitter not running");
1232    }
1233
1234    #[cfg(feature = "async")]
1235    #[tokio::test]
1236    async fn test_custom_summary_formatter() {
1237        use std::sync::atomic::{AtomicUsize, Ordering};
1238        use std::time::Duration;
1239
1240        // Track formatter invocations
1241        let call_count = Arc::new(AtomicUsize::new(0));
1242        let count_clone = Arc::clone(&call_count);
1243
1244        // Track data passed to formatter
1245        let last_count = Arc::new(AtomicUsize::new(0));
1246        let last_count_clone = Arc::clone(&last_count);
1247
1248        // Create layer with custom formatter
1249        let layer = TracingRateLimitLayer::builder()
1250            .with_policy(Policy::count_based(2).unwrap())
1251            .with_active_emission(true)
1252            .with_summary_interval(Duration::from_millis(100))
1253            .with_summary_formatter(Arc::new(move |summary| {
1254                count_clone.fetch_add(1, Ordering::SeqCst);
1255                last_count_clone.store(summary.count, Ordering::SeqCst);
1256                // Custom format: emit at INFO level instead of WARN
1257                tracing::info!(
1258                    sig = %summary.signature,
1259                    suppressed = summary.count,
1260                    "Custom format"
1261                );
1262            }))
1263            .build()
1264            .unwrap();
1265
1266        // Emit events that will be suppressed
1267        let sig = EventSignature::simple("INFO", "test_message");
1268        for _ in 0..10 {
1269            layer.should_allow(sig);
1270        }
1271
1272        // Wait for emission
1273        tokio::time::sleep(Duration::from_millis(250)).await;
1274
1275        // Verify custom formatter was called
1276        let calls = call_count.load(Ordering::SeqCst);
1277        assert!(calls > 0, "Custom formatter should have been called");
1278
1279        // Verify formatter received correct data
1280        let count = last_count.load(Ordering::SeqCst);
1281        assert!(
1282            count >= 8,
1283            "Expected at least 8 suppressions, got {}",
1284            count
1285        );
1286
1287        layer.shutdown().await.expect("shutdown failed");
1288    }
1289
1290    #[cfg(feature = "async")]
1291    #[tokio::test]
1292    async fn test_default_formatter_used() {
1293        use std::sync::atomic::{AtomicUsize, Ordering};
1294        use std::time::Duration;
1295
1296        let emission_count = Arc::new(AtomicUsize::new(0));
1297        let count_clone = Arc::clone(&emission_count);
1298
1299        let storage = Arc::new(ShardedStorage::new());
1300        let clock = Arc::new(SystemClock::new());
1301        let policy = Policy::count_based(2).unwrap();
1302        let registry = SuppressionRegistry::new(storage, clock, policy);
1303
1304        let emitter_config = EmitterConfig::new(Duration::from_millis(100)).unwrap();
1305        let emitter = SummaryEmitter::new(registry.clone(), emitter_config);
1306
1307        // Start without custom formatter - should use default
1308        let handle = emitter.start(
1309            move |summaries| {
1310                count_clone.fetch_add(summaries.len(), Ordering::SeqCst);
1311            },
1312            false,
1313        );
1314
1315        let sig = EventSignature::simple("INFO", "test_message");
1316        for _ in 0..10 {
1317            registry.with_event_state(sig, |state, now| {
1318                state.counter.record_suppression(now);
1319            });
1320        }
1321
1322        tokio::time::sleep(Duration::from_millis(250)).await;
1323
1324        let count = emission_count.load(Ordering::SeqCst);
1325        assert!(count > 0, "Default formatter should have emitted summaries");
1326
1327        handle.shutdown().await.expect("shutdown failed");
1328    }
1329}