tracing_throttle/infrastructure/
layer.rs

1//! Tracing integration layer.
2//!
3//! Provides a `tracing::Layer` implementation that applies rate limiting
4//! to log events.
5
6use crate::application::{
7    circuit_breaker::CircuitBreaker,
8    emitter::EmitterConfig,
9    limiter::{LimitDecision, RateLimiter},
10    metrics::Metrics,
11    ports::{Clock, Storage},
12    registry::{EventState, SuppressionRegistry},
13};
14use crate::domain::{policy::Policy, signature::EventSignature};
15use crate::infrastructure::clock::SystemClock;
16use crate::infrastructure::storage::ShardedStorage;
17use crate::infrastructure::visitor::FieldVisitor;
18
19use std::collections::{BTreeMap, BTreeSet};
20use std::sync::Arc;
21use std::time::Duration;
22use tracing::{Metadata, Subscriber};
23use tracing_subscriber::layer::Filter;
24use tracing_subscriber::registry::LookupSpan;
25use tracing_subscriber::{layer::Context, Layer};
26
27#[cfg(feature = "async")]
28use crate::application::emitter::{EmitterHandle, SummaryEmitter};
29
30#[cfg(feature = "async")]
31use crate::domain::summary::SuppressionSummary;
32
33#[cfg(feature = "async")]
34use std::sync::Mutex;
35
36/// Function type for formatting suppression summaries.
37///
38/// Takes a reference to a `SuppressionSummary` and emits it as a tracing event.
39/// The function is responsible for choosing the log level and format.
40#[cfg(feature = "async")]
41pub type SummaryFormatter = Arc<dyn Fn(&SuppressionSummary) + Send + Sync + 'static>;
42
43/// Error returned when building a TracingRateLimitLayer fails.
44#[derive(Debug, Clone, PartialEq, Eq)]
45pub enum BuildError {
46    /// Maximum signatures must be greater than zero
47    ZeroMaxSignatures,
48    /// Emitter configuration validation failed
49    EmitterConfig(crate::application::emitter::EmitterConfigError),
50}
51
52impl std::fmt::Display for BuildError {
53    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
54        match self {
55            BuildError::ZeroMaxSignatures => {
56                write!(f, "max_signatures must be greater than 0")
57            }
58            BuildError::EmitterConfig(e) => {
59                write!(f, "emitter configuration error: {}", e)
60            }
61        }
62    }
63}
64
65impl std::error::Error for BuildError {}
66
67impl From<crate::application::emitter::EmitterConfigError> for BuildError {
68    fn from(e: crate::application::emitter::EmitterConfigError) -> Self {
69        BuildError::EmitterConfig(e)
70    }
71}
72
73/// Builder for constructing a `TracingRateLimitLayer`.
74pub struct TracingRateLimitLayerBuilder {
75    policy: Policy,
76    summary_interval: Duration,
77    clock: Option<Arc<dyn Clock>>,
78    max_signatures: Option<usize>,
79    enable_active_emission: bool,
80    #[cfg(feature = "async")]
81    summary_formatter: Option<SummaryFormatter>,
82    span_context_fields: Vec<String>,
83    event_fields: Vec<String>,
84    eviction_strategy: Option<EvictionStrategy>,
85}
86
87/// Eviction strategy configuration for the rate limit layer.
88///
89/// This enum provides a user-friendly API that internally creates
90/// the appropriate EvictionPolicy adapter.
91#[derive(Clone)]
92pub enum EvictionStrategy {
93    /// LRU (Least Recently Used) eviction with entry count limit.
94    Lru {
95        /// Maximum number of entries
96        max_entries: usize,
97    },
98    /// Priority-based eviction using a custom function.
99    Priority {
100        /// Maximum number of entries
101        max_entries: usize,
102        /// Priority calculation function
103        priority_fn: crate::infrastructure::eviction::PriorityFn<EventSignature, EventState>,
104    },
105    /// Memory-based eviction with byte limit.
106    Memory {
107        /// Maximum memory usage in bytes
108        max_bytes: usize,
109    },
110    /// Combined priority and memory limits.
111    PriorityWithMemory {
112        /// Maximum number of entries
113        max_entries: usize,
114        /// Priority calculation function
115        priority_fn: crate::infrastructure::eviction::PriorityFn<EventSignature, EventState>,
116        /// Maximum memory usage in bytes
117        max_bytes: usize,
118    },
119}
120
121impl EvictionStrategy {
122    /// Check if this strategy tracks memory usage.
123    pub fn tracks_memory(&self) -> bool {
124        matches!(
125            self,
126            EvictionStrategy::Memory { .. } | EvictionStrategy::PriorityWithMemory { .. }
127        )
128    }
129
130    /// Get the memory limit if this strategy uses one.
131    pub fn memory_limit(&self) -> Option<usize> {
132        match self {
133            EvictionStrategy::Memory { max_bytes } => Some(*max_bytes),
134            EvictionStrategy::PriorityWithMemory { max_bytes, .. } => Some(*max_bytes),
135            _ => None,
136        }
137    }
138
139    /// Check if this strategy uses priority-based eviction.
140    pub fn uses_priority(&self) -> bool {
141        matches!(
142            self,
143            EvictionStrategy::Priority { .. } | EvictionStrategy::PriorityWithMemory { .. }
144        )
145    }
146}
147
148impl std::fmt::Debug for EvictionStrategy {
149    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
150        match self {
151            EvictionStrategy::Lru { max_entries } => f
152                .debug_struct("Lru")
153                .field("max_entries", max_entries)
154                .finish(),
155            EvictionStrategy::Priority {
156                max_entries,
157                priority_fn: _,
158            } => f
159                .debug_struct("Priority")
160                .field("max_entries", max_entries)
161                .field("priority_fn", &"<fn>")
162                .finish(),
163            EvictionStrategy::Memory { max_bytes } => f
164                .debug_struct("Memory")
165                .field("max_bytes", max_bytes)
166                .finish(),
167            EvictionStrategy::PriorityWithMemory {
168                max_entries,
169                priority_fn: _,
170                max_bytes,
171            } => f
172                .debug_struct("PriorityWithMemory")
173                .field("max_entries", max_entries)
174                .field("priority_fn", &"<fn>")
175                .field("max_bytes", max_bytes)
176                .finish(),
177        }
178    }
179}
180
181impl TracingRateLimitLayerBuilder {
182    /// Set the rate limiting policy.
183    pub fn with_policy(mut self, policy: Policy) -> Self {
184        self.policy = policy;
185        self
186    }
187
188    /// Set the summary emission interval.
189    ///
190    /// The interval will be validated when `build()` is called.
191    pub fn with_summary_interval(mut self, interval: Duration) -> Self {
192        self.summary_interval = interval;
193        self
194    }
195
196    /// Set a custom clock (mainly for testing).
197    pub fn with_clock(mut self, clock: Arc<dyn Clock>) -> Self {
198        self.clock = Some(clock);
199        self
200    }
201
202    /// Set the maximum number of unique event signatures to track.
203    ///
204    /// When this limit is reached, the least recently used signatures will be evicted.
205    /// This prevents unbounded memory growth in applications with high signature cardinality.
206    ///
207    /// Default: 10,000 signatures
208    ///
209    /// The value will be validated when `build()` is called.
210    pub fn with_max_signatures(mut self, max_signatures: usize) -> Self {
211        self.max_signatures = Some(max_signatures);
212        self
213    }
214
215    /// Disable the signature limit, allowing unbounded growth.
216    ///
217    /// **Warning**: This can lead to unbounded memory usage in applications that generate
218    /// many unique event signatures. Only use this if you're certain your application has
219    /// bounded signature cardinality or you have external memory monitoring.
220    pub fn with_unlimited_signatures(mut self) -> Self {
221        self.max_signatures = None;
222        self
223    }
224
225    /// Enable active emission of suppression summaries.
226    ///
227    /// When enabled, the layer will automatically emit `WARN`-level tracing events
228    /// containing summaries of suppressed log events at the configured interval.
229    ///
230    /// **Requires the `async` feature** - this method has no effect without it.
231    ///
232    /// Default: disabled
233    ///
234    /// # Example
235    ///
236    /// ```no_run
237    /// # use tracing_throttle::TracingRateLimitLayer;
238    /// # use std::time::Duration;
239    /// let layer = TracingRateLimitLayer::builder()
240    ///     .with_active_emission(true)
241    ///     .with_summary_interval(Duration::from_secs(60))
242    ///     .build()
243    ///     .unwrap();
244    /// ```
245    pub fn with_active_emission(mut self, enabled: bool) -> Self {
246        self.enable_active_emission = enabled;
247        self
248    }
249
250    /// Set a custom formatter for suppression summaries.
251    ///
252    /// The formatter is responsible for emitting summaries as tracing events.
253    /// This allows full control over log level, message format, and structured fields.
254    ///
255    /// **Requires the `async` feature.**
256    ///
257    /// If not set, a default formatter is used that emits at WARN level with
258    /// `signature` and `count` fields.
259    ///
260    /// # Example
261    ///
262    /// ```no_run
263    /// # use tracing_throttle::TracingRateLimitLayer;
264    /// # use std::sync::Arc;
265    /// # use std::time::Duration;
266    /// let layer = TracingRateLimitLayer::builder()
267    ///     .with_active_emission(true)
268    ///     .with_summary_formatter(Arc::new(|summary| {
269    ///         tracing::info!(
270    ///             signature = %summary.signature,
271    ///             count = summary.count,
272    ///             duration_secs = summary.duration.as_secs(),
273    ///             "Suppression summary"
274    ///         );
275    ///     }))
276    ///     .build()
277    ///     .unwrap();
278    /// ```
279    #[cfg(feature = "async")]
280    pub fn with_summary_formatter(mut self, formatter: SummaryFormatter) -> Self {
281        self.summary_formatter = Some(formatter);
282        self
283    }
284
285    /// Include span context fields in event signatures.
286    ///
287    /// When specified, the layer will extract these fields from the current span
288    /// context and include them in the event signature. This enables rate limiting
289    /// per-user, per-tenant, per-request, or any other span-level context.
290    ///
291    /// Duplicate field names are automatically removed, and empty field names are filtered out.
292    ///
293    /// # Example
294    ///
295    /// ```no_run
296    /// # use tracing_throttle::TracingRateLimitLayer;
297    /// // Rate limit separately per user
298    /// let layer = TracingRateLimitLayer::builder()
299    ///     .with_span_context_fields(vec!["user_id".to_string()])
300    ///     .build()
301    ///     .unwrap();
302    ///
303    /// // Rate limit per user and tenant
304    /// let layer = TracingRateLimitLayer::builder()
305    ///     .with_span_context_fields(vec!["user_id".to_string(), "tenant_id".to_string()])
306    ///     .build()
307    ///     .unwrap();
308    /// ```
309    ///
310    /// # Usage with Spans
311    ///
312    /// ```no_run
313    /// # use tracing::{info, info_span};
314    /// // Create a span with user context
315    /// let span = info_span!("request", user_id = "alice");
316    /// let _enter = span.enter();
317    ///
318    /// // These events will be rate limited separately per user
319    /// info!("Processing request");  // Limited for user "alice"
320    /// ```
321    pub fn with_span_context_fields(mut self, fields: Vec<String>) -> Self {
322        // Deduplicate and filter out empty field names
323        let unique_fields: BTreeSet<_> = fields.into_iter().filter(|f| !f.is_empty()).collect();
324        self.span_context_fields = unique_fields.into_iter().collect();
325        self
326    }
327
328    /// Include event fields in event signatures.
329    ///
330    /// When specified, the layer will extract these fields from events themselves
331    /// and include them in the event signature. This enables rate limiting per
332    /// error code, status, endpoint, or any other event-level field.
333    ///
334    /// Duplicate field names are automatically removed, and empty field names are filtered out.
335    ///
336    /// # Example
337    ///
338    /// ```no_run
339    /// # use tracing_throttle::TracingRateLimitLayer;
340    /// // Rate limit separately per error_code
341    /// let layer = TracingRateLimitLayer::builder()
342    ///     .with_event_fields(vec!["error_code".to_string()])
343    ///     .build()
344    ///     .unwrap();
345    ///
346    /// // Rate limit per status and endpoint
347    /// let layer = TracingRateLimitLayer::builder()
348    ///     .with_event_fields(vec!["status".to_string(), "endpoint".to_string()])
349    ///     .build()
350    ///     .unwrap();
351    /// ```
352    ///
353    /// # Usage with Events
354    ///
355    /// ```no_run
356    /// # use tracing::error;
357    /// // Events with different error codes are rate limited independently
358    /// error!(error_code = "AUTH_FAILED", "Authentication failed");
359    /// error!(error_code = "TIMEOUT", "Request timeout");
360    /// ```
361    pub fn with_event_fields(mut self, fields: Vec<String>) -> Self {
362        // Deduplicate and filter out empty field names
363        let unique_fields: BTreeSet<_> = fields.into_iter().filter(|f| !f.is_empty()).collect();
364        self.event_fields = unique_fields.into_iter().collect();
365        self
366    }
367
368    /// Set a custom eviction strategy for signature management.
369    ///
370    /// Controls which signatures are evicted when storage limits are reached.
371    /// If not set, uses LRU eviction with the configured max_signatures limit.
372    ///
373    /// # Example: Priority-based eviction
374    ///
375    /// ```no_run
376    /// # use tracing_throttle::{TracingRateLimitLayer, EvictionStrategy};
377    /// # use std::sync::Arc;
378    /// let layer = TracingRateLimitLayer::builder()
379    ///     .with_eviction_strategy(EvictionStrategy::Priority {
380    ///         max_entries: 5_000,
381    ///         priority_fn: Arc::new(|_sig, state| {
382    ///             // Keep ERROR events longer than INFO events
383    ///             match state.metadata.as_ref().map(|m| m.level.as_str()) {
384    ///                 Some("ERROR") => 100,
385    ///                 Some("WARN") => 50,
386    ///                 Some("INFO") => 10,
387    ///                 _ => 5,
388    ///             }
389    ///         })
390    ///     })
391    ///     .build()
392    ///     .unwrap();
393    /// ```
394    ///
395    /// # Example: Memory-based eviction
396    ///
397    /// ```no_run
398    /// # use tracing_throttle::{TracingRateLimitLayer, EvictionStrategy};
399    /// // Evict when total memory exceeds 5MB
400    /// let layer = TracingRateLimitLayer::builder()
401    ///     .with_eviction_strategy(EvictionStrategy::Memory {
402    ///         max_bytes: 5 * 1024 * 1024,
403    ///     })
404    ///     .build()
405    ///     .unwrap();
406    /// ```
407    pub fn with_eviction_strategy(mut self, strategy: EvictionStrategy) -> Self {
408        self.eviction_strategy = Some(strategy);
409        self
410    }
411
412    /// Build the layer.
413    ///
414    /// # Errors
415    /// Returns `BuildError` if the configuration is invalid.
416    pub fn build(self) -> Result<TracingRateLimitLayer, BuildError> {
417        // Validate max_signatures if set
418        if let Some(max) = self.max_signatures {
419            if max == 0 {
420                return Err(BuildError::ZeroMaxSignatures);
421            }
422        }
423
424        // Create shared metrics and circuit breaker
425        let metrics = Metrics::new();
426        let circuit_breaker = Arc::new(CircuitBreaker::new());
427
428        let clock = self.clock.unwrap_or_else(|| Arc::new(SystemClock::new()));
429        let mut storage = ShardedStorage::new().with_metrics(metrics.clone());
430
431        // Convert eviction strategy to adapter, or use default LRU with max_signatures
432        let eviction_policy: Option<
433            Arc<dyn crate::application::ports::EvictionPolicy<EventSignature, EventState>>,
434        > = match self.eviction_strategy {
435            Some(EvictionStrategy::Lru { max_entries }) => Some(Arc::new(
436                crate::infrastructure::eviction::LruEviction::new(max_entries),
437            )),
438            Some(EvictionStrategy::Priority {
439                max_entries,
440                priority_fn,
441            }) => Some(Arc::new(
442                crate::infrastructure::eviction::PriorityEviction::new(max_entries, priority_fn),
443            )),
444            Some(EvictionStrategy::Memory { max_bytes }) => Some(Arc::new(
445                crate::infrastructure::eviction::MemoryEviction::new(max_bytes),
446            )),
447            Some(EvictionStrategy::PriorityWithMemory {
448                max_entries,
449                priority_fn,
450                max_bytes,
451            }) => Some(Arc::new(
452                crate::infrastructure::eviction::PriorityWithMemoryEviction::new(
453                    max_entries,
454                    priority_fn,
455                    max_bytes,
456                ),
457            )),
458            None => {
459                // Use default LRU with max_signatures if configured
460                self.max_signatures.map(|max| {
461                    Arc::new(crate::infrastructure::eviction::LruEviction::new(max))
462                        as Arc<
463                            dyn crate::application::ports::EvictionPolicy<
464                                EventSignature,
465                                EventState,
466                            >,
467                        >
468                })
469            }
470        };
471
472        if let Some(policy) = eviction_policy {
473            storage = storage.with_eviction_policy(policy);
474        }
475
476        let storage = Arc::new(storage);
477        let registry = SuppressionRegistry::new(storage, clock, self.policy);
478        let limiter = RateLimiter::new(registry.clone(), metrics.clone(), circuit_breaker);
479
480        // Let EmitterConfig validate the interval
481        let emitter_config = EmitterConfig::new(self.summary_interval)?;
482
483        #[cfg(feature = "async")]
484        let emitter_handle = if self.enable_active_emission {
485            let emitter = SummaryEmitter::new(registry, emitter_config);
486
487            // Use custom formatter or default
488            let formatter = self.summary_formatter.unwrap_or_else(|| {
489                Arc::new(|summary: &SuppressionSummary| {
490                    tracing::warn!(
491                        signature = %summary.signature,
492                        count = summary.count,
493                        "{}",
494                        summary.format_message()
495                    );
496                })
497            });
498
499            let handle = emitter.start(
500                move |summaries| {
501                    for summary in summaries {
502                        formatter(&summary);
503                    }
504                },
505                false, // Don't emit final summaries on shutdown
506            );
507            Arc::new(Mutex::new(Some(handle)))
508        } else {
509            Arc::new(Mutex::new(None))
510        };
511
512        Ok(TracingRateLimitLayer {
513            limiter,
514            span_context_fields: Arc::new(self.span_context_fields),
515            event_fields: Arc::new(self.event_fields),
516            #[cfg(feature = "async")]
517            emitter_handle,
518            #[cfg(not(feature = "async"))]
519            _emitter_config: emitter_config,
520        })
521    }
522}
523
524/// A `tracing::Layer` that applies rate limiting to events.
525///
526/// This layer intercepts events, computes their signature, and decides
527/// whether to allow or suppress them based on the configured policy.
528///
529/// Optionally emits periodic summaries of suppressed events when active
530/// emission is enabled (requires `async` feature).
531#[derive(Clone)]
532pub struct TracingRateLimitLayer<S = Arc<ShardedStorage<EventSignature, EventState>>>
533where
534    S: Storage<EventSignature, EventState> + Clone,
535{
536    limiter: RateLimiter<S>,
537    span_context_fields: Arc<Vec<String>>,
538    event_fields: Arc<Vec<String>>,
539    #[cfg(feature = "async")]
540    emitter_handle: Arc<Mutex<Option<EmitterHandle>>>,
541    #[cfg(not(feature = "async"))]
542    _emitter_config: EmitterConfig,
543}
544
545impl<S> TracingRateLimitLayer<S>
546where
547    S: Storage<EventSignature, EventState> + Clone,
548{
549    /// Extract span context fields from the current span.
550    fn extract_span_context<Sub>(&self, cx: &Context<'_, Sub>) -> BTreeMap<String, String>
551    where
552        Sub: Subscriber + for<'lookup> LookupSpan<'lookup>,
553    {
554        if self.span_context_fields.is_empty() {
555            return BTreeMap::new();
556        }
557
558        let mut context_fields = BTreeMap::new();
559
560        if let Some(span) = cx.lookup_current() {
561            for span_ref in span.scope() {
562                let extensions = span_ref.extensions();
563
564                if let Some(stored_fields) = extensions.get::<BTreeMap<String, String>>() {
565                    for field_name in self.span_context_fields.as_ref() {
566                        if !context_fields.contains_key(field_name) {
567                            if let Some(value) = stored_fields.get(field_name) {
568                                context_fields.insert(field_name.clone(), value.clone());
569                            }
570                        }
571                    }
572                }
573
574                if context_fields.len() == self.span_context_fields.len() {
575                    break;
576                }
577            }
578        }
579
580        context_fields
581    }
582
583    /// Extract event fields from an event.
584    fn extract_event_fields(&self, event: &tracing::Event<'_>) -> BTreeMap<String, String> {
585        if self.event_fields.is_empty() {
586            return BTreeMap::new();
587        }
588
589        let mut visitor = FieldVisitor::new();
590        event.record(&mut visitor);
591        let all_fields = visitor.into_fields();
592
593        // Filter to only the configured event fields
594        self.event_fields
595            .iter()
596            .filter_map(|field_name| {
597                all_fields
598                    .get(field_name)
599                    .map(|value| (field_name.clone(), value.clone()))
600            })
601            .collect()
602    }
603
604    /// Compute event signature from tracing metadata, span context, and event fields.
605    ///
606    /// The signature includes:
607    /// - Log level (INFO, WARN, ERROR, etc.)
608    /// - Message template
609    /// - Target module path
610    /// - Span context fields (if configured)
611    /// - Event fields (if configured)
612    fn compute_signature(
613        &self,
614        metadata: &Metadata,
615        combined_fields: &BTreeMap<String, String>,
616    ) -> EventSignature {
617        let level = metadata.level().as_str();
618        let message = metadata.name();
619        let target = Some(metadata.target());
620
621        // Use combined fields (span context + event fields) in signature
622        EventSignature::new(level, message, combined_fields, target)
623    }
624
625    /// Check if an event should be allowed through.
626    pub fn should_allow(&self, signature: EventSignature) -> bool {
627        matches!(self.limiter.check_event(signature), LimitDecision::Allow)
628    }
629
630    /// Check if an event should be allowed through and capture metadata.
631    ///
632    /// This method stores event metadata on first occurrence so summaries
633    /// can show human-readable event details instead of just signature hashes.
634    ///
635    /// **Note:** Only available with the `human-readable` feature flag.
636    #[cfg(feature = "human-readable")]
637    pub fn should_allow_with_metadata(
638        &self,
639        signature: EventSignature,
640        metadata: crate::domain::metadata::EventMetadata,
641    ) -> bool {
642        matches!(
643            self.limiter.check_event_with_metadata(signature, metadata),
644            LimitDecision::Allow
645        )
646    }
647
648    /// Get a reference to the underlying limiter.
649    pub fn limiter(&self) -> &RateLimiter<S> {
650        &self.limiter
651    }
652
653    /// Get a reference to the metrics.
654    ///
655    /// Returns metrics about rate limiting behavior including:
656    /// - Events allowed
657    /// - Events suppressed
658    /// - Signatures evicted
659    pub fn metrics(&self) -> &Metrics {
660        self.limiter.metrics()
661    }
662
663    /// Get the current number of tracked signatures.
664    pub fn signature_count(&self) -> usize {
665        self.limiter.registry().len()
666    }
667
668    /// Get a reference to the circuit breaker.
669    ///
670    /// Use this to check the circuit breaker state and health:
671    /// - `circuit_breaker().state()` - Current circuit state
672    /// - `circuit_breaker().consecutive_failures()` - Failure count
673    pub fn circuit_breaker(&self) -> &Arc<CircuitBreaker> {
674        self.limiter.circuit_breaker()
675    }
676
677    /// Shutdown the active suppression summary emitter, if running.
678    ///
679    /// This method gracefully stops the background emission task.  If active emission
680    /// is not enabled, this method does nothing.
681    ///
682    /// **Requires the `async` feature.**
683    ///
684    /// # Errors
685    ///
686    /// Returns an error if the emitter task fails to shut down gracefully.
687    ///
688    /// # Example
689    ///
690    /// ```no_run
691    /// # use tracing_throttle::TracingRateLimitLayer;
692    /// # async fn example() {
693    /// let layer = TracingRateLimitLayer::builder()
694    ///     .with_active_emission(true)
695    ///     .build()
696    ///     .unwrap();
697    ///
698    /// // Use the layer...
699    ///
700    /// // Shutdown before dropping
701    /// layer.shutdown().await.expect("shutdown failed");
702    /// # }
703    /// ```
704    #[cfg(feature = "async")]
705    pub async fn shutdown(&self) -> Result<(), crate::application::emitter::ShutdownError> {
706        // Take the handle while holding the lock, then release the lock before awaiting
707        let handle = {
708            let mut handle_guard = self.emitter_handle.lock().unwrap();
709            handle_guard.take()
710        };
711
712        if let Some(handle) = handle {
713            handle.shutdown().await?;
714        }
715        Ok(())
716    }
717}
718
719impl TracingRateLimitLayer<Arc<ShardedStorage<EventSignature, EventState>>> {
720    /// Create a builder for configuring the layer.
721    ///
722    /// Defaults:
723    /// - Policy: token bucket (50 burst capacity, 1 token/sec refill rate)
724    /// - Max signatures: 10,000 (with LRU eviction)
725    /// - Summary interval: 30 seconds
726    /// - Active emission: disabled
727    /// - Summary formatter: default (WARN level with signature and count)
728    pub fn builder() -> TracingRateLimitLayerBuilder {
729        TracingRateLimitLayerBuilder {
730            policy: Policy::token_bucket(50.0, 1.0)
731                .expect("default policy with 50 capacity and 1/sec refill is always valid"),
732            summary_interval: Duration::from_secs(30),
733            clock: None,
734            max_signatures: Some(10_000),
735            enable_active_emission: false,
736            #[cfg(feature = "async")]
737            summary_formatter: None,
738            span_context_fields: Vec::new(),
739            event_fields: Vec::new(),
740            eviction_strategy: None,
741        }
742    }
743
744    /// Create a layer with default settings.
745    ///
746    /// Equivalent to `TracingRateLimitLayer::builder().build().unwrap()`.
747    ///
748    /// Defaults:
749    /// - Policy: token bucket (50 burst capacity, 1 token/sec refill rate = 60/min)
750    /// - Max signatures: 10,000 (with LRU eviction)
751    /// - Summary interval: 30 seconds
752    ///
753    /// # Panics
754    /// This method cannot panic because all default values are valid.
755    pub fn new() -> Self {
756        Self::builder()
757            .build()
758            .expect("default configuration is always valid")
759    }
760
761    /// Create a layer with custom storage backend.
762    ///
763    /// This allows using alternative storage implementations like Redis for distributed
764    /// rate limiting across multiple application instances.
765    ///
766    /// # Arguments
767    ///
768    /// * `storage` - Custom storage implementation (must implement `Storage<EventSignature, EventState>`)
769    /// * `policy` - Rate limiting policy to apply
770    /// * `clock` - Clock implementation (use `SystemClock::new()` for production)
771    ///
772    /// # Example with Redis
773    ///
774    /// ```rust,ignore
775    /// use tracing_throttle::{TracingRateLimitLayer, RedisStorage, Policy, SystemClock};
776    /// use std::sync::Arc;
777    ///
778    /// #[tokio::main]
779    /// async fn main() {
780    ///     let storage = Arc::new(
781    ///         RedisStorage::connect("redis://127.0.0.1/")
782    ///             .await
783    ///             .expect("Failed to connect")
784    ///     );
785    ///     let policy = Policy::token_bucket(100.0, 10.0).unwrap();
786    ///     let clock = Arc::new(SystemClock::new());
787    ///
788    ///     let layer = TracingRateLimitLayer::with_storage(storage, policy, clock);
789    /// }
790    /// ```
791    pub fn with_storage<ST>(
792        storage: ST,
793        policy: Policy,
794        clock: Arc<dyn Clock>,
795    ) -> TracingRateLimitLayer<ST>
796    where
797        ST: Storage<EventSignature, EventState> + Clone,
798    {
799        let metrics = Metrics::new();
800        let circuit_breaker = Arc::new(CircuitBreaker::new());
801        let registry = SuppressionRegistry::new(storage, clock, policy);
802        let limiter = RateLimiter::new(registry, metrics, circuit_breaker);
803
804        TracingRateLimitLayer {
805            limiter,
806            span_context_fields: Arc::new(Vec::new()),
807            event_fields: Arc::new(Vec::new()),
808            #[cfg(feature = "async")]
809            emitter_handle: Arc::new(Mutex::new(None)),
810            #[cfg(not(feature = "async"))]
811            _emitter_config: EmitterConfig::new(Duration::from_secs(30))
812                .expect("30 seconds is valid"),
813        }
814    }
815}
816
817impl Default for TracingRateLimitLayer<Arc<ShardedStorage<EventSignature, EventState>>> {
818    fn default() -> Self {
819        Self::new()
820    }
821}
822
823// Implement the Filter trait for rate limiting
824impl<S, Sub> Filter<Sub> for TracingRateLimitLayer<S>
825where
826    S: Storage<EventSignature, EventState> + Clone,
827    Sub: Subscriber + for<'lookup> LookupSpan<'lookup>,
828{
829    fn enabled(&self, _meta: &Metadata<'_>, _cx: &Context<'_, Sub>) -> bool {
830        // Always return true - actual filtering happens in event_enabled
831        // This prevents double-checking in dual-layer setups
832        true
833    }
834
835    fn event_enabled(&self, event: &tracing::Event<'_>, cx: &Context<'_, Sub>) -> bool {
836        // Combine span context and event fields
837        let mut combined_fields = self.extract_span_context(cx);
838        let event_fields = self.extract_event_fields(event);
839        combined_fields.extend(event_fields);
840
841        let metadata_obj = event.metadata();
842        let signature = self.compute_signature(metadata_obj, &combined_fields);
843
844        #[cfg(feature = "human-readable")]
845        {
846            // Extract message from event for metadata
847            let mut visitor = FieldVisitor::new();
848            event.record(&mut visitor);
849            let all_fields = visitor.into_fields();
850            let message = all_fields
851                .get("message")
852                .cloned()
853                .unwrap_or_else(|| event.metadata().name().to_string());
854
855            // Create EventMetadata for this event
856            let event_metadata = crate::domain::metadata::EventMetadata::new(
857                metadata_obj.level().as_str().to_string(),
858                message,
859                metadata_obj.target().to_string(),
860                combined_fields,
861            );
862
863            self.should_allow_with_metadata(signature, event_metadata)
864        }
865
866        #[cfg(not(feature = "human-readable"))]
867        {
868            self.should_allow(signature)
869        }
870    }
871}
872
873impl<S, Sub> Layer<Sub> for TracingRateLimitLayer<S>
874where
875    S: Storage<EventSignature, EventState> + Clone + 'static,
876    Sub: Subscriber + for<'lookup> LookupSpan<'lookup>,
877{
878    fn on_new_span(
879        &self,
880        attrs: &tracing::span::Attributes<'_>,
881        id: &tracing::span::Id,
882        ctx: Context<'_, Sub>,
883    ) {
884        if self.span_context_fields.is_empty() {
885            return;
886        }
887
888        let mut visitor = FieldVisitor::new();
889        attrs.record(&mut visitor);
890        let fields = visitor.into_fields();
891
892        if let Some(span) = ctx.span(id) {
893            let mut extensions = span.extensions_mut();
894            extensions.insert(fields);
895        }
896    }
897}
898
899#[cfg(test)]
900mod tests {
901    use super::*;
902    use tracing::info;
903    use tracing_subscriber::layer::SubscriberExt;
904
905    #[test]
906    fn test_layer_builder() {
907        let layer = TracingRateLimitLayer::builder()
908            .with_policy(Policy::count_based(50).unwrap())
909            .with_summary_interval(Duration::from_secs(60))
910            .build()
911            .unwrap();
912
913        assert!(layer.limiter().registry().is_empty());
914    }
915
916    #[test]
917    fn test_span_context_fields_deduplication() {
918        let layer = TracingRateLimitLayer::builder()
919            .with_span_context_fields(vec![
920                "user_id".to_string(),
921                "user_id".to_string(), // duplicate
922                "tenant_id".to_string(),
923                "".to_string(),        // empty, should be filtered
924                "user_id".to_string(), // another duplicate
925            ])
926            .build()
927            .unwrap();
928
929        // Should only have 2 unique fields: user_id and tenant_id
930        assert_eq!(layer.span_context_fields.len(), 2);
931        assert!(layer.span_context_fields.iter().any(|f| f == "user_id"));
932        assert!(layer.span_context_fields.iter().any(|f| f == "tenant_id"));
933    }
934
935    #[test]
936    fn test_event_fields_deduplication() {
937        let layer = TracingRateLimitLayer::builder()
938            .with_event_fields(vec![
939                "error_code".to_string(),
940                "error_code".to_string(), // duplicate
941                "status".to_string(),
942                "".to_string(),           // empty, should be filtered
943                "error_code".to_string(), // another duplicate
944            ])
945            .build()
946            .unwrap();
947
948        // Should only have 2 unique fields: error_code and status
949        assert_eq!(layer.event_fields.len(), 2);
950        assert!(layer.event_fields.iter().any(|f| f == "error_code"));
951        assert!(layer.event_fields.iter().any(|f| f == "status"));
952    }
953
954    #[test]
955    fn test_layer_default() {
956        let layer = TracingRateLimitLayer::default();
957        assert!(layer.limiter().registry().is_empty());
958    }
959
960    #[test]
961    fn test_signature_computation() {
962        let _layer = TracingRateLimitLayer::new();
963
964        // Use a simple signature test without metadata construction
965        let sig1 = EventSignature::simple("INFO", "test_event");
966        let sig2 = EventSignature::simple("INFO", "test_event");
967
968        // Same inputs should produce same signature
969        assert_eq!(sig1, sig2);
970    }
971
972    #[test]
973    fn test_basic_rate_limiting() {
974        let layer = TracingRateLimitLayer::builder()
975            .with_policy(Policy::count_based(2).unwrap())
976            .build()
977            .unwrap();
978
979        let sig = EventSignature::simple("INFO", "test_message");
980
981        // First two should be allowed
982        assert!(layer.should_allow(sig));
983        assert!(layer.should_allow(sig));
984
985        // Third should be suppressed
986        assert!(!layer.should_allow(sig));
987    }
988
989    #[test]
990    fn test_layer_integration() {
991        let layer = TracingRateLimitLayer::builder()
992            .with_policy(Policy::count_based(3).unwrap())
993            .build()
994            .unwrap();
995
996        // Clone for use in subscriber, keep original for checking state
997        let layer_for_check = layer.clone();
998
999        let subscriber = tracing_subscriber::registry()
1000            .with(tracing_subscriber::fmt::layer().with_filter(layer));
1001
1002        // Test that the layer correctly tracks event signatures
1003        tracing::subscriber::with_default(subscriber, || {
1004            // Emit 10 identical events
1005            for _ in 0..10 {
1006                info!("test event");
1007            }
1008        });
1009
1010        // After emitting 10 events with the same signature, the layer should have
1011        // tracked them and only the first 3 should have been marked as allowed
1012        // The registry should contain one entry for this signature
1013        assert_eq!(layer_for_check.limiter().registry().len(), 1);
1014    }
1015
1016    #[test]
1017    fn test_layer_suppression_logic() {
1018        let layer = TracingRateLimitLayer::builder()
1019            .with_policy(Policy::count_based(3).unwrap())
1020            .build()
1021            .unwrap();
1022
1023        let sig = EventSignature::simple("INFO", "test");
1024
1025        // Verify the suppression logic works correctly
1026        let mut allowed_count = 0;
1027        for _ in 0..10 {
1028            if layer.should_allow(sig) {
1029                allowed_count += 1;
1030            }
1031        }
1032
1033        assert_eq!(allowed_count, 3);
1034    }
1035
1036    #[test]
1037    fn test_builder_zero_summary_interval() {
1038        let result = TracingRateLimitLayer::builder()
1039            .with_summary_interval(Duration::from_secs(0))
1040            .build();
1041
1042        assert!(matches!(
1043            result,
1044            Err(BuildError::EmitterConfig(
1045                crate::application::emitter::EmitterConfigError::ZeroSummaryInterval
1046            ))
1047        ));
1048    }
1049
1050    #[test]
1051    fn test_builder_zero_max_signatures() {
1052        let result = TracingRateLimitLayer::builder()
1053            .with_max_signatures(0)
1054            .build();
1055
1056        assert!(matches!(result, Err(BuildError::ZeroMaxSignatures)));
1057    }
1058
1059    #[test]
1060    fn test_builder_valid_max_signatures() {
1061        let layer = TracingRateLimitLayer::builder()
1062            .with_max_signatures(100)
1063            .build()
1064            .unwrap();
1065
1066        assert!(layer.limiter().registry().is_empty());
1067    }
1068
1069    #[test]
1070    fn test_metrics_tracking() {
1071        let layer = TracingRateLimitLayer::builder()
1072            .with_policy(Policy::count_based(2).unwrap())
1073            .build()
1074            .unwrap();
1075
1076        let sig = EventSignature::simple("INFO", "test");
1077
1078        // Check initial metrics
1079        assert_eq!(layer.metrics().events_allowed(), 0);
1080        assert_eq!(layer.metrics().events_suppressed(), 0);
1081
1082        // Allow first two events
1083        assert!(layer.should_allow(sig));
1084        assert!(layer.should_allow(sig));
1085
1086        // Check metrics after allowed events
1087        assert_eq!(layer.metrics().events_allowed(), 2);
1088        assert_eq!(layer.metrics().events_suppressed(), 0);
1089
1090        // Suppress third event
1091        assert!(!layer.should_allow(sig));
1092
1093        // Check metrics after suppressed event
1094        assert_eq!(layer.metrics().events_allowed(), 2);
1095        assert_eq!(layer.metrics().events_suppressed(), 1);
1096    }
1097
1098    #[test]
1099    fn test_metrics_snapshot() {
1100        let layer = TracingRateLimitLayer::builder()
1101            .with_policy(Policy::count_based(3).unwrap())
1102            .build()
1103            .unwrap();
1104
1105        let sig = EventSignature::simple("INFO", "test");
1106
1107        // Generate some events
1108        for _ in 0..5 {
1109            layer.should_allow(sig);
1110        }
1111
1112        // Get snapshot
1113        let snapshot = layer.metrics().snapshot();
1114        assert_eq!(snapshot.events_allowed, 3);
1115        assert_eq!(snapshot.events_suppressed, 2);
1116        assert_eq!(snapshot.total_events(), 5);
1117        assert!((snapshot.suppression_rate() - 0.4).abs() < f64::EPSILON);
1118    }
1119
1120    #[test]
1121    fn test_signature_count() {
1122        let layer = TracingRateLimitLayer::builder()
1123            .with_policy(Policy::count_based(2).unwrap())
1124            .build()
1125            .unwrap();
1126
1127        assert_eq!(layer.signature_count(), 0);
1128
1129        let sig1 = EventSignature::simple("INFO", "test1");
1130        let sig2 = EventSignature::simple("INFO", "test2");
1131
1132        layer.should_allow(sig1);
1133        assert_eq!(layer.signature_count(), 1);
1134
1135        layer.should_allow(sig2);
1136        assert_eq!(layer.signature_count(), 2);
1137
1138        // Same signature shouldn't increase count
1139        layer.should_allow(sig1);
1140        assert_eq!(layer.signature_count(), 2);
1141    }
1142
1143    #[test]
1144    fn test_metrics_with_eviction() {
1145        let layer = TracingRateLimitLayer::builder()
1146            .with_policy(Policy::count_based(1).unwrap())
1147            .with_max_signatures(3)
1148            .build()
1149            .unwrap();
1150
1151        // Fill up to capacity
1152        for i in 0..3 {
1153            let sig = EventSignature::simple("INFO", &format!("test{}", i));
1154            layer.should_allow(sig);
1155        }
1156
1157        assert_eq!(layer.signature_count(), 3);
1158        assert_eq!(layer.metrics().signatures_evicted(), 0);
1159
1160        // Add one more, which should trigger eviction
1161        let sig = EventSignature::simple("INFO", "test3");
1162        layer.should_allow(sig);
1163
1164        assert_eq!(layer.signature_count(), 3);
1165        assert_eq!(layer.metrics().signatures_evicted(), 1);
1166    }
1167
1168    #[test]
1169    fn test_circuit_breaker_observability() {
1170        use crate::application::circuit_breaker::CircuitState;
1171
1172        let layer = TracingRateLimitLayer::builder()
1173            .with_policy(Policy::count_based(2).unwrap())
1174            .build()
1175            .unwrap();
1176
1177        // Check initial circuit breaker state
1178        let cb = layer.circuit_breaker();
1179        assert_eq!(cb.state(), CircuitState::Closed);
1180        assert_eq!(cb.consecutive_failures(), 0);
1181
1182        // Circuit breaker should remain closed during normal operation
1183        let sig = EventSignature::simple("INFO", "test");
1184        layer.should_allow(sig);
1185        layer.should_allow(sig);
1186        layer.should_allow(sig);
1187
1188        assert_eq!(cb.state(), CircuitState::Closed);
1189    }
1190
1191    #[test]
1192    fn test_circuit_breaker_fail_open_integration() {
1193        use crate::application::circuit_breaker::{
1194            CircuitBreaker, CircuitBreakerConfig, CircuitState,
1195        };
1196        use std::time::Duration;
1197
1198        // Create a circuit breaker with low threshold for testing
1199        let cb_config = CircuitBreakerConfig {
1200            failure_threshold: 2,
1201            recovery_timeout: Duration::from_secs(1),
1202        };
1203        let circuit_breaker = Arc::new(CircuitBreaker::with_config(cb_config));
1204
1205        // Build layer with custom circuit breaker
1206        let storage = Arc::new(ShardedStorage::new());
1207        let clock = Arc::new(SystemClock::new());
1208        let policy = Policy::count_based(2).unwrap();
1209        let registry = SuppressionRegistry::new(storage, clock, policy);
1210        let metrics = Metrics::new();
1211        let limiter = RateLimiter::new(registry, metrics, circuit_breaker.clone());
1212
1213        let layer = TracingRateLimitLayer {
1214            limiter,
1215            span_context_fields: Arc::new(Vec::new()),
1216            event_fields: Arc::new(Vec::new()),
1217            #[cfg(feature = "async")]
1218            emitter_handle: Arc::new(Mutex::new(None)),
1219            #[cfg(not(feature = "async"))]
1220            _emitter_config: crate::application::emitter::EmitterConfig::new(Duration::from_secs(
1221                30,
1222            ))
1223            .unwrap(),
1224        };
1225
1226        let sig = EventSignature::simple("INFO", "test");
1227
1228        // Normal operation - first 2 events allowed, third suppressed
1229        assert!(layer.should_allow(sig));
1230        assert!(layer.should_allow(sig));
1231        assert!(!layer.should_allow(sig));
1232
1233        // Circuit should still be closed
1234        assert_eq!(circuit_breaker.state(), CircuitState::Closed);
1235
1236        // Manually trigger circuit breaker failures to test fail-open
1237        circuit_breaker.record_failure();
1238        circuit_breaker.record_failure();
1239
1240        // Circuit should now be open
1241        assert_eq!(circuit_breaker.state(), CircuitState::Open);
1242
1243        // With circuit open, rate limiter should fail open (allow all events)
1244        // even though we've already hit the rate limit
1245        assert!(layer.should_allow(sig));
1246        assert!(layer.should_allow(sig));
1247        assert!(layer.should_allow(sig));
1248
1249        // Metrics should show these as allowed (fail-open behavior)
1250        let snapshot = layer.metrics().snapshot();
1251        assert!(snapshot.events_allowed >= 5); // 2 normal + 3 fail-open
1252    }
1253
1254    #[cfg(feature = "async")]
1255    #[tokio::test]
1256    async fn test_active_emission_integration() {
1257        use std::sync::atomic::{AtomicUsize, Ordering};
1258        use std::time::Duration;
1259
1260        // Use an atomic counter to track emissions
1261        let emission_count = Arc::new(AtomicUsize::new(0));
1262        let count_clone = Arc::clone(&emission_count);
1263
1264        // Create a layer with a custom emitter that increments our counter
1265        let storage = Arc::new(ShardedStorage::new());
1266        let clock = Arc::new(SystemClock::new());
1267        let policy = Policy::count_based(2).unwrap();
1268        let registry = SuppressionRegistry::new(storage, clock, policy);
1269
1270        let emitter_config = EmitterConfig::new(Duration::from_millis(100)).unwrap();
1271        let emitter = SummaryEmitter::new(registry.clone(), emitter_config);
1272
1273        // Start emitter with custom callback
1274        let handle = emitter.start(
1275            move |summaries| {
1276                count_clone.fetch_add(summaries.len(), Ordering::SeqCst);
1277            },
1278            false,
1279        );
1280
1281        // Emit events that will be suppressed
1282        let sig = EventSignature::simple("INFO", "test_message");
1283        for _ in 0..10 {
1284            registry.with_event_state(sig, |state, now| {
1285                state.counter.record_suppression(now);
1286            });
1287        }
1288
1289        // Wait for at least two emission intervals
1290        tokio::time::sleep(Duration::from_millis(250)).await;
1291
1292        // Check that summaries were emitted
1293        let count = emission_count.load(Ordering::SeqCst);
1294        assert!(
1295            count > 0,
1296            "Expected at least one suppression summary to be emitted, got {}",
1297            count
1298        );
1299
1300        // Graceful shutdown
1301        handle.shutdown().await.expect("shutdown failed");
1302    }
1303
1304    #[cfg(feature = "async")]
1305    #[tokio::test]
1306    async fn test_active_emission_disabled() {
1307        use crate::infrastructure::mocks::layer::MockCaptureLayer;
1308        use std::time::Duration;
1309
1310        // Create layer with active emission disabled (default)
1311        let layer = TracingRateLimitLayer::builder()
1312            .with_policy(Policy::count_based(2).unwrap())
1313            .with_summary_interval(Duration::from_millis(100))
1314            .build()
1315            .unwrap();
1316
1317        let mock = MockCaptureLayer::new();
1318        let mock_clone = mock.clone();
1319
1320        let subscriber = tracing_subscriber::registry()
1321            .with(mock)
1322            .with(tracing_subscriber::fmt::layer().with_filter(layer.clone()));
1323
1324        tracing::subscriber::with_default(subscriber, || {
1325            let sig = EventSignature::simple("INFO", "test_message");
1326            for _ in 0..10 {
1327                layer.should_allow(sig);
1328            }
1329        });
1330
1331        // Wait to ensure no emissions occur
1332        tokio::time::sleep(Duration::from_millis(250)).await;
1333
1334        // Should not have emitted any summaries
1335        let events = mock_clone.get_captured();
1336        let summary_count = events
1337            .iter()
1338            .filter(|e| e.message.contains("suppressed"))
1339            .count();
1340
1341        assert_eq!(
1342            summary_count, 0,
1343            "Should not emit summaries when active emission is disabled"
1344        );
1345
1346        // Shutdown should succeed even when emitter was never started
1347        layer.shutdown().await.expect("shutdown failed");
1348    }
1349
1350    #[cfg(feature = "async")]
1351    #[tokio::test]
1352    async fn test_shutdown_without_emission() {
1353        // Test that shutdown works when emission was never enabled
1354        let layer = TracingRateLimitLayer::new();
1355
1356        // Should not error
1357        layer
1358            .shutdown()
1359            .await
1360            .expect("shutdown should succeed when emitter not running");
1361    }
1362
1363    #[cfg(feature = "async")]
1364    #[tokio::test]
1365    async fn test_custom_summary_formatter() {
1366        use std::sync::atomic::{AtomicUsize, Ordering};
1367        use std::time::Duration;
1368
1369        // Track formatter invocations
1370        let call_count = Arc::new(AtomicUsize::new(0));
1371        let count_clone = Arc::clone(&call_count);
1372
1373        // Track data passed to formatter
1374        let last_count = Arc::new(AtomicUsize::new(0));
1375        let last_count_clone = Arc::clone(&last_count);
1376
1377        // Create layer with custom formatter
1378        let layer = TracingRateLimitLayer::builder()
1379            .with_policy(Policy::count_based(2).unwrap())
1380            .with_active_emission(true)
1381            .with_summary_interval(Duration::from_millis(100))
1382            .with_summary_formatter(Arc::new(move |summary| {
1383                count_clone.fetch_add(1, Ordering::SeqCst);
1384                last_count_clone.store(summary.count, Ordering::SeqCst);
1385                // Custom format: emit at INFO level instead of WARN
1386                tracing::info!(
1387                    sig = %summary.signature,
1388                    suppressed = summary.count,
1389                    "Custom format"
1390                );
1391            }))
1392            .build()
1393            .unwrap();
1394
1395        // Emit events that will be suppressed
1396        let sig = EventSignature::simple("INFO", "test_message");
1397        for _ in 0..10 {
1398            layer.should_allow(sig);
1399        }
1400
1401        // Wait for emission
1402        tokio::time::sleep(Duration::from_millis(250)).await;
1403
1404        // Verify custom formatter was called
1405        let calls = call_count.load(Ordering::SeqCst);
1406        assert!(calls > 0, "Custom formatter should have been called");
1407
1408        // Verify formatter received correct data
1409        let count = last_count.load(Ordering::SeqCst);
1410        assert!(
1411            count >= 8,
1412            "Expected at least 8 suppressions, got {}",
1413            count
1414        );
1415
1416        layer.shutdown().await.expect("shutdown failed");
1417    }
1418
1419    #[cfg(feature = "async")]
1420    #[tokio::test]
1421    async fn test_default_formatter_used() {
1422        use std::sync::atomic::{AtomicUsize, Ordering};
1423        use std::time::Duration;
1424
1425        let emission_count = Arc::new(AtomicUsize::new(0));
1426        let count_clone = Arc::clone(&emission_count);
1427
1428        let storage = Arc::new(ShardedStorage::new());
1429        let clock = Arc::new(SystemClock::new());
1430        let policy = Policy::count_based(2).unwrap();
1431        let registry = SuppressionRegistry::new(storage, clock, policy);
1432
1433        let emitter_config = EmitterConfig::new(Duration::from_millis(100)).unwrap();
1434        let emitter = SummaryEmitter::new(registry.clone(), emitter_config);
1435
1436        // Start without custom formatter - should use default
1437        let handle = emitter.start(
1438            move |summaries| {
1439                count_clone.fetch_add(summaries.len(), Ordering::SeqCst);
1440            },
1441            false,
1442        );
1443
1444        let sig = EventSignature::simple("INFO", "test_message");
1445        for _ in 0..10 {
1446            registry.with_event_state(sig, |state, now| {
1447                state.counter.record_suppression(now);
1448            });
1449        }
1450
1451        tokio::time::sleep(Duration::from_millis(250)).await;
1452
1453        let count = emission_count.load(Ordering::SeqCst);
1454        assert!(count > 0, "Default formatter should have emitted summaries");
1455
1456        handle.shutdown().await.expect("shutdown failed");
1457    }
1458}