tracing_throttle/infrastructure/
layer.rs

1//! Tracing integration layer.
2//!
3//! Provides a `tracing::Layer` implementation that applies rate limiting
4//! to log events.
5
6use crate::application::{
7    circuit_breaker::CircuitBreaker,
8    emitter::EmitterConfig,
9    limiter::{LimitDecision, RateLimiter},
10    metrics::Metrics,
11    ports::{Clock, Storage},
12    registry::{EventState, SuppressionRegistry},
13};
14use crate::domain::{policy::Policy, signature::EventSignature};
15use crate::infrastructure::clock::SystemClock;
16use crate::infrastructure::storage::ShardedStorage;
17use crate::infrastructure::visitor::FieldVisitor;
18
19use std::collections::{BTreeMap, BTreeSet};
20use std::sync::Arc;
21use std::time::Duration;
22use tracing::{Metadata, Subscriber};
23use tracing_subscriber::layer::Filter;
24use tracing_subscriber::registry::LookupSpan;
25use tracing_subscriber::{layer::Context, Layer};
26
27#[cfg(feature = "async")]
28use crate::application::emitter::{EmitterHandle, SummaryEmitter};
29
30#[cfg(feature = "async")]
31use crate::domain::summary::SuppressionSummary;
32
33#[cfg(feature = "async")]
34use std::sync::Mutex;
35
36/// Function type for formatting suppression summaries.
37///
38/// Takes a reference to a `SuppressionSummary` and emits it as a tracing event.
39/// The function is responsible for choosing the log level and format.
40#[cfg(feature = "async")]
41pub type SummaryFormatter = Arc<dyn Fn(&SuppressionSummary) + Send + Sync + 'static>;
42
43/// Error returned when building a TracingRateLimitLayer fails.
44#[derive(Debug, Clone, PartialEq, Eq)]
45pub enum BuildError {
46    /// Maximum signatures must be greater than zero
47    ZeroMaxSignatures,
48    /// Emitter configuration validation failed
49    EmitterConfig(crate::application::emitter::EmitterConfigError),
50}
51
52impl std::fmt::Display for BuildError {
53    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
54        match self {
55            BuildError::ZeroMaxSignatures => {
56                write!(f, "max_signatures must be greater than 0")
57            }
58            BuildError::EmitterConfig(e) => {
59                write!(f, "emitter configuration error: {}", e)
60            }
61        }
62    }
63}
64
65impl std::error::Error for BuildError {}
66
67impl From<crate::application::emitter::EmitterConfigError> for BuildError {
68    fn from(e: crate::application::emitter::EmitterConfigError) -> Self {
69        BuildError::EmitterConfig(e)
70    }
71}
72
73/// Builder for constructing a `TracingRateLimitLayer`.
74pub struct TracingRateLimitLayerBuilder {
75    policy: Policy,
76    summary_interval: Duration,
77    clock: Option<Arc<dyn Clock>>,
78    max_signatures: Option<usize>,
79    enable_active_emission: bool,
80    #[cfg(feature = "async")]
81    summary_formatter: Option<SummaryFormatter>,
82    span_context_fields: Vec<String>,
83    event_fields: Vec<String>,
84}
85
86impl TracingRateLimitLayerBuilder {
87    /// Set the rate limiting policy.
88    pub fn with_policy(mut self, policy: Policy) -> Self {
89        self.policy = policy;
90        self
91    }
92
93    /// Set the summary emission interval.
94    ///
95    /// The interval will be validated when `build()` is called.
96    pub fn with_summary_interval(mut self, interval: Duration) -> Self {
97        self.summary_interval = interval;
98        self
99    }
100
101    /// Set a custom clock (mainly for testing).
102    pub fn with_clock(mut self, clock: Arc<dyn Clock>) -> Self {
103        self.clock = Some(clock);
104        self
105    }
106
107    /// Set the maximum number of unique event signatures to track.
108    ///
109    /// When this limit is reached, the least recently used signatures will be evicted.
110    /// This prevents unbounded memory growth in applications with high signature cardinality.
111    ///
112    /// Default: 10,000 signatures
113    ///
114    /// The value will be validated when `build()` is called.
115    pub fn with_max_signatures(mut self, max_signatures: usize) -> Self {
116        self.max_signatures = Some(max_signatures);
117        self
118    }
119
120    /// Disable the signature limit, allowing unbounded growth.
121    ///
122    /// **Warning**: This can lead to unbounded memory usage in applications that generate
123    /// many unique event signatures. Only use this if you're certain your application has
124    /// bounded signature cardinality or you have external memory monitoring.
125    pub fn with_unlimited_signatures(mut self) -> Self {
126        self.max_signatures = None;
127        self
128    }
129
130    /// Enable active emission of suppression summaries.
131    ///
132    /// When enabled, the layer will automatically emit `WARN`-level tracing events
133    /// containing summaries of suppressed log events at the configured interval.
134    ///
135    /// **Requires the `async` feature** - this method has no effect without it.
136    ///
137    /// Default: disabled
138    ///
139    /// # Example
140    ///
141    /// ```no_run
142    /// # use tracing_throttle::TracingRateLimitLayer;
143    /// # use std::time::Duration;
144    /// let layer = TracingRateLimitLayer::builder()
145    ///     .with_active_emission(true)
146    ///     .with_summary_interval(Duration::from_secs(60))
147    ///     .build()
148    ///     .unwrap();
149    /// ```
150    pub fn with_active_emission(mut self, enabled: bool) -> Self {
151        self.enable_active_emission = enabled;
152        self
153    }
154
155    /// Set a custom formatter for suppression summaries.
156    ///
157    /// The formatter is responsible for emitting summaries as tracing events.
158    /// This allows full control over log level, message format, and structured fields.
159    ///
160    /// **Requires the `async` feature.**
161    ///
162    /// If not set, a default formatter is used that emits at WARN level with
163    /// `signature` and `count` fields.
164    ///
165    /// # Example
166    ///
167    /// ```no_run
168    /// # use tracing_throttle::TracingRateLimitLayer;
169    /// # use std::sync::Arc;
170    /// # use std::time::Duration;
171    /// let layer = TracingRateLimitLayer::builder()
172    ///     .with_active_emission(true)
173    ///     .with_summary_formatter(Arc::new(|summary| {
174    ///         tracing::info!(
175    ///             signature = %summary.signature,
176    ///             count = summary.count,
177    ///             duration_secs = summary.duration.as_secs(),
178    ///             "Suppression summary"
179    ///         );
180    ///     }))
181    ///     .build()
182    ///     .unwrap();
183    /// ```
184    #[cfg(feature = "async")]
185    pub fn with_summary_formatter(mut self, formatter: SummaryFormatter) -> Self {
186        self.summary_formatter = Some(formatter);
187        self
188    }
189
190    /// Include span context fields in event signatures.
191    ///
192    /// When specified, the layer will extract these fields from the current span
193    /// context and include them in the event signature. This enables rate limiting
194    /// per-user, per-tenant, per-request, or any other span-level context.
195    ///
196    /// Duplicate field names are automatically removed, and empty field names are filtered out.
197    ///
198    /// # Example
199    ///
200    /// ```no_run
201    /// # use tracing_throttle::TracingRateLimitLayer;
202    /// // Rate limit separately per user
203    /// let layer = TracingRateLimitLayer::builder()
204    ///     .with_span_context_fields(vec!["user_id".to_string()])
205    ///     .build()
206    ///     .unwrap();
207    ///
208    /// // Rate limit per user and tenant
209    /// let layer = TracingRateLimitLayer::builder()
210    ///     .with_span_context_fields(vec!["user_id".to_string(), "tenant_id".to_string()])
211    ///     .build()
212    ///     .unwrap();
213    /// ```
214    ///
215    /// # Usage with Spans
216    ///
217    /// ```no_run
218    /// # use tracing::{info, info_span};
219    /// // Create a span with user context
220    /// let span = info_span!("request", user_id = "alice");
221    /// let _enter = span.enter();
222    ///
223    /// // These events will be rate limited separately per user
224    /// info!("Processing request");  // Limited for user "alice"
225    /// ```
226    pub fn with_span_context_fields(mut self, fields: Vec<String>) -> Self {
227        // Deduplicate and filter out empty field names
228        let unique_fields: BTreeSet<_> = fields.into_iter().filter(|f| !f.is_empty()).collect();
229        self.span_context_fields = unique_fields.into_iter().collect();
230        self
231    }
232
233    /// Include event fields in event signatures.
234    ///
235    /// When specified, the layer will extract these fields from events themselves
236    /// and include them in the event signature. This enables rate limiting per
237    /// error code, status, endpoint, or any other event-level field.
238    ///
239    /// Duplicate field names are automatically removed, and empty field names are filtered out.
240    ///
241    /// # Example
242    ///
243    /// ```no_run
244    /// # use tracing_throttle::TracingRateLimitLayer;
245    /// // Rate limit separately per error_code
246    /// let layer = TracingRateLimitLayer::builder()
247    ///     .with_event_fields(vec!["error_code".to_string()])
248    ///     .build()
249    ///     .unwrap();
250    ///
251    /// // Rate limit per status and endpoint
252    /// let layer = TracingRateLimitLayer::builder()
253    ///     .with_event_fields(vec!["status".to_string(), "endpoint".to_string()])
254    ///     .build()
255    ///     .unwrap();
256    /// ```
257    ///
258    /// # Usage with Events
259    ///
260    /// ```no_run
261    /// # use tracing::error;
262    /// // Events with different error codes are rate limited independently
263    /// error!(error_code = "AUTH_FAILED", "Authentication failed");
264    /// error!(error_code = "TIMEOUT", "Request timeout");
265    /// ```
266    pub fn with_event_fields(mut self, fields: Vec<String>) -> Self {
267        // Deduplicate and filter out empty field names
268        let unique_fields: BTreeSet<_> = fields.into_iter().filter(|f| !f.is_empty()).collect();
269        self.event_fields = unique_fields.into_iter().collect();
270        self
271    }
272
273    /// Build the layer.
274    ///
275    /// # Errors
276    /// Returns `BuildError` if the configuration is invalid.
277    pub fn build(self) -> Result<TracingRateLimitLayer, BuildError> {
278        // Validate max_signatures if set
279        if let Some(max) = self.max_signatures {
280            if max == 0 {
281                return Err(BuildError::ZeroMaxSignatures);
282            }
283        }
284
285        // Create shared metrics and circuit breaker
286        let metrics = Metrics::new();
287        let circuit_breaker = Arc::new(CircuitBreaker::new());
288
289        let clock = self.clock.unwrap_or_else(|| Arc::new(SystemClock::new()));
290        let storage = if let Some(max) = self.max_signatures {
291            Arc::new(ShardedStorage::with_max_entries(max).with_metrics(metrics.clone()))
292        } else {
293            Arc::new(ShardedStorage::new().with_metrics(metrics.clone()))
294        };
295        let registry = SuppressionRegistry::new(storage, clock, self.policy);
296        let limiter = RateLimiter::new(registry.clone(), metrics.clone(), circuit_breaker);
297
298        // Let EmitterConfig validate the interval
299        let emitter_config = EmitterConfig::new(self.summary_interval)?;
300
301        #[cfg(feature = "async")]
302        let emitter_handle = if self.enable_active_emission {
303            let emitter = SummaryEmitter::new(registry, emitter_config);
304
305            // Use custom formatter or default
306            let formatter = self.summary_formatter.unwrap_or_else(|| {
307                Arc::new(|summary: &SuppressionSummary| {
308                    tracing::warn!(
309                        signature = %summary.signature,
310                        count = summary.count,
311                        "{}",
312                        summary.format_message()
313                    );
314                })
315            });
316
317            let handle = emitter.start(
318                move |summaries| {
319                    for summary in summaries {
320                        formatter(&summary);
321                    }
322                },
323                false, // Don't emit final summaries on shutdown
324            );
325            Arc::new(Mutex::new(Some(handle)))
326        } else {
327            Arc::new(Mutex::new(None))
328        };
329
330        Ok(TracingRateLimitLayer {
331            limiter,
332            span_context_fields: Arc::new(self.span_context_fields),
333            event_fields: Arc::new(self.event_fields),
334            #[cfg(feature = "async")]
335            emitter_handle,
336            #[cfg(not(feature = "async"))]
337            _emitter_config: emitter_config,
338        })
339    }
340}
341
342/// A `tracing::Layer` that applies rate limiting to events.
343///
344/// This layer intercepts events, computes their signature, and decides
345/// whether to allow or suppress them based on the configured policy.
346///
347/// Optionally emits periodic summaries of suppressed events when active
348/// emission is enabled (requires `async` feature).
349#[derive(Clone)]
350pub struct TracingRateLimitLayer<S = Arc<ShardedStorage<EventSignature, EventState>>>
351where
352    S: Storage<EventSignature, EventState> + Clone,
353{
354    limiter: RateLimiter<S>,
355    span_context_fields: Arc<Vec<String>>,
356    event_fields: Arc<Vec<String>>,
357    #[cfg(feature = "async")]
358    emitter_handle: Arc<Mutex<Option<EmitterHandle>>>,
359    #[cfg(not(feature = "async"))]
360    _emitter_config: EmitterConfig,
361}
362
363impl<S> TracingRateLimitLayer<S>
364where
365    S: Storage<EventSignature, EventState> + Clone,
366{
367    /// Extract span context fields from the current span.
368    fn extract_span_context<Sub>(&self, cx: &Context<'_, Sub>) -> BTreeMap<String, String>
369    where
370        Sub: Subscriber + for<'lookup> LookupSpan<'lookup>,
371    {
372        if self.span_context_fields.is_empty() {
373            return BTreeMap::new();
374        }
375
376        let mut context_fields = BTreeMap::new();
377
378        if let Some(span) = cx.lookup_current() {
379            for span_ref in span.scope() {
380                let extensions = span_ref.extensions();
381
382                if let Some(stored_fields) = extensions.get::<BTreeMap<String, String>>() {
383                    for field_name in self.span_context_fields.as_ref() {
384                        if !context_fields.contains_key(field_name) {
385                            if let Some(value) = stored_fields.get(field_name) {
386                                context_fields.insert(field_name.clone(), value.clone());
387                            }
388                        }
389                    }
390                }
391
392                if context_fields.len() == self.span_context_fields.len() {
393                    break;
394                }
395            }
396        }
397
398        context_fields
399    }
400
401    /// Extract event fields from an event.
402    fn extract_event_fields(&self, event: &tracing::Event<'_>) -> BTreeMap<String, String> {
403        if self.event_fields.is_empty() {
404            return BTreeMap::new();
405        }
406
407        let mut visitor = FieldVisitor::new();
408        event.record(&mut visitor);
409        let all_fields = visitor.into_fields();
410
411        // Filter to only the configured event fields
412        self.event_fields
413            .iter()
414            .filter_map(|field_name| {
415                all_fields
416                    .get(field_name)
417                    .map(|value| (field_name.clone(), value.clone()))
418            })
419            .collect()
420    }
421
422    /// Compute event signature from tracing metadata, span context, and event fields.
423    ///
424    /// The signature includes:
425    /// - Log level (INFO, WARN, ERROR, etc.)
426    /// - Message template
427    /// - Target module path
428    /// - Span context fields (if configured)
429    /// - Event fields (if configured)
430    fn compute_signature(
431        &self,
432        metadata: &Metadata,
433        combined_fields: &BTreeMap<String, String>,
434    ) -> EventSignature {
435        let level = metadata.level().as_str();
436        let message = metadata.name();
437        let target = Some(metadata.target());
438
439        // Use combined fields (span context + event fields) in signature
440        EventSignature::new(level, message, combined_fields, target)
441    }
442
443    /// Check if an event should be allowed through.
444    pub fn should_allow(&self, signature: EventSignature) -> bool {
445        matches!(self.limiter.check_event(signature), LimitDecision::Allow)
446    }
447
448    /// Get a reference to the underlying limiter.
449    pub fn limiter(&self) -> &RateLimiter<S> {
450        &self.limiter
451    }
452
453    /// Get a reference to the metrics.
454    ///
455    /// Returns metrics about rate limiting behavior including:
456    /// - Events allowed
457    /// - Events suppressed
458    /// - Signatures evicted
459    pub fn metrics(&self) -> &Metrics {
460        self.limiter.metrics()
461    }
462
463    /// Get the current number of tracked signatures.
464    pub fn signature_count(&self) -> usize {
465        self.limiter.registry().len()
466    }
467
468    /// Get a reference to the circuit breaker.
469    ///
470    /// Use this to check the circuit breaker state and health:
471    /// - `circuit_breaker().state()` - Current circuit state
472    /// - `circuit_breaker().consecutive_failures()` - Failure count
473    pub fn circuit_breaker(&self) -> &Arc<CircuitBreaker> {
474        self.limiter.circuit_breaker()
475    }
476
477    /// Shutdown the active suppression summary emitter, if running.
478    ///
479    /// This method gracefully stops the background emission task.  If active emission
480    /// is not enabled, this method does nothing.
481    ///
482    /// **Requires the `async` feature.**
483    ///
484    /// # Errors
485    ///
486    /// Returns an error if the emitter task fails to shut down gracefully.
487    ///
488    /// # Example
489    ///
490    /// ```no_run
491    /// # use tracing_throttle::TracingRateLimitLayer;
492    /// # async fn example() {
493    /// let layer = TracingRateLimitLayer::builder()
494    ///     .with_active_emission(true)
495    ///     .build()
496    ///     .unwrap();
497    ///
498    /// // Use the layer...
499    ///
500    /// // Shutdown before dropping
501    /// layer.shutdown().await.expect("shutdown failed");
502    /// # }
503    /// ```
504    #[cfg(feature = "async")]
505    pub async fn shutdown(&self) -> Result<(), crate::application::emitter::ShutdownError> {
506        // Take the handle while holding the lock, then release the lock before awaiting
507        let handle = {
508            let mut handle_guard = self.emitter_handle.lock().unwrap();
509            handle_guard.take()
510        };
511
512        if let Some(handle) = handle {
513            handle.shutdown().await?;
514        }
515        Ok(())
516    }
517}
518
519impl TracingRateLimitLayer<Arc<ShardedStorage<EventSignature, EventState>>> {
520    /// Create a builder for configuring the layer.
521    ///
522    /// Defaults:
523    /// - Policy: token bucket (50 burst capacity, 1 token/sec refill rate)
524    /// - Max signatures: 10,000 (with LRU eviction)
525    /// - Summary interval: 30 seconds
526    /// - Active emission: disabled
527    /// - Summary formatter: default (WARN level with signature and count)
528    pub fn builder() -> TracingRateLimitLayerBuilder {
529        TracingRateLimitLayerBuilder {
530            policy: Policy::token_bucket(50.0, 1.0)
531                .expect("default policy with 50 capacity and 1/sec refill is always valid"),
532            summary_interval: Duration::from_secs(30),
533            clock: None,
534            max_signatures: Some(10_000),
535            enable_active_emission: false,
536            #[cfg(feature = "async")]
537            summary_formatter: None,
538            span_context_fields: Vec::new(),
539            event_fields: Vec::new(),
540        }
541    }
542
543    /// Create a layer with default settings.
544    ///
545    /// Equivalent to `TracingRateLimitLayer::builder().build().unwrap()`.
546    ///
547    /// Defaults:
548    /// - Policy: token bucket (50 burst capacity, 1 token/sec refill rate = 60/min)
549    /// - Max signatures: 10,000 (with LRU eviction)
550    /// - Summary interval: 30 seconds
551    ///
552    /// # Panics
553    /// This method cannot panic because all default values are valid.
554    pub fn new() -> Self {
555        Self::builder()
556            .build()
557            .expect("default configuration is always valid")
558    }
559
560    /// Create a layer with custom storage backend.
561    ///
562    /// This allows using alternative storage implementations like Redis for distributed
563    /// rate limiting across multiple application instances.
564    ///
565    /// # Arguments
566    ///
567    /// * `storage` - Custom storage implementation (must implement `Storage<EventSignature, EventState>`)
568    /// * `policy` - Rate limiting policy to apply
569    /// * `clock` - Clock implementation (use `SystemClock::new()` for production)
570    ///
571    /// # Example with Redis
572    ///
573    /// ```rust,ignore
574    /// use tracing_throttle::{TracingRateLimitLayer, RedisStorage, Policy, SystemClock};
575    /// use std::sync::Arc;
576    ///
577    /// #[tokio::main]
578    /// async fn main() {
579    ///     let storage = Arc::new(
580    ///         RedisStorage::connect("redis://127.0.0.1/")
581    ///             .await
582    ///             .expect("Failed to connect")
583    ///     );
584    ///     let policy = Policy::token_bucket(100.0, 10.0).unwrap();
585    ///     let clock = Arc::new(SystemClock::new());
586    ///
587    ///     let layer = TracingRateLimitLayer::with_storage(storage, policy, clock);
588    /// }
589    /// ```
590    pub fn with_storage<ST>(
591        storage: ST,
592        policy: Policy,
593        clock: Arc<dyn Clock>,
594    ) -> TracingRateLimitLayer<ST>
595    where
596        ST: Storage<EventSignature, EventState> + Clone,
597    {
598        let metrics = Metrics::new();
599        let circuit_breaker = Arc::new(CircuitBreaker::new());
600        let registry = SuppressionRegistry::new(storage, clock, policy);
601        let limiter = RateLimiter::new(registry, metrics, circuit_breaker);
602
603        TracingRateLimitLayer {
604            limiter,
605            span_context_fields: Arc::new(Vec::new()),
606            event_fields: Arc::new(Vec::new()),
607            #[cfg(feature = "async")]
608            emitter_handle: Arc::new(Mutex::new(None)),
609            #[cfg(not(feature = "async"))]
610            _emitter_config: EmitterConfig::new(Duration::from_secs(30))
611                .expect("30 seconds is valid"),
612        }
613    }
614}
615
616impl Default for TracingRateLimitLayer<Arc<ShardedStorage<EventSignature, EventState>>> {
617    fn default() -> Self {
618        Self::new()
619    }
620}
621
622// Implement the Filter trait for rate limiting
623impl<S, Sub> Filter<Sub> for TracingRateLimitLayer<S>
624where
625    S: Storage<EventSignature, EventState> + Clone,
626    Sub: Subscriber + for<'lookup> LookupSpan<'lookup>,
627{
628    fn enabled(&self, _meta: &Metadata<'_>, _cx: &Context<'_, Sub>) -> bool {
629        // Always return true - actual filtering happens in event_enabled
630        // This prevents double-checking in dual-layer setups
631        true
632    }
633
634    fn event_enabled(&self, event: &tracing::Event<'_>, cx: &Context<'_, Sub>) -> bool {
635        // Combine span context and event fields
636        let mut combined_fields = self.extract_span_context(cx);
637        let event_fields = self.extract_event_fields(event);
638        combined_fields.extend(event_fields);
639
640        let signature = self.compute_signature(event.metadata(), &combined_fields);
641        self.should_allow(signature)
642    }
643}
644
645impl<S, Sub> Layer<Sub> for TracingRateLimitLayer<S>
646where
647    S: Storage<EventSignature, EventState> + Clone + 'static,
648    Sub: Subscriber + for<'lookup> LookupSpan<'lookup>,
649{
650    fn on_new_span(
651        &self,
652        attrs: &tracing::span::Attributes<'_>,
653        id: &tracing::span::Id,
654        ctx: Context<'_, Sub>,
655    ) {
656        if self.span_context_fields.is_empty() {
657            return;
658        }
659
660        let mut visitor = FieldVisitor::new();
661        attrs.record(&mut visitor);
662        let fields = visitor.into_fields();
663
664        if let Some(span) = ctx.span(id) {
665            let mut extensions = span.extensions_mut();
666            extensions.insert(fields);
667        }
668    }
669}
670
671#[cfg(test)]
672mod tests {
673    use super::*;
674    use tracing::info;
675    use tracing_subscriber::layer::SubscriberExt;
676
677    #[test]
678    fn test_layer_builder() {
679        let layer = TracingRateLimitLayer::builder()
680            .with_policy(Policy::count_based(50).unwrap())
681            .with_summary_interval(Duration::from_secs(60))
682            .build()
683            .unwrap();
684
685        assert!(layer.limiter().registry().is_empty());
686    }
687
688    #[test]
689    fn test_span_context_fields_deduplication() {
690        let layer = TracingRateLimitLayer::builder()
691            .with_span_context_fields(vec![
692                "user_id".to_string(),
693                "user_id".to_string(), // duplicate
694                "tenant_id".to_string(),
695                "".to_string(),        // empty, should be filtered
696                "user_id".to_string(), // another duplicate
697            ])
698            .build()
699            .unwrap();
700
701        // Should only have 2 unique fields: user_id and tenant_id
702        assert_eq!(layer.span_context_fields.len(), 2);
703        assert!(layer.span_context_fields.iter().any(|f| f == "user_id"));
704        assert!(layer.span_context_fields.iter().any(|f| f == "tenant_id"));
705    }
706
707    #[test]
708    fn test_event_fields_deduplication() {
709        let layer = TracingRateLimitLayer::builder()
710            .with_event_fields(vec![
711                "error_code".to_string(),
712                "error_code".to_string(), // duplicate
713                "status".to_string(),
714                "".to_string(),           // empty, should be filtered
715                "error_code".to_string(), // another duplicate
716            ])
717            .build()
718            .unwrap();
719
720        // Should only have 2 unique fields: error_code and status
721        assert_eq!(layer.event_fields.len(), 2);
722        assert!(layer.event_fields.iter().any(|f| f == "error_code"));
723        assert!(layer.event_fields.iter().any(|f| f == "status"));
724    }
725
726    #[test]
727    fn test_layer_default() {
728        let layer = TracingRateLimitLayer::default();
729        assert!(layer.limiter().registry().is_empty());
730    }
731
732    #[test]
733    fn test_signature_computation() {
734        let _layer = TracingRateLimitLayer::new();
735
736        // Use a simple signature test without metadata construction
737        let sig1 = EventSignature::simple("INFO", "test_event");
738        let sig2 = EventSignature::simple("INFO", "test_event");
739
740        // Same inputs should produce same signature
741        assert_eq!(sig1, sig2);
742    }
743
744    #[test]
745    fn test_basic_rate_limiting() {
746        let layer = TracingRateLimitLayer::builder()
747            .with_policy(Policy::count_based(2).unwrap())
748            .build()
749            .unwrap();
750
751        let sig = EventSignature::simple("INFO", "test_message");
752
753        // First two should be allowed
754        assert!(layer.should_allow(sig));
755        assert!(layer.should_allow(sig));
756
757        // Third should be suppressed
758        assert!(!layer.should_allow(sig));
759    }
760
761    #[test]
762    fn test_layer_integration() {
763        let layer = TracingRateLimitLayer::builder()
764            .with_policy(Policy::count_based(3).unwrap())
765            .build()
766            .unwrap();
767
768        // Clone for use in subscriber, keep original for checking state
769        let layer_for_check = layer.clone();
770
771        let subscriber = tracing_subscriber::registry()
772            .with(tracing_subscriber::fmt::layer().with_filter(layer));
773
774        // Test that the layer correctly tracks event signatures
775        tracing::subscriber::with_default(subscriber, || {
776            // Emit 10 identical events
777            for _ in 0..10 {
778                info!("test event");
779            }
780        });
781
782        // After emitting 10 events with the same signature, the layer should have
783        // tracked them and only the first 3 should have been marked as allowed
784        // The registry should contain one entry for this signature
785        assert_eq!(layer_for_check.limiter().registry().len(), 1);
786    }
787
788    #[test]
789    fn test_layer_suppression_logic() {
790        let layer = TracingRateLimitLayer::builder()
791            .with_policy(Policy::count_based(3).unwrap())
792            .build()
793            .unwrap();
794
795        let sig = EventSignature::simple("INFO", "test");
796
797        // Verify the suppression logic works correctly
798        let mut allowed_count = 0;
799        for _ in 0..10 {
800            if layer.should_allow(sig) {
801                allowed_count += 1;
802            }
803        }
804
805        assert_eq!(allowed_count, 3);
806    }
807
808    #[test]
809    fn test_builder_zero_summary_interval() {
810        let result = TracingRateLimitLayer::builder()
811            .with_summary_interval(Duration::from_secs(0))
812            .build();
813
814        assert!(matches!(
815            result,
816            Err(BuildError::EmitterConfig(
817                crate::application::emitter::EmitterConfigError::ZeroSummaryInterval
818            ))
819        ));
820    }
821
822    #[test]
823    fn test_builder_zero_max_signatures() {
824        let result = TracingRateLimitLayer::builder()
825            .with_max_signatures(0)
826            .build();
827
828        assert!(matches!(result, Err(BuildError::ZeroMaxSignatures)));
829    }
830
831    #[test]
832    fn test_builder_valid_max_signatures() {
833        let layer = TracingRateLimitLayer::builder()
834            .with_max_signatures(100)
835            .build()
836            .unwrap();
837
838        assert!(layer.limiter().registry().is_empty());
839    }
840
841    #[test]
842    fn test_metrics_tracking() {
843        let layer = TracingRateLimitLayer::builder()
844            .with_policy(Policy::count_based(2).unwrap())
845            .build()
846            .unwrap();
847
848        let sig = EventSignature::simple("INFO", "test");
849
850        // Check initial metrics
851        assert_eq!(layer.metrics().events_allowed(), 0);
852        assert_eq!(layer.metrics().events_suppressed(), 0);
853
854        // Allow first two events
855        assert!(layer.should_allow(sig));
856        assert!(layer.should_allow(sig));
857
858        // Check metrics after allowed events
859        assert_eq!(layer.metrics().events_allowed(), 2);
860        assert_eq!(layer.metrics().events_suppressed(), 0);
861
862        // Suppress third event
863        assert!(!layer.should_allow(sig));
864
865        // Check metrics after suppressed event
866        assert_eq!(layer.metrics().events_allowed(), 2);
867        assert_eq!(layer.metrics().events_suppressed(), 1);
868    }
869
870    #[test]
871    fn test_metrics_snapshot() {
872        let layer = TracingRateLimitLayer::builder()
873            .with_policy(Policy::count_based(3).unwrap())
874            .build()
875            .unwrap();
876
877        let sig = EventSignature::simple("INFO", "test");
878
879        // Generate some events
880        for _ in 0..5 {
881            layer.should_allow(sig);
882        }
883
884        // Get snapshot
885        let snapshot = layer.metrics().snapshot();
886        assert_eq!(snapshot.events_allowed, 3);
887        assert_eq!(snapshot.events_suppressed, 2);
888        assert_eq!(snapshot.total_events(), 5);
889        assert!((snapshot.suppression_rate() - 0.4).abs() < f64::EPSILON);
890    }
891
892    #[test]
893    fn test_signature_count() {
894        let layer = TracingRateLimitLayer::builder()
895            .with_policy(Policy::count_based(2).unwrap())
896            .build()
897            .unwrap();
898
899        assert_eq!(layer.signature_count(), 0);
900
901        let sig1 = EventSignature::simple("INFO", "test1");
902        let sig2 = EventSignature::simple("INFO", "test2");
903
904        layer.should_allow(sig1);
905        assert_eq!(layer.signature_count(), 1);
906
907        layer.should_allow(sig2);
908        assert_eq!(layer.signature_count(), 2);
909
910        // Same signature shouldn't increase count
911        layer.should_allow(sig1);
912        assert_eq!(layer.signature_count(), 2);
913    }
914
915    #[test]
916    fn test_metrics_with_eviction() {
917        let layer = TracingRateLimitLayer::builder()
918            .with_policy(Policy::count_based(1).unwrap())
919            .with_max_signatures(3)
920            .build()
921            .unwrap();
922
923        // Fill up to capacity
924        for i in 0..3 {
925            let sig = EventSignature::simple("INFO", &format!("test{}", i));
926            layer.should_allow(sig);
927        }
928
929        assert_eq!(layer.signature_count(), 3);
930        assert_eq!(layer.metrics().signatures_evicted(), 0);
931
932        // Add one more, which should trigger eviction
933        let sig = EventSignature::simple("INFO", "test3");
934        layer.should_allow(sig);
935
936        assert_eq!(layer.signature_count(), 3);
937        assert_eq!(layer.metrics().signatures_evicted(), 1);
938    }
939
940    #[test]
941    fn test_circuit_breaker_observability() {
942        use crate::application::circuit_breaker::CircuitState;
943
944        let layer = TracingRateLimitLayer::builder()
945            .with_policy(Policy::count_based(2).unwrap())
946            .build()
947            .unwrap();
948
949        // Check initial circuit breaker state
950        let cb = layer.circuit_breaker();
951        assert_eq!(cb.state(), CircuitState::Closed);
952        assert_eq!(cb.consecutive_failures(), 0);
953
954        // Circuit breaker should remain closed during normal operation
955        let sig = EventSignature::simple("INFO", "test");
956        layer.should_allow(sig);
957        layer.should_allow(sig);
958        layer.should_allow(sig);
959
960        assert_eq!(cb.state(), CircuitState::Closed);
961    }
962
963    #[test]
964    fn test_circuit_breaker_fail_open_integration() {
965        use crate::application::circuit_breaker::{
966            CircuitBreaker, CircuitBreakerConfig, CircuitState,
967        };
968        use std::time::Duration;
969
970        // Create a circuit breaker with low threshold for testing
971        let cb_config = CircuitBreakerConfig {
972            failure_threshold: 2,
973            recovery_timeout: Duration::from_secs(1),
974        };
975        let circuit_breaker = Arc::new(CircuitBreaker::with_config(cb_config));
976
977        // Build layer with custom circuit breaker
978        let storage = Arc::new(ShardedStorage::new());
979        let clock = Arc::new(SystemClock::new());
980        let policy = Policy::count_based(2).unwrap();
981        let registry = SuppressionRegistry::new(storage, clock, policy);
982        let metrics = Metrics::new();
983        let limiter = RateLimiter::new(registry, metrics, circuit_breaker.clone());
984
985        let layer = TracingRateLimitLayer {
986            limiter,
987            span_context_fields: Arc::new(Vec::new()),
988            event_fields: Arc::new(Vec::new()),
989            #[cfg(feature = "async")]
990            emitter_handle: Arc::new(Mutex::new(None)),
991            #[cfg(not(feature = "async"))]
992            _emitter_config: crate::application::emitter::EmitterConfig::new(Duration::from_secs(
993                30,
994            ))
995            .unwrap(),
996        };
997
998        let sig = EventSignature::simple("INFO", "test");
999
1000        // Normal operation - first 2 events allowed, third suppressed
1001        assert!(layer.should_allow(sig));
1002        assert!(layer.should_allow(sig));
1003        assert!(!layer.should_allow(sig));
1004
1005        // Circuit should still be closed
1006        assert_eq!(circuit_breaker.state(), CircuitState::Closed);
1007
1008        // Manually trigger circuit breaker failures to test fail-open
1009        circuit_breaker.record_failure();
1010        circuit_breaker.record_failure();
1011
1012        // Circuit should now be open
1013        assert_eq!(circuit_breaker.state(), CircuitState::Open);
1014
1015        // With circuit open, rate limiter should fail open (allow all events)
1016        // even though we've already hit the rate limit
1017        assert!(layer.should_allow(sig));
1018        assert!(layer.should_allow(sig));
1019        assert!(layer.should_allow(sig));
1020
1021        // Metrics should show these as allowed (fail-open behavior)
1022        let snapshot = layer.metrics().snapshot();
1023        assert!(snapshot.events_allowed >= 5); // 2 normal + 3 fail-open
1024    }
1025
1026    #[cfg(feature = "async")]
1027    #[tokio::test]
1028    async fn test_active_emission_integration() {
1029        use std::sync::atomic::{AtomicUsize, Ordering};
1030        use std::time::Duration;
1031
1032        // Use an atomic counter to track emissions
1033        let emission_count = Arc::new(AtomicUsize::new(0));
1034        let count_clone = Arc::clone(&emission_count);
1035
1036        // Create a layer with a custom emitter that increments our counter
1037        let storage = Arc::new(ShardedStorage::new());
1038        let clock = Arc::new(SystemClock::new());
1039        let policy = Policy::count_based(2).unwrap();
1040        let registry = SuppressionRegistry::new(storage, clock, policy);
1041
1042        let emitter_config = EmitterConfig::new(Duration::from_millis(100)).unwrap();
1043        let emitter = SummaryEmitter::new(registry.clone(), emitter_config);
1044
1045        // Start emitter with custom callback
1046        let handle = emitter.start(
1047            move |summaries| {
1048                count_clone.fetch_add(summaries.len(), Ordering::SeqCst);
1049            },
1050            false,
1051        );
1052
1053        // Emit events that will be suppressed
1054        let sig = EventSignature::simple("INFO", "test_message");
1055        for _ in 0..10 {
1056            registry.with_event_state(sig, |state, now| {
1057                state.counter.record_suppression(now);
1058            });
1059        }
1060
1061        // Wait for at least two emission intervals
1062        tokio::time::sleep(Duration::from_millis(250)).await;
1063
1064        // Check that summaries were emitted
1065        let count = emission_count.load(Ordering::SeqCst);
1066        assert!(
1067            count > 0,
1068            "Expected at least one suppression summary to be emitted, got {}",
1069            count
1070        );
1071
1072        // Graceful shutdown
1073        handle.shutdown().await.expect("shutdown failed");
1074    }
1075
1076    #[cfg(feature = "async")]
1077    #[tokio::test]
1078    async fn test_active_emission_disabled() {
1079        use crate::infrastructure::mocks::layer::MockCaptureLayer;
1080        use std::time::Duration;
1081
1082        // Create layer with active emission disabled (default)
1083        let layer = TracingRateLimitLayer::builder()
1084            .with_policy(Policy::count_based(2).unwrap())
1085            .with_summary_interval(Duration::from_millis(100))
1086            .build()
1087            .unwrap();
1088
1089        let mock = MockCaptureLayer::new();
1090        let mock_clone = mock.clone();
1091
1092        let subscriber = tracing_subscriber::registry()
1093            .with(mock)
1094            .with(tracing_subscriber::fmt::layer().with_filter(layer.clone()));
1095
1096        tracing::subscriber::with_default(subscriber, || {
1097            let sig = EventSignature::simple("INFO", "test_message");
1098            for _ in 0..10 {
1099                layer.should_allow(sig);
1100            }
1101        });
1102
1103        // Wait to ensure no emissions occur
1104        tokio::time::sleep(Duration::from_millis(250)).await;
1105
1106        // Should not have emitted any summaries
1107        let events = mock_clone.get_captured();
1108        let summary_count = events
1109            .iter()
1110            .filter(|e| e.message.contains("suppressed"))
1111            .count();
1112
1113        assert_eq!(
1114            summary_count, 0,
1115            "Should not emit summaries when active emission is disabled"
1116        );
1117
1118        // Shutdown should succeed even when emitter was never started
1119        layer.shutdown().await.expect("shutdown failed");
1120    }
1121
1122    #[cfg(feature = "async")]
1123    #[tokio::test]
1124    async fn test_shutdown_without_emission() {
1125        // Test that shutdown works when emission was never enabled
1126        let layer = TracingRateLimitLayer::new();
1127
1128        // Should not error
1129        layer
1130            .shutdown()
1131            .await
1132            .expect("shutdown should succeed when emitter not running");
1133    }
1134
1135    #[cfg(feature = "async")]
1136    #[tokio::test]
1137    async fn test_custom_summary_formatter() {
1138        use std::sync::atomic::{AtomicUsize, Ordering};
1139        use std::time::Duration;
1140
1141        // Track formatter invocations
1142        let call_count = Arc::new(AtomicUsize::new(0));
1143        let count_clone = Arc::clone(&call_count);
1144
1145        // Track data passed to formatter
1146        let last_count = Arc::new(AtomicUsize::new(0));
1147        let last_count_clone = Arc::clone(&last_count);
1148
1149        // Create layer with custom formatter
1150        let layer = TracingRateLimitLayer::builder()
1151            .with_policy(Policy::count_based(2).unwrap())
1152            .with_active_emission(true)
1153            .with_summary_interval(Duration::from_millis(100))
1154            .with_summary_formatter(Arc::new(move |summary| {
1155                count_clone.fetch_add(1, Ordering::SeqCst);
1156                last_count_clone.store(summary.count, Ordering::SeqCst);
1157                // Custom format: emit at INFO level instead of WARN
1158                tracing::info!(
1159                    sig = %summary.signature,
1160                    suppressed = summary.count,
1161                    "Custom format"
1162                );
1163            }))
1164            .build()
1165            .unwrap();
1166
1167        // Emit events that will be suppressed
1168        let sig = EventSignature::simple("INFO", "test_message");
1169        for _ in 0..10 {
1170            layer.should_allow(sig);
1171        }
1172
1173        // Wait for emission
1174        tokio::time::sleep(Duration::from_millis(250)).await;
1175
1176        // Verify custom formatter was called
1177        let calls = call_count.load(Ordering::SeqCst);
1178        assert!(calls > 0, "Custom formatter should have been called");
1179
1180        // Verify formatter received correct data
1181        let count = last_count.load(Ordering::SeqCst);
1182        assert!(
1183            count >= 8,
1184            "Expected at least 8 suppressions, got {}",
1185            count
1186        );
1187
1188        layer.shutdown().await.expect("shutdown failed");
1189    }
1190
1191    #[cfg(feature = "async")]
1192    #[tokio::test]
1193    async fn test_default_formatter_used() {
1194        use std::sync::atomic::{AtomicUsize, Ordering};
1195        use std::time::Duration;
1196
1197        let emission_count = Arc::new(AtomicUsize::new(0));
1198        let count_clone = Arc::clone(&emission_count);
1199
1200        let storage = Arc::new(ShardedStorage::new());
1201        let clock = Arc::new(SystemClock::new());
1202        let policy = Policy::count_based(2).unwrap();
1203        let registry = SuppressionRegistry::new(storage, clock, policy);
1204
1205        let emitter_config = EmitterConfig::new(Duration::from_millis(100)).unwrap();
1206        let emitter = SummaryEmitter::new(registry.clone(), emitter_config);
1207
1208        // Start without custom formatter - should use default
1209        let handle = emitter.start(
1210            move |summaries| {
1211                count_clone.fetch_add(summaries.len(), Ordering::SeqCst);
1212            },
1213            false,
1214        );
1215
1216        let sig = EventSignature::simple("INFO", "test_message");
1217        for _ in 0..10 {
1218            registry.with_event_state(sig, |state, now| {
1219                state.counter.record_suppression(now);
1220            });
1221        }
1222
1223        tokio::time::sleep(Duration::from_millis(250)).await;
1224
1225        let count = emission_count.load(Ordering::SeqCst);
1226        assert!(count > 0, "Default formatter should have emitted summaries");
1227
1228        handle.shutdown().await.expect("shutdown failed");
1229    }
1230}