Skip to main content

tracing_throttle/infrastructure/
layer.rs

1//! Tracing integration layer.
2//!
3//! Provides a `tracing::Layer` implementation that applies rate limiting
4//! to log events.
5
6use crate::application::{
7    circuit_breaker::CircuitBreaker,
8    emitter::EmitterConfig,
9    limiter::{LimitDecision, RateLimiter},
10    metrics::Metrics,
11    ports::{Clock, Storage},
12    registry::{EventState, SuppressionRegistry},
13};
14use crate::domain::{policy::Policy, signature::EventSignature};
15use crate::infrastructure::clock::SystemClock;
16use crate::infrastructure::storage::ShardedStorage;
17use crate::infrastructure::visitor::FieldVisitor;
18
19use std::borrow::Cow;
20use std::collections::{BTreeMap, BTreeSet};
21use std::sync::Arc;
22use std::time::Duration;
23use tracing::{Metadata, Subscriber};
24use tracing_subscriber::layer::Filter;
25use tracing_subscriber::registry::LookupSpan;
26use tracing_subscriber::{layer::Context, Layer};
27
28/// Internal target used for summary emission events.
29///
30/// Events emitted with this target are automatically exempt from throttling to
31/// prevent recursive suppression of summary messages. This target is intentionally
32/// internal and not part of the public API.
33#[cfg(feature = "async")]
34const SUMMARY_TARGET: &str = "tracing_throttle::summary";
35
36#[cfg(feature = "async")]
37use crate::application::emitter::{EmitterHandle, SummaryEmitter};
38
39#[cfg(feature = "async")]
40use crate::domain::summary::SuppressionSummary;
41
42#[cfg(feature = "async")]
43use std::sync::Mutex;
44
45/// Function type for formatting suppression summaries.
46///
47/// Takes a reference to a `SuppressionSummary` and emits it as a tracing event.
48/// The function is responsible for choosing the log level and format.
49#[cfg(feature = "async")]
50pub type SummaryFormatter = Arc<dyn Fn(&SuppressionSummary) + Send + Sync + 'static>;
51
52/// Error returned when building a TracingRateLimitLayer fails.
53#[derive(Debug, Clone, PartialEq, Eq)]
54pub enum BuildError {
55    /// Maximum signatures must be greater than zero
56    ZeroMaxSignatures,
57    /// Emitter configuration validation failed
58    EmitterConfig(crate::application::emitter::EmitterConfigError),
59}
60
61impl std::fmt::Display for BuildError {
62    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
63        match self {
64            BuildError::ZeroMaxSignatures => {
65                write!(f, "max_signatures must be greater than 0")
66            }
67            BuildError::EmitterConfig(e) => {
68                write!(f, "emitter configuration error: {}", e)
69            }
70        }
71    }
72}
73
74impl std::error::Error for BuildError {}
75
76impl From<crate::application::emitter::EmitterConfigError> for BuildError {
77    fn from(e: crate::application::emitter::EmitterConfigError) -> Self {
78        BuildError::EmitterConfig(e)
79    }
80}
81
82/// Builder for constructing a `TracingRateLimitLayer`.
83pub struct TracingRateLimitLayerBuilder {
84    policy: Policy,
85    summary_interval: Duration,
86    clock: Option<Arc<dyn Clock>>,
87    max_signatures: Option<usize>,
88    enable_active_emission: bool,
89    #[cfg(feature = "async")]
90    summary_formatter: Option<SummaryFormatter>,
91    span_context_fields: Vec<String>,
92    excluded_fields: BTreeSet<String>,
93    eviction_strategy: Option<EvictionStrategy>,
94    exempt_targets: BTreeSet<String>,
95}
96
97/// Eviction strategy configuration for the rate limit layer.
98///
99/// This enum provides a user-friendly API that internally creates
100/// the appropriate EvictionPolicy adapter.
101#[derive(Clone)]
102pub enum EvictionStrategy {
103    /// LRU (Least Recently Used) eviction with entry count limit.
104    Lru {
105        /// Maximum number of entries
106        max_entries: usize,
107    },
108    /// Priority-based eviction using a custom function.
109    Priority {
110        /// Maximum number of entries
111        max_entries: usize,
112        /// Priority calculation function
113        priority_fn: crate::infrastructure::eviction::PriorityFn<EventSignature, EventState>,
114    },
115    /// Memory-based eviction with byte limit.
116    Memory {
117        /// Maximum memory usage in bytes
118        max_bytes: usize,
119    },
120    /// Combined priority and memory limits.
121    PriorityWithMemory {
122        /// Maximum number of entries
123        max_entries: usize,
124        /// Priority calculation function
125        priority_fn: crate::infrastructure::eviction::PriorityFn<EventSignature, EventState>,
126        /// Maximum memory usage in bytes
127        max_bytes: usize,
128    },
129}
130
131impl EvictionStrategy {
132    /// Check if this strategy tracks memory usage.
133    pub fn tracks_memory(&self) -> bool {
134        matches!(
135            self,
136            EvictionStrategy::Memory { .. } | EvictionStrategy::PriorityWithMemory { .. }
137        )
138    }
139
140    /// Get the memory limit if this strategy uses one.
141    pub fn memory_limit(&self) -> Option<usize> {
142        match self {
143            EvictionStrategy::Memory { max_bytes } => Some(*max_bytes),
144            EvictionStrategy::PriorityWithMemory { max_bytes, .. } => Some(*max_bytes),
145            _ => None,
146        }
147    }
148
149    /// Check if this strategy uses priority-based eviction.
150    pub fn uses_priority(&self) -> bool {
151        matches!(
152            self,
153            EvictionStrategy::Priority { .. } | EvictionStrategy::PriorityWithMemory { .. }
154        )
155    }
156}
157
158impl std::fmt::Debug for EvictionStrategy {
159    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
160        match self {
161            EvictionStrategy::Lru { max_entries } => f
162                .debug_struct("Lru")
163                .field("max_entries", max_entries)
164                .finish(),
165            EvictionStrategy::Priority {
166                max_entries,
167                priority_fn: _,
168            } => f
169                .debug_struct("Priority")
170                .field("max_entries", max_entries)
171                .field("priority_fn", &"<fn>")
172                .finish(),
173            EvictionStrategy::Memory { max_bytes } => f
174                .debug_struct("Memory")
175                .field("max_bytes", max_bytes)
176                .finish(),
177            EvictionStrategy::PriorityWithMemory {
178                max_entries,
179                priority_fn: _,
180                max_bytes,
181            } => f
182                .debug_struct("PriorityWithMemory")
183                .field("max_entries", max_entries)
184                .field("priority_fn", &"<fn>")
185                .field("max_bytes", max_bytes)
186                .finish(),
187        }
188    }
189}
190
191impl TracingRateLimitLayerBuilder {
192    /// Set the rate limiting policy.
193    pub fn with_policy(mut self, policy: Policy) -> Self {
194        self.policy = policy;
195        self
196    }
197
198    /// Set the summary emission interval.
199    ///
200    /// The interval will be validated when `build()` is called.
201    pub fn with_summary_interval(mut self, interval: Duration) -> Self {
202        self.summary_interval = interval;
203        self
204    }
205
206    /// Set a custom clock (mainly for testing).
207    pub fn with_clock(mut self, clock: Arc<dyn Clock>) -> Self {
208        self.clock = Some(clock);
209        self
210    }
211
212    /// Set the maximum number of unique event signatures to track.
213    ///
214    /// When this limit is reached, the least recently used signatures will be evicted.
215    /// This prevents unbounded memory growth in applications with high signature cardinality.
216    ///
217    /// Default: 10,000 signatures
218    ///
219    /// The value will be validated when `build()` is called.
220    pub fn with_max_signatures(mut self, max_signatures: usize) -> Self {
221        self.max_signatures = Some(max_signatures);
222        self
223    }
224
225    /// Disable the signature limit, allowing unbounded growth.
226    ///
227    /// **Warning**: This can lead to unbounded memory usage in applications that generate
228    /// many unique event signatures. Only use this if you're certain your application has
229    /// bounded signature cardinality or you have external memory monitoring.
230    pub fn with_unlimited_signatures(mut self) -> Self {
231        self.max_signatures = None;
232        self
233    }
234
235    /// Enable active emission of suppression summaries.
236    ///
237    /// When enabled, the layer will automatically emit `WARN`-level tracing events
238    /// containing summaries of suppressed log events at the configured interval.
239    ///
240    /// **Requires the `async` feature** - this method has no effect without it.
241    ///
242    /// Default: disabled
243    ///
244    /// # Example
245    ///
246    /// ```no_run
247    /// # use tracing_throttle::TracingRateLimitLayer;
248    /// # use std::time::Duration;
249    /// let layer = TracingRateLimitLayer::builder()
250    ///     .with_active_emission(true)
251    ///     .with_summary_interval(Duration::from_secs(60))
252    ///     .build()
253    ///     .unwrap();
254    /// ```
255    pub fn with_active_emission(mut self, enabled: bool) -> Self {
256        self.enable_active_emission = enabled;
257        self
258    }
259
260    /// Set a custom formatter for suppression summaries.
261    ///
262    /// The formatter is responsible for emitting summaries as tracing events.
263    /// This allows full control over log level, message format, and structured fields.
264    ///
265    /// **Requires the `async` feature.**
266    ///
267    /// If not set, a default formatter is used that emits at WARN level with
268    /// `signature` and `count` fields.
269    ///
270    /// # Example
271    ///
272    /// ```no_run
273    /// # use tracing_throttle::TracingRateLimitLayer;
274    /// # use std::sync::Arc;
275    /// # use std::time::Duration;
276    /// let layer = TracingRateLimitLayer::builder()
277    ///     .with_active_emission(true)
278    ///     .with_summary_formatter(Arc::new(|summary| {
279    ///         tracing::info!(
280    ///             signature = %summary.signature,
281    ///             count = summary.count,
282    ///             duration_secs = summary.duration.as_secs(),
283    ///             "Suppression summary"
284    ///         );
285    ///     }))
286    ///     .build()
287    ///     .unwrap();
288    /// ```
289    #[cfg(feature = "async")]
290    pub fn with_summary_formatter(mut self, formatter: SummaryFormatter) -> Self {
291        self.summary_formatter = Some(formatter);
292        self
293    }
294
295    /// Include span context fields in event signatures.
296    ///
297    /// When specified, the layer will extract these fields from the current span
298    /// context and include them in the event signature. This enables rate limiting
299    /// per-user, per-tenant, per-request, or any other span-level context.
300    ///
301    /// Duplicate field names are automatically removed, and empty field names are filtered out.
302    ///
303    /// # Example
304    ///
305    /// ```no_run
306    /// # use tracing_throttle::TracingRateLimitLayer;
307    /// // Rate limit separately per user
308    /// let layer = TracingRateLimitLayer::builder()
309    ///     .with_span_context_fields(vec!["user_id".to_string()])
310    ///     .build()
311    ///     .unwrap();
312    ///
313    /// // Rate limit per user and tenant
314    /// let layer = TracingRateLimitLayer::builder()
315    ///     .with_span_context_fields(vec!["user_id".to_string(), "tenant_id".to_string()])
316    ///     .build()
317    ///     .unwrap();
318    /// ```
319    ///
320    /// # Usage with Spans
321    ///
322    /// ```no_run
323    /// # use tracing::{info, info_span};
324    /// // Create a span with user context
325    /// let span = info_span!("request", user_id = "alice");
326    /// let _enter = span.enter();
327    ///
328    /// // These events will be rate limited separately per user
329    /// info!("Processing request");  // Limited for user "alice"
330    /// ```
331    pub fn with_span_context_fields(mut self, fields: Vec<String>) -> Self {
332        // Deduplicate and filter out empty field names
333        let unique_fields: BTreeSet<_> = fields.into_iter().filter(|f| !f.is_empty()).collect();
334        self.span_context_fields = unique_fields.into_iter().collect();
335        self
336    }
337
338    /// Exclude specific fields from event signatures.
339    ///
340    /// By default, ALL event fields are included in signatures. This ensures that
341    /// events with different field values are treated as distinct events, preventing
342    /// accidental loss of meaningful log data.
343    ///
344    /// Use this method to exclude high-cardinality fields that don't change the
345    /// semantic meaning of the event (e.g., request_id, timestamp, trace_id).
346    ///
347    /// Duplicate field names are automatically removed, and empty field names are filtered out.
348    ///
349    /// # Example
350    ///
351    /// ```no_run
352    /// # use tracing_throttle::TracingRateLimitLayer;
353    /// // Exclude request_id so events with same user_id are deduplicated
354    /// let layer = TracingRateLimitLayer::builder()
355    ///     .with_excluded_fields(vec!["request_id".to_string(), "trace_id".to_string()])
356    ///     .build()
357    ///     .unwrap();
358    /// ```
359    ///
360    /// # Default Behavior (ALL fields included)
361    ///
362    /// ```no_run
363    /// # use tracing::error;
364    /// // These are DIFFERENT events (different user_id values)
365    /// error!(user_id = 123, "Failed to fetch user");
366    /// error!(user_id = 456, "Failed to fetch user");
367    /// // Both are logged - they have distinct signatures
368    /// ```
369    ///
370    /// # With Exclusions
371    ///
372    /// ```no_run
373    /// # use tracing::error;
374    /// // Exclude request_id from signature
375    /// error!(user_id = 123, request_id = "abc", "Failed to fetch user");
376    /// error!(user_id = 123, request_id = "def", "Failed to fetch user");
377    /// // Second is throttled - same user_id, request_id excluded from signature
378    /// ```
379    pub fn with_excluded_fields(mut self, fields: Vec<String>) -> Self {
380        // Deduplicate and filter out empty field names
381        let unique_fields: BTreeSet<_> = fields.into_iter().filter(|f| !f.is_empty()).collect();
382        self.excluded_fields = unique_fields;
383        self
384    }
385
386    /// Exempt specific targets from rate limiting.
387    ///
388    /// Events from the specified targets will bypass rate limiting entirely and
389    /// always be allowed through. This is useful for ensuring critical logs (e.g.,
390    /// security events, audit logs) are never suppressed.
391    ///
392    /// Targets are matched exactly against the event's target (module path).
393    /// Duplicate targets are automatically removed, and empty targets are filtered out.
394    ///
395    /// # Example
396    ///
397    /// ```no_run
398    /// # use tracing_throttle::TracingRateLimitLayer;
399    /// // Never throttle security or audit logs
400    /// let layer = TracingRateLimitLayer::builder()
401    ///     .with_exempt_targets(vec![
402    ///         "myapp::security".to_string(),
403    ///         "myapp::audit".to_string(),
404    ///     ])
405    ///     .build()
406    ///     .unwrap();
407    /// ```
408    ///
409    /// # Usage with Events
410    ///
411    /// ```no_run
412    /// # use tracing::error;
413    /// // Explicitly set target - this event bypasses throttling
414    /// error!(target: "myapp::security", "Security breach detected");
415    ///
416    /// // Or use the module's default target
417    /// mod security {
418    ///     use tracing::warn;
419    ///     pub fn check() {
420    ///         // Target is automatically "myapp::security" - bypasses throttling
421    ///         warn!("Suspicious activity detected");
422    ///     }
423    /// }
424    /// ```
425    pub fn with_exempt_targets(mut self, targets: Vec<String>) -> Self {
426        // Deduplicate and filter out empty targets
427        let unique_targets: BTreeSet<_> = targets.into_iter().filter(|t| !t.is_empty()).collect();
428        self.exempt_targets = unique_targets;
429        self
430    }
431
432    /// Set a custom eviction strategy for signature management.
433    ///
434    /// Controls which signatures are evicted when storage limits are reached.
435    /// If not set, uses LRU eviction with the configured max_signatures limit.
436    ///
437    /// # Example: Priority-based eviction
438    ///
439    /// ```no_run
440    /// # use tracing_throttle::{TracingRateLimitLayer, EvictionStrategy};
441    /// # use std::sync::Arc;
442    /// let layer = TracingRateLimitLayer::builder()
443    ///     .with_eviction_strategy(EvictionStrategy::Priority {
444    ///         max_entries: 5_000,
445    ///         priority_fn: Arc::new(|_sig, state| {
446    ///             // Keep ERROR events longer than INFO events
447    ///             match state.metadata.as_ref().map(|m| m.level.as_str()) {
448    ///                 Some("ERROR") => 100,
449    ///                 Some("WARN") => 50,
450    ///                 Some("INFO") => 10,
451    ///                 _ => 5,
452    ///             }
453    ///         })
454    ///     })
455    ///     .build()
456    ///     .unwrap();
457    /// ```
458    ///
459    /// # Example: Memory-based eviction
460    ///
461    /// ```no_run
462    /// # use tracing_throttle::{TracingRateLimitLayer, EvictionStrategy};
463    /// // Evict when total memory exceeds 5MB
464    /// let layer = TracingRateLimitLayer::builder()
465    ///     .with_eviction_strategy(EvictionStrategy::Memory {
466    ///         max_bytes: 5 * 1024 * 1024,
467    ///     })
468    ///     .build()
469    ///     .unwrap();
470    /// ```
471    pub fn with_eviction_strategy(mut self, strategy: EvictionStrategy) -> Self {
472        self.eviction_strategy = Some(strategy);
473        self
474    }
475
476    /// Build the layer.
477    ///
478    /// # Errors
479    /// Returns `BuildError` if the configuration is invalid.
480    pub fn build(self) -> Result<TracingRateLimitLayer, BuildError> {
481        // Validate max_signatures if set
482        if let Some(max) = self.max_signatures {
483            if max == 0 {
484                return Err(BuildError::ZeroMaxSignatures);
485            }
486        }
487
488        // Create shared metrics and circuit breaker
489        let metrics = Metrics::new();
490        let circuit_breaker = Arc::new(CircuitBreaker::new());
491
492        let clock = self.clock.unwrap_or_else(|| Arc::new(SystemClock::new()));
493        let mut storage = ShardedStorage::new().with_metrics(metrics.clone());
494
495        // Convert eviction strategy to adapter, or use default LRU with max_signatures
496        let eviction_policy: Option<
497            Arc<dyn crate::application::ports::EvictionPolicy<EventSignature, EventState>>,
498        > = match self.eviction_strategy {
499            Some(EvictionStrategy::Lru { max_entries }) => Some(Arc::new(
500                crate::infrastructure::eviction::LruEviction::new(max_entries),
501            )),
502            Some(EvictionStrategy::Priority {
503                max_entries,
504                priority_fn,
505            }) => Some(Arc::new(
506                crate::infrastructure::eviction::PriorityEviction::new(max_entries, priority_fn),
507            )),
508            Some(EvictionStrategy::Memory { max_bytes }) => Some(Arc::new(
509                crate::infrastructure::eviction::MemoryEviction::new(max_bytes),
510            )),
511            Some(EvictionStrategy::PriorityWithMemory {
512                max_entries,
513                priority_fn,
514                max_bytes,
515            }) => Some(Arc::new(
516                crate::infrastructure::eviction::PriorityWithMemoryEviction::new(
517                    max_entries,
518                    priority_fn,
519                    max_bytes,
520                ),
521            )),
522            None => {
523                // Use default LRU with max_signatures if configured
524                self.max_signatures.map(|max| {
525                    Arc::new(crate::infrastructure::eviction::LruEviction::new(max))
526                        as Arc<
527                            dyn crate::application::ports::EvictionPolicy<
528                                EventSignature,
529                                EventState,
530                            >,
531                        >
532                })
533            }
534        };
535
536        if let Some(policy) = eviction_policy {
537            storage = storage.with_eviction_policy(policy);
538        }
539
540        let storage = Arc::new(storage);
541        let registry = SuppressionRegistry::new(storage, clock, self.policy);
542        let limiter = RateLimiter::new(registry.clone(), metrics.clone(), circuit_breaker);
543
544        // Let EmitterConfig validate the interval
545        let emitter_config = EmitterConfig::new(self.summary_interval)?;
546
547        #[cfg(feature = "async")]
548        let emitter_handle = if self.enable_active_emission {
549            let emitter = SummaryEmitter::new(registry, emitter_config);
550
551            // Use custom formatter or default
552            let formatter = self.summary_formatter.unwrap_or_else(|| {
553                Arc::new(|summary: &SuppressionSummary| {
554                    tracing::warn!(
555                        target: SUMMARY_TARGET,
556                        signature = %summary.signature,
557                        count = summary.count,
558                        "{}",
559                        summary.format_message()
560                    );
561                })
562            });
563
564            let handle = emitter.start(
565                move |summaries| {
566                    for summary in summaries {
567                        formatter(&summary);
568                    }
569                },
570                false, // Don't emit final summaries on shutdown
571            );
572            Arc::new(Mutex::new(Some(handle)))
573        } else {
574            Arc::new(Mutex::new(None))
575        };
576
577        Ok(TracingRateLimitLayer {
578            limiter,
579            span_context_fields: Arc::new(self.span_context_fields),
580            excluded_fields: Arc::new(self.excluded_fields),
581            exempt_targets: Arc::new(self.exempt_targets),
582            #[cfg(feature = "async")]
583            emitter_handle,
584            #[cfg(not(feature = "async"))]
585            _emitter_config: emitter_config,
586        })
587    }
588}
589
590/// A `tracing::Layer` that applies rate limiting to events.
591///
592/// This layer intercepts events, computes their signature, and decides
593/// whether to allow or suppress them based on the configured policy.
594///
595/// Optionally emits periodic summaries of suppressed events when active
596/// emission is enabled (requires `async` feature).
597#[derive(Clone)]
598pub struct TracingRateLimitLayer<S = Arc<ShardedStorage<EventSignature, EventState>>>
599where
600    S: Storage<EventSignature, EventState> + Clone,
601{
602    limiter: RateLimiter<S>,
603    span_context_fields: Arc<Vec<String>>,
604    excluded_fields: Arc<BTreeSet<String>>,
605    exempt_targets: Arc<BTreeSet<String>>,
606    #[cfg(feature = "async")]
607    emitter_handle: Arc<Mutex<Option<EmitterHandle>>>,
608    #[cfg(not(feature = "async"))]
609    _emitter_config: EmitterConfig,
610}
611
612impl<S> TracingRateLimitLayer<S>
613where
614    S: Storage<EventSignature, EventState> + Clone,
615{
616    /// Extract span context fields from the current span.
617    fn extract_span_context<Sub>(
618        &self,
619        cx: &Context<'_, Sub>,
620    ) -> BTreeMap<Cow<'static, str>, Cow<'static, str>>
621    where
622        Sub: Subscriber + for<'lookup> LookupSpan<'lookup>,
623    {
624        if self.span_context_fields.is_empty() {
625            return BTreeMap::new();
626        }
627
628        let mut context_fields = BTreeMap::new();
629
630        if let Some(span) = cx.lookup_current() {
631            for span_ref in span.scope() {
632                let extensions = span_ref.extensions();
633
634                if let Some(stored_fields) =
635                    extensions.get::<BTreeMap<Cow<'static, str>, Cow<'static, str>>>()
636                {
637                    for field_name in self.span_context_fields.as_ref() {
638                        // Create an owned Cow since we can't guarantee 'static lifetime from the String
639                        let field_key: Cow<'static, str> = Cow::Owned(field_name.clone());
640                        if let std::collections::btree_map::Entry::Vacant(e) =
641                            context_fields.entry(field_key.clone())
642                        {
643                            if let Some(value) = stored_fields.get(&field_key) {
644                                e.insert(value.clone());
645                            }
646                        }
647                    }
648                }
649
650                if context_fields.len() == self.span_context_fields.len() {
651                    break;
652                }
653            }
654        }
655
656        context_fields
657    }
658
659    /// Extract event fields from an event.
660    ///
661    /// Extracts ALL fields from the event, then excludes any fields in the
662    /// excluded_fields set. This ensures that field values are included in
663    /// event signatures by default, preventing accidental deduplication of
664    /// semantically different events.
665    fn extract_event_fields(
666        &self,
667        event: &tracing::Event<'_>,
668    ) -> BTreeMap<Cow<'static, str>, Cow<'static, str>> {
669        let mut visitor = FieldVisitor::new();
670        event.record(&mut visitor);
671        let all_fields = visitor.into_fields();
672
673        // Exclude configured fields (e.g., high-cardinality fields like request_id)
674        if self.excluded_fields.is_empty() {
675            all_fields
676        } else {
677            all_fields
678                .into_iter()
679                .filter(|(field_name, _)| !self.excluded_fields.contains(field_name.as_ref()))
680                .collect()
681        }
682    }
683
684    /// Compute event signature from tracing metadata, span context, and event fields.
685    ///
686    /// The signature includes:
687    /// - Log level (INFO, WARN, ERROR, etc.)
688    /// - Message template
689    /// - Target module path
690    /// - Span context fields (if configured)
691    /// - Event fields (if configured)
692    fn compute_signature(
693        &self,
694        metadata: &Metadata,
695        combined_fields: &BTreeMap<Cow<'static, str>, Cow<'static, str>>,
696    ) -> EventSignature {
697        let level = metadata.level().as_str();
698        let message = metadata.name();
699        let target = Some(metadata.target());
700
701        // Use combined fields (span context + event fields) in signature
702        EventSignature::new(level, message, combined_fields, target)
703    }
704
705    /// Check if an event should be allowed through.
706    pub fn should_allow(&self, signature: EventSignature) -> bool {
707        matches!(self.limiter.check_event(signature), LimitDecision::Allow)
708    }
709
710    /// Check if an event should be allowed through and capture metadata.
711    ///
712    /// This method stores event metadata on first occurrence so summaries
713    /// can show human-readable event details instead of just signature hashes.
714    ///
715    /// **Note:** Only available with the `human-readable` feature flag.
716    #[cfg(feature = "human-readable")]
717    pub fn should_allow_with_metadata(
718        &self,
719        signature: EventSignature,
720        metadata: crate::domain::metadata::EventMetadata,
721    ) -> bool {
722        matches!(
723            self.limiter.check_event_with_metadata(signature, metadata),
724            LimitDecision::Allow
725        )
726    }
727
728    /// Get a reference to the underlying limiter.
729    pub fn limiter(&self) -> &RateLimiter<S> {
730        &self.limiter
731    }
732
733    /// Get a reference to the metrics.
734    ///
735    /// Returns metrics about rate limiting behavior including:
736    /// - Events allowed
737    /// - Events suppressed
738    /// - Signatures evicted
739    pub fn metrics(&self) -> &Metrics {
740        self.limiter.metrics()
741    }
742
743    /// Get the current number of tracked signatures.
744    pub fn signature_count(&self) -> usize {
745        self.limiter.registry().len()
746    }
747
748    /// Get a reference to the circuit breaker.
749    ///
750    /// Use this to check the circuit breaker state and health:
751    /// - `circuit_breaker().state()` - Current circuit state
752    /// - `circuit_breaker().consecutive_failures()` - Failure count
753    pub fn circuit_breaker(&self) -> &Arc<CircuitBreaker> {
754        self.limiter.circuit_breaker()
755    }
756
757    /// Shutdown the active suppression summary emitter, if running.
758    ///
759    /// This method gracefully stops the background emission task.  If active emission
760    /// is not enabled, this method does nothing.
761    ///
762    /// **Requires the `async` feature.**
763    ///
764    /// # Errors
765    ///
766    /// Returns an error if the emitter task fails to shut down gracefully.
767    ///
768    /// # Example
769    ///
770    /// ```no_run
771    /// # use tracing_throttle::TracingRateLimitLayer;
772    /// # async fn example() {
773    /// let layer = TracingRateLimitLayer::builder()
774    ///     .with_active_emission(true)
775    ///     .build()
776    ///     .unwrap();
777    ///
778    /// // Use the layer...
779    ///
780    /// // Shutdown before dropping
781    /// layer.shutdown().await.expect("shutdown failed");
782    /// # }
783    /// ```
784    #[cfg(feature = "async")]
785    pub async fn shutdown(&self) -> Result<(), crate::application::emitter::ShutdownError> {
786        // Take the handle while holding the lock, then release the lock before awaiting
787        let handle = {
788            let mut handle_guard = self.emitter_handle.lock().unwrap();
789            handle_guard.take()
790        };
791
792        if let Some(handle) = handle {
793            handle.shutdown().await?;
794        }
795        Ok(())
796    }
797}
798
799impl TracingRateLimitLayer<Arc<ShardedStorage<EventSignature, EventState>>> {
800    /// Create a builder for configuring the layer.
801    ///
802    /// Defaults:
803    /// - Policy: token bucket (50 burst capacity, 1 token/sec refill rate)
804    /// - Max signatures: 10,000 (with LRU eviction)
805    /// - Summary interval: 30 seconds
806    /// - Active emission: disabled
807    /// - Summary formatter: default (WARN level with signature and count)
808    pub fn builder() -> TracingRateLimitLayerBuilder {
809        TracingRateLimitLayerBuilder {
810            policy: Policy::token_bucket(50.0, 1.0)
811                .expect("default policy with 50 capacity and 1/sec refill is always valid"),
812            summary_interval: Duration::from_secs(30),
813            clock: None,
814            max_signatures: Some(10_000),
815            enable_active_emission: false,
816            #[cfg(feature = "async")]
817            summary_formatter: None,
818            span_context_fields: Vec::new(),
819            excluded_fields: BTreeSet::new(),
820            eviction_strategy: None,
821            exempt_targets: BTreeSet::new(),
822        }
823    }
824
825    /// Create a layer with default settings.
826    ///
827    /// Equivalent to `TracingRateLimitLayer::builder().build().unwrap()`.
828    ///
829    /// Defaults:
830    /// - Policy: token bucket (50 burst capacity, 1 token/sec refill rate = 60/min)
831    /// - Max signatures: 10,000 (with LRU eviction)
832    /// - Summary interval: 30 seconds
833    ///
834    /// # Panics
835    /// This method cannot panic because all default values are valid.
836    pub fn new() -> Self {
837        Self::builder()
838            .build()
839            .expect("default configuration is always valid")
840    }
841
842    /// Create a layer with custom storage backend.
843    ///
844    /// This allows using alternative storage implementations like Redis for distributed
845    /// rate limiting across multiple application instances.
846    ///
847    /// # Arguments
848    ///
849    /// * `storage` - Custom storage implementation (must implement `Storage<EventSignature, EventState>`)
850    /// * `policy` - Rate limiting policy to apply
851    /// * `clock` - Clock implementation (use `SystemClock::new()` for production)
852    ///
853    /// # Example with Redis
854    ///
855    /// ```rust,ignore
856    /// use tracing_throttle::{TracingRateLimitLayer, RedisStorage, Policy, SystemClock};
857    /// use std::sync::Arc;
858    ///
859    /// #[tokio::main]
860    /// async fn main() {
861    ///     let storage = Arc::new(
862    ///         RedisStorage::connect("redis://127.0.0.1/")
863    ///             .await
864    ///             .expect("Failed to connect")
865    ///     );
866    ///     let policy = Policy::token_bucket(100.0, 10.0).unwrap();
867    ///     let clock = Arc::new(SystemClock::new());
868    ///
869    ///     let layer = TracingRateLimitLayer::with_storage(storage, policy, clock);
870    /// }
871    /// ```
872    pub fn with_storage<ST>(
873        storage: ST,
874        policy: Policy,
875        clock: Arc<dyn Clock>,
876    ) -> TracingRateLimitLayer<ST>
877    where
878        ST: Storage<EventSignature, EventState> + Clone,
879    {
880        let metrics = Metrics::new();
881        let circuit_breaker = Arc::new(CircuitBreaker::new());
882        let registry = SuppressionRegistry::new(storage, clock, policy);
883        let limiter = RateLimiter::new(registry, metrics, circuit_breaker);
884
885        TracingRateLimitLayer {
886            limiter,
887            span_context_fields: Arc::new(Vec::new()),
888            excluded_fields: Arc::new(BTreeSet::new()),
889            exempt_targets: Arc::new(BTreeSet::new()),
890            #[cfg(feature = "async")]
891            emitter_handle: Arc::new(Mutex::new(None)),
892            #[cfg(not(feature = "async"))]
893            _emitter_config: EmitterConfig::new(Duration::from_secs(30))
894                .expect("30 seconds is valid"),
895        }
896    }
897}
898
899impl Default for TracingRateLimitLayer<Arc<ShardedStorage<EventSignature, EventState>>> {
900    fn default() -> Self {
901        Self::new()
902    }
903}
904
905// Implement the Filter trait for rate limiting
906impl<S, Sub> Filter<Sub> for TracingRateLimitLayer<S>
907where
908    S: Storage<EventSignature, EventState> + Clone,
909    Sub: Subscriber + for<'lookup> LookupSpan<'lookup>,
910{
911    fn enabled(&self, _meta: &Metadata<'_>, _cx: &Context<'_, Sub>) -> bool {
912        // Always return true - actual filtering happens in event_enabled
913        // This prevents double-checking in dual-layer setups
914        true
915    }
916
917    fn event_enabled(&self, event: &tracing::Event<'_>, cx: &Context<'_, Sub>) -> bool {
918        let metadata_obj = event.metadata();
919        let target = metadata_obj.target();
920
921        // Exempt our internal summary emission target to prevent recursive throttling.
922        // Only the default formatter uses this target; custom formatters are the user's
923        // responsibility and are intentionally subject to normal throttling rules.
924        #[cfg(feature = "async")]
925        if target == SUMMARY_TARGET {
926            return true;
927        }
928
929        // Check if this target is exempt from rate limiting
930        // Skip the lookup if no exempt targets are configured (common case)
931        if !self.exempt_targets.is_empty() && self.exempt_targets.contains(target) {
932            // Exempt targets bypass rate limiting entirely
933            self.limiter.metrics().record_allowed();
934            return true;
935        }
936
937        // Combine span context and event fields
938        let mut combined_fields = self.extract_span_context(cx);
939        let event_fields = self.extract_event_fields(event);
940        combined_fields.extend(event_fields);
941
942        let signature = self.compute_signature(metadata_obj, &combined_fields);
943
944        #[cfg(feature = "human-readable")]
945        {
946            // Extract message from event for metadata
947            let mut visitor = FieldVisitor::new();
948            event.record(&mut visitor);
949            let all_fields = visitor.into_fields();
950            let message = all_fields
951                .get(&Cow::Borrowed("message"))
952                .map(|v| v.to_string())
953                .unwrap_or_else(|| event.metadata().name().to_string());
954
955            // Create EventMetadata for this event
956            let event_metadata = crate::domain::metadata::EventMetadata::new(
957                metadata_obj.level().as_str().to_string(),
958                message,
959                target.to_string(),
960                combined_fields,
961            );
962
963            self.should_allow_with_metadata(signature, event_metadata)
964        }
965
966        #[cfg(not(feature = "human-readable"))]
967        {
968            self.should_allow(signature)
969        }
970    }
971}
972
973impl<S, Sub> Layer<Sub> for TracingRateLimitLayer<S>
974where
975    S: Storage<EventSignature, EventState> + Clone + 'static,
976    Sub: Subscriber + for<'lookup> LookupSpan<'lookup>,
977{
978    fn on_new_span(
979        &self,
980        attrs: &tracing::span::Attributes<'_>,
981        id: &tracing::span::Id,
982        ctx: Context<'_, Sub>,
983    ) {
984        if self.span_context_fields.is_empty() {
985            return;
986        }
987
988        let mut visitor = FieldVisitor::new();
989        attrs.record(&mut visitor);
990        let fields = visitor.into_fields();
991
992        if let Some(span) = ctx.span(id) {
993            let mut extensions = span.extensions_mut();
994            extensions.insert(fields);
995        }
996    }
997}
998
999#[cfg(test)]
1000mod tests {
1001    use super::*;
1002    use tracing::info;
1003    use tracing_subscriber::layer::SubscriberExt;
1004
1005    #[test]
1006    fn test_layer_builder() {
1007        let layer = TracingRateLimitLayer::builder()
1008            .with_policy(Policy::count_based(50).unwrap())
1009            .with_summary_interval(Duration::from_secs(60))
1010            .build()
1011            .unwrap();
1012
1013        assert!(layer.limiter().registry().is_empty());
1014    }
1015
1016    #[test]
1017    fn test_span_context_fields_deduplication() {
1018        let layer = TracingRateLimitLayer::builder()
1019            .with_span_context_fields(vec![
1020                "user_id".to_string(),
1021                "user_id".to_string(), // duplicate
1022                "tenant_id".to_string(),
1023                "".to_string(),        // empty, should be filtered
1024                "user_id".to_string(), // another duplicate
1025            ])
1026            .build()
1027            .unwrap();
1028
1029        // Should only have 2 unique fields: user_id and tenant_id
1030        assert_eq!(layer.span_context_fields.len(), 2);
1031        assert!(layer.span_context_fields.iter().any(|f| f == "user_id"));
1032        assert!(layer.span_context_fields.iter().any(|f| f == "tenant_id"));
1033    }
1034
1035    #[test]
1036    fn test_excluded_fields_deduplication() {
1037        let layer = TracingRateLimitLayer::builder()
1038            .with_excluded_fields(vec![
1039                "request_id".to_string(),
1040                "request_id".to_string(), // duplicate
1041                "trace_id".to_string(),
1042                "".to_string(),           // empty, should be filtered
1043                "request_id".to_string(), // another duplicate
1044            ])
1045            .build()
1046            .unwrap();
1047
1048        // Should only have 2 unique fields: request_id and trace_id
1049        assert_eq!(layer.excluded_fields.len(), 2);
1050        assert!(layer.excluded_fields.contains("request_id"));
1051        assert!(layer.excluded_fields.contains("trace_id"));
1052    }
1053
1054    #[test]
1055    fn test_exempt_targets_deduplication() {
1056        let layer = TracingRateLimitLayer::builder()
1057            .with_exempt_targets(vec![
1058                "myapp::security".to_string(),
1059                "myapp::security".to_string(), // duplicate
1060                "myapp::audit".to_string(),
1061                "".to_string(),                // empty, should be filtered
1062                "myapp::security".to_string(), // another duplicate
1063            ])
1064            .build()
1065            .unwrap();
1066
1067        // Should only have 2 unique targets: security and audit
1068        assert_eq!(layer.exempt_targets.len(), 2);
1069        assert!(layer.exempt_targets.contains("myapp::security"));
1070        assert!(layer.exempt_targets.contains("myapp::audit"));
1071    }
1072
1073    #[test]
1074    fn test_exempt_targets_bypass_rate_limiting() {
1075        let rate_limit = TracingRateLimitLayer::builder()
1076            .with_policy(Policy::count_based(2).unwrap())
1077            .with_exempt_targets(vec!["myapp::security".to_string()])
1078            .build()
1079            .unwrap();
1080
1081        let subscriber = tracing_subscriber::registry()
1082            .with(tracing_subscriber::fmt::layer().with_filter(rate_limit.clone()));
1083
1084        tracing::subscriber::with_default(subscriber, || {
1085            // Regular logs get throttled after 2 (same callsite = same signature)
1086            for _ in 0..3 {
1087                info!("Regular log"); // First 2 allowed, 3rd suppressed
1088            }
1089
1090            // Security logs are never throttled (exempt target)
1091            for _ in 0..4 {
1092                info!(target: "myapp::security", "Security event"); // All 4 allowed
1093            }
1094        });
1095
1096        // Verify metrics
1097        let metrics = rate_limit.metrics();
1098        assert_eq!(metrics.events_allowed(), 6); // 2 regular + 4 exempt
1099        assert_eq!(metrics.events_suppressed(), 1); // 1 regular suppressed
1100    }
1101
1102    #[test]
1103    fn test_layer_default() {
1104        let layer = TracingRateLimitLayer::default();
1105        assert!(layer.limiter().registry().is_empty());
1106    }
1107
1108    #[test]
1109    fn test_signature_computation() {
1110        let _layer = TracingRateLimitLayer::new();
1111
1112        // Use a simple signature test without metadata construction
1113        let sig1 = EventSignature::simple("INFO", "test_event");
1114        let sig2 = EventSignature::simple("INFO", "test_event");
1115
1116        // Same inputs should produce same signature
1117        assert_eq!(sig1, sig2);
1118    }
1119
1120    #[test]
1121    fn test_basic_rate_limiting() {
1122        let layer = TracingRateLimitLayer::builder()
1123            .with_policy(Policy::count_based(2).unwrap())
1124            .build()
1125            .unwrap();
1126
1127        let sig = EventSignature::simple("INFO", "test_message");
1128
1129        // First two should be allowed
1130        assert!(layer.should_allow(sig));
1131        assert!(layer.should_allow(sig));
1132
1133        // Third should be suppressed
1134        assert!(!layer.should_allow(sig));
1135    }
1136
1137    #[test]
1138    fn test_layer_integration() {
1139        let layer = TracingRateLimitLayer::builder()
1140            .with_policy(Policy::count_based(3).unwrap())
1141            .build()
1142            .unwrap();
1143
1144        // Clone for use in subscriber, keep original for checking state
1145        let layer_for_check = layer.clone();
1146
1147        let subscriber = tracing_subscriber::registry()
1148            .with(tracing_subscriber::fmt::layer().with_filter(layer));
1149
1150        // Test that the layer correctly tracks event signatures
1151        tracing::subscriber::with_default(subscriber, || {
1152            // Emit 10 identical events
1153            for _ in 0..10 {
1154                info!("test event");
1155            }
1156        });
1157
1158        // After emitting 10 events with the same signature, the layer should have
1159        // tracked them and only the first 3 should have been marked as allowed
1160        // The registry should contain one entry for this signature
1161        assert_eq!(layer_for_check.limiter().registry().len(), 1);
1162    }
1163
1164    #[test]
1165    fn test_layer_suppression_logic() {
1166        let layer = TracingRateLimitLayer::builder()
1167            .with_policy(Policy::count_based(3).unwrap())
1168            .build()
1169            .unwrap();
1170
1171        let sig = EventSignature::simple("INFO", "test");
1172
1173        // Verify the suppression logic works correctly
1174        let mut allowed_count = 0;
1175        for _ in 0..10 {
1176            if layer.should_allow(sig) {
1177                allowed_count += 1;
1178            }
1179        }
1180
1181        assert_eq!(allowed_count, 3);
1182    }
1183
1184    #[test]
1185    fn test_builder_zero_summary_interval() {
1186        let result = TracingRateLimitLayer::builder()
1187            .with_summary_interval(Duration::from_secs(0))
1188            .build();
1189
1190        assert!(matches!(
1191            result,
1192            Err(BuildError::EmitterConfig(
1193                crate::application::emitter::EmitterConfigError::ZeroSummaryInterval
1194            ))
1195        ));
1196    }
1197
1198    #[test]
1199    fn test_builder_zero_max_signatures() {
1200        let result = TracingRateLimitLayer::builder()
1201            .with_max_signatures(0)
1202            .build();
1203
1204        assert!(matches!(result, Err(BuildError::ZeroMaxSignatures)));
1205    }
1206
1207    #[test]
1208    fn test_builder_valid_max_signatures() {
1209        let layer = TracingRateLimitLayer::builder()
1210            .with_max_signatures(100)
1211            .build()
1212            .unwrap();
1213
1214        assert!(layer.limiter().registry().is_empty());
1215    }
1216
1217    #[test]
1218    fn test_metrics_tracking() {
1219        let layer = TracingRateLimitLayer::builder()
1220            .with_policy(Policy::count_based(2).unwrap())
1221            .build()
1222            .unwrap();
1223
1224        let sig = EventSignature::simple("INFO", "test");
1225
1226        // Check initial metrics
1227        assert_eq!(layer.metrics().events_allowed(), 0);
1228        assert_eq!(layer.metrics().events_suppressed(), 0);
1229
1230        // Allow first two events
1231        assert!(layer.should_allow(sig));
1232        assert!(layer.should_allow(sig));
1233
1234        // Check metrics after allowed events
1235        assert_eq!(layer.metrics().events_allowed(), 2);
1236        assert_eq!(layer.metrics().events_suppressed(), 0);
1237
1238        // Suppress third event
1239        assert!(!layer.should_allow(sig));
1240
1241        // Check metrics after suppressed event
1242        assert_eq!(layer.metrics().events_allowed(), 2);
1243        assert_eq!(layer.metrics().events_suppressed(), 1);
1244    }
1245
1246    #[test]
1247    fn test_metrics_snapshot() {
1248        let layer = TracingRateLimitLayer::builder()
1249            .with_policy(Policy::count_based(3).unwrap())
1250            .build()
1251            .unwrap();
1252
1253        let sig = EventSignature::simple("INFO", "test");
1254
1255        // Generate some events
1256        for _ in 0..5 {
1257            layer.should_allow(sig);
1258        }
1259
1260        // Get snapshot
1261        let snapshot = layer.metrics().snapshot();
1262        assert_eq!(snapshot.events_allowed, 3);
1263        assert_eq!(snapshot.events_suppressed, 2);
1264        assert_eq!(snapshot.total_events(), 5);
1265        assert!((snapshot.suppression_rate() - 0.4).abs() < f64::EPSILON);
1266    }
1267
1268    #[test]
1269    fn test_signature_count() {
1270        let layer = TracingRateLimitLayer::builder()
1271            .with_policy(Policy::count_based(2).unwrap())
1272            .build()
1273            .unwrap();
1274
1275        assert_eq!(layer.signature_count(), 0);
1276
1277        let sig1 = EventSignature::simple("INFO", "test1");
1278        let sig2 = EventSignature::simple("INFO", "test2");
1279
1280        layer.should_allow(sig1);
1281        assert_eq!(layer.signature_count(), 1);
1282
1283        layer.should_allow(sig2);
1284        assert_eq!(layer.signature_count(), 2);
1285
1286        // Same signature shouldn't increase count
1287        layer.should_allow(sig1);
1288        assert_eq!(layer.signature_count(), 2);
1289    }
1290
1291    #[test]
1292    fn test_metrics_with_eviction() {
1293        let layer = TracingRateLimitLayer::builder()
1294            .with_policy(Policy::count_based(1).unwrap())
1295            .with_max_signatures(3)
1296            .build()
1297            .unwrap();
1298
1299        // Fill up to capacity
1300        for i in 0..3 {
1301            let sig = EventSignature::simple("INFO", &format!("test{}", i));
1302            layer.should_allow(sig);
1303        }
1304
1305        assert_eq!(layer.signature_count(), 3);
1306        assert_eq!(layer.metrics().signatures_evicted(), 0);
1307
1308        // Add one more, which should trigger eviction
1309        let sig = EventSignature::simple("INFO", "test3");
1310        layer.should_allow(sig);
1311
1312        assert_eq!(layer.signature_count(), 3);
1313        assert_eq!(layer.metrics().signatures_evicted(), 1);
1314    }
1315
1316    #[test]
1317    fn test_circuit_breaker_observability() {
1318        use crate::application::circuit_breaker::CircuitState;
1319
1320        let layer = TracingRateLimitLayer::builder()
1321            .with_policy(Policy::count_based(2).unwrap())
1322            .build()
1323            .unwrap();
1324
1325        // Check initial circuit breaker state
1326        let cb = layer.circuit_breaker();
1327        assert_eq!(cb.state(), CircuitState::Closed);
1328        assert_eq!(cb.consecutive_failures(), 0);
1329
1330        // Circuit breaker should remain closed during normal operation
1331        let sig = EventSignature::simple("INFO", "test");
1332        layer.should_allow(sig);
1333        layer.should_allow(sig);
1334        layer.should_allow(sig);
1335
1336        assert_eq!(cb.state(), CircuitState::Closed);
1337    }
1338
1339    #[test]
1340    fn test_circuit_breaker_fail_open_integration() {
1341        use crate::application::circuit_breaker::{
1342            CircuitBreaker, CircuitBreakerConfig, CircuitState,
1343        };
1344        use std::time::Duration;
1345
1346        // Create a circuit breaker with low threshold for testing
1347        let cb_config = CircuitBreakerConfig {
1348            failure_threshold: 2,
1349            recovery_timeout: Duration::from_secs(1),
1350        };
1351        let circuit_breaker = Arc::new(CircuitBreaker::with_config(cb_config));
1352
1353        // Build layer with custom circuit breaker
1354        let storage = Arc::new(ShardedStorage::new());
1355        let clock = Arc::new(SystemClock::new());
1356        let policy = Policy::count_based(2).unwrap();
1357        let registry = SuppressionRegistry::new(storage, clock, policy);
1358        let metrics = Metrics::new();
1359        let limiter = RateLimiter::new(registry, metrics, circuit_breaker.clone());
1360
1361        let layer = TracingRateLimitLayer {
1362            limiter,
1363            span_context_fields: Arc::new(Vec::new()),
1364            excluded_fields: Arc::new(BTreeSet::new()),
1365            exempt_targets: Arc::new(BTreeSet::new()),
1366            #[cfg(feature = "async")]
1367            emitter_handle: Arc::new(Mutex::new(None)),
1368            #[cfg(not(feature = "async"))]
1369            _emitter_config: crate::application::emitter::EmitterConfig::new(Duration::from_secs(
1370                30,
1371            ))
1372            .unwrap(),
1373        };
1374
1375        let sig = EventSignature::simple("INFO", "test");
1376
1377        // Normal operation - first 2 events allowed, third suppressed
1378        assert!(layer.should_allow(sig));
1379        assert!(layer.should_allow(sig));
1380        assert!(!layer.should_allow(sig));
1381
1382        // Circuit should still be closed
1383        assert_eq!(circuit_breaker.state(), CircuitState::Closed);
1384
1385        // Manually trigger circuit breaker failures to test fail-open
1386        circuit_breaker.record_failure();
1387        circuit_breaker.record_failure();
1388
1389        // Circuit should now be open
1390        assert_eq!(circuit_breaker.state(), CircuitState::Open);
1391
1392        // With circuit open, rate limiter should fail open (allow all events)
1393        // even though we've already hit the rate limit
1394        assert!(layer.should_allow(sig));
1395        assert!(layer.should_allow(sig));
1396        assert!(layer.should_allow(sig));
1397
1398        // Metrics should show these as allowed (fail-open behavior)
1399        let snapshot = layer.metrics().snapshot();
1400        assert!(snapshot.events_allowed >= 5); // 2 normal + 3 fail-open
1401    }
1402
1403    #[cfg(feature = "async")]
1404    #[tokio::test]
1405    async fn test_active_emission_integration() {
1406        use std::sync::atomic::{AtomicUsize, Ordering};
1407        use std::time::Duration;
1408
1409        // Use an atomic counter to track emissions
1410        let emission_count = Arc::new(AtomicUsize::new(0));
1411        let count_clone = Arc::clone(&emission_count);
1412
1413        // Create a layer with a custom emitter that increments our counter
1414        let storage = Arc::new(ShardedStorage::new());
1415        let clock = Arc::new(SystemClock::new());
1416        let policy = Policy::count_based(2).unwrap();
1417        let registry = SuppressionRegistry::new(storage, clock, policy);
1418
1419        let emitter_config = EmitterConfig::new(Duration::from_millis(100)).unwrap();
1420        let emitter = SummaryEmitter::new(registry.clone(), emitter_config);
1421
1422        // Start emitter with custom callback
1423        let handle = emitter.start(
1424            move |summaries| {
1425                count_clone.fetch_add(summaries.len(), Ordering::SeqCst);
1426            },
1427            false,
1428        );
1429
1430        // Emit events that will be suppressed
1431        let sig = EventSignature::simple("INFO", "test_message");
1432        for _ in 0..10 {
1433            registry.with_event_state(sig, |state, now| {
1434                state.counter.record_suppression(now);
1435            });
1436        }
1437
1438        // Wait for at least two emission intervals
1439        tokio::time::sleep(Duration::from_millis(250)).await;
1440
1441        // Check that summaries were emitted
1442        let count = emission_count.load(Ordering::SeqCst);
1443        assert!(
1444            count > 0,
1445            "Expected at least one suppression summary to be emitted, got {}",
1446            count
1447        );
1448
1449        // Graceful shutdown
1450        handle.shutdown().await.expect("shutdown failed");
1451    }
1452
1453    #[cfg(feature = "async")]
1454    #[tokio::test]
1455    async fn test_active_emission_disabled() {
1456        use crate::infrastructure::mocks::layer::MockCaptureLayer;
1457        use std::time::Duration;
1458
1459        // Create layer with active emission disabled (default)
1460        let layer = TracingRateLimitLayer::builder()
1461            .with_policy(Policy::count_based(2).unwrap())
1462            .with_summary_interval(Duration::from_millis(100))
1463            .build()
1464            .unwrap();
1465
1466        let mock = MockCaptureLayer::new();
1467        let mock_clone = mock.clone();
1468
1469        let subscriber = tracing_subscriber::registry()
1470            .with(mock)
1471            .with(tracing_subscriber::fmt::layer().with_filter(layer.clone()));
1472
1473        tracing::subscriber::with_default(subscriber, || {
1474            let sig = EventSignature::simple("INFO", "test_message");
1475            for _ in 0..10 {
1476                layer.should_allow(sig);
1477            }
1478        });
1479
1480        // Wait to ensure no emissions occur
1481        tokio::time::sleep(Duration::from_millis(250)).await;
1482
1483        // Should not have emitted any summaries
1484        let events = mock_clone.get_captured();
1485        let summary_count = events
1486            .iter()
1487            .filter(|e| e.message.contains("suppressed"))
1488            .count();
1489
1490        assert_eq!(
1491            summary_count, 0,
1492            "Should not emit summaries when active emission is disabled"
1493        );
1494
1495        // Shutdown should succeed even when emitter was never started
1496        layer.shutdown().await.expect("shutdown failed");
1497    }
1498
1499    #[cfg(feature = "async")]
1500    #[tokio::test]
1501    async fn test_shutdown_without_emission() {
1502        // Test that shutdown works when emission was never enabled
1503        let layer = TracingRateLimitLayer::new();
1504
1505        // Should not error
1506        layer
1507            .shutdown()
1508            .await
1509            .expect("shutdown should succeed when emitter not running");
1510    }
1511
1512    #[cfg(feature = "async")]
1513    #[tokio::test]
1514    async fn test_custom_summary_formatter() {
1515        use std::sync::atomic::{AtomicUsize, Ordering};
1516        use std::time::Duration;
1517
1518        // Track formatter invocations
1519        let call_count = Arc::new(AtomicUsize::new(0));
1520        let count_clone = Arc::clone(&call_count);
1521
1522        // Track data passed to formatter
1523        let last_count = Arc::new(AtomicUsize::new(0));
1524        let last_count_clone = Arc::clone(&last_count);
1525
1526        // Create layer with custom formatter
1527        let layer = TracingRateLimitLayer::builder()
1528            .with_policy(Policy::count_based(2).unwrap())
1529            .with_active_emission(true)
1530            .with_summary_interval(Duration::from_millis(100))
1531            .with_summary_formatter(Arc::new(move |summary| {
1532                count_clone.fetch_add(1, Ordering::SeqCst);
1533                last_count_clone.store(summary.count, Ordering::SeqCst);
1534                // Custom format: emit at INFO level instead of WARN
1535                tracing::info!(
1536                    sig = %summary.signature,
1537                    suppressed = summary.count,
1538                    "Custom format"
1539                );
1540            }))
1541            .build()
1542            .unwrap();
1543
1544        // Emit events that will be suppressed
1545        let sig = EventSignature::simple("INFO", "test_message");
1546        for _ in 0..10 {
1547            layer.should_allow(sig);
1548        }
1549
1550        // Wait for emission
1551        tokio::time::sleep(Duration::from_millis(250)).await;
1552
1553        // Verify custom formatter was called
1554        let calls = call_count.load(Ordering::SeqCst);
1555        assert!(calls > 0, "Custom formatter should have been called");
1556
1557        // Verify formatter received correct data
1558        let count = last_count.load(Ordering::SeqCst);
1559        assert!(
1560            count >= 8,
1561            "Expected at least 8 suppressions, got {}",
1562            count
1563        );
1564
1565        layer.shutdown().await.expect("shutdown failed");
1566    }
1567
1568    #[cfg(feature = "async")]
1569    #[tokio::test]
1570    async fn test_default_formatter_used() {
1571        use std::sync::atomic::{AtomicUsize, Ordering};
1572        use std::time::Duration;
1573
1574        let emission_count = Arc::new(AtomicUsize::new(0));
1575        let count_clone = Arc::clone(&emission_count);
1576
1577        let storage = Arc::new(ShardedStorage::new());
1578        let clock = Arc::new(SystemClock::new());
1579        let policy = Policy::count_based(2).unwrap();
1580        let registry = SuppressionRegistry::new(storage, clock, policy);
1581
1582        let emitter_config = EmitterConfig::new(Duration::from_millis(100)).unwrap();
1583        let emitter = SummaryEmitter::new(registry.clone(), emitter_config);
1584
1585        // Start without custom formatter - should use default
1586        let handle = emitter.start(
1587            move |summaries| {
1588                count_clone.fetch_add(summaries.len(), Ordering::SeqCst);
1589            },
1590            false,
1591        );
1592
1593        let sig = EventSignature::simple("INFO", "test_message");
1594        for _ in 0..10 {
1595            registry.with_event_state(sig, |state, now| {
1596                state.counter.record_suppression(now);
1597            });
1598        }
1599
1600        tokio::time::sleep(Duration::from_millis(250)).await;
1601
1602        let count = emission_count.load(Ordering::SeqCst);
1603        assert!(count > 0, "Default formatter should have emitted summaries");
1604
1605        handle.shutdown().await.expect("shutdown failed");
1606    }
1607
1608    #[cfg(feature = "async")]
1609    #[tokio::test]
1610    async fn test_summary_emission_not_recursively_throttled() {
1611        use std::time::Duration;
1612
1613        // Build a layer with active emission and a very restrictive policy
1614        let layer = TracingRateLimitLayer::builder()
1615            .with_policy(Policy::count_based(1).unwrap())
1616            .with_active_emission(true)
1617            .with_summary_interval(Duration::from_millis(100))
1618            .build()
1619            .unwrap();
1620
1621        let layer_clone = layer.clone();
1622
1623        let subscriber = tracing_subscriber::registry()
1624            .with(tracing_subscriber::fmt::layer().with_filter(layer));
1625
1626        // Emit events to trigger suppressions, then wait for multiple summary intervals.
1627        // If summaries were recursively throttled, the second interval would produce
1628        // nested "Suppressed 1 times: ... Suppressed N times: ..." messages and the
1629        // signature count would keep growing.
1630        tracing::subscriber::with_default(subscriber, || {
1631            // Trigger suppressions
1632            for _ in 0..5 {
1633                tracing::info!(target: "myapp", "repetitive event");
1634            }
1635        });
1636
1637        // Wait for two summary intervals
1638        tokio::time::sleep(Duration::from_millis(250)).await;
1639
1640        // Signature count should be exactly 1 (the "myapp" event).
1641        // If summary events were being throttled, additional signatures would appear.
1642        assert_eq!(
1643            layer_clone.signature_count(),
1644            1,
1645            "Summary emissions should not create additional throttle signatures"
1646        );
1647
1648        layer_clone.shutdown().await.expect("shutdown failed");
1649    }
1650}