1use crate::application::{
7 circuit_breaker::CircuitBreaker,
8 emitter::EmitterConfig,
9 limiter::{LimitDecision, RateLimiter},
10 metrics::Metrics,
11 ports::{Clock, Storage},
12 registry::{EventState, SuppressionRegistry},
13};
14use crate::domain::{policy::Policy, signature::EventSignature};
15use crate::infrastructure::clock::SystemClock;
16use crate::infrastructure::storage::ShardedStorage;
17use crate::infrastructure::visitor::FieldVisitor;
18
19use std::collections::{BTreeMap, BTreeSet};
20use std::sync::Arc;
21use std::time::Duration;
22use tracing::{Metadata, Subscriber};
23use tracing_subscriber::layer::Filter;
24use tracing_subscriber::registry::LookupSpan;
25use tracing_subscriber::{layer::Context, Layer};
26
27#[cfg(feature = "async")]
28use crate::application::emitter::{EmitterHandle, SummaryEmitter};
29
30#[cfg(feature = "async")]
31use crate::domain::summary::SuppressionSummary;
32
33#[cfg(feature = "async")]
34use std::sync::Mutex;
35
36#[cfg(feature = "async")]
41pub type SummaryFormatter = Arc<dyn Fn(&SuppressionSummary) + Send + Sync + 'static>;
42
43#[derive(Debug, Clone, PartialEq, Eq)]
45pub enum BuildError {
46 ZeroMaxSignatures,
48 EmitterConfig(crate::application::emitter::EmitterConfigError),
50}
51
52impl std::fmt::Display for BuildError {
53 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
54 match self {
55 BuildError::ZeroMaxSignatures => {
56 write!(f, "max_signatures must be greater than 0")
57 }
58 BuildError::EmitterConfig(e) => {
59 write!(f, "emitter configuration error: {}", e)
60 }
61 }
62 }
63}
64
65impl std::error::Error for BuildError {}
66
67impl From<crate::application::emitter::EmitterConfigError> for BuildError {
68 fn from(e: crate::application::emitter::EmitterConfigError) -> Self {
69 BuildError::EmitterConfig(e)
70 }
71}
72
73pub struct TracingRateLimitLayerBuilder {
75 policy: Policy,
76 summary_interval: Duration,
77 clock: Option<Arc<dyn Clock>>,
78 max_signatures: Option<usize>,
79 enable_active_emission: bool,
80 #[cfg(feature = "async")]
81 summary_formatter: Option<SummaryFormatter>,
82 span_context_fields: Vec<String>,
83 excluded_fields: BTreeSet<String>,
84 eviction_strategy: Option<EvictionStrategy>,
85 exempt_targets: BTreeSet<String>,
86}
87
88#[derive(Clone)]
93pub enum EvictionStrategy {
94 Lru {
96 max_entries: usize,
98 },
99 Priority {
101 max_entries: usize,
103 priority_fn: crate::infrastructure::eviction::PriorityFn<EventSignature, EventState>,
105 },
106 Memory {
108 max_bytes: usize,
110 },
111 PriorityWithMemory {
113 max_entries: usize,
115 priority_fn: crate::infrastructure::eviction::PriorityFn<EventSignature, EventState>,
117 max_bytes: usize,
119 },
120}
121
122impl EvictionStrategy {
123 pub fn tracks_memory(&self) -> bool {
125 matches!(
126 self,
127 EvictionStrategy::Memory { .. } | EvictionStrategy::PriorityWithMemory { .. }
128 )
129 }
130
131 pub fn memory_limit(&self) -> Option<usize> {
133 match self {
134 EvictionStrategy::Memory { max_bytes } => Some(*max_bytes),
135 EvictionStrategy::PriorityWithMemory { max_bytes, .. } => Some(*max_bytes),
136 _ => None,
137 }
138 }
139
140 pub fn uses_priority(&self) -> bool {
142 matches!(
143 self,
144 EvictionStrategy::Priority { .. } | EvictionStrategy::PriorityWithMemory { .. }
145 )
146 }
147}
148
149impl std::fmt::Debug for EvictionStrategy {
150 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
151 match self {
152 EvictionStrategy::Lru { max_entries } => f
153 .debug_struct("Lru")
154 .field("max_entries", max_entries)
155 .finish(),
156 EvictionStrategy::Priority {
157 max_entries,
158 priority_fn: _,
159 } => f
160 .debug_struct("Priority")
161 .field("max_entries", max_entries)
162 .field("priority_fn", &"<fn>")
163 .finish(),
164 EvictionStrategy::Memory { max_bytes } => f
165 .debug_struct("Memory")
166 .field("max_bytes", max_bytes)
167 .finish(),
168 EvictionStrategy::PriorityWithMemory {
169 max_entries,
170 priority_fn: _,
171 max_bytes,
172 } => f
173 .debug_struct("PriorityWithMemory")
174 .field("max_entries", max_entries)
175 .field("priority_fn", &"<fn>")
176 .field("max_bytes", max_bytes)
177 .finish(),
178 }
179 }
180}
181
182impl TracingRateLimitLayerBuilder {
183 pub fn with_policy(mut self, policy: Policy) -> Self {
185 self.policy = policy;
186 self
187 }
188
189 pub fn with_summary_interval(mut self, interval: Duration) -> Self {
193 self.summary_interval = interval;
194 self
195 }
196
197 pub fn with_clock(mut self, clock: Arc<dyn Clock>) -> Self {
199 self.clock = Some(clock);
200 self
201 }
202
203 pub fn with_max_signatures(mut self, max_signatures: usize) -> Self {
212 self.max_signatures = Some(max_signatures);
213 self
214 }
215
216 pub fn with_unlimited_signatures(mut self) -> Self {
222 self.max_signatures = None;
223 self
224 }
225
226 pub fn with_active_emission(mut self, enabled: bool) -> Self {
247 self.enable_active_emission = enabled;
248 self
249 }
250
251 #[cfg(feature = "async")]
281 pub fn with_summary_formatter(mut self, formatter: SummaryFormatter) -> Self {
282 self.summary_formatter = Some(formatter);
283 self
284 }
285
286 pub fn with_span_context_fields(mut self, fields: Vec<String>) -> Self {
323 let unique_fields: BTreeSet<_> = fields.into_iter().filter(|f| !f.is_empty()).collect();
325 self.span_context_fields = unique_fields.into_iter().collect();
326 self
327 }
328
329 pub fn with_excluded_fields(mut self, fields: Vec<String>) -> Self {
371 let unique_fields: BTreeSet<_> = fields.into_iter().filter(|f| !f.is_empty()).collect();
373 self.excluded_fields = unique_fields;
374 self
375 }
376
377 pub fn with_exempt_targets(mut self, targets: Vec<String>) -> Self {
417 let unique_targets: BTreeSet<_> = targets.into_iter().filter(|t| !t.is_empty()).collect();
419 self.exempt_targets = unique_targets;
420 self
421 }
422
423 pub fn with_eviction_strategy(mut self, strategy: EvictionStrategy) -> Self {
463 self.eviction_strategy = Some(strategy);
464 self
465 }
466
467 pub fn build(self) -> Result<TracingRateLimitLayer, BuildError> {
472 if let Some(max) = self.max_signatures {
474 if max == 0 {
475 return Err(BuildError::ZeroMaxSignatures);
476 }
477 }
478
479 let metrics = Metrics::new();
481 let circuit_breaker = Arc::new(CircuitBreaker::new());
482
483 let clock = self.clock.unwrap_or_else(|| Arc::new(SystemClock::new()));
484 let mut storage = ShardedStorage::new().with_metrics(metrics.clone());
485
486 let eviction_policy: Option<
488 Arc<dyn crate::application::ports::EvictionPolicy<EventSignature, EventState>>,
489 > = match self.eviction_strategy {
490 Some(EvictionStrategy::Lru { max_entries }) => Some(Arc::new(
491 crate::infrastructure::eviction::LruEviction::new(max_entries),
492 )),
493 Some(EvictionStrategy::Priority {
494 max_entries,
495 priority_fn,
496 }) => Some(Arc::new(
497 crate::infrastructure::eviction::PriorityEviction::new(max_entries, priority_fn),
498 )),
499 Some(EvictionStrategy::Memory { max_bytes }) => Some(Arc::new(
500 crate::infrastructure::eviction::MemoryEviction::new(max_bytes),
501 )),
502 Some(EvictionStrategy::PriorityWithMemory {
503 max_entries,
504 priority_fn,
505 max_bytes,
506 }) => Some(Arc::new(
507 crate::infrastructure::eviction::PriorityWithMemoryEviction::new(
508 max_entries,
509 priority_fn,
510 max_bytes,
511 ),
512 )),
513 None => {
514 self.max_signatures.map(|max| {
516 Arc::new(crate::infrastructure::eviction::LruEviction::new(max))
517 as Arc<
518 dyn crate::application::ports::EvictionPolicy<
519 EventSignature,
520 EventState,
521 >,
522 >
523 })
524 }
525 };
526
527 if let Some(policy) = eviction_policy {
528 storage = storage.with_eviction_policy(policy);
529 }
530
531 let storage = Arc::new(storage);
532 let registry = SuppressionRegistry::new(storage, clock, self.policy);
533 let limiter = RateLimiter::new(registry.clone(), metrics.clone(), circuit_breaker);
534
535 let emitter_config = EmitterConfig::new(self.summary_interval)?;
537
538 #[cfg(feature = "async")]
539 let emitter_handle = if self.enable_active_emission {
540 let emitter = SummaryEmitter::new(registry, emitter_config);
541
542 let formatter = self.summary_formatter.unwrap_or_else(|| {
544 Arc::new(|summary: &SuppressionSummary| {
545 tracing::warn!(
546 signature = %summary.signature,
547 count = summary.count,
548 "{}",
549 summary.format_message()
550 );
551 })
552 });
553
554 let handle = emitter.start(
555 move |summaries| {
556 for summary in summaries {
557 formatter(&summary);
558 }
559 },
560 false, );
562 Arc::new(Mutex::new(Some(handle)))
563 } else {
564 Arc::new(Mutex::new(None))
565 };
566
567 Ok(TracingRateLimitLayer {
568 limiter,
569 span_context_fields: Arc::new(self.span_context_fields),
570 excluded_fields: Arc::new(self.excluded_fields),
571 exempt_targets: Arc::new(self.exempt_targets),
572 #[cfg(feature = "async")]
573 emitter_handle,
574 #[cfg(not(feature = "async"))]
575 _emitter_config: emitter_config,
576 })
577 }
578}
579
580#[derive(Clone)]
588pub struct TracingRateLimitLayer<S = Arc<ShardedStorage<EventSignature, EventState>>>
589where
590 S: Storage<EventSignature, EventState> + Clone,
591{
592 limiter: RateLimiter<S>,
593 span_context_fields: Arc<Vec<String>>,
594 excluded_fields: Arc<BTreeSet<String>>,
595 exempt_targets: Arc<BTreeSet<String>>,
596 #[cfg(feature = "async")]
597 emitter_handle: Arc<Mutex<Option<EmitterHandle>>>,
598 #[cfg(not(feature = "async"))]
599 _emitter_config: EmitterConfig,
600}
601
602impl<S> TracingRateLimitLayer<S>
603where
604 S: Storage<EventSignature, EventState> + Clone,
605{
606 fn extract_span_context<Sub>(&self, cx: &Context<'_, Sub>) -> BTreeMap<String, String>
608 where
609 Sub: Subscriber + for<'lookup> LookupSpan<'lookup>,
610 {
611 if self.span_context_fields.is_empty() {
612 return BTreeMap::new();
613 }
614
615 let mut context_fields = BTreeMap::new();
616
617 if let Some(span) = cx.lookup_current() {
618 for span_ref in span.scope() {
619 let extensions = span_ref.extensions();
620
621 if let Some(stored_fields) = extensions.get::<BTreeMap<String, String>>() {
622 for field_name in self.span_context_fields.as_ref() {
623 if !context_fields.contains_key(field_name) {
624 if let Some(value) = stored_fields.get(field_name) {
625 context_fields.insert(field_name.clone(), value.clone());
626 }
627 }
628 }
629 }
630
631 if context_fields.len() == self.span_context_fields.len() {
632 break;
633 }
634 }
635 }
636
637 context_fields
638 }
639
640 fn extract_event_fields(&self, event: &tracing::Event<'_>) -> BTreeMap<String, String> {
647 let mut visitor = FieldVisitor::new();
648 event.record(&mut visitor);
649 let all_fields = visitor.into_fields();
650
651 if self.excluded_fields.is_empty() {
653 all_fields
654 } else {
655 all_fields
656 .into_iter()
657 .filter(|(field_name, _)| !self.excluded_fields.contains(field_name))
658 .collect()
659 }
660 }
661
662 fn compute_signature(
671 &self,
672 metadata: &Metadata,
673 combined_fields: &BTreeMap<String, String>,
674 ) -> EventSignature {
675 let level = metadata.level().as_str();
676 let message = metadata.name();
677 let target = Some(metadata.target());
678
679 EventSignature::new(level, message, combined_fields, target)
681 }
682
683 pub fn should_allow(&self, signature: EventSignature) -> bool {
685 matches!(self.limiter.check_event(signature), LimitDecision::Allow)
686 }
687
688 #[cfg(feature = "human-readable")]
695 pub fn should_allow_with_metadata(
696 &self,
697 signature: EventSignature,
698 metadata: crate::domain::metadata::EventMetadata,
699 ) -> bool {
700 matches!(
701 self.limiter.check_event_with_metadata(signature, metadata),
702 LimitDecision::Allow
703 )
704 }
705
706 pub fn limiter(&self) -> &RateLimiter<S> {
708 &self.limiter
709 }
710
711 pub fn metrics(&self) -> &Metrics {
718 self.limiter.metrics()
719 }
720
721 pub fn signature_count(&self) -> usize {
723 self.limiter.registry().len()
724 }
725
726 pub fn circuit_breaker(&self) -> &Arc<CircuitBreaker> {
732 self.limiter.circuit_breaker()
733 }
734
735 #[cfg(feature = "async")]
763 pub async fn shutdown(&self) -> Result<(), crate::application::emitter::ShutdownError> {
764 let handle = {
766 let mut handle_guard = self.emitter_handle.lock().unwrap();
767 handle_guard.take()
768 };
769
770 if let Some(handle) = handle {
771 handle.shutdown().await?;
772 }
773 Ok(())
774 }
775}
776
777impl TracingRateLimitLayer<Arc<ShardedStorage<EventSignature, EventState>>> {
778 pub fn builder() -> TracingRateLimitLayerBuilder {
787 TracingRateLimitLayerBuilder {
788 policy: Policy::token_bucket(50.0, 1.0)
789 .expect("default policy with 50 capacity and 1/sec refill is always valid"),
790 summary_interval: Duration::from_secs(30),
791 clock: None,
792 max_signatures: Some(10_000),
793 enable_active_emission: false,
794 #[cfg(feature = "async")]
795 summary_formatter: None,
796 span_context_fields: Vec::new(),
797 excluded_fields: BTreeSet::new(),
798 eviction_strategy: None,
799 exempt_targets: BTreeSet::new(),
800 }
801 }
802
803 pub fn new() -> Self {
815 Self::builder()
816 .build()
817 .expect("default configuration is always valid")
818 }
819
820 pub fn with_storage<ST>(
851 storage: ST,
852 policy: Policy,
853 clock: Arc<dyn Clock>,
854 ) -> TracingRateLimitLayer<ST>
855 where
856 ST: Storage<EventSignature, EventState> + Clone,
857 {
858 let metrics = Metrics::new();
859 let circuit_breaker = Arc::new(CircuitBreaker::new());
860 let registry = SuppressionRegistry::new(storage, clock, policy);
861 let limiter = RateLimiter::new(registry, metrics, circuit_breaker);
862
863 TracingRateLimitLayer {
864 limiter,
865 span_context_fields: Arc::new(Vec::new()),
866 excluded_fields: Arc::new(BTreeSet::new()),
867 exempt_targets: Arc::new(BTreeSet::new()),
868 #[cfg(feature = "async")]
869 emitter_handle: Arc::new(Mutex::new(None)),
870 #[cfg(not(feature = "async"))]
871 _emitter_config: EmitterConfig::new(Duration::from_secs(30))
872 .expect("30 seconds is valid"),
873 }
874 }
875}
876
877impl Default for TracingRateLimitLayer<Arc<ShardedStorage<EventSignature, EventState>>> {
878 fn default() -> Self {
879 Self::new()
880 }
881}
882
883impl<S, Sub> Filter<Sub> for TracingRateLimitLayer<S>
885where
886 S: Storage<EventSignature, EventState> + Clone,
887 Sub: Subscriber + for<'lookup> LookupSpan<'lookup>,
888{
889 fn enabled(&self, _meta: &Metadata<'_>, _cx: &Context<'_, Sub>) -> bool {
890 true
893 }
894
895 fn event_enabled(&self, event: &tracing::Event<'_>, cx: &Context<'_, Sub>) -> bool {
896 let metadata_obj = event.metadata();
897
898 if !self.exempt_targets.is_empty() && self.exempt_targets.contains(metadata_obj.target()) {
901 self.limiter.metrics().record_allowed();
903 return true;
904 }
905
906 let mut combined_fields = self.extract_span_context(cx);
908 let event_fields = self.extract_event_fields(event);
909 combined_fields.extend(event_fields);
910
911 let signature = self.compute_signature(metadata_obj, &combined_fields);
912
913 #[cfg(feature = "human-readable")]
914 {
915 let mut visitor = FieldVisitor::new();
917 event.record(&mut visitor);
918 let all_fields = visitor.into_fields();
919 let message = all_fields
920 .get("message")
921 .cloned()
922 .unwrap_or_else(|| event.metadata().name().to_string());
923
924 let event_metadata = crate::domain::metadata::EventMetadata::new(
926 metadata_obj.level().as_str().to_string(),
927 message,
928 metadata_obj.target().to_string(),
929 combined_fields,
930 );
931
932 self.should_allow_with_metadata(signature, event_metadata)
933 }
934
935 #[cfg(not(feature = "human-readable"))]
936 {
937 self.should_allow(signature)
938 }
939 }
940}
941
942impl<S, Sub> Layer<Sub> for TracingRateLimitLayer<S>
943where
944 S: Storage<EventSignature, EventState> + Clone + 'static,
945 Sub: Subscriber + for<'lookup> LookupSpan<'lookup>,
946{
947 fn on_new_span(
948 &self,
949 attrs: &tracing::span::Attributes<'_>,
950 id: &tracing::span::Id,
951 ctx: Context<'_, Sub>,
952 ) {
953 if self.span_context_fields.is_empty() {
954 return;
955 }
956
957 let mut visitor = FieldVisitor::new();
958 attrs.record(&mut visitor);
959 let fields = visitor.into_fields();
960
961 if let Some(span) = ctx.span(id) {
962 let mut extensions = span.extensions_mut();
963 extensions.insert(fields);
964 }
965 }
966}
967
968#[cfg(test)]
969mod tests {
970 use super::*;
971 use tracing::info;
972 use tracing_subscriber::layer::SubscriberExt;
973
974 #[test]
975 fn test_layer_builder() {
976 let layer = TracingRateLimitLayer::builder()
977 .with_policy(Policy::count_based(50).unwrap())
978 .with_summary_interval(Duration::from_secs(60))
979 .build()
980 .unwrap();
981
982 assert!(layer.limiter().registry().is_empty());
983 }
984
985 #[test]
986 fn test_span_context_fields_deduplication() {
987 let layer = TracingRateLimitLayer::builder()
988 .with_span_context_fields(vec![
989 "user_id".to_string(),
990 "user_id".to_string(), "tenant_id".to_string(),
992 "".to_string(), "user_id".to_string(), ])
995 .build()
996 .unwrap();
997
998 assert_eq!(layer.span_context_fields.len(), 2);
1000 assert!(layer.span_context_fields.iter().any(|f| f == "user_id"));
1001 assert!(layer.span_context_fields.iter().any(|f| f == "tenant_id"));
1002 }
1003
1004 #[test]
1005 fn test_excluded_fields_deduplication() {
1006 let layer = TracingRateLimitLayer::builder()
1007 .with_excluded_fields(vec![
1008 "request_id".to_string(),
1009 "request_id".to_string(), "trace_id".to_string(),
1011 "".to_string(), "request_id".to_string(), ])
1014 .build()
1015 .unwrap();
1016
1017 assert_eq!(layer.excluded_fields.len(), 2);
1019 assert!(layer.excluded_fields.contains("request_id"));
1020 assert!(layer.excluded_fields.contains("trace_id"));
1021 }
1022
1023 #[test]
1024 fn test_exempt_targets_deduplication() {
1025 let layer = TracingRateLimitLayer::builder()
1026 .with_exempt_targets(vec![
1027 "myapp::security".to_string(),
1028 "myapp::security".to_string(), "myapp::audit".to_string(),
1030 "".to_string(), "myapp::security".to_string(), ])
1033 .build()
1034 .unwrap();
1035
1036 assert_eq!(layer.exempt_targets.len(), 2);
1038 assert!(layer.exempt_targets.contains("myapp::security"));
1039 assert!(layer.exempt_targets.contains("myapp::audit"));
1040 }
1041
1042 #[test]
1043 fn test_exempt_targets_bypass_rate_limiting() {
1044 let rate_limit = TracingRateLimitLayer::builder()
1045 .with_policy(Policy::count_based(2).unwrap())
1046 .with_exempt_targets(vec!["myapp::security".to_string()])
1047 .build()
1048 .unwrap();
1049
1050 let subscriber = tracing_subscriber::registry()
1051 .with(tracing_subscriber::fmt::layer().with_filter(rate_limit.clone()));
1052
1053 tracing::subscriber::with_default(subscriber, || {
1054 for _ in 0..3 {
1056 info!("Regular log"); }
1058
1059 for _ in 0..4 {
1061 info!(target: "myapp::security", "Security event"); }
1063 });
1064
1065 let metrics = rate_limit.metrics();
1067 assert_eq!(metrics.events_allowed(), 6); assert_eq!(metrics.events_suppressed(), 1); }
1070
1071 #[test]
1072 fn test_layer_default() {
1073 let layer = TracingRateLimitLayer::default();
1074 assert!(layer.limiter().registry().is_empty());
1075 }
1076
1077 #[test]
1078 fn test_signature_computation() {
1079 let _layer = TracingRateLimitLayer::new();
1080
1081 let sig1 = EventSignature::simple("INFO", "test_event");
1083 let sig2 = EventSignature::simple("INFO", "test_event");
1084
1085 assert_eq!(sig1, sig2);
1087 }
1088
1089 #[test]
1090 fn test_basic_rate_limiting() {
1091 let layer = TracingRateLimitLayer::builder()
1092 .with_policy(Policy::count_based(2).unwrap())
1093 .build()
1094 .unwrap();
1095
1096 let sig = EventSignature::simple("INFO", "test_message");
1097
1098 assert!(layer.should_allow(sig));
1100 assert!(layer.should_allow(sig));
1101
1102 assert!(!layer.should_allow(sig));
1104 }
1105
1106 #[test]
1107 fn test_layer_integration() {
1108 let layer = TracingRateLimitLayer::builder()
1109 .with_policy(Policy::count_based(3).unwrap())
1110 .build()
1111 .unwrap();
1112
1113 let layer_for_check = layer.clone();
1115
1116 let subscriber = tracing_subscriber::registry()
1117 .with(tracing_subscriber::fmt::layer().with_filter(layer));
1118
1119 tracing::subscriber::with_default(subscriber, || {
1121 for _ in 0..10 {
1123 info!("test event");
1124 }
1125 });
1126
1127 assert_eq!(layer_for_check.limiter().registry().len(), 1);
1131 }
1132
1133 #[test]
1134 fn test_layer_suppression_logic() {
1135 let layer = TracingRateLimitLayer::builder()
1136 .with_policy(Policy::count_based(3).unwrap())
1137 .build()
1138 .unwrap();
1139
1140 let sig = EventSignature::simple("INFO", "test");
1141
1142 let mut allowed_count = 0;
1144 for _ in 0..10 {
1145 if layer.should_allow(sig) {
1146 allowed_count += 1;
1147 }
1148 }
1149
1150 assert_eq!(allowed_count, 3);
1151 }
1152
1153 #[test]
1154 fn test_builder_zero_summary_interval() {
1155 let result = TracingRateLimitLayer::builder()
1156 .with_summary_interval(Duration::from_secs(0))
1157 .build();
1158
1159 assert!(matches!(
1160 result,
1161 Err(BuildError::EmitterConfig(
1162 crate::application::emitter::EmitterConfigError::ZeroSummaryInterval
1163 ))
1164 ));
1165 }
1166
1167 #[test]
1168 fn test_builder_zero_max_signatures() {
1169 let result = TracingRateLimitLayer::builder()
1170 .with_max_signatures(0)
1171 .build();
1172
1173 assert!(matches!(result, Err(BuildError::ZeroMaxSignatures)));
1174 }
1175
1176 #[test]
1177 fn test_builder_valid_max_signatures() {
1178 let layer = TracingRateLimitLayer::builder()
1179 .with_max_signatures(100)
1180 .build()
1181 .unwrap();
1182
1183 assert!(layer.limiter().registry().is_empty());
1184 }
1185
1186 #[test]
1187 fn test_metrics_tracking() {
1188 let layer = TracingRateLimitLayer::builder()
1189 .with_policy(Policy::count_based(2).unwrap())
1190 .build()
1191 .unwrap();
1192
1193 let sig = EventSignature::simple("INFO", "test");
1194
1195 assert_eq!(layer.metrics().events_allowed(), 0);
1197 assert_eq!(layer.metrics().events_suppressed(), 0);
1198
1199 assert!(layer.should_allow(sig));
1201 assert!(layer.should_allow(sig));
1202
1203 assert_eq!(layer.metrics().events_allowed(), 2);
1205 assert_eq!(layer.metrics().events_suppressed(), 0);
1206
1207 assert!(!layer.should_allow(sig));
1209
1210 assert_eq!(layer.metrics().events_allowed(), 2);
1212 assert_eq!(layer.metrics().events_suppressed(), 1);
1213 }
1214
1215 #[test]
1216 fn test_metrics_snapshot() {
1217 let layer = TracingRateLimitLayer::builder()
1218 .with_policy(Policy::count_based(3).unwrap())
1219 .build()
1220 .unwrap();
1221
1222 let sig = EventSignature::simple("INFO", "test");
1223
1224 for _ in 0..5 {
1226 layer.should_allow(sig);
1227 }
1228
1229 let snapshot = layer.metrics().snapshot();
1231 assert_eq!(snapshot.events_allowed, 3);
1232 assert_eq!(snapshot.events_suppressed, 2);
1233 assert_eq!(snapshot.total_events(), 5);
1234 assert!((snapshot.suppression_rate() - 0.4).abs() < f64::EPSILON);
1235 }
1236
1237 #[test]
1238 fn test_signature_count() {
1239 let layer = TracingRateLimitLayer::builder()
1240 .with_policy(Policy::count_based(2).unwrap())
1241 .build()
1242 .unwrap();
1243
1244 assert_eq!(layer.signature_count(), 0);
1245
1246 let sig1 = EventSignature::simple("INFO", "test1");
1247 let sig2 = EventSignature::simple("INFO", "test2");
1248
1249 layer.should_allow(sig1);
1250 assert_eq!(layer.signature_count(), 1);
1251
1252 layer.should_allow(sig2);
1253 assert_eq!(layer.signature_count(), 2);
1254
1255 layer.should_allow(sig1);
1257 assert_eq!(layer.signature_count(), 2);
1258 }
1259
1260 #[test]
1261 fn test_metrics_with_eviction() {
1262 let layer = TracingRateLimitLayer::builder()
1263 .with_policy(Policy::count_based(1).unwrap())
1264 .with_max_signatures(3)
1265 .build()
1266 .unwrap();
1267
1268 for i in 0..3 {
1270 let sig = EventSignature::simple("INFO", &format!("test{}", i));
1271 layer.should_allow(sig);
1272 }
1273
1274 assert_eq!(layer.signature_count(), 3);
1275 assert_eq!(layer.metrics().signatures_evicted(), 0);
1276
1277 let sig = EventSignature::simple("INFO", "test3");
1279 layer.should_allow(sig);
1280
1281 assert_eq!(layer.signature_count(), 3);
1282 assert_eq!(layer.metrics().signatures_evicted(), 1);
1283 }
1284
1285 #[test]
1286 fn test_circuit_breaker_observability() {
1287 use crate::application::circuit_breaker::CircuitState;
1288
1289 let layer = TracingRateLimitLayer::builder()
1290 .with_policy(Policy::count_based(2).unwrap())
1291 .build()
1292 .unwrap();
1293
1294 let cb = layer.circuit_breaker();
1296 assert_eq!(cb.state(), CircuitState::Closed);
1297 assert_eq!(cb.consecutive_failures(), 0);
1298
1299 let sig = EventSignature::simple("INFO", "test");
1301 layer.should_allow(sig);
1302 layer.should_allow(sig);
1303 layer.should_allow(sig);
1304
1305 assert_eq!(cb.state(), CircuitState::Closed);
1306 }
1307
1308 #[test]
1309 fn test_circuit_breaker_fail_open_integration() {
1310 use crate::application::circuit_breaker::{
1311 CircuitBreaker, CircuitBreakerConfig, CircuitState,
1312 };
1313 use std::time::Duration;
1314
1315 let cb_config = CircuitBreakerConfig {
1317 failure_threshold: 2,
1318 recovery_timeout: Duration::from_secs(1),
1319 };
1320 let circuit_breaker = Arc::new(CircuitBreaker::with_config(cb_config));
1321
1322 let storage = Arc::new(ShardedStorage::new());
1324 let clock = Arc::new(SystemClock::new());
1325 let policy = Policy::count_based(2).unwrap();
1326 let registry = SuppressionRegistry::new(storage, clock, policy);
1327 let metrics = Metrics::new();
1328 let limiter = RateLimiter::new(registry, metrics, circuit_breaker.clone());
1329
1330 let layer = TracingRateLimitLayer {
1331 limiter,
1332 span_context_fields: Arc::new(Vec::new()),
1333 excluded_fields: Arc::new(BTreeSet::new()),
1334 exempt_targets: Arc::new(BTreeSet::new()),
1335 #[cfg(feature = "async")]
1336 emitter_handle: Arc::new(Mutex::new(None)),
1337 #[cfg(not(feature = "async"))]
1338 _emitter_config: crate::application::emitter::EmitterConfig::new(Duration::from_secs(
1339 30,
1340 ))
1341 .unwrap(),
1342 };
1343
1344 let sig = EventSignature::simple("INFO", "test");
1345
1346 assert!(layer.should_allow(sig));
1348 assert!(layer.should_allow(sig));
1349 assert!(!layer.should_allow(sig));
1350
1351 assert_eq!(circuit_breaker.state(), CircuitState::Closed);
1353
1354 circuit_breaker.record_failure();
1356 circuit_breaker.record_failure();
1357
1358 assert_eq!(circuit_breaker.state(), CircuitState::Open);
1360
1361 assert!(layer.should_allow(sig));
1364 assert!(layer.should_allow(sig));
1365 assert!(layer.should_allow(sig));
1366
1367 let snapshot = layer.metrics().snapshot();
1369 assert!(snapshot.events_allowed >= 5); }
1371
1372 #[cfg(feature = "async")]
1373 #[tokio::test]
1374 async fn test_active_emission_integration() {
1375 use std::sync::atomic::{AtomicUsize, Ordering};
1376 use std::time::Duration;
1377
1378 let emission_count = Arc::new(AtomicUsize::new(0));
1380 let count_clone = Arc::clone(&emission_count);
1381
1382 let storage = Arc::new(ShardedStorage::new());
1384 let clock = Arc::new(SystemClock::new());
1385 let policy = Policy::count_based(2).unwrap();
1386 let registry = SuppressionRegistry::new(storage, clock, policy);
1387
1388 let emitter_config = EmitterConfig::new(Duration::from_millis(100)).unwrap();
1389 let emitter = SummaryEmitter::new(registry.clone(), emitter_config);
1390
1391 let handle = emitter.start(
1393 move |summaries| {
1394 count_clone.fetch_add(summaries.len(), Ordering::SeqCst);
1395 },
1396 false,
1397 );
1398
1399 let sig = EventSignature::simple("INFO", "test_message");
1401 for _ in 0..10 {
1402 registry.with_event_state(sig, |state, now| {
1403 state.counter.record_suppression(now);
1404 });
1405 }
1406
1407 tokio::time::sleep(Duration::from_millis(250)).await;
1409
1410 let count = emission_count.load(Ordering::SeqCst);
1412 assert!(
1413 count > 0,
1414 "Expected at least one suppression summary to be emitted, got {}",
1415 count
1416 );
1417
1418 handle.shutdown().await.expect("shutdown failed");
1420 }
1421
1422 #[cfg(feature = "async")]
1423 #[tokio::test]
1424 async fn test_active_emission_disabled() {
1425 use crate::infrastructure::mocks::layer::MockCaptureLayer;
1426 use std::time::Duration;
1427
1428 let layer = TracingRateLimitLayer::builder()
1430 .with_policy(Policy::count_based(2).unwrap())
1431 .with_summary_interval(Duration::from_millis(100))
1432 .build()
1433 .unwrap();
1434
1435 let mock = MockCaptureLayer::new();
1436 let mock_clone = mock.clone();
1437
1438 let subscriber = tracing_subscriber::registry()
1439 .with(mock)
1440 .with(tracing_subscriber::fmt::layer().with_filter(layer.clone()));
1441
1442 tracing::subscriber::with_default(subscriber, || {
1443 let sig = EventSignature::simple("INFO", "test_message");
1444 for _ in 0..10 {
1445 layer.should_allow(sig);
1446 }
1447 });
1448
1449 tokio::time::sleep(Duration::from_millis(250)).await;
1451
1452 let events = mock_clone.get_captured();
1454 let summary_count = events
1455 .iter()
1456 .filter(|e| e.message.contains("suppressed"))
1457 .count();
1458
1459 assert_eq!(
1460 summary_count, 0,
1461 "Should not emit summaries when active emission is disabled"
1462 );
1463
1464 layer.shutdown().await.expect("shutdown failed");
1466 }
1467
1468 #[cfg(feature = "async")]
1469 #[tokio::test]
1470 async fn test_shutdown_without_emission() {
1471 let layer = TracingRateLimitLayer::new();
1473
1474 layer
1476 .shutdown()
1477 .await
1478 .expect("shutdown should succeed when emitter not running");
1479 }
1480
1481 #[cfg(feature = "async")]
1482 #[tokio::test]
1483 async fn test_custom_summary_formatter() {
1484 use std::sync::atomic::{AtomicUsize, Ordering};
1485 use std::time::Duration;
1486
1487 let call_count = Arc::new(AtomicUsize::new(0));
1489 let count_clone = Arc::clone(&call_count);
1490
1491 let last_count = Arc::new(AtomicUsize::new(0));
1493 let last_count_clone = Arc::clone(&last_count);
1494
1495 let layer = TracingRateLimitLayer::builder()
1497 .with_policy(Policy::count_based(2).unwrap())
1498 .with_active_emission(true)
1499 .with_summary_interval(Duration::from_millis(100))
1500 .with_summary_formatter(Arc::new(move |summary| {
1501 count_clone.fetch_add(1, Ordering::SeqCst);
1502 last_count_clone.store(summary.count, Ordering::SeqCst);
1503 tracing::info!(
1505 sig = %summary.signature,
1506 suppressed = summary.count,
1507 "Custom format"
1508 );
1509 }))
1510 .build()
1511 .unwrap();
1512
1513 let sig = EventSignature::simple("INFO", "test_message");
1515 for _ in 0..10 {
1516 layer.should_allow(sig);
1517 }
1518
1519 tokio::time::sleep(Duration::from_millis(250)).await;
1521
1522 let calls = call_count.load(Ordering::SeqCst);
1524 assert!(calls > 0, "Custom formatter should have been called");
1525
1526 let count = last_count.load(Ordering::SeqCst);
1528 assert!(
1529 count >= 8,
1530 "Expected at least 8 suppressions, got {}",
1531 count
1532 );
1533
1534 layer.shutdown().await.expect("shutdown failed");
1535 }
1536
1537 #[cfg(feature = "async")]
1538 #[tokio::test]
1539 async fn test_default_formatter_used() {
1540 use std::sync::atomic::{AtomicUsize, Ordering};
1541 use std::time::Duration;
1542
1543 let emission_count = Arc::new(AtomicUsize::new(0));
1544 let count_clone = Arc::clone(&emission_count);
1545
1546 let storage = Arc::new(ShardedStorage::new());
1547 let clock = Arc::new(SystemClock::new());
1548 let policy = Policy::count_based(2).unwrap();
1549 let registry = SuppressionRegistry::new(storage, clock, policy);
1550
1551 let emitter_config = EmitterConfig::new(Duration::from_millis(100)).unwrap();
1552 let emitter = SummaryEmitter::new(registry.clone(), emitter_config);
1553
1554 let handle = emitter.start(
1556 move |summaries| {
1557 count_clone.fetch_add(summaries.len(), Ordering::SeqCst);
1558 },
1559 false,
1560 );
1561
1562 let sig = EventSignature::simple("INFO", "test_message");
1563 for _ in 0..10 {
1564 registry.with_event_state(sig, |state, now| {
1565 state.counter.record_suppression(now);
1566 });
1567 }
1568
1569 tokio::time::sleep(Duration::from_millis(250)).await;
1570
1571 let count = emission_count.load(Ordering::SeqCst);
1572 assert!(count > 0, "Default formatter should have emitted summaries");
1573
1574 handle.shutdown().await.expect("shutdown failed");
1575 }
1576}