tracing_throttle/infrastructure/
layer.rs

1//! Tracing integration layer.
2//!
3//! Provides a `tracing::Layer` implementation that applies rate limiting
4//! to log events.
5
6use crate::application::{
7    circuit_breaker::CircuitBreaker,
8    emitter::EmitterConfig,
9    limiter::{LimitDecision, RateLimiter},
10    metrics::Metrics,
11    ports::{Clock, Storage},
12    registry::{EventState, SuppressionRegistry},
13};
14use crate::domain::{policy::Policy, signature::EventSignature};
15use crate::infrastructure::clock::SystemClock;
16use crate::infrastructure::storage::ShardedStorage;
17use crate::infrastructure::visitor::FieldVisitor;
18
19use std::borrow::Cow;
20use std::collections::{BTreeMap, BTreeSet};
21use std::sync::Arc;
22use std::time::Duration;
23use tracing::{Metadata, Subscriber};
24use tracing_subscriber::layer::Filter;
25use tracing_subscriber::registry::LookupSpan;
26use tracing_subscriber::{layer::Context, Layer};
27
28#[cfg(feature = "async")]
29use crate::application::emitter::{EmitterHandle, SummaryEmitter};
30
31#[cfg(feature = "async")]
32use crate::domain::summary::SuppressionSummary;
33
34#[cfg(feature = "async")]
35use std::sync::Mutex;
36
37/// Function type for formatting suppression summaries.
38///
39/// Takes a reference to a `SuppressionSummary` and emits it as a tracing event.
40/// The function is responsible for choosing the log level and format.
41#[cfg(feature = "async")]
42pub type SummaryFormatter = Arc<dyn Fn(&SuppressionSummary) + Send + Sync + 'static>;
43
44/// Error returned when building a TracingRateLimitLayer fails.
45#[derive(Debug, Clone, PartialEq, Eq)]
46pub enum BuildError {
47    /// Maximum signatures must be greater than zero
48    ZeroMaxSignatures,
49    /// Emitter configuration validation failed
50    EmitterConfig(crate::application::emitter::EmitterConfigError),
51}
52
53impl std::fmt::Display for BuildError {
54    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
55        match self {
56            BuildError::ZeroMaxSignatures => {
57                write!(f, "max_signatures must be greater than 0")
58            }
59            BuildError::EmitterConfig(e) => {
60                write!(f, "emitter configuration error: {}", e)
61            }
62        }
63    }
64}
65
66impl std::error::Error for BuildError {}
67
68impl From<crate::application::emitter::EmitterConfigError> for BuildError {
69    fn from(e: crate::application::emitter::EmitterConfigError) -> Self {
70        BuildError::EmitterConfig(e)
71    }
72}
73
74/// Builder for constructing a `TracingRateLimitLayer`.
75pub struct TracingRateLimitLayerBuilder {
76    policy: Policy,
77    summary_interval: Duration,
78    clock: Option<Arc<dyn Clock>>,
79    max_signatures: Option<usize>,
80    enable_active_emission: bool,
81    #[cfg(feature = "async")]
82    summary_formatter: Option<SummaryFormatter>,
83    span_context_fields: Vec<String>,
84    excluded_fields: BTreeSet<String>,
85    eviction_strategy: Option<EvictionStrategy>,
86    exempt_targets: BTreeSet<String>,
87}
88
89/// Eviction strategy configuration for the rate limit layer.
90///
91/// This enum provides a user-friendly API that internally creates
92/// the appropriate EvictionPolicy adapter.
93#[derive(Clone)]
94pub enum EvictionStrategy {
95    /// LRU (Least Recently Used) eviction with entry count limit.
96    Lru {
97        /// Maximum number of entries
98        max_entries: usize,
99    },
100    /// Priority-based eviction using a custom function.
101    Priority {
102        /// Maximum number of entries
103        max_entries: usize,
104        /// Priority calculation function
105        priority_fn: crate::infrastructure::eviction::PriorityFn<EventSignature, EventState>,
106    },
107    /// Memory-based eviction with byte limit.
108    Memory {
109        /// Maximum memory usage in bytes
110        max_bytes: usize,
111    },
112    /// Combined priority and memory limits.
113    PriorityWithMemory {
114        /// Maximum number of entries
115        max_entries: usize,
116        /// Priority calculation function
117        priority_fn: crate::infrastructure::eviction::PriorityFn<EventSignature, EventState>,
118        /// Maximum memory usage in bytes
119        max_bytes: usize,
120    },
121}
122
123impl EvictionStrategy {
124    /// Check if this strategy tracks memory usage.
125    pub fn tracks_memory(&self) -> bool {
126        matches!(
127            self,
128            EvictionStrategy::Memory { .. } | EvictionStrategy::PriorityWithMemory { .. }
129        )
130    }
131
132    /// Get the memory limit if this strategy uses one.
133    pub fn memory_limit(&self) -> Option<usize> {
134        match self {
135            EvictionStrategy::Memory { max_bytes } => Some(*max_bytes),
136            EvictionStrategy::PriorityWithMemory { max_bytes, .. } => Some(*max_bytes),
137            _ => None,
138        }
139    }
140
141    /// Check if this strategy uses priority-based eviction.
142    pub fn uses_priority(&self) -> bool {
143        matches!(
144            self,
145            EvictionStrategy::Priority { .. } | EvictionStrategy::PriorityWithMemory { .. }
146        )
147    }
148}
149
150impl std::fmt::Debug for EvictionStrategy {
151    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
152        match self {
153            EvictionStrategy::Lru { max_entries } => f
154                .debug_struct("Lru")
155                .field("max_entries", max_entries)
156                .finish(),
157            EvictionStrategy::Priority {
158                max_entries,
159                priority_fn: _,
160            } => f
161                .debug_struct("Priority")
162                .field("max_entries", max_entries)
163                .field("priority_fn", &"<fn>")
164                .finish(),
165            EvictionStrategy::Memory { max_bytes } => f
166                .debug_struct("Memory")
167                .field("max_bytes", max_bytes)
168                .finish(),
169            EvictionStrategy::PriorityWithMemory {
170                max_entries,
171                priority_fn: _,
172                max_bytes,
173            } => f
174                .debug_struct("PriorityWithMemory")
175                .field("max_entries", max_entries)
176                .field("priority_fn", &"<fn>")
177                .field("max_bytes", max_bytes)
178                .finish(),
179        }
180    }
181}
182
183impl TracingRateLimitLayerBuilder {
184    /// Set the rate limiting policy.
185    pub fn with_policy(mut self, policy: Policy) -> Self {
186        self.policy = policy;
187        self
188    }
189
190    /// Set the summary emission interval.
191    ///
192    /// The interval will be validated when `build()` is called.
193    pub fn with_summary_interval(mut self, interval: Duration) -> Self {
194        self.summary_interval = interval;
195        self
196    }
197
198    /// Set a custom clock (mainly for testing).
199    pub fn with_clock(mut self, clock: Arc<dyn Clock>) -> Self {
200        self.clock = Some(clock);
201        self
202    }
203
204    /// Set the maximum number of unique event signatures to track.
205    ///
206    /// When this limit is reached, the least recently used signatures will be evicted.
207    /// This prevents unbounded memory growth in applications with high signature cardinality.
208    ///
209    /// Default: 10,000 signatures
210    ///
211    /// The value will be validated when `build()` is called.
212    pub fn with_max_signatures(mut self, max_signatures: usize) -> Self {
213        self.max_signatures = Some(max_signatures);
214        self
215    }
216
217    /// Disable the signature limit, allowing unbounded growth.
218    ///
219    /// **Warning**: This can lead to unbounded memory usage in applications that generate
220    /// many unique event signatures. Only use this if you're certain your application has
221    /// bounded signature cardinality or you have external memory monitoring.
222    pub fn with_unlimited_signatures(mut self) -> Self {
223        self.max_signatures = None;
224        self
225    }
226
227    /// Enable active emission of suppression summaries.
228    ///
229    /// When enabled, the layer will automatically emit `WARN`-level tracing events
230    /// containing summaries of suppressed log events at the configured interval.
231    ///
232    /// **Requires the `async` feature** - this method has no effect without it.
233    ///
234    /// Default: disabled
235    ///
236    /// # Example
237    ///
238    /// ```no_run
239    /// # use tracing_throttle::TracingRateLimitLayer;
240    /// # use std::time::Duration;
241    /// let layer = TracingRateLimitLayer::builder()
242    ///     .with_active_emission(true)
243    ///     .with_summary_interval(Duration::from_secs(60))
244    ///     .build()
245    ///     .unwrap();
246    /// ```
247    pub fn with_active_emission(mut self, enabled: bool) -> Self {
248        self.enable_active_emission = enabled;
249        self
250    }
251
252    /// Set a custom formatter for suppression summaries.
253    ///
254    /// The formatter is responsible for emitting summaries as tracing events.
255    /// This allows full control over log level, message format, and structured fields.
256    ///
257    /// **Requires the `async` feature.**
258    ///
259    /// If not set, a default formatter is used that emits at WARN level with
260    /// `signature` and `count` fields.
261    ///
262    /// # Example
263    ///
264    /// ```no_run
265    /// # use tracing_throttle::TracingRateLimitLayer;
266    /// # use std::sync::Arc;
267    /// # use std::time::Duration;
268    /// let layer = TracingRateLimitLayer::builder()
269    ///     .with_active_emission(true)
270    ///     .with_summary_formatter(Arc::new(|summary| {
271    ///         tracing::info!(
272    ///             signature = %summary.signature,
273    ///             count = summary.count,
274    ///             duration_secs = summary.duration.as_secs(),
275    ///             "Suppression summary"
276    ///         );
277    ///     }))
278    ///     .build()
279    ///     .unwrap();
280    /// ```
281    #[cfg(feature = "async")]
282    pub fn with_summary_formatter(mut self, formatter: SummaryFormatter) -> Self {
283        self.summary_formatter = Some(formatter);
284        self
285    }
286
287    /// Include span context fields in event signatures.
288    ///
289    /// When specified, the layer will extract these fields from the current span
290    /// context and include them in the event signature. This enables rate limiting
291    /// per-user, per-tenant, per-request, or any other span-level context.
292    ///
293    /// Duplicate field names are automatically removed, and empty field names are filtered out.
294    ///
295    /// # Example
296    ///
297    /// ```no_run
298    /// # use tracing_throttle::TracingRateLimitLayer;
299    /// // Rate limit separately per user
300    /// let layer = TracingRateLimitLayer::builder()
301    ///     .with_span_context_fields(vec!["user_id".to_string()])
302    ///     .build()
303    ///     .unwrap();
304    ///
305    /// // Rate limit per user and tenant
306    /// let layer = TracingRateLimitLayer::builder()
307    ///     .with_span_context_fields(vec!["user_id".to_string(), "tenant_id".to_string()])
308    ///     .build()
309    ///     .unwrap();
310    /// ```
311    ///
312    /// # Usage with Spans
313    ///
314    /// ```no_run
315    /// # use tracing::{info, info_span};
316    /// // Create a span with user context
317    /// let span = info_span!("request", user_id = "alice");
318    /// let _enter = span.enter();
319    ///
320    /// // These events will be rate limited separately per user
321    /// info!("Processing request");  // Limited for user "alice"
322    /// ```
323    pub fn with_span_context_fields(mut self, fields: Vec<String>) -> Self {
324        // Deduplicate and filter out empty field names
325        let unique_fields: BTreeSet<_> = fields.into_iter().filter(|f| !f.is_empty()).collect();
326        self.span_context_fields = unique_fields.into_iter().collect();
327        self
328    }
329
330    /// Exclude specific fields from event signatures.
331    ///
332    /// By default, ALL event fields are included in signatures. This ensures that
333    /// events with different field values are treated as distinct events, preventing
334    /// accidental loss of meaningful log data.
335    ///
336    /// Use this method to exclude high-cardinality fields that don't change the
337    /// semantic meaning of the event (e.g., request_id, timestamp, trace_id).
338    ///
339    /// Duplicate field names are automatically removed, and empty field names are filtered out.
340    ///
341    /// # Example
342    ///
343    /// ```no_run
344    /// # use tracing_throttle::TracingRateLimitLayer;
345    /// // Exclude request_id so events with same user_id are deduplicated
346    /// let layer = TracingRateLimitLayer::builder()
347    ///     .with_excluded_fields(vec!["request_id".to_string(), "trace_id".to_string()])
348    ///     .build()
349    ///     .unwrap();
350    /// ```
351    ///
352    /// # Default Behavior (ALL fields included)
353    ///
354    /// ```no_run
355    /// # use tracing::error;
356    /// // These are DIFFERENT events (different user_id values)
357    /// error!(user_id = 123, "Failed to fetch user");
358    /// error!(user_id = 456, "Failed to fetch user");
359    /// // Both are logged - they have distinct signatures
360    /// ```
361    ///
362    /// # With Exclusions
363    ///
364    /// ```no_run
365    /// # use tracing::error;
366    /// // Exclude request_id from signature
367    /// error!(user_id = 123, request_id = "abc", "Failed to fetch user");
368    /// error!(user_id = 123, request_id = "def", "Failed to fetch user");
369    /// // Second is throttled - same user_id, request_id excluded from signature
370    /// ```
371    pub fn with_excluded_fields(mut self, fields: Vec<String>) -> Self {
372        // Deduplicate and filter out empty field names
373        let unique_fields: BTreeSet<_> = fields.into_iter().filter(|f| !f.is_empty()).collect();
374        self.excluded_fields = unique_fields;
375        self
376    }
377
378    /// Exempt specific targets from rate limiting.
379    ///
380    /// Events from the specified targets will bypass rate limiting entirely and
381    /// always be allowed through. This is useful for ensuring critical logs (e.g.,
382    /// security events, audit logs) are never suppressed.
383    ///
384    /// Targets are matched exactly against the event's target (module path).
385    /// Duplicate targets are automatically removed, and empty targets are filtered out.
386    ///
387    /// # Example
388    ///
389    /// ```no_run
390    /// # use tracing_throttle::TracingRateLimitLayer;
391    /// // Never throttle security or audit logs
392    /// let layer = TracingRateLimitLayer::builder()
393    ///     .with_exempt_targets(vec![
394    ///         "myapp::security".to_string(),
395    ///         "myapp::audit".to_string(),
396    ///     ])
397    ///     .build()
398    ///     .unwrap();
399    /// ```
400    ///
401    /// # Usage with Events
402    ///
403    /// ```no_run
404    /// # use tracing::error;
405    /// // Explicitly set target - this event bypasses throttling
406    /// error!(target: "myapp::security", "Security breach detected");
407    ///
408    /// // Or use the module's default target
409    /// mod security {
410    ///     use tracing::warn;
411    ///     pub fn check() {
412    ///         // Target is automatically "myapp::security" - bypasses throttling
413    ///         warn!("Suspicious activity detected");
414    ///     }
415    /// }
416    /// ```
417    pub fn with_exempt_targets(mut self, targets: Vec<String>) -> Self {
418        // Deduplicate and filter out empty targets
419        let unique_targets: BTreeSet<_> = targets.into_iter().filter(|t| !t.is_empty()).collect();
420        self.exempt_targets = unique_targets;
421        self
422    }
423
424    /// Set a custom eviction strategy for signature management.
425    ///
426    /// Controls which signatures are evicted when storage limits are reached.
427    /// If not set, uses LRU eviction with the configured max_signatures limit.
428    ///
429    /// # Example: Priority-based eviction
430    ///
431    /// ```no_run
432    /// # use tracing_throttle::{TracingRateLimitLayer, EvictionStrategy};
433    /// # use std::sync::Arc;
434    /// let layer = TracingRateLimitLayer::builder()
435    ///     .with_eviction_strategy(EvictionStrategy::Priority {
436    ///         max_entries: 5_000,
437    ///         priority_fn: Arc::new(|_sig, state| {
438    ///             // Keep ERROR events longer than INFO events
439    ///             match state.metadata.as_ref().map(|m| m.level.as_str()) {
440    ///                 Some("ERROR") => 100,
441    ///                 Some("WARN") => 50,
442    ///                 Some("INFO") => 10,
443    ///                 _ => 5,
444    ///             }
445    ///         })
446    ///     })
447    ///     .build()
448    ///     .unwrap();
449    /// ```
450    ///
451    /// # Example: Memory-based eviction
452    ///
453    /// ```no_run
454    /// # use tracing_throttle::{TracingRateLimitLayer, EvictionStrategy};
455    /// // Evict when total memory exceeds 5MB
456    /// let layer = TracingRateLimitLayer::builder()
457    ///     .with_eviction_strategy(EvictionStrategy::Memory {
458    ///         max_bytes: 5 * 1024 * 1024,
459    ///     })
460    ///     .build()
461    ///     .unwrap();
462    /// ```
463    pub fn with_eviction_strategy(mut self, strategy: EvictionStrategy) -> Self {
464        self.eviction_strategy = Some(strategy);
465        self
466    }
467
468    /// Build the layer.
469    ///
470    /// # Errors
471    /// Returns `BuildError` if the configuration is invalid.
472    pub fn build(self) -> Result<TracingRateLimitLayer, BuildError> {
473        // Validate max_signatures if set
474        if let Some(max) = self.max_signatures {
475            if max == 0 {
476                return Err(BuildError::ZeroMaxSignatures);
477            }
478        }
479
480        // Create shared metrics and circuit breaker
481        let metrics = Metrics::new();
482        let circuit_breaker = Arc::new(CircuitBreaker::new());
483
484        let clock = self.clock.unwrap_or_else(|| Arc::new(SystemClock::new()));
485        let mut storage = ShardedStorage::new().with_metrics(metrics.clone());
486
487        // Convert eviction strategy to adapter, or use default LRU with max_signatures
488        let eviction_policy: Option<
489            Arc<dyn crate::application::ports::EvictionPolicy<EventSignature, EventState>>,
490        > = match self.eviction_strategy {
491            Some(EvictionStrategy::Lru { max_entries }) => Some(Arc::new(
492                crate::infrastructure::eviction::LruEviction::new(max_entries),
493            )),
494            Some(EvictionStrategy::Priority {
495                max_entries,
496                priority_fn,
497            }) => Some(Arc::new(
498                crate::infrastructure::eviction::PriorityEviction::new(max_entries, priority_fn),
499            )),
500            Some(EvictionStrategy::Memory { max_bytes }) => Some(Arc::new(
501                crate::infrastructure::eviction::MemoryEviction::new(max_bytes),
502            )),
503            Some(EvictionStrategy::PriorityWithMemory {
504                max_entries,
505                priority_fn,
506                max_bytes,
507            }) => Some(Arc::new(
508                crate::infrastructure::eviction::PriorityWithMemoryEviction::new(
509                    max_entries,
510                    priority_fn,
511                    max_bytes,
512                ),
513            )),
514            None => {
515                // Use default LRU with max_signatures if configured
516                self.max_signatures.map(|max| {
517                    Arc::new(crate::infrastructure::eviction::LruEviction::new(max))
518                        as Arc<
519                            dyn crate::application::ports::EvictionPolicy<
520                                EventSignature,
521                                EventState,
522                            >,
523                        >
524                })
525            }
526        };
527
528        if let Some(policy) = eviction_policy {
529            storage = storage.with_eviction_policy(policy);
530        }
531
532        let storage = Arc::new(storage);
533        let registry = SuppressionRegistry::new(storage, clock, self.policy);
534        let limiter = RateLimiter::new(registry.clone(), metrics.clone(), circuit_breaker);
535
536        // Let EmitterConfig validate the interval
537        let emitter_config = EmitterConfig::new(self.summary_interval)?;
538
539        #[cfg(feature = "async")]
540        let emitter_handle = if self.enable_active_emission {
541            let emitter = SummaryEmitter::new(registry, emitter_config);
542
543            // Use custom formatter or default
544            let formatter = self.summary_formatter.unwrap_or_else(|| {
545                Arc::new(|summary: &SuppressionSummary| {
546                    tracing::warn!(
547                        signature = %summary.signature,
548                        count = summary.count,
549                        "{}",
550                        summary.format_message()
551                    );
552                })
553            });
554
555            let handle = emitter.start(
556                move |summaries| {
557                    for summary in summaries {
558                        formatter(&summary);
559                    }
560                },
561                false, // Don't emit final summaries on shutdown
562            );
563            Arc::new(Mutex::new(Some(handle)))
564        } else {
565            Arc::new(Mutex::new(None))
566        };
567
568        Ok(TracingRateLimitLayer {
569            limiter,
570            span_context_fields: Arc::new(self.span_context_fields),
571            excluded_fields: Arc::new(self.excluded_fields),
572            exempt_targets: Arc::new(self.exempt_targets),
573            #[cfg(feature = "async")]
574            emitter_handle,
575            #[cfg(not(feature = "async"))]
576            _emitter_config: emitter_config,
577        })
578    }
579}
580
581/// A `tracing::Layer` that applies rate limiting to events.
582///
583/// This layer intercepts events, computes their signature, and decides
584/// whether to allow or suppress them based on the configured policy.
585///
586/// Optionally emits periodic summaries of suppressed events when active
587/// emission is enabled (requires `async` feature).
588#[derive(Clone)]
589pub struct TracingRateLimitLayer<S = Arc<ShardedStorage<EventSignature, EventState>>>
590where
591    S: Storage<EventSignature, EventState> + Clone,
592{
593    limiter: RateLimiter<S>,
594    span_context_fields: Arc<Vec<String>>,
595    excluded_fields: Arc<BTreeSet<String>>,
596    exempt_targets: Arc<BTreeSet<String>>,
597    #[cfg(feature = "async")]
598    emitter_handle: Arc<Mutex<Option<EmitterHandle>>>,
599    #[cfg(not(feature = "async"))]
600    _emitter_config: EmitterConfig,
601}
602
603impl<S> TracingRateLimitLayer<S>
604where
605    S: Storage<EventSignature, EventState> + Clone,
606{
607    /// Extract span context fields from the current span.
608    fn extract_span_context<Sub>(
609        &self,
610        cx: &Context<'_, Sub>,
611    ) -> BTreeMap<Cow<'static, str>, Cow<'static, str>>
612    where
613        Sub: Subscriber + for<'lookup> LookupSpan<'lookup>,
614    {
615        if self.span_context_fields.is_empty() {
616            return BTreeMap::new();
617        }
618
619        let mut context_fields = BTreeMap::new();
620
621        if let Some(span) = cx.lookup_current() {
622            for span_ref in span.scope() {
623                let extensions = span_ref.extensions();
624
625                if let Some(stored_fields) =
626                    extensions.get::<BTreeMap<Cow<'static, str>, Cow<'static, str>>>()
627                {
628                    for field_name in self.span_context_fields.as_ref() {
629                        // Create an owned Cow since we can't guarantee 'static lifetime from the String
630                        let field_key: Cow<'static, str> = Cow::Owned(field_name.clone());
631                        if let std::collections::btree_map::Entry::Vacant(e) =
632                            context_fields.entry(field_key.clone())
633                        {
634                            if let Some(value) = stored_fields.get(&field_key) {
635                                e.insert(value.clone());
636                            }
637                        }
638                    }
639                }
640
641                if context_fields.len() == self.span_context_fields.len() {
642                    break;
643                }
644            }
645        }
646
647        context_fields
648    }
649
650    /// Extract event fields from an event.
651    ///
652    /// Extracts ALL fields from the event, then excludes any fields in the
653    /// excluded_fields set. This ensures that field values are included in
654    /// event signatures by default, preventing accidental deduplication of
655    /// semantically different events.
656    fn extract_event_fields(
657        &self,
658        event: &tracing::Event<'_>,
659    ) -> BTreeMap<Cow<'static, str>, Cow<'static, str>> {
660        let mut visitor = FieldVisitor::new();
661        event.record(&mut visitor);
662        let all_fields = visitor.into_fields();
663
664        // Exclude configured fields (e.g., high-cardinality fields like request_id)
665        if self.excluded_fields.is_empty() {
666            all_fields
667        } else {
668            all_fields
669                .into_iter()
670                .filter(|(field_name, _)| !self.excluded_fields.contains(field_name.as_ref()))
671                .collect()
672        }
673    }
674
675    /// Compute event signature from tracing metadata, span context, and event fields.
676    ///
677    /// The signature includes:
678    /// - Log level (INFO, WARN, ERROR, etc.)
679    /// - Message template
680    /// - Target module path
681    /// - Span context fields (if configured)
682    /// - Event fields (if configured)
683    fn compute_signature(
684        &self,
685        metadata: &Metadata,
686        combined_fields: &BTreeMap<Cow<'static, str>, Cow<'static, str>>,
687    ) -> EventSignature {
688        let level = metadata.level().as_str();
689        let message = metadata.name();
690        let target = Some(metadata.target());
691
692        // Use combined fields (span context + event fields) in signature
693        EventSignature::new(level, message, combined_fields, target)
694    }
695
696    /// Check if an event should be allowed through.
697    pub fn should_allow(&self, signature: EventSignature) -> bool {
698        matches!(self.limiter.check_event(signature), LimitDecision::Allow)
699    }
700
701    /// Check if an event should be allowed through and capture metadata.
702    ///
703    /// This method stores event metadata on first occurrence so summaries
704    /// can show human-readable event details instead of just signature hashes.
705    ///
706    /// **Note:** Only available with the `human-readable` feature flag.
707    #[cfg(feature = "human-readable")]
708    pub fn should_allow_with_metadata(
709        &self,
710        signature: EventSignature,
711        metadata: crate::domain::metadata::EventMetadata,
712    ) -> bool {
713        matches!(
714            self.limiter.check_event_with_metadata(signature, metadata),
715            LimitDecision::Allow
716        )
717    }
718
719    /// Get a reference to the underlying limiter.
720    pub fn limiter(&self) -> &RateLimiter<S> {
721        &self.limiter
722    }
723
724    /// Get a reference to the metrics.
725    ///
726    /// Returns metrics about rate limiting behavior including:
727    /// - Events allowed
728    /// - Events suppressed
729    /// - Signatures evicted
730    pub fn metrics(&self) -> &Metrics {
731        self.limiter.metrics()
732    }
733
734    /// Get the current number of tracked signatures.
735    pub fn signature_count(&self) -> usize {
736        self.limiter.registry().len()
737    }
738
739    /// Get a reference to the circuit breaker.
740    ///
741    /// Use this to check the circuit breaker state and health:
742    /// - `circuit_breaker().state()` - Current circuit state
743    /// - `circuit_breaker().consecutive_failures()` - Failure count
744    pub fn circuit_breaker(&self) -> &Arc<CircuitBreaker> {
745        self.limiter.circuit_breaker()
746    }
747
748    /// Shutdown the active suppression summary emitter, if running.
749    ///
750    /// This method gracefully stops the background emission task.  If active emission
751    /// is not enabled, this method does nothing.
752    ///
753    /// **Requires the `async` feature.**
754    ///
755    /// # Errors
756    ///
757    /// Returns an error if the emitter task fails to shut down gracefully.
758    ///
759    /// # Example
760    ///
761    /// ```no_run
762    /// # use tracing_throttle::TracingRateLimitLayer;
763    /// # async fn example() {
764    /// let layer = TracingRateLimitLayer::builder()
765    ///     .with_active_emission(true)
766    ///     .build()
767    ///     .unwrap();
768    ///
769    /// // Use the layer...
770    ///
771    /// // Shutdown before dropping
772    /// layer.shutdown().await.expect("shutdown failed");
773    /// # }
774    /// ```
775    #[cfg(feature = "async")]
776    pub async fn shutdown(&self) -> Result<(), crate::application::emitter::ShutdownError> {
777        // Take the handle while holding the lock, then release the lock before awaiting
778        let handle = {
779            let mut handle_guard = self.emitter_handle.lock().unwrap();
780            handle_guard.take()
781        };
782
783        if let Some(handle) = handle {
784            handle.shutdown().await?;
785        }
786        Ok(())
787    }
788}
789
790impl TracingRateLimitLayer<Arc<ShardedStorage<EventSignature, EventState>>> {
791    /// Create a builder for configuring the layer.
792    ///
793    /// Defaults:
794    /// - Policy: token bucket (50 burst capacity, 1 token/sec refill rate)
795    /// - Max signatures: 10,000 (with LRU eviction)
796    /// - Summary interval: 30 seconds
797    /// - Active emission: disabled
798    /// - Summary formatter: default (WARN level with signature and count)
799    pub fn builder() -> TracingRateLimitLayerBuilder {
800        TracingRateLimitLayerBuilder {
801            policy: Policy::token_bucket(50.0, 1.0)
802                .expect("default policy with 50 capacity and 1/sec refill is always valid"),
803            summary_interval: Duration::from_secs(30),
804            clock: None,
805            max_signatures: Some(10_000),
806            enable_active_emission: false,
807            #[cfg(feature = "async")]
808            summary_formatter: None,
809            span_context_fields: Vec::new(),
810            excluded_fields: BTreeSet::new(),
811            eviction_strategy: None,
812            exempt_targets: BTreeSet::new(),
813        }
814    }
815
816    /// Create a layer with default settings.
817    ///
818    /// Equivalent to `TracingRateLimitLayer::builder().build().unwrap()`.
819    ///
820    /// Defaults:
821    /// - Policy: token bucket (50 burst capacity, 1 token/sec refill rate = 60/min)
822    /// - Max signatures: 10,000 (with LRU eviction)
823    /// - Summary interval: 30 seconds
824    ///
825    /// # Panics
826    /// This method cannot panic because all default values are valid.
827    pub fn new() -> Self {
828        Self::builder()
829            .build()
830            .expect("default configuration is always valid")
831    }
832
833    /// Create a layer with custom storage backend.
834    ///
835    /// This allows using alternative storage implementations like Redis for distributed
836    /// rate limiting across multiple application instances.
837    ///
838    /// # Arguments
839    ///
840    /// * `storage` - Custom storage implementation (must implement `Storage<EventSignature, EventState>`)
841    /// * `policy` - Rate limiting policy to apply
842    /// * `clock` - Clock implementation (use `SystemClock::new()` for production)
843    ///
844    /// # Example with Redis
845    ///
846    /// ```rust,ignore
847    /// use tracing_throttle::{TracingRateLimitLayer, RedisStorage, Policy, SystemClock};
848    /// use std::sync::Arc;
849    ///
850    /// #[tokio::main]
851    /// async fn main() {
852    ///     let storage = Arc::new(
853    ///         RedisStorage::connect("redis://127.0.0.1/")
854    ///             .await
855    ///             .expect("Failed to connect")
856    ///     );
857    ///     let policy = Policy::token_bucket(100.0, 10.0).unwrap();
858    ///     let clock = Arc::new(SystemClock::new());
859    ///
860    ///     let layer = TracingRateLimitLayer::with_storage(storage, policy, clock);
861    /// }
862    /// ```
863    pub fn with_storage<ST>(
864        storage: ST,
865        policy: Policy,
866        clock: Arc<dyn Clock>,
867    ) -> TracingRateLimitLayer<ST>
868    where
869        ST: Storage<EventSignature, EventState> + Clone,
870    {
871        let metrics = Metrics::new();
872        let circuit_breaker = Arc::new(CircuitBreaker::new());
873        let registry = SuppressionRegistry::new(storage, clock, policy);
874        let limiter = RateLimiter::new(registry, metrics, circuit_breaker);
875
876        TracingRateLimitLayer {
877            limiter,
878            span_context_fields: Arc::new(Vec::new()),
879            excluded_fields: Arc::new(BTreeSet::new()),
880            exempt_targets: Arc::new(BTreeSet::new()),
881            #[cfg(feature = "async")]
882            emitter_handle: Arc::new(Mutex::new(None)),
883            #[cfg(not(feature = "async"))]
884            _emitter_config: EmitterConfig::new(Duration::from_secs(30))
885                .expect("30 seconds is valid"),
886        }
887    }
888}
889
890impl Default for TracingRateLimitLayer<Arc<ShardedStorage<EventSignature, EventState>>> {
891    fn default() -> Self {
892        Self::new()
893    }
894}
895
896// Implement the Filter trait for rate limiting
897impl<S, Sub> Filter<Sub> for TracingRateLimitLayer<S>
898where
899    S: Storage<EventSignature, EventState> + Clone,
900    Sub: Subscriber + for<'lookup> LookupSpan<'lookup>,
901{
902    fn enabled(&self, _meta: &Metadata<'_>, _cx: &Context<'_, Sub>) -> bool {
903        // Always return true - actual filtering happens in event_enabled
904        // This prevents double-checking in dual-layer setups
905        true
906    }
907
908    fn event_enabled(&self, event: &tracing::Event<'_>, cx: &Context<'_, Sub>) -> bool {
909        let metadata_obj = event.metadata();
910
911        // Check if this target is exempt from rate limiting
912        // Skip the lookup if no exempt targets are configured (common case)
913        if !self.exempt_targets.is_empty() && self.exempt_targets.contains(metadata_obj.target()) {
914            // Exempt targets bypass rate limiting entirely
915            self.limiter.metrics().record_allowed();
916            return true;
917        }
918
919        // Combine span context and event fields
920        let mut combined_fields = self.extract_span_context(cx);
921        let event_fields = self.extract_event_fields(event);
922        combined_fields.extend(event_fields);
923
924        let signature = self.compute_signature(metadata_obj, &combined_fields);
925
926        #[cfg(feature = "human-readable")]
927        {
928            // Extract message from event for metadata
929            let mut visitor = FieldVisitor::new();
930            event.record(&mut visitor);
931            let all_fields = visitor.into_fields();
932            let message = all_fields
933                .get(&Cow::Borrowed("message"))
934                .map(|v| v.to_string())
935                .unwrap_or_else(|| event.metadata().name().to_string());
936
937            // Create EventMetadata for this event
938            let event_metadata = crate::domain::metadata::EventMetadata::new(
939                metadata_obj.level().as_str().to_string(),
940                message,
941                metadata_obj.target().to_string(),
942                combined_fields,
943            );
944
945            self.should_allow_with_metadata(signature, event_metadata)
946        }
947
948        #[cfg(not(feature = "human-readable"))]
949        {
950            self.should_allow(signature)
951        }
952    }
953}
954
955impl<S, Sub> Layer<Sub> for TracingRateLimitLayer<S>
956where
957    S: Storage<EventSignature, EventState> + Clone + 'static,
958    Sub: Subscriber + for<'lookup> LookupSpan<'lookup>,
959{
960    fn on_new_span(
961        &self,
962        attrs: &tracing::span::Attributes<'_>,
963        id: &tracing::span::Id,
964        ctx: Context<'_, Sub>,
965    ) {
966        if self.span_context_fields.is_empty() {
967            return;
968        }
969
970        let mut visitor = FieldVisitor::new();
971        attrs.record(&mut visitor);
972        let fields = visitor.into_fields();
973
974        if let Some(span) = ctx.span(id) {
975            let mut extensions = span.extensions_mut();
976            extensions.insert(fields);
977        }
978    }
979}
980
981#[cfg(test)]
982mod tests {
983    use super::*;
984    use tracing::info;
985    use tracing_subscriber::layer::SubscriberExt;
986
987    #[test]
988    fn test_layer_builder() {
989        let layer = TracingRateLimitLayer::builder()
990            .with_policy(Policy::count_based(50).unwrap())
991            .with_summary_interval(Duration::from_secs(60))
992            .build()
993            .unwrap();
994
995        assert!(layer.limiter().registry().is_empty());
996    }
997
998    #[test]
999    fn test_span_context_fields_deduplication() {
1000        let layer = TracingRateLimitLayer::builder()
1001            .with_span_context_fields(vec![
1002                "user_id".to_string(),
1003                "user_id".to_string(), // duplicate
1004                "tenant_id".to_string(),
1005                "".to_string(),        // empty, should be filtered
1006                "user_id".to_string(), // another duplicate
1007            ])
1008            .build()
1009            .unwrap();
1010
1011        // Should only have 2 unique fields: user_id and tenant_id
1012        assert_eq!(layer.span_context_fields.len(), 2);
1013        assert!(layer.span_context_fields.iter().any(|f| f == "user_id"));
1014        assert!(layer.span_context_fields.iter().any(|f| f == "tenant_id"));
1015    }
1016
1017    #[test]
1018    fn test_excluded_fields_deduplication() {
1019        let layer = TracingRateLimitLayer::builder()
1020            .with_excluded_fields(vec![
1021                "request_id".to_string(),
1022                "request_id".to_string(), // duplicate
1023                "trace_id".to_string(),
1024                "".to_string(),           // empty, should be filtered
1025                "request_id".to_string(), // another duplicate
1026            ])
1027            .build()
1028            .unwrap();
1029
1030        // Should only have 2 unique fields: request_id and trace_id
1031        assert_eq!(layer.excluded_fields.len(), 2);
1032        assert!(layer.excluded_fields.contains("request_id"));
1033        assert!(layer.excluded_fields.contains("trace_id"));
1034    }
1035
1036    #[test]
1037    fn test_exempt_targets_deduplication() {
1038        let layer = TracingRateLimitLayer::builder()
1039            .with_exempt_targets(vec![
1040                "myapp::security".to_string(),
1041                "myapp::security".to_string(), // duplicate
1042                "myapp::audit".to_string(),
1043                "".to_string(),                // empty, should be filtered
1044                "myapp::security".to_string(), // another duplicate
1045            ])
1046            .build()
1047            .unwrap();
1048
1049        // Should only have 2 unique targets: security and audit
1050        assert_eq!(layer.exempt_targets.len(), 2);
1051        assert!(layer.exempt_targets.contains("myapp::security"));
1052        assert!(layer.exempt_targets.contains("myapp::audit"));
1053    }
1054
1055    #[test]
1056    fn test_exempt_targets_bypass_rate_limiting() {
1057        let rate_limit = TracingRateLimitLayer::builder()
1058            .with_policy(Policy::count_based(2).unwrap())
1059            .with_exempt_targets(vec!["myapp::security".to_string()])
1060            .build()
1061            .unwrap();
1062
1063        let subscriber = tracing_subscriber::registry()
1064            .with(tracing_subscriber::fmt::layer().with_filter(rate_limit.clone()));
1065
1066        tracing::subscriber::with_default(subscriber, || {
1067            // Regular logs get throttled after 2 (same callsite = same signature)
1068            for _ in 0..3 {
1069                info!("Regular log"); // First 2 allowed, 3rd suppressed
1070            }
1071
1072            // Security logs are never throttled (exempt target)
1073            for _ in 0..4 {
1074                info!(target: "myapp::security", "Security event"); // All 4 allowed
1075            }
1076        });
1077
1078        // Verify metrics
1079        let metrics = rate_limit.metrics();
1080        assert_eq!(metrics.events_allowed(), 6); // 2 regular + 4 exempt
1081        assert_eq!(metrics.events_suppressed(), 1); // 1 regular suppressed
1082    }
1083
1084    #[test]
1085    fn test_layer_default() {
1086        let layer = TracingRateLimitLayer::default();
1087        assert!(layer.limiter().registry().is_empty());
1088    }
1089
1090    #[test]
1091    fn test_signature_computation() {
1092        let _layer = TracingRateLimitLayer::new();
1093
1094        // Use a simple signature test without metadata construction
1095        let sig1 = EventSignature::simple("INFO", "test_event");
1096        let sig2 = EventSignature::simple("INFO", "test_event");
1097
1098        // Same inputs should produce same signature
1099        assert_eq!(sig1, sig2);
1100    }
1101
1102    #[test]
1103    fn test_basic_rate_limiting() {
1104        let layer = TracingRateLimitLayer::builder()
1105            .with_policy(Policy::count_based(2).unwrap())
1106            .build()
1107            .unwrap();
1108
1109        let sig = EventSignature::simple("INFO", "test_message");
1110
1111        // First two should be allowed
1112        assert!(layer.should_allow(sig));
1113        assert!(layer.should_allow(sig));
1114
1115        // Third should be suppressed
1116        assert!(!layer.should_allow(sig));
1117    }
1118
1119    #[test]
1120    fn test_layer_integration() {
1121        let layer = TracingRateLimitLayer::builder()
1122            .with_policy(Policy::count_based(3).unwrap())
1123            .build()
1124            .unwrap();
1125
1126        // Clone for use in subscriber, keep original for checking state
1127        let layer_for_check = layer.clone();
1128
1129        let subscriber = tracing_subscriber::registry()
1130            .with(tracing_subscriber::fmt::layer().with_filter(layer));
1131
1132        // Test that the layer correctly tracks event signatures
1133        tracing::subscriber::with_default(subscriber, || {
1134            // Emit 10 identical events
1135            for _ in 0..10 {
1136                info!("test event");
1137            }
1138        });
1139
1140        // After emitting 10 events with the same signature, the layer should have
1141        // tracked them and only the first 3 should have been marked as allowed
1142        // The registry should contain one entry for this signature
1143        assert_eq!(layer_for_check.limiter().registry().len(), 1);
1144    }
1145
1146    #[test]
1147    fn test_layer_suppression_logic() {
1148        let layer = TracingRateLimitLayer::builder()
1149            .with_policy(Policy::count_based(3).unwrap())
1150            .build()
1151            .unwrap();
1152
1153        let sig = EventSignature::simple("INFO", "test");
1154
1155        // Verify the suppression logic works correctly
1156        let mut allowed_count = 0;
1157        for _ in 0..10 {
1158            if layer.should_allow(sig) {
1159                allowed_count += 1;
1160            }
1161        }
1162
1163        assert_eq!(allowed_count, 3);
1164    }
1165
1166    #[test]
1167    fn test_builder_zero_summary_interval() {
1168        let result = TracingRateLimitLayer::builder()
1169            .with_summary_interval(Duration::from_secs(0))
1170            .build();
1171
1172        assert!(matches!(
1173            result,
1174            Err(BuildError::EmitterConfig(
1175                crate::application::emitter::EmitterConfigError::ZeroSummaryInterval
1176            ))
1177        ));
1178    }
1179
1180    #[test]
1181    fn test_builder_zero_max_signatures() {
1182        let result = TracingRateLimitLayer::builder()
1183            .with_max_signatures(0)
1184            .build();
1185
1186        assert!(matches!(result, Err(BuildError::ZeroMaxSignatures)));
1187    }
1188
1189    #[test]
1190    fn test_builder_valid_max_signatures() {
1191        let layer = TracingRateLimitLayer::builder()
1192            .with_max_signatures(100)
1193            .build()
1194            .unwrap();
1195
1196        assert!(layer.limiter().registry().is_empty());
1197    }
1198
1199    #[test]
1200    fn test_metrics_tracking() {
1201        let layer = TracingRateLimitLayer::builder()
1202            .with_policy(Policy::count_based(2).unwrap())
1203            .build()
1204            .unwrap();
1205
1206        let sig = EventSignature::simple("INFO", "test");
1207
1208        // Check initial metrics
1209        assert_eq!(layer.metrics().events_allowed(), 0);
1210        assert_eq!(layer.metrics().events_suppressed(), 0);
1211
1212        // Allow first two events
1213        assert!(layer.should_allow(sig));
1214        assert!(layer.should_allow(sig));
1215
1216        // Check metrics after allowed events
1217        assert_eq!(layer.metrics().events_allowed(), 2);
1218        assert_eq!(layer.metrics().events_suppressed(), 0);
1219
1220        // Suppress third event
1221        assert!(!layer.should_allow(sig));
1222
1223        // Check metrics after suppressed event
1224        assert_eq!(layer.metrics().events_allowed(), 2);
1225        assert_eq!(layer.metrics().events_suppressed(), 1);
1226    }
1227
1228    #[test]
1229    fn test_metrics_snapshot() {
1230        let layer = TracingRateLimitLayer::builder()
1231            .with_policy(Policy::count_based(3).unwrap())
1232            .build()
1233            .unwrap();
1234
1235        let sig = EventSignature::simple("INFO", "test");
1236
1237        // Generate some events
1238        for _ in 0..5 {
1239            layer.should_allow(sig);
1240        }
1241
1242        // Get snapshot
1243        let snapshot = layer.metrics().snapshot();
1244        assert_eq!(snapshot.events_allowed, 3);
1245        assert_eq!(snapshot.events_suppressed, 2);
1246        assert_eq!(snapshot.total_events(), 5);
1247        assert!((snapshot.suppression_rate() - 0.4).abs() < f64::EPSILON);
1248    }
1249
1250    #[test]
1251    fn test_signature_count() {
1252        let layer = TracingRateLimitLayer::builder()
1253            .with_policy(Policy::count_based(2).unwrap())
1254            .build()
1255            .unwrap();
1256
1257        assert_eq!(layer.signature_count(), 0);
1258
1259        let sig1 = EventSignature::simple("INFO", "test1");
1260        let sig2 = EventSignature::simple("INFO", "test2");
1261
1262        layer.should_allow(sig1);
1263        assert_eq!(layer.signature_count(), 1);
1264
1265        layer.should_allow(sig2);
1266        assert_eq!(layer.signature_count(), 2);
1267
1268        // Same signature shouldn't increase count
1269        layer.should_allow(sig1);
1270        assert_eq!(layer.signature_count(), 2);
1271    }
1272
1273    #[test]
1274    fn test_metrics_with_eviction() {
1275        let layer = TracingRateLimitLayer::builder()
1276            .with_policy(Policy::count_based(1).unwrap())
1277            .with_max_signatures(3)
1278            .build()
1279            .unwrap();
1280
1281        // Fill up to capacity
1282        for i in 0..3 {
1283            let sig = EventSignature::simple("INFO", &format!("test{}", i));
1284            layer.should_allow(sig);
1285        }
1286
1287        assert_eq!(layer.signature_count(), 3);
1288        assert_eq!(layer.metrics().signatures_evicted(), 0);
1289
1290        // Add one more, which should trigger eviction
1291        let sig = EventSignature::simple("INFO", "test3");
1292        layer.should_allow(sig);
1293
1294        assert_eq!(layer.signature_count(), 3);
1295        assert_eq!(layer.metrics().signatures_evicted(), 1);
1296    }
1297
1298    #[test]
1299    fn test_circuit_breaker_observability() {
1300        use crate::application::circuit_breaker::CircuitState;
1301
1302        let layer = TracingRateLimitLayer::builder()
1303            .with_policy(Policy::count_based(2).unwrap())
1304            .build()
1305            .unwrap();
1306
1307        // Check initial circuit breaker state
1308        let cb = layer.circuit_breaker();
1309        assert_eq!(cb.state(), CircuitState::Closed);
1310        assert_eq!(cb.consecutive_failures(), 0);
1311
1312        // Circuit breaker should remain closed during normal operation
1313        let sig = EventSignature::simple("INFO", "test");
1314        layer.should_allow(sig);
1315        layer.should_allow(sig);
1316        layer.should_allow(sig);
1317
1318        assert_eq!(cb.state(), CircuitState::Closed);
1319    }
1320
1321    #[test]
1322    fn test_circuit_breaker_fail_open_integration() {
1323        use crate::application::circuit_breaker::{
1324            CircuitBreaker, CircuitBreakerConfig, CircuitState,
1325        };
1326        use std::time::Duration;
1327
1328        // Create a circuit breaker with low threshold for testing
1329        let cb_config = CircuitBreakerConfig {
1330            failure_threshold: 2,
1331            recovery_timeout: Duration::from_secs(1),
1332        };
1333        let circuit_breaker = Arc::new(CircuitBreaker::with_config(cb_config));
1334
1335        // Build layer with custom circuit breaker
1336        let storage = Arc::new(ShardedStorage::new());
1337        let clock = Arc::new(SystemClock::new());
1338        let policy = Policy::count_based(2).unwrap();
1339        let registry = SuppressionRegistry::new(storage, clock, policy);
1340        let metrics = Metrics::new();
1341        let limiter = RateLimiter::new(registry, metrics, circuit_breaker.clone());
1342
1343        let layer = TracingRateLimitLayer {
1344            limiter,
1345            span_context_fields: Arc::new(Vec::new()),
1346            excluded_fields: Arc::new(BTreeSet::new()),
1347            exempt_targets: Arc::new(BTreeSet::new()),
1348            #[cfg(feature = "async")]
1349            emitter_handle: Arc::new(Mutex::new(None)),
1350            #[cfg(not(feature = "async"))]
1351            _emitter_config: crate::application::emitter::EmitterConfig::new(Duration::from_secs(
1352                30,
1353            ))
1354            .unwrap(),
1355        };
1356
1357        let sig = EventSignature::simple("INFO", "test");
1358
1359        // Normal operation - first 2 events allowed, third suppressed
1360        assert!(layer.should_allow(sig));
1361        assert!(layer.should_allow(sig));
1362        assert!(!layer.should_allow(sig));
1363
1364        // Circuit should still be closed
1365        assert_eq!(circuit_breaker.state(), CircuitState::Closed);
1366
1367        // Manually trigger circuit breaker failures to test fail-open
1368        circuit_breaker.record_failure();
1369        circuit_breaker.record_failure();
1370
1371        // Circuit should now be open
1372        assert_eq!(circuit_breaker.state(), CircuitState::Open);
1373
1374        // With circuit open, rate limiter should fail open (allow all events)
1375        // even though we've already hit the rate limit
1376        assert!(layer.should_allow(sig));
1377        assert!(layer.should_allow(sig));
1378        assert!(layer.should_allow(sig));
1379
1380        // Metrics should show these as allowed (fail-open behavior)
1381        let snapshot = layer.metrics().snapshot();
1382        assert!(snapshot.events_allowed >= 5); // 2 normal + 3 fail-open
1383    }
1384
1385    #[cfg(feature = "async")]
1386    #[tokio::test]
1387    async fn test_active_emission_integration() {
1388        use std::sync::atomic::{AtomicUsize, Ordering};
1389        use std::time::Duration;
1390
1391        // Use an atomic counter to track emissions
1392        let emission_count = Arc::new(AtomicUsize::new(0));
1393        let count_clone = Arc::clone(&emission_count);
1394
1395        // Create a layer with a custom emitter that increments our counter
1396        let storage = Arc::new(ShardedStorage::new());
1397        let clock = Arc::new(SystemClock::new());
1398        let policy = Policy::count_based(2).unwrap();
1399        let registry = SuppressionRegistry::new(storage, clock, policy);
1400
1401        let emitter_config = EmitterConfig::new(Duration::from_millis(100)).unwrap();
1402        let emitter = SummaryEmitter::new(registry.clone(), emitter_config);
1403
1404        // Start emitter with custom callback
1405        let handle = emitter.start(
1406            move |summaries| {
1407                count_clone.fetch_add(summaries.len(), Ordering::SeqCst);
1408            },
1409            false,
1410        );
1411
1412        // Emit events that will be suppressed
1413        let sig = EventSignature::simple("INFO", "test_message");
1414        for _ in 0..10 {
1415            registry.with_event_state(sig, |state, now| {
1416                state.counter.record_suppression(now);
1417            });
1418        }
1419
1420        // Wait for at least two emission intervals
1421        tokio::time::sleep(Duration::from_millis(250)).await;
1422
1423        // Check that summaries were emitted
1424        let count = emission_count.load(Ordering::SeqCst);
1425        assert!(
1426            count > 0,
1427            "Expected at least one suppression summary to be emitted, got {}",
1428            count
1429        );
1430
1431        // Graceful shutdown
1432        handle.shutdown().await.expect("shutdown failed");
1433    }
1434
1435    #[cfg(feature = "async")]
1436    #[tokio::test]
1437    async fn test_active_emission_disabled() {
1438        use crate::infrastructure::mocks::layer::MockCaptureLayer;
1439        use std::time::Duration;
1440
1441        // Create layer with active emission disabled (default)
1442        let layer = TracingRateLimitLayer::builder()
1443            .with_policy(Policy::count_based(2).unwrap())
1444            .with_summary_interval(Duration::from_millis(100))
1445            .build()
1446            .unwrap();
1447
1448        let mock = MockCaptureLayer::new();
1449        let mock_clone = mock.clone();
1450
1451        let subscriber = tracing_subscriber::registry()
1452            .with(mock)
1453            .with(tracing_subscriber::fmt::layer().with_filter(layer.clone()));
1454
1455        tracing::subscriber::with_default(subscriber, || {
1456            let sig = EventSignature::simple("INFO", "test_message");
1457            for _ in 0..10 {
1458                layer.should_allow(sig);
1459            }
1460        });
1461
1462        // Wait to ensure no emissions occur
1463        tokio::time::sleep(Duration::from_millis(250)).await;
1464
1465        // Should not have emitted any summaries
1466        let events = mock_clone.get_captured();
1467        let summary_count = events
1468            .iter()
1469            .filter(|e| e.message.contains("suppressed"))
1470            .count();
1471
1472        assert_eq!(
1473            summary_count, 0,
1474            "Should not emit summaries when active emission is disabled"
1475        );
1476
1477        // Shutdown should succeed even when emitter was never started
1478        layer.shutdown().await.expect("shutdown failed");
1479    }
1480
1481    #[cfg(feature = "async")]
1482    #[tokio::test]
1483    async fn test_shutdown_without_emission() {
1484        // Test that shutdown works when emission was never enabled
1485        let layer = TracingRateLimitLayer::new();
1486
1487        // Should not error
1488        layer
1489            .shutdown()
1490            .await
1491            .expect("shutdown should succeed when emitter not running");
1492    }
1493
1494    #[cfg(feature = "async")]
1495    #[tokio::test]
1496    async fn test_custom_summary_formatter() {
1497        use std::sync::atomic::{AtomicUsize, Ordering};
1498        use std::time::Duration;
1499
1500        // Track formatter invocations
1501        let call_count = Arc::new(AtomicUsize::new(0));
1502        let count_clone = Arc::clone(&call_count);
1503
1504        // Track data passed to formatter
1505        let last_count = Arc::new(AtomicUsize::new(0));
1506        let last_count_clone = Arc::clone(&last_count);
1507
1508        // Create layer with custom formatter
1509        let layer = TracingRateLimitLayer::builder()
1510            .with_policy(Policy::count_based(2).unwrap())
1511            .with_active_emission(true)
1512            .with_summary_interval(Duration::from_millis(100))
1513            .with_summary_formatter(Arc::new(move |summary| {
1514                count_clone.fetch_add(1, Ordering::SeqCst);
1515                last_count_clone.store(summary.count, Ordering::SeqCst);
1516                // Custom format: emit at INFO level instead of WARN
1517                tracing::info!(
1518                    sig = %summary.signature,
1519                    suppressed = summary.count,
1520                    "Custom format"
1521                );
1522            }))
1523            .build()
1524            .unwrap();
1525
1526        // Emit events that will be suppressed
1527        let sig = EventSignature::simple("INFO", "test_message");
1528        for _ in 0..10 {
1529            layer.should_allow(sig);
1530        }
1531
1532        // Wait for emission
1533        tokio::time::sleep(Duration::from_millis(250)).await;
1534
1535        // Verify custom formatter was called
1536        let calls = call_count.load(Ordering::SeqCst);
1537        assert!(calls > 0, "Custom formatter should have been called");
1538
1539        // Verify formatter received correct data
1540        let count = last_count.load(Ordering::SeqCst);
1541        assert!(
1542            count >= 8,
1543            "Expected at least 8 suppressions, got {}",
1544            count
1545        );
1546
1547        layer.shutdown().await.expect("shutdown failed");
1548    }
1549
1550    #[cfg(feature = "async")]
1551    #[tokio::test]
1552    async fn test_default_formatter_used() {
1553        use std::sync::atomic::{AtomicUsize, Ordering};
1554        use std::time::Duration;
1555
1556        let emission_count = Arc::new(AtomicUsize::new(0));
1557        let count_clone = Arc::clone(&emission_count);
1558
1559        let storage = Arc::new(ShardedStorage::new());
1560        let clock = Arc::new(SystemClock::new());
1561        let policy = Policy::count_based(2).unwrap();
1562        let registry = SuppressionRegistry::new(storage, clock, policy);
1563
1564        let emitter_config = EmitterConfig::new(Duration::from_millis(100)).unwrap();
1565        let emitter = SummaryEmitter::new(registry.clone(), emitter_config);
1566
1567        // Start without custom formatter - should use default
1568        let handle = emitter.start(
1569            move |summaries| {
1570                count_clone.fetch_add(summaries.len(), Ordering::SeqCst);
1571            },
1572            false,
1573        );
1574
1575        let sig = EventSignature::simple("INFO", "test_message");
1576        for _ in 0..10 {
1577            registry.with_event_state(sig, |state, now| {
1578                state.counter.record_suppression(now);
1579            });
1580        }
1581
1582        tokio::time::sleep(Duration::from_millis(250)).await;
1583
1584        let count = emission_count.load(Ordering::SeqCst);
1585        assert!(count > 0, "Default formatter should have emitted summaries");
1586
1587        handle.shutdown().await.expect("shutdown failed");
1588    }
1589}