tracing_throttle/infrastructure/
layer.rs

1//! Tracing integration layer.
2//!
3//! Provides a `tracing::Layer` implementation that applies rate limiting
4//! to log events.
5
6use crate::application::{
7    circuit_breaker::CircuitBreaker,
8    emitter::EmitterConfig,
9    limiter::{LimitDecision, RateLimiter},
10    metrics::Metrics,
11    ports::{Clock, Storage},
12    registry::{EventState, SuppressionRegistry},
13};
14use crate::domain::{policy::Policy, signature::EventSignature};
15use crate::infrastructure::clock::SystemClock;
16use crate::infrastructure::storage::ShardedStorage;
17use crate::infrastructure::visitor::FieldVisitor;
18
19use std::collections::{BTreeMap, BTreeSet};
20use std::sync::Arc;
21use std::time::Duration;
22use tracing::{Metadata, Subscriber};
23use tracing_subscriber::layer::Filter;
24use tracing_subscriber::registry::LookupSpan;
25use tracing_subscriber::{layer::Context, Layer};
26
27#[cfg(feature = "async")]
28use crate::application::emitter::{EmitterHandle, SummaryEmitter};
29
30#[cfg(feature = "async")]
31use crate::domain::summary::SuppressionSummary;
32
33#[cfg(feature = "async")]
34use std::sync::Mutex;
35
36/// Function type for formatting suppression summaries.
37///
38/// Takes a reference to a `SuppressionSummary` and emits it as a tracing event.
39/// The function is responsible for choosing the log level and format.
40#[cfg(feature = "async")]
41pub type SummaryFormatter = Arc<dyn Fn(&SuppressionSummary) + Send + Sync + 'static>;
42
43/// Error returned when building a TracingRateLimitLayer fails.
44#[derive(Debug, Clone, PartialEq, Eq)]
45pub enum BuildError {
46    /// Maximum signatures must be greater than zero
47    ZeroMaxSignatures,
48    /// Emitter configuration validation failed
49    EmitterConfig(crate::application::emitter::EmitterConfigError),
50}
51
52impl std::fmt::Display for BuildError {
53    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
54        match self {
55            BuildError::ZeroMaxSignatures => {
56                write!(f, "max_signatures must be greater than 0")
57            }
58            BuildError::EmitterConfig(e) => {
59                write!(f, "emitter configuration error: {}", e)
60            }
61        }
62    }
63}
64
65impl std::error::Error for BuildError {}
66
67impl From<crate::application::emitter::EmitterConfigError> for BuildError {
68    fn from(e: crate::application::emitter::EmitterConfigError) -> Self {
69        BuildError::EmitterConfig(e)
70    }
71}
72
73/// Builder for constructing a `TracingRateLimitLayer`.
74pub struct TracingRateLimitLayerBuilder {
75    policy: Policy,
76    summary_interval: Duration,
77    clock: Option<Arc<dyn Clock>>,
78    max_signatures: Option<usize>,
79    enable_active_emission: bool,
80    #[cfg(feature = "async")]
81    summary_formatter: Option<SummaryFormatter>,
82    span_context_fields: Vec<String>,
83    excluded_fields: BTreeSet<String>,
84    eviction_strategy: Option<EvictionStrategy>,
85    exempt_targets: BTreeSet<String>,
86}
87
88/// Eviction strategy configuration for the rate limit layer.
89///
90/// This enum provides a user-friendly API that internally creates
91/// the appropriate EvictionPolicy adapter.
92#[derive(Clone)]
93pub enum EvictionStrategy {
94    /// LRU (Least Recently Used) eviction with entry count limit.
95    Lru {
96        /// Maximum number of entries
97        max_entries: usize,
98    },
99    /// Priority-based eviction using a custom function.
100    Priority {
101        /// Maximum number of entries
102        max_entries: usize,
103        /// Priority calculation function
104        priority_fn: crate::infrastructure::eviction::PriorityFn<EventSignature, EventState>,
105    },
106    /// Memory-based eviction with byte limit.
107    Memory {
108        /// Maximum memory usage in bytes
109        max_bytes: usize,
110    },
111    /// Combined priority and memory limits.
112    PriorityWithMemory {
113        /// Maximum number of entries
114        max_entries: usize,
115        /// Priority calculation function
116        priority_fn: crate::infrastructure::eviction::PriorityFn<EventSignature, EventState>,
117        /// Maximum memory usage in bytes
118        max_bytes: usize,
119    },
120}
121
122impl EvictionStrategy {
123    /// Check if this strategy tracks memory usage.
124    pub fn tracks_memory(&self) -> bool {
125        matches!(
126            self,
127            EvictionStrategy::Memory { .. } | EvictionStrategy::PriorityWithMemory { .. }
128        )
129    }
130
131    /// Get the memory limit if this strategy uses one.
132    pub fn memory_limit(&self) -> Option<usize> {
133        match self {
134            EvictionStrategy::Memory { max_bytes } => Some(*max_bytes),
135            EvictionStrategy::PriorityWithMemory { max_bytes, .. } => Some(*max_bytes),
136            _ => None,
137        }
138    }
139
140    /// Check if this strategy uses priority-based eviction.
141    pub fn uses_priority(&self) -> bool {
142        matches!(
143            self,
144            EvictionStrategy::Priority { .. } | EvictionStrategy::PriorityWithMemory { .. }
145        )
146    }
147}
148
149impl std::fmt::Debug for EvictionStrategy {
150    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
151        match self {
152            EvictionStrategy::Lru { max_entries } => f
153                .debug_struct("Lru")
154                .field("max_entries", max_entries)
155                .finish(),
156            EvictionStrategy::Priority {
157                max_entries,
158                priority_fn: _,
159            } => f
160                .debug_struct("Priority")
161                .field("max_entries", max_entries)
162                .field("priority_fn", &"<fn>")
163                .finish(),
164            EvictionStrategy::Memory { max_bytes } => f
165                .debug_struct("Memory")
166                .field("max_bytes", max_bytes)
167                .finish(),
168            EvictionStrategy::PriorityWithMemory {
169                max_entries,
170                priority_fn: _,
171                max_bytes,
172            } => f
173                .debug_struct("PriorityWithMemory")
174                .field("max_entries", max_entries)
175                .field("priority_fn", &"<fn>")
176                .field("max_bytes", max_bytes)
177                .finish(),
178        }
179    }
180}
181
182impl TracingRateLimitLayerBuilder {
183    /// Set the rate limiting policy.
184    pub fn with_policy(mut self, policy: Policy) -> Self {
185        self.policy = policy;
186        self
187    }
188
189    /// Set the summary emission interval.
190    ///
191    /// The interval will be validated when `build()` is called.
192    pub fn with_summary_interval(mut self, interval: Duration) -> Self {
193        self.summary_interval = interval;
194        self
195    }
196
197    /// Set a custom clock (mainly for testing).
198    pub fn with_clock(mut self, clock: Arc<dyn Clock>) -> Self {
199        self.clock = Some(clock);
200        self
201    }
202
203    /// Set the maximum number of unique event signatures to track.
204    ///
205    /// When this limit is reached, the least recently used signatures will be evicted.
206    /// This prevents unbounded memory growth in applications with high signature cardinality.
207    ///
208    /// Default: 10,000 signatures
209    ///
210    /// The value will be validated when `build()` is called.
211    pub fn with_max_signatures(mut self, max_signatures: usize) -> Self {
212        self.max_signatures = Some(max_signatures);
213        self
214    }
215
216    /// Disable the signature limit, allowing unbounded growth.
217    ///
218    /// **Warning**: This can lead to unbounded memory usage in applications that generate
219    /// many unique event signatures. Only use this if you're certain your application has
220    /// bounded signature cardinality or you have external memory monitoring.
221    pub fn with_unlimited_signatures(mut self) -> Self {
222        self.max_signatures = None;
223        self
224    }
225
226    /// Enable active emission of suppression summaries.
227    ///
228    /// When enabled, the layer will automatically emit `WARN`-level tracing events
229    /// containing summaries of suppressed log events at the configured interval.
230    ///
231    /// **Requires the `async` feature** - this method has no effect without it.
232    ///
233    /// Default: disabled
234    ///
235    /// # Example
236    ///
237    /// ```no_run
238    /// # use tracing_throttle::TracingRateLimitLayer;
239    /// # use std::time::Duration;
240    /// let layer = TracingRateLimitLayer::builder()
241    ///     .with_active_emission(true)
242    ///     .with_summary_interval(Duration::from_secs(60))
243    ///     .build()
244    ///     .unwrap();
245    /// ```
246    pub fn with_active_emission(mut self, enabled: bool) -> Self {
247        self.enable_active_emission = enabled;
248        self
249    }
250
251    /// Set a custom formatter for suppression summaries.
252    ///
253    /// The formatter is responsible for emitting summaries as tracing events.
254    /// This allows full control over log level, message format, and structured fields.
255    ///
256    /// **Requires the `async` feature.**
257    ///
258    /// If not set, a default formatter is used that emits at WARN level with
259    /// `signature` and `count` fields.
260    ///
261    /// # Example
262    ///
263    /// ```no_run
264    /// # use tracing_throttle::TracingRateLimitLayer;
265    /// # use std::sync::Arc;
266    /// # use std::time::Duration;
267    /// let layer = TracingRateLimitLayer::builder()
268    ///     .with_active_emission(true)
269    ///     .with_summary_formatter(Arc::new(|summary| {
270    ///         tracing::info!(
271    ///             signature = %summary.signature,
272    ///             count = summary.count,
273    ///             duration_secs = summary.duration.as_secs(),
274    ///             "Suppression summary"
275    ///         );
276    ///     }))
277    ///     .build()
278    ///     .unwrap();
279    /// ```
280    #[cfg(feature = "async")]
281    pub fn with_summary_formatter(mut self, formatter: SummaryFormatter) -> Self {
282        self.summary_formatter = Some(formatter);
283        self
284    }
285
286    /// Include span context fields in event signatures.
287    ///
288    /// When specified, the layer will extract these fields from the current span
289    /// context and include them in the event signature. This enables rate limiting
290    /// per-user, per-tenant, per-request, or any other span-level context.
291    ///
292    /// Duplicate field names are automatically removed, and empty field names are filtered out.
293    ///
294    /// # Example
295    ///
296    /// ```no_run
297    /// # use tracing_throttle::TracingRateLimitLayer;
298    /// // Rate limit separately per user
299    /// let layer = TracingRateLimitLayer::builder()
300    ///     .with_span_context_fields(vec!["user_id".to_string()])
301    ///     .build()
302    ///     .unwrap();
303    ///
304    /// // Rate limit per user and tenant
305    /// let layer = TracingRateLimitLayer::builder()
306    ///     .with_span_context_fields(vec!["user_id".to_string(), "tenant_id".to_string()])
307    ///     .build()
308    ///     .unwrap();
309    /// ```
310    ///
311    /// # Usage with Spans
312    ///
313    /// ```no_run
314    /// # use tracing::{info, info_span};
315    /// // Create a span with user context
316    /// let span = info_span!("request", user_id = "alice");
317    /// let _enter = span.enter();
318    ///
319    /// // These events will be rate limited separately per user
320    /// info!("Processing request");  // Limited for user "alice"
321    /// ```
322    pub fn with_span_context_fields(mut self, fields: Vec<String>) -> Self {
323        // Deduplicate and filter out empty field names
324        let unique_fields: BTreeSet<_> = fields.into_iter().filter(|f| !f.is_empty()).collect();
325        self.span_context_fields = unique_fields.into_iter().collect();
326        self
327    }
328
329    /// Exclude specific fields from event signatures.
330    ///
331    /// By default, ALL event fields are included in signatures. This ensures that
332    /// events with different field values are treated as distinct events, preventing
333    /// accidental loss of meaningful log data.
334    ///
335    /// Use this method to exclude high-cardinality fields that don't change the
336    /// semantic meaning of the event (e.g., request_id, timestamp, trace_id).
337    ///
338    /// Duplicate field names are automatically removed, and empty field names are filtered out.
339    ///
340    /// # Example
341    ///
342    /// ```no_run
343    /// # use tracing_throttle::TracingRateLimitLayer;
344    /// // Exclude request_id so events with same user_id are deduplicated
345    /// let layer = TracingRateLimitLayer::builder()
346    ///     .with_excluded_fields(vec!["request_id".to_string(), "trace_id".to_string()])
347    ///     .build()
348    ///     .unwrap();
349    /// ```
350    ///
351    /// # Default Behavior (ALL fields included)
352    ///
353    /// ```no_run
354    /// # use tracing::error;
355    /// // These are DIFFERENT events (different user_id values)
356    /// error!(user_id = 123, "Failed to fetch user");
357    /// error!(user_id = 456, "Failed to fetch user");
358    /// // Both are logged - they have distinct signatures
359    /// ```
360    ///
361    /// # With Exclusions
362    ///
363    /// ```no_run
364    /// # use tracing::error;
365    /// // Exclude request_id from signature
366    /// error!(user_id = 123, request_id = "abc", "Failed to fetch user");
367    /// error!(user_id = 123, request_id = "def", "Failed to fetch user");
368    /// // Second is throttled - same user_id, request_id excluded from signature
369    /// ```
370    pub fn with_excluded_fields(mut self, fields: Vec<String>) -> Self {
371        // Deduplicate and filter out empty field names
372        let unique_fields: BTreeSet<_> = fields.into_iter().filter(|f| !f.is_empty()).collect();
373        self.excluded_fields = unique_fields;
374        self
375    }
376
377    /// Exempt specific targets from rate limiting.
378    ///
379    /// Events from the specified targets will bypass rate limiting entirely and
380    /// always be allowed through. This is useful for ensuring critical logs (e.g.,
381    /// security events, audit logs) are never suppressed.
382    ///
383    /// Targets are matched exactly against the event's target (module path).
384    /// Duplicate targets are automatically removed, and empty targets are filtered out.
385    ///
386    /// # Example
387    ///
388    /// ```no_run
389    /// # use tracing_throttle::TracingRateLimitLayer;
390    /// // Never throttle security or audit logs
391    /// let layer = TracingRateLimitLayer::builder()
392    ///     .with_exempt_targets(vec![
393    ///         "myapp::security".to_string(),
394    ///         "myapp::audit".to_string(),
395    ///     ])
396    ///     .build()
397    ///     .unwrap();
398    /// ```
399    ///
400    /// # Usage with Events
401    ///
402    /// ```no_run
403    /// # use tracing::error;
404    /// // Explicitly set target - this event bypasses throttling
405    /// error!(target: "myapp::security", "Security breach detected");
406    ///
407    /// // Or use the module's default target
408    /// mod security {
409    ///     use tracing::warn;
410    ///     pub fn check() {
411    ///         // Target is automatically "myapp::security" - bypasses throttling
412    ///         warn!("Suspicious activity detected");
413    ///     }
414    /// }
415    /// ```
416    pub fn with_exempt_targets(mut self, targets: Vec<String>) -> Self {
417        // Deduplicate and filter out empty targets
418        let unique_targets: BTreeSet<_> = targets.into_iter().filter(|t| !t.is_empty()).collect();
419        self.exempt_targets = unique_targets;
420        self
421    }
422
423    /// Set a custom eviction strategy for signature management.
424    ///
425    /// Controls which signatures are evicted when storage limits are reached.
426    /// If not set, uses LRU eviction with the configured max_signatures limit.
427    ///
428    /// # Example: Priority-based eviction
429    ///
430    /// ```no_run
431    /// # use tracing_throttle::{TracingRateLimitLayer, EvictionStrategy};
432    /// # use std::sync::Arc;
433    /// let layer = TracingRateLimitLayer::builder()
434    ///     .with_eviction_strategy(EvictionStrategy::Priority {
435    ///         max_entries: 5_000,
436    ///         priority_fn: Arc::new(|_sig, state| {
437    ///             // Keep ERROR events longer than INFO events
438    ///             match state.metadata.as_ref().map(|m| m.level.as_str()) {
439    ///                 Some("ERROR") => 100,
440    ///                 Some("WARN") => 50,
441    ///                 Some("INFO") => 10,
442    ///                 _ => 5,
443    ///             }
444    ///         })
445    ///     })
446    ///     .build()
447    ///     .unwrap();
448    /// ```
449    ///
450    /// # Example: Memory-based eviction
451    ///
452    /// ```no_run
453    /// # use tracing_throttle::{TracingRateLimitLayer, EvictionStrategy};
454    /// // Evict when total memory exceeds 5MB
455    /// let layer = TracingRateLimitLayer::builder()
456    ///     .with_eviction_strategy(EvictionStrategy::Memory {
457    ///         max_bytes: 5 * 1024 * 1024,
458    ///     })
459    ///     .build()
460    ///     .unwrap();
461    /// ```
462    pub fn with_eviction_strategy(mut self, strategy: EvictionStrategy) -> Self {
463        self.eviction_strategy = Some(strategy);
464        self
465    }
466
467    /// Build the layer.
468    ///
469    /// # Errors
470    /// Returns `BuildError` if the configuration is invalid.
471    pub fn build(self) -> Result<TracingRateLimitLayer, BuildError> {
472        // Validate max_signatures if set
473        if let Some(max) = self.max_signatures {
474            if max == 0 {
475                return Err(BuildError::ZeroMaxSignatures);
476            }
477        }
478
479        // Create shared metrics and circuit breaker
480        let metrics = Metrics::new();
481        let circuit_breaker = Arc::new(CircuitBreaker::new());
482
483        let clock = self.clock.unwrap_or_else(|| Arc::new(SystemClock::new()));
484        let mut storage = ShardedStorage::new().with_metrics(metrics.clone());
485
486        // Convert eviction strategy to adapter, or use default LRU with max_signatures
487        let eviction_policy: Option<
488            Arc<dyn crate::application::ports::EvictionPolicy<EventSignature, EventState>>,
489        > = match self.eviction_strategy {
490            Some(EvictionStrategy::Lru { max_entries }) => Some(Arc::new(
491                crate::infrastructure::eviction::LruEviction::new(max_entries),
492            )),
493            Some(EvictionStrategy::Priority {
494                max_entries,
495                priority_fn,
496            }) => Some(Arc::new(
497                crate::infrastructure::eviction::PriorityEviction::new(max_entries, priority_fn),
498            )),
499            Some(EvictionStrategy::Memory { max_bytes }) => Some(Arc::new(
500                crate::infrastructure::eviction::MemoryEviction::new(max_bytes),
501            )),
502            Some(EvictionStrategy::PriorityWithMemory {
503                max_entries,
504                priority_fn,
505                max_bytes,
506            }) => Some(Arc::new(
507                crate::infrastructure::eviction::PriorityWithMemoryEviction::new(
508                    max_entries,
509                    priority_fn,
510                    max_bytes,
511                ),
512            )),
513            None => {
514                // Use default LRU with max_signatures if configured
515                self.max_signatures.map(|max| {
516                    Arc::new(crate::infrastructure::eviction::LruEviction::new(max))
517                        as Arc<
518                            dyn crate::application::ports::EvictionPolicy<
519                                EventSignature,
520                                EventState,
521                            >,
522                        >
523                })
524            }
525        };
526
527        if let Some(policy) = eviction_policy {
528            storage = storage.with_eviction_policy(policy);
529        }
530
531        let storage = Arc::new(storage);
532        let registry = SuppressionRegistry::new(storage, clock, self.policy);
533        let limiter = RateLimiter::new(registry.clone(), metrics.clone(), circuit_breaker);
534
535        // Let EmitterConfig validate the interval
536        let emitter_config = EmitterConfig::new(self.summary_interval)?;
537
538        #[cfg(feature = "async")]
539        let emitter_handle = if self.enable_active_emission {
540            let emitter = SummaryEmitter::new(registry, emitter_config);
541
542            // Use custom formatter or default
543            let formatter = self.summary_formatter.unwrap_or_else(|| {
544                Arc::new(|summary: &SuppressionSummary| {
545                    tracing::warn!(
546                        signature = %summary.signature,
547                        count = summary.count,
548                        "{}",
549                        summary.format_message()
550                    );
551                })
552            });
553
554            let handle = emitter.start(
555                move |summaries| {
556                    for summary in summaries {
557                        formatter(&summary);
558                    }
559                },
560                false, // Don't emit final summaries on shutdown
561            );
562            Arc::new(Mutex::new(Some(handle)))
563        } else {
564            Arc::new(Mutex::new(None))
565        };
566
567        Ok(TracingRateLimitLayer {
568            limiter,
569            span_context_fields: Arc::new(self.span_context_fields),
570            excluded_fields: Arc::new(self.excluded_fields),
571            exempt_targets: Arc::new(self.exempt_targets),
572            #[cfg(feature = "async")]
573            emitter_handle,
574            #[cfg(not(feature = "async"))]
575            _emitter_config: emitter_config,
576        })
577    }
578}
579
580/// A `tracing::Layer` that applies rate limiting to events.
581///
582/// This layer intercepts events, computes their signature, and decides
583/// whether to allow or suppress them based on the configured policy.
584///
585/// Optionally emits periodic summaries of suppressed events when active
586/// emission is enabled (requires `async` feature).
587#[derive(Clone)]
588pub struct TracingRateLimitLayer<S = Arc<ShardedStorage<EventSignature, EventState>>>
589where
590    S: Storage<EventSignature, EventState> + Clone,
591{
592    limiter: RateLimiter<S>,
593    span_context_fields: Arc<Vec<String>>,
594    excluded_fields: Arc<BTreeSet<String>>,
595    exempt_targets: Arc<BTreeSet<String>>,
596    #[cfg(feature = "async")]
597    emitter_handle: Arc<Mutex<Option<EmitterHandle>>>,
598    #[cfg(not(feature = "async"))]
599    _emitter_config: EmitterConfig,
600}
601
602impl<S> TracingRateLimitLayer<S>
603where
604    S: Storage<EventSignature, EventState> + Clone,
605{
606    /// Extract span context fields from the current span.
607    fn extract_span_context<Sub>(&self, cx: &Context<'_, Sub>) -> BTreeMap<String, String>
608    where
609        Sub: Subscriber + for<'lookup> LookupSpan<'lookup>,
610    {
611        if self.span_context_fields.is_empty() {
612            return BTreeMap::new();
613        }
614
615        let mut context_fields = BTreeMap::new();
616
617        if let Some(span) = cx.lookup_current() {
618            for span_ref in span.scope() {
619                let extensions = span_ref.extensions();
620
621                if let Some(stored_fields) = extensions.get::<BTreeMap<String, String>>() {
622                    for field_name in self.span_context_fields.as_ref() {
623                        if !context_fields.contains_key(field_name) {
624                            if let Some(value) = stored_fields.get(field_name) {
625                                context_fields.insert(field_name.clone(), value.clone());
626                            }
627                        }
628                    }
629                }
630
631                if context_fields.len() == self.span_context_fields.len() {
632                    break;
633                }
634            }
635        }
636
637        context_fields
638    }
639
640    /// Extract event fields from an event.
641    ///
642    /// Extracts ALL fields from the event, then excludes any fields in the
643    /// excluded_fields set. This ensures that field values are included in
644    /// event signatures by default, preventing accidental deduplication of
645    /// semantically different events.
646    fn extract_event_fields(&self, event: &tracing::Event<'_>) -> BTreeMap<String, String> {
647        let mut visitor = FieldVisitor::new();
648        event.record(&mut visitor);
649        let all_fields = visitor.into_fields();
650
651        // Exclude configured fields (e.g., high-cardinality fields like request_id)
652        if self.excluded_fields.is_empty() {
653            all_fields
654        } else {
655            all_fields
656                .into_iter()
657                .filter(|(field_name, _)| !self.excluded_fields.contains(field_name))
658                .collect()
659        }
660    }
661
662    /// Compute event signature from tracing metadata, span context, and event fields.
663    ///
664    /// The signature includes:
665    /// - Log level (INFO, WARN, ERROR, etc.)
666    /// - Message template
667    /// - Target module path
668    /// - Span context fields (if configured)
669    /// - Event fields (if configured)
670    fn compute_signature(
671        &self,
672        metadata: &Metadata,
673        combined_fields: &BTreeMap<String, String>,
674    ) -> EventSignature {
675        let level = metadata.level().as_str();
676        let message = metadata.name();
677        let target = Some(metadata.target());
678
679        // Use combined fields (span context + event fields) in signature
680        EventSignature::new(level, message, combined_fields, target)
681    }
682
683    /// Check if an event should be allowed through.
684    pub fn should_allow(&self, signature: EventSignature) -> bool {
685        matches!(self.limiter.check_event(signature), LimitDecision::Allow)
686    }
687
688    /// Check if an event should be allowed through and capture metadata.
689    ///
690    /// This method stores event metadata on first occurrence so summaries
691    /// can show human-readable event details instead of just signature hashes.
692    ///
693    /// **Note:** Only available with the `human-readable` feature flag.
694    #[cfg(feature = "human-readable")]
695    pub fn should_allow_with_metadata(
696        &self,
697        signature: EventSignature,
698        metadata: crate::domain::metadata::EventMetadata,
699    ) -> bool {
700        matches!(
701            self.limiter.check_event_with_metadata(signature, metadata),
702            LimitDecision::Allow
703        )
704    }
705
706    /// Get a reference to the underlying limiter.
707    pub fn limiter(&self) -> &RateLimiter<S> {
708        &self.limiter
709    }
710
711    /// Get a reference to the metrics.
712    ///
713    /// Returns metrics about rate limiting behavior including:
714    /// - Events allowed
715    /// - Events suppressed
716    /// - Signatures evicted
717    pub fn metrics(&self) -> &Metrics {
718        self.limiter.metrics()
719    }
720
721    /// Get the current number of tracked signatures.
722    pub fn signature_count(&self) -> usize {
723        self.limiter.registry().len()
724    }
725
726    /// Get a reference to the circuit breaker.
727    ///
728    /// Use this to check the circuit breaker state and health:
729    /// - `circuit_breaker().state()` - Current circuit state
730    /// - `circuit_breaker().consecutive_failures()` - Failure count
731    pub fn circuit_breaker(&self) -> &Arc<CircuitBreaker> {
732        self.limiter.circuit_breaker()
733    }
734
735    /// Shutdown the active suppression summary emitter, if running.
736    ///
737    /// This method gracefully stops the background emission task.  If active emission
738    /// is not enabled, this method does nothing.
739    ///
740    /// **Requires the `async` feature.**
741    ///
742    /// # Errors
743    ///
744    /// Returns an error if the emitter task fails to shut down gracefully.
745    ///
746    /// # Example
747    ///
748    /// ```no_run
749    /// # use tracing_throttle::TracingRateLimitLayer;
750    /// # async fn example() {
751    /// let layer = TracingRateLimitLayer::builder()
752    ///     .with_active_emission(true)
753    ///     .build()
754    ///     .unwrap();
755    ///
756    /// // Use the layer...
757    ///
758    /// // Shutdown before dropping
759    /// layer.shutdown().await.expect("shutdown failed");
760    /// # }
761    /// ```
762    #[cfg(feature = "async")]
763    pub async fn shutdown(&self) -> Result<(), crate::application::emitter::ShutdownError> {
764        // Take the handle while holding the lock, then release the lock before awaiting
765        let handle = {
766            let mut handle_guard = self.emitter_handle.lock().unwrap();
767            handle_guard.take()
768        };
769
770        if let Some(handle) = handle {
771            handle.shutdown().await?;
772        }
773        Ok(())
774    }
775}
776
777impl TracingRateLimitLayer<Arc<ShardedStorage<EventSignature, EventState>>> {
778    /// Create a builder for configuring the layer.
779    ///
780    /// Defaults:
781    /// - Policy: token bucket (50 burst capacity, 1 token/sec refill rate)
782    /// - Max signatures: 10,000 (with LRU eviction)
783    /// - Summary interval: 30 seconds
784    /// - Active emission: disabled
785    /// - Summary formatter: default (WARN level with signature and count)
786    pub fn builder() -> TracingRateLimitLayerBuilder {
787        TracingRateLimitLayerBuilder {
788            policy: Policy::token_bucket(50.0, 1.0)
789                .expect("default policy with 50 capacity and 1/sec refill is always valid"),
790            summary_interval: Duration::from_secs(30),
791            clock: None,
792            max_signatures: Some(10_000),
793            enable_active_emission: false,
794            #[cfg(feature = "async")]
795            summary_formatter: None,
796            span_context_fields: Vec::new(),
797            excluded_fields: BTreeSet::new(),
798            eviction_strategy: None,
799            exempt_targets: BTreeSet::new(),
800        }
801    }
802
803    /// Create a layer with default settings.
804    ///
805    /// Equivalent to `TracingRateLimitLayer::builder().build().unwrap()`.
806    ///
807    /// Defaults:
808    /// - Policy: token bucket (50 burst capacity, 1 token/sec refill rate = 60/min)
809    /// - Max signatures: 10,000 (with LRU eviction)
810    /// - Summary interval: 30 seconds
811    ///
812    /// # Panics
813    /// This method cannot panic because all default values are valid.
814    pub fn new() -> Self {
815        Self::builder()
816            .build()
817            .expect("default configuration is always valid")
818    }
819
820    /// Create a layer with custom storage backend.
821    ///
822    /// This allows using alternative storage implementations like Redis for distributed
823    /// rate limiting across multiple application instances.
824    ///
825    /// # Arguments
826    ///
827    /// * `storage` - Custom storage implementation (must implement `Storage<EventSignature, EventState>`)
828    /// * `policy` - Rate limiting policy to apply
829    /// * `clock` - Clock implementation (use `SystemClock::new()` for production)
830    ///
831    /// # Example with Redis
832    ///
833    /// ```rust,ignore
834    /// use tracing_throttle::{TracingRateLimitLayer, RedisStorage, Policy, SystemClock};
835    /// use std::sync::Arc;
836    ///
837    /// #[tokio::main]
838    /// async fn main() {
839    ///     let storage = Arc::new(
840    ///         RedisStorage::connect("redis://127.0.0.1/")
841    ///             .await
842    ///             .expect("Failed to connect")
843    ///     );
844    ///     let policy = Policy::token_bucket(100.0, 10.0).unwrap();
845    ///     let clock = Arc::new(SystemClock::new());
846    ///
847    ///     let layer = TracingRateLimitLayer::with_storage(storage, policy, clock);
848    /// }
849    /// ```
850    pub fn with_storage<ST>(
851        storage: ST,
852        policy: Policy,
853        clock: Arc<dyn Clock>,
854    ) -> TracingRateLimitLayer<ST>
855    where
856        ST: Storage<EventSignature, EventState> + Clone,
857    {
858        let metrics = Metrics::new();
859        let circuit_breaker = Arc::new(CircuitBreaker::new());
860        let registry = SuppressionRegistry::new(storage, clock, policy);
861        let limiter = RateLimiter::new(registry, metrics, circuit_breaker);
862
863        TracingRateLimitLayer {
864            limiter,
865            span_context_fields: Arc::new(Vec::new()),
866            excluded_fields: Arc::new(BTreeSet::new()),
867            exempt_targets: Arc::new(BTreeSet::new()),
868            #[cfg(feature = "async")]
869            emitter_handle: Arc::new(Mutex::new(None)),
870            #[cfg(not(feature = "async"))]
871            _emitter_config: EmitterConfig::new(Duration::from_secs(30))
872                .expect("30 seconds is valid"),
873        }
874    }
875}
876
877impl Default for TracingRateLimitLayer<Arc<ShardedStorage<EventSignature, EventState>>> {
878    fn default() -> Self {
879        Self::new()
880    }
881}
882
883// Implement the Filter trait for rate limiting
884impl<S, Sub> Filter<Sub> for TracingRateLimitLayer<S>
885where
886    S: Storage<EventSignature, EventState> + Clone,
887    Sub: Subscriber + for<'lookup> LookupSpan<'lookup>,
888{
889    fn enabled(&self, _meta: &Metadata<'_>, _cx: &Context<'_, Sub>) -> bool {
890        // Always return true - actual filtering happens in event_enabled
891        // This prevents double-checking in dual-layer setups
892        true
893    }
894
895    fn event_enabled(&self, event: &tracing::Event<'_>, cx: &Context<'_, Sub>) -> bool {
896        let metadata_obj = event.metadata();
897
898        // Check if this target is exempt from rate limiting
899        // Skip the lookup if no exempt targets are configured (common case)
900        if !self.exempt_targets.is_empty() && self.exempt_targets.contains(metadata_obj.target()) {
901            // Exempt targets bypass rate limiting entirely
902            self.limiter.metrics().record_allowed();
903            return true;
904        }
905
906        // Combine span context and event fields
907        let mut combined_fields = self.extract_span_context(cx);
908        let event_fields = self.extract_event_fields(event);
909        combined_fields.extend(event_fields);
910
911        let signature = self.compute_signature(metadata_obj, &combined_fields);
912
913        #[cfg(feature = "human-readable")]
914        {
915            // Extract message from event for metadata
916            let mut visitor = FieldVisitor::new();
917            event.record(&mut visitor);
918            let all_fields = visitor.into_fields();
919            let message = all_fields
920                .get("message")
921                .cloned()
922                .unwrap_or_else(|| event.metadata().name().to_string());
923
924            // Create EventMetadata for this event
925            let event_metadata = crate::domain::metadata::EventMetadata::new(
926                metadata_obj.level().as_str().to_string(),
927                message,
928                metadata_obj.target().to_string(),
929                combined_fields,
930            );
931
932            self.should_allow_with_metadata(signature, event_metadata)
933        }
934
935        #[cfg(not(feature = "human-readable"))]
936        {
937            self.should_allow(signature)
938        }
939    }
940}
941
942impl<S, Sub> Layer<Sub> for TracingRateLimitLayer<S>
943where
944    S: Storage<EventSignature, EventState> + Clone + 'static,
945    Sub: Subscriber + for<'lookup> LookupSpan<'lookup>,
946{
947    fn on_new_span(
948        &self,
949        attrs: &tracing::span::Attributes<'_>,
950        id: &tracing::span::Id,
951        ctx: Context<'_, Sub>,
952    ) {
953        if self.span_context_fields.is_empty() {
954            return;
955        }
956
957        let mut visitor = FieldVisitor::new();
958        attrs.record(&mut visitor);
959        let fields = visitor.into_fields();
960
961        if let Some(span) = ctx.span(id) {
962            let mut extensions = span.extensions_mut();
963            extensions.insert(fields);
964        }
965    }
966}
967
968#[cfg(test)]
969mod tests {
970    use super::*;
971    use tracing::info;
972    use tracing_subscriber::layer::SubscriberExt;
973
974    #[test]
975    fn test_layer_builder() {
976        let layer = TracingRateLimitLayer::builder()
977            .with_policy(Policy::count_based(50).unwrap())
978            .with_summary_interval(Duration::from_secs(60))
979            .build()
980            .unwrap();
981
982        assert!(layer.limiter().registry().is_empty());
983    }
984
985    #[test]
986    fn test_span_context_fields_deduplication() {
987        let layer = TracingRateLimitLayer::builder()
988            .with_span_context_fields(vec![
989                "user_id".to_string(),
990                "user_id".to_string(), // duplicate
991                "tenant_id".to_string(),
992                "".to_string(),        // empty, should be filtered
993                "user_id".to_string(), // another duplicate
994            ])
995            .build()
996            .unwrap();
997
998        // Should only have 2 unique fields: user_id and tenant_id
999        assert_eq!(layer.span_context_fields.len(), 2);
1000        assert!(layer.span_context_fields.iter().any(|f| f == "user_id"));
1001        assert!(layer.span_context_fields.iter().any(|f| f == "tenant_id"));
1002    }
1003
1004    #[test]
1005    fn test_excluded_fields_deduplication() {
1006        let layer = TracingRateLimitLayer::builder()
1007            .with_excluded_fields(vec![
1008                "request_id".to_string(),
1009                "request_id".to_string(), // duplicate
1010                "trace_id".to_string(),
1011                "".to_string(),           // empty, should be filtered
1012                "request_id".to_string(), // another duplicate
1013            ])
1014            .build()
1015            .unwrap();
1016
1017        // Should only have 2 unique fields: request_id and trace_id
1018        assert_eq!(layer.excluded_fields.len(), 2);
1019        assert!(layer.excluded_fields.contains("request_id"));
1020        assert!(layer.excluded_fields.contains("trace_id"));
1021    }
1022
1023    #[test]
1024    fn test_exempt_targets_deduplication() {
1025        let layer = TracingRateLimitLayer::builder()
1026            .with_exempt_targets(vec![
1027                "myapp::security".to_string(),
1028                "myapp::security".to_string(), // duplicate
1029                "myapp::audit".to_string(),
1030                "".to_string(),                // empty, should be filtered
1031                "myapp::security".to_string(), // another duplicate
1032            ])
1033            .build()
1034            .unwrap();
1035
1036        // Should only have 2 unique targets: security and audit
1037        assert_eq!(layer.exempt_targets.len(), 2);
1038        assert!(layer.exempt_targets.contains("myapp::security"));
1039        assert!(layer.exempt_targets.contains("myapp::audit"));
1040    }
1041
1042    #[test]
1043    fn test_exempt_targets_bypass_rate_limiting() {
1044        let rate_limit = TracingRateLimitLayer::builder()
1045            .with_policy(Policy::count_based(2).unwrap())
1046            .with_exempt_targets(vec!["myapp::security".to_string()])
1047            .build()
1048            .unwrap();
1049
1050        let subscriber = tracing_subscriber::registry()
1051            .with(tracing_subscriber::fmt::layer().with_filter(rate_limit.clone()));
1052
1053        tracing::subscriber::with_default(subscriber, || {
1054            // Regular logs get throttled after 2 (same callsite = same signature)
1055            for _ in 0..3 {
1056                info!("Regular log"); // First 2 allowed, 3rd suppressed
1057            }
1058
1059            // Security logs are never throttled (exempt target)
1060            for _ in 0..4 {
1061                info!(target: "myapp::security", "Security event"); // All 4 allowed
1062            }
1063        });
1064
1065        // Verify metrics
1066        let metrics = rate_limit.metrics();
1067        assert_eq!(metrics.events_allowed(), 6); // 2 regular + 4 exempt
1068        assert_eq!(metrics.events_suppressed(), 1); // 1 regular suppressed
1069    }
1070
1071    #[test]
1072    fn test_layer_default() {
1073        let layer = TracingRateLimitLayer::default();
1074        assert!(layer.limiter().registry().is_empty());
1075    }
1076
1077    #[test]
1078    fn test_signature_computation() {
1079        let _layer = TracingRateLimitLayer::new();
1080
1081        // Use a simple signature test without metadata construction
1082        let sig1 = EventSignature::simple("INFO", "test_event");
1083        let sig2 = EventSignature::simple("INFO", "test_event");
1084
1085        // Same inputs should produce same signature
1086        assert_eq!(sig1, sig2);
1087    }
1088
1089    #[test]
1090    fn test_basic_rate_limiting() {
1091        let layer = TracingRateLimitLayer::builder()
1092            .with_policy(Policy::count_based(2).unwrap())
1093            .build()
1094            .unwrap();
1095
1096        let sig = EventSignature::simple("INFO", "test_message");
1097
1098        // First two should be allowed
1099        assert!(layer.should_allow(sig));
1100        assert!(layer.should_allow(sig));
1101
1102        // Third should be suppressed
1103        assert!(!layer.should_allow(sig));
1104    }
1105
1106    #[test]
1107    fn test_layer_integration() {
1108        let layer = TracingRateLimitLayer::builder()
1109            .with_policy(Policy::count_based(3).unwrap())
1110            .build()
1111            .unwrap();
1112
1113        // Clone for use in subscriber, keep original for checking state
1114        let layer_for_check = layer.clone();
1115
1116        let subscriber = tracing_subscriber::registry()
1117            .with(tracing_subscriber::fmt::layer().with_filter(layer));
1118
1119        // Test that the layer correctly tracks event signatures
1120        tracing::subscriber::with_default(subscriber, || {
1121            // Emit 10 identical events
1122            for _ in 0..10 {
1123                info!("test event");
1124            }
1125        });
1126
1127        // After emitting 10 events with the same signature, the layer should have
1128        // tracked them and only the first 3 should have been marked as allowed
1129        // The registry should contain one entry for this signature
1130        assert_eq!(layer_for_check.limiter().registry().len(), 1);
1131    }
1132
1133    #[test]
1134    fn test_layer_suppression_logic() {
1135        let layer = TracingRateLimitLayer::builder()
1136            .with_policy(Policy::count_based(3).unwrap())
1137            .build()
1138            .unwrap();
1139
1140        let sig = EventSignature::simple("INFO", "test");
1141
1142        // Verify the suppression logic works correctly
1143        let mut allowed_count = 0;
1144        for _ in 0..10 {
1145            if layer.should_allow(sig) {
1146                allowed_count += 1;
1147            }
1148        }
1149
1150        assert_eq!(allowed_count, 3);
1151    }
1152
1153    #[test]
1154    fn test_builder_zero_summary_interval() {
1155        let result = TracingRateLimitLayer::builder()
1156            .with_summary_interval(Duration::from_secs(0))
1157            .build();
1158
1159        assert!(matches!(
1160            result,
1161            Err(BuildError::EmitterConfig(
1162                crate::application::emitter::EmitterConfigError::ZeroSummaryInterval
1163            ))
1164        ));
1165    }
1166
1167    #[test]
1168    fn test_builder_zero_max_signatures() {
1169        let result = TracingRateLimitLayer::builder()
1170            .with_max_signatures(0)
1171            .build();
1172
1173        assert!(matches!(result, Err(BuildError::ZeroMaxSignatures)));
1174    }
1175
1176    #[test]
1177    fn test_builder_valid_max_signatures() {
1178        let layer = TracingRateLimitLayer::builder()
1179            .with_max_signatures(100)
1180            .build()
1181            .unwrap();
1182
1183        assert!(layer.limiter().registry().is_empty());
1184    }
1185
1186    #[test]
1187    fn test_metrics_tracking() {
1188        let layer = TracingRateLimitLayer::builder()
1189            .with_policy(Policy::count_based(2).unwrap())
1190            .build()
1191            .unwrap();
1192
1193        let sig = EventSignature::simple("INFO", "test");
1194
1195        // Check initial metrics
1196        assert_eq!(layer.metrics().events_allowed(), 0);
1197        assert_eq!(layer.metrics().events_suppressed(), 0);
1198
1199        // Allow first two events
1200        assert!(layer.should_allow(sig));
1201        assert!(layer.should_allow(sig));
1202
1203        // Check metrics after allowed events
1204        assert_eq!(layer.metrics().events_allowed(), 2);
1205        assert_eq!(layer.metrics().events_suppressed(), 0);
1206
1207        // Suppress third event
1208        assert!(!layer.should_allow(sig));
1209
1210        // Check metrics after suppressed event
1211        assert_eq!(layer.metrics().events_allowed(), 2);
1212        assert_eq!(layer.metrics().events_suppressed(), 1);
1213    }
1214
1215    #[test]
1216    fn test_metrics_snapshot() {
1217        let layer = TracingRateLimitLayer::builder()
1218            .with_policy(Policy::count_based(3).unwrap())
1219            .build()
1220            .unwrap();
1221
1222        let sig = EventSignature::simple("INFO", "test");
1223
1224        // Generate some events
1225        for _ in 0..5 {
1226            layer.should_allow(sig);
1227        }
1228
1229        // Get snapshot
1230        let snapshot = layer.metrics().snapshot();
1231        assert_eq!(snapshot.events_allowed, 3);
1232        assert_eq!(snapshot.events_suppressed, 2);
1233        assert_eq!(snapshot.total_events(), 5);
1234        assert!((snapshot.suppression_rate() - 0.4).abs() < f64::EPSILON);
1235    }
1236
1237    #[test]
1238    fn test_signature_count() {
1239        let layer = TracingRateLimitLayer::builder()
1240            .with_policy(Policy::count_based(2).unwrap())
1241            .build()
1242            .unwrap();
1243
1244        assert_eq!(layer.signature_count(), 0);
1245
1246        let sig1 = EventSignature::simple("INFO", "test1");
1247        let sig2 = EventSignature::simple("INFO", "test2");
1248
1249        layer.should_allow(sig1);
1250        assert_eq!(layer.signature_count(), 1);
1251
1252        layer.should_allow(sig2);
1253        assert_eq!(layer.signature_count(), 2);
1254
1255        // Same signature shouldn't increase count
1256        layer.should_allow(sig1);
1257        assert_eq!(layer.signature_count(), 2);
1258    }
1259
1260    #[test]
1261    fn test_metrics_with_eviction() {
1262        let layer = TracingRateLimitLayer::builder()
1263            .with_policy(Policy::count_based(1).unwrap())
1264            .with_max_signatures(3)
1265            .build()
1266            .unwrap();
1267
1268        // Fill up to capacity
1269        for i in 0..3 {
1270            let sig = EventSignature::simple("INFO", &format!("test{}", i));
1271            layer.should_allow(sig);
1272        }
1273
1274        assert_eq!(layer.signature_count(), 3);
1275        assert_eq!(layer.metrics().signatures_evicted(), 0);
1276
1277        // Add one more, which should trigger eviction
1278        let sig = EventSignature::simple("INFO", "test3");
1279        layer.should_allow(sig);
1280
1281        assert_eq!(layer.signature_count(), 3);
1282        assert_eq!(layer.metrics().signatures_evicted(), 1);
1283    }
1284
1285    #[test]
1286    fn test_circuit_breaker_observability() {
1287        use crate::application::circuit_breaker::CircuitState;
1288
1289        let layer = TracingRateLimitLayer::builder()
1290            .with_policy(Policy::count_based(2).unwrap())
1291            .build()
1292            .unwrap();
1293
1294        // Check initial circuit breaker state
1295        let cb = layer.circuit_breaker();
1296        assert_eq!(cb.state(), CircuitState::Closed);
1297        assert_eq!(cb.consecutive_failures(), 0);
1298
1299        // Circuit breaker should remain closed during normal operation
1300        let sig = EventSignature::simple("INFO", "test");
1301        layer.should_allow(sig);
1302        layer.should_allow(sig);
1303        layer.should_allow(sig);
1304
1305        assert_eq!(cb.state(), CircuitState::Closed);
1306    }
1307
1308    #[test]
1309    fn test_circuit_breaker_fail_open_integration() {
1310        use crate::application::circuit_breaker::{
1311            CircuitBreaker, CircuitBreakerConfig, CircuitState,
1312        };
1313        use std::time::Duration;
1314
1315        // Create a circuit breaker with low threshold for testing
1316        let cb_config = CircuitBreakerConfig {
1317            failure_threshold: 2,
1318            recovery_timeout: Duration::from_secs(1),
1319        };
1320        let circuit_breaker = Arc::new(CircuitBreaker::with_config(cb_config));
1321
1322        // Build layer with custom circuit breaker
1323        let storage = Arc::new(ShardedStorage::new());
1324        let clock = Arc::new(SystemClock::new());
1325        let policy = Policy::count_based(2).unwrap();
1326        let registry = SuppressionRegistry::new(storage, clock, policy);
1327        let metrics = Metrics::new();
1328        let limiter = RateLimiter::new(registry, metrics, circuit_breaker.clone());
1329
1330        let layer = TracingRateLimitLayer {
1331            limiter,
1332            span_context_fields: Arc::new(Vec::new()),
1333            excluded_fields: Arc::new(BTreeSet::new()),
1334            exempt_targets: Arc::new(BTreeSet::new()),
1335            #[cfg(feature = "async")]
1336            emitter_handle: Arc::new(Mutex::new(None)),
1337            #[cfg(not(feature = "async"))]
1338            _emitter_config: crate::application::emitter::EmitterConfig::new(Duration::from_secs(
1339                30,
1340            ))
1341            .unwrap(),
1342        };
1343
1344        let sig = EventSignature::simple("INFO", "test");
1345
1346        // Normal operation - first 2 events allowed, third suppressed
1347        assert!(layer.should_allow(sig));
1348        assert!(layer.should_allow(sig));
1349        assert!(!layer.should_allow(sig));
1350
1351        // Circuit should still be closed
1352        assert_eq!(circuit_breaker.state(), CircuitState::Closed);
1353
1354        // Manually trigger circuit breaker failures to test fail-open
1355        circuit_breaker.record_failure();
1356        circuit_breaker.record_failure();
1357
1358        // Circuit should now be open
1359        assert_eq!(circuit_breaker.state(), CircuitState::Open);
1360
1361        // With circuit open, rate limiter should fail open (allow all events)
1362        // even though we've already hit the rate limit
1363        assert!(layer.should_allow(sig));
1364        assert!(layer.should_allow(sig));
1365        assert!(layer.should_allow(sig));
1366
1367        // Metrics should show these as allowed (fail-open behavior)
1368        let snapshot = layer.metrics().snapshot();
1369        assert!(snapshot.events_allowed >= 5); // 2 normal + 3 fail-open
1370    }
1371
1372    #[cfg(feature = "async")]
1373    #[tokio::test]
1374    async fn test_active_emission_integration() {
1375        use std::sync::atomic::{AtomicUsize, Ordering};
1376        use std::time::Duration;
1377
1378        // Use an atomic counter to track emissions
1379        let emission_count = Arc::new(AtomicUsize::new(0));
1380        let count_clone = Arc::clone(&emission_count);
1381
1382        // Create a layer with a custom emitter that increments our counter
1383        let storage = Arc::new(ShardedStorage::new());
1384        let clock = Arc::new(SystemClock::new());
1385        let policy = Policy::count_based(2).unwrap();
1386        let registry = SuppressionRegistry::new(storage, clock, policy);
1387
1388        let emitter_config = EmitterConfig::new(Duration::from_millis(100)).unwrap();
1389        let emitter = SummaryEmitter::new(registry.clone(), emitter_config);
1390
1391        // Start emitter with custom callback
1392        let handle = emitter.start(
1393            move |summaries| {
1394                count_clone.fetch_add(summaries.len(), Ordering::SeqCst);
1395            },
1396            false,
1397        );
1398
1399        // Emit events that will be suppressed
1400        let sig = EventSignature::simple("INFO", "test_message");
1401        for _ in 0..10 {
1402            registry.with_event_state(sig, |state, now| {
1403                state.counter.record_suppression(now);
1404            });
1405        }
1406
1407        // Wait for at least two emission intervals
1408        tokio::time::sleep(Duration::from_millis(250)).await;
1409
1410        // Check that summaries were emitted
1411        let count = emission_count.load(Ordering::SeqCst);
1412        assert!(
1413            count > 0,
1414            "Expected at least one suppression summary to be emitted, got {}",
1415            count
1416        );
1417
1418        // Graceful shutdown
1419        handle.shutdown().await.expect("shutdown failed");
1420    }
1421
1422    #[cfg(feature = "async")]
1423    #[tokio::test]
1424    async fn test_active_emission_disabled() {
1425        use crate::infrastructure::mocks::layer::MockCaptureLayer;
1426        use std::time::Duration;
1427
1428        // Create layer with active emission disabled (default)
1429        let layer = TracingRateLimitLayer::builder()
1430            .with_policy(Policy::count_based(2).unwrap())
1431            .with_summary_interval(Duration::from_millis(100))
1432            .build()
1433            .unwrap();
1434
1435        let mock = MockCaptureLayer::new();
1436        let mock_clone = mock.clone();
1437
1438        let subscriber = tracing_subscriber::registry()
1439            .with(mock)
1440            .with(tracing_subscriber::fmt::layer().with_filter(layer.clone()));
1441
1442        tracing::subscriber::with_default(subscriber, || {
1443            let sig = EventSignature::simple("INFO", "test_message");
1444            for _ in 0..10 {
1445                layer.should_allow(sig);
1446            }
1447        });
1448
1449        // Wait to ensure no emissions occur
1450        tokio::time::sleep(Duration::from_millis(250)).await;
1451
1452        // Should not have emitted any summaries
1453        let events = mock_clone.get_captured();
1454        let summary_count = events
1455            .iter()
1456            .filter(|e| e.message.contains("suppressed"))
1457            .count();
1458
1459        assert_eq!(
1460            summary_count, 0,
1461            "Should not emit summaries when active emission is disabled"
1462        );
1463
1464        // Shutdown should succeed even when emitter was never started
1465        layer.shutdown().await.expect("shutdown failed");
1466    }
1467
1468    #[cfg(feature = "async")]
1469    #[tokio::test]
1470    async fn test_shutdown_without_emission() {
1471        // Test that shutdown works when emission was never enabled
1472        let layer = TracingRateLimitLayer::new();
1473
1474        // Should not error
1475        layer
1476            .shutdown()
1477            .await
1478            .expect("shutdown should succeed when emitter not running");
1479    }
1480
1481    #[cfg(feature = "async")]
1482    #[tokio::test]
1483    async fn test_custom_summary_formatter() {
1484        use std::sync::atomic::{AtomicUsize, Ordering};
1485        use std::time::Duration;
1486
1487        // Track formatter invocations
1488        let call_count = Arc::new(AtomicUsize::new(0));
1489        let count_clone = Arc::clone(&call_count);
1490
1491        // Track data passed to formatter
1492        let last_count = Arc::new(AtomicUsize::new(0));
1493        let last_count_clone = Arc::clone(&last_count);
1494
1495        // Create layer with custom formatter
1496        let layer = TracingRateLimitLayer::builder()
1497            .with_policy(Policy::count_based(2).unwrap())
1498            .with_active_emission(true)
1499            .with_summary_interval(Duration::from_millis(100))
1500            .with_summary_formatter(Arc::new(move |summary| {
1501                count_clone.fetch_add(1, Ordering::SeqCst);
1502                last_count_clone.store(summary.count, Ordering::SeqCst);
1503                // Custom format: emit at INFO level instead of WARN
1504                tracing::info!(
1505                    sig = %summary.signature,
1506                    suppressed = summary.count,
1507                    "Custom format"
1508                );
1509            }))
1510            .build()
1511            .unwrap();
1512
1513        // Emit events that will be suppressed
1514        let sig = EventSignature::simple("INFO", "test_message");
1515        for _ in 0..10 {
1516            layer.should_allow(sig);
1517        }
1518
1519        // Wait for emission
1520        tokio::time::sleep(Duration::from_millis(250)).await;
1521
1522        // Verify custom formatter was called
1523        let calls = call_count.load(Ordering::SeqCst);
1524        assert!(calls > 0, "Custom formatter should have been called");
1525
1526        // Verify formatter received correct data
1527        let count = last_count.load(Ordering::SeqCst);
1528        assert!(
1529            count >= 8,
1530            "Expected at least 8 suppressions, got {}",
1531            count
1532        );
1533
1534        layer.shutdown().await.expect("shutdown failed");
1535    }
1536
1537    #[cfg(feature = "async")]
1538    #[tokio::test]
1539    async fn test_default_formatter_used() {
1540        use std::sync::atomic::{AtomicUsize, Ordering};
1541        use std::time::Duration;
1542
1543        let emission_count = Arc::new(AtomicUsize::new(0));
1544        let count_clone = Arc::clone(&emission_count);
1545
1546        let storage = Arc::new(ShardedStorage::new());
1547        let clock = Arc::new(SystemClock::new());
1548        let policy = Policy::count_based(2).unwrap();
1549        let registry = SuppressionRegistry::new(storage, clock, policy);
1550
1551        let emitter_config = EmitterConfig::new(Duration::from_millis(100)).unwrap();
1552        let emitter = SummaryEmitter::new(registry.clone(), emitter_config);
1553
1554        // Start without custom formatter - should use default
1555        let handle = emitter.start(
1556            move |summaries| {
1557                count_clone.fetch_add(summaries.len(), Ordering::SeqCst);
1558            },
1559            false,
1560        );
1561
1562        let sig = EventSignature::simple("INFO", "test_message");
1563        for _ in 0..10 {
1564            registry.with_event_state(sig, |state, now| {
1565                state.counter.record_suppression(now);
1566            });
1567        }
1568
1569        tokio::time::sleep(Duration::from_millis(250)).await;
1570
1571        let count = emission_count.load(Ordering::SeqCst);
1572        assert!(count > 0, "Default formatter should have emitted summaries");
1573
1574        handle.shutdown().await.expect("shutdown failed");
1575    }
1576}