1use crate::application::{
7 circuit_breaker::CircuitBreaker,
8 emitter::EmitterConfig,
9 limiter::{LimitDecision, RateLimiter},
10 metrics::Metrics,
11 ports::{Clock, Storage},
12 registry::{EventState, SuppressionRegistry},
13};
14use crate::domain::{policy::Policy, signature::EventSignature};
15use crate::infrastructure::clock::SystemClock;
16use crate::infrastructure::storage::ShardedStorage;
17use crate::infrastructure::visitor::FieldVisitor;
18
19use std::borrow::Cow;
20use std::collections::{BTreeMap, BTreeSet};
21use std::sync::Arc;
22use std::time::Duration;
23use tracing::{Metadata, Subscriber};
24use tracing_subscriber::layer::Filter;
25use tracing_subscriber::registry::LookupSpan;
26use tracing_subscriber::{layer::Context, Layer};
27
28#[cfg(feature = "async")]
29use crate::application::emitter::{EmitterHandle, SummaryEmitter};
30
31#[cfg(feature = "async")]
32use crate::domain::summary::SuppressionSummary;
33
34#[cfg(feature = "async")]
35use std::sync::Mutex;
36
37#[cfg(feature = "async")]
42pub type SummaryFormatter = Arc<dyn Fn(&SuppressionSummary) + Send + Sync + 'static>;
43
44#[derive(Debug, Clone, PartialEq, Eq)]
46pub enum BuildError {
47 ZeroMaxSignatures,
49 EmitterConfig(crate::application::emitter::EmitterConfigError),
51}
52
53impl std::fmt::Display for BuildError {
54 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
55 match self {
56 BuildError::ZeroMaxSignatures => {
57 write!(f, "max_signatures must be greater than 0")
58 }
59 BuildError::EmitterConfig(e) => {
60 write!(f, "emitter configuration error: {}", e)
61 }
62 }
63 }
64}
65
66impl std::error::Error for BuildError {}
67
68impl From<crate::application::emitter::EmitterConfigError> for BuildError {
69 fn from(e: crate::application::emitter::EmitterConfigError) -> Self {
70 BuildError::EmitterConfig(e)
71 }
72}
73
74pub struct TracingRateLimitLayerBuilder {
76 policy: Policy,
77 summary_interval: Duration,
78 clock: Option<Arc<dyn Clock>>,
79 max_signatures: Option<usize>,
80 enable_active_emission: bool,
81 #[cfg(feature = "async")]
82 summary_formatter: Option<SummaryFormatter>,
83 span_context_fields: Vec<String>,
84 excluded_fields: BTreeSet<String>,
85 eviction_strategy: Option<EvictionStrategy>,
86 exempt_targets: BTreeSet<String>,
87}
88
89#[derive(Clone)]
94pub enum EvictionStrategy {
95 Lru {
97 max_entries: usize,
99 },
100 Priority {
102 max_entries: usize,
104 priority_fn: crate::infrastructure::eviction::PriorityFn<EventSignature, EventState>,
106 },
107 Memory {
109 max_bytes: usize,
111 },
112 PriorityWithMemory {
114 max_entries: usize,
116 priority_fn: crate::infrastructure::eviction::PriorityFn<EventSignature, EventState>,
118 max_bytes: usize,
120 },
121}
122
123impl EvictionStrategy {
124 pub fn tracks_memory(&self) -> bool {
126 matches!(
127 self,
128 EvictionStrategy::Memory { .. } | EvictionStrategy::PriorityWithMemory { .. }
129 )
130 }
131
132 pub fn memory_limit(&self) -> Option<usize> {
134 match self {
135 EvictionStrategy::Memory { max_bytes } => Some(*max_bytes),
136 EvictionStrategy::PriorityWithMemory { max_bytes, .. } => Some(*max_bytes),
137 _ => None,
138 }
139 }
140
141 pub fn uses_priority(&self) -> bool {
143 matches!(
144 self,
145 EvictionStrategy::Priority { .. } | EvictionStrategy::PriorityWithMemory { .. }
146 )
147 }
148}
149
150impl std::fmt::Debug for EvictionStrategy {
151 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
152 match self {
153 EvictionStrategy::Lru { max_entries } => f
154 .debug_struct("Lru")
155 .field("max_entries", max_entries)
156 .finish(),
157 EvictionStrategy::Priority {
158 max_entries,
159 priority_fn: _,
160 } => f
161 .debug_struct("Priority")
162 .field("max_entries", max_entries)
163 .field("priority_fn", &"<fn>")
164 .finish(),
165 EvictionStrategy::Memory { max_bytes } => f
166 .debug_struct("Memory")
167 .field("max_bytes", max_bytes)
168 .finish(),
169 EvictionStrategy::PriorityWithMemory {
170 max_entries,
171 priority_fn: _,
172 max_bytes,
173 } => f
174 .debug_struct("PriorityWithMemory")
175 .field("max_entries", max_entries)
176 .field("priority_fn", &"<fn>")
177 .field("max_bytes", max_bytes)
178 .finish(),
179 }
180 }
181}
182
183impl TracingRateLimitLayerBuilder {
184 pub fn with_policy(mut self, policy: Policy) -> Self {
186 self.policy = policy;
187 self
188 }
189
190 pub fn with_summary_interval(mut self, interval: Duration) -> Self {
194 self.summary_interval = interval;
195 self
196 }
197
198 pub fn with_clock(mut self, clock: Arc<dyn Clock>) -> Self {
200 self.clock = Some(clock);
201 self
202 }
203
204 pub fn with_max_signatures(mut self, max_signatures: usize) -> Self {
213 self.max_signatures = Some(max_signatures);
214 self
215 }
216
217 pub fn with_unlimited_signatures(mut self) -> Self {
223 self.max_signatures = None;
224 self
225 }
226
227 pub fn with_active_emission(mut self, enabled: bool) -> Self {
248 self.enable_active_emission = enabled;
249 self
250 }
251
252 #[cfg(feature = "async")]
282 pub fn with_summary_formatter(mut self, formatter: SummaryFormatter) -> Self {
283 self.summary_formatter = Some(formatter);
284 self
285 }
286
287 pub fn with_span_context_fields(mut self, fields: Vec<String>) -> Self {
324 let unique_fields: BTreeSet<_> = fields.into_iter().filter(|f| !f.is_empty()).collect();
326 self.span_context_fields = unique_fields.into_iter().collect();
327 self
328 }
329
330 pub fn with_excluded_fields(mut self, fields: Vec<String>) -> Self {
372 let unique_fields: BTreeSet<_> = fields.into_iter().filter(|f| !f.is_empty()).collect();
374 self.excluded_fields = unique_fields;
375 self
376 }
377
378 pub fn with_exempt_targets(mut self, targets: Vec<String>) -> Self {
418 let unique_targets: BTreeSet<_> = targets.into_iter().filter(|t| !t.is_empty()).collect();
420 self.exempt_targets = unique_targets;
421 self
422 }
423
424 pub fn with_eviction_strategy(mut self, strategy: EvictionStrategy) -> Self {
464 self.eviction_strategy = Some(strategy);
465 self
466 }
467
468 pub fn build(self) -> Result<TracingRateLimitLayer, BuildError> {
473 if let Some(max) = self.max_signatures {
475 if max == 0 {
476 return Err(BuildError::ZeroMaxSignatures);
477 }
478 }
479
480 let metrics = Metrics::new();
482 let circuit_breaker = Arc::new(CircuitBreaker::new());
483
484 let clock = self.clock.unwrap_or_else(|| Arc::new(SystemClock::new()));
485 let mut storage = ShardedStorage::new().with_metrics(metrics.clone());
486
487 let eviction_policy: Option<
489 Arc<dyn crate::application::ports::EvictionPolicy<EventSignature, EventState>>,
490 > = match self.eviction_strategy {
491 Some(EvictionStrategy::Lru { max_entries }) => Some(Arc::new(
492 crate::infrastructure::eviction::LruEviction::new(max_entries),
493 )),
494 Some(EvictionStrategy::Priority {
495 max_entries,
496 priority_fn,
497 }) => Some(Arc::new(
498 crate::infrastructure::eviction::PriorityEviction::new(max_entries, priority_fn),
499 )),
500 Some(EvictionStrategy::Memory { max_bytes }) => Some(Arc::new(
501 crate::infrastructure::eviction::MemoryEviction::new(max_bytes),
502 )),
503 Some(EvictionStrategy::PriorityWithMemory {
504 max_entries,
505 priority_fn,
506 max_bytes,
507 }) => Some(Arc::new(
508 crate::infrastructure::eviction::PriorityWithMemoryEviction::new(
509 max_entries,
510 priority_fn,
511 max_bytes,
512 ),
513 )),
514 None => {
515 self.max_signatures.map(|max| {
517 Arc::new(crate::infrastructure::eviction::LruEviction::new(max))
518 as Arc<
519 dyn crate::application::ports::EvictionPolicy<
520 EventSignature,
521 EventState,
522 >,
523 >
524 })
525 }
526 };
527
528 if let Some(policy) = eviction_policy {
529 storage = storage.with_eviction_policy(policy);
530 }
531
532 let storage = Arc::new(storage);
533 let registry = SuppressionRegistry::new(storage, clock, self.policy);
534 let limiter = RateLimiter::new(registry.clone(), metrics.clone(), circuit_breaker);
535
536 let emitter_config = EmitterConfig::new(self.summary_interval)?;
538
539 #[cfg(feature = "async")]
540 let emitter_handle = if self.enable_active_emission {
541 let emitter = SummaryEmitter::new(registry, emitter_config);
542
543 let formatter = self.summary_formatter.unwrap_or_else(|| {
545 Arc::new(|summary: &SuppressionSummary| {
546 tracing::warn!(
547 signature = %summary.signature,
548 count = summary.count,
549 "{}",
550 summary.format_message()
551 );
552 })
553 });
554
555 let handle = emitter.start(
556 move |summaries| {
557 for summary in summaries {
558 formatter(&summary);
559 }
560 },
561 false, );
563 Arc::new(Mutex::new(Some(handle)))
564 } else {
565 Arc::new(Mutex::new(None))
566 };
567
568 Ok(TracingRateLimitLayer {
569 limiter,
570 span_context_fields: Arc::new(self.span_context_fields),
571 excluded_fields: Arc::new(self.excluded_fields),
572 exempt_targets: Arc::new(self.exempt_targets),
573 #[cfg(feature = "async")]
574 emitter_handle,
575 #[cfg(not(feature = "async"))]
576 _emitter_config: emitter_config,
577 })
578 }
579}
580
581#[derive(Clone)]
589pub struct TracingRateLimitLayer<S = Arc<ShardedStorage<EventSignature, EventState>>>
590where
591 S: Storage<EventSignature, EventState> + Clone,
592{
593 limiter: RateLimiter<S>,
594 span_context_fields: Arc<Vec<String>>,
595 excluded_fields: Arc<BTreeSet<String>>,
596 exempt_targets: Arc<BTreeSet<String>>,
597 #[cfg(feature = "async")]
598 emitter_handle: Arc<Mutex<Option<EmitterHandle>>>,
599 #[cfg(not(feature = "async"))]
600 _emitter_config: EmitterConfig,
601}
602
603impl<S> TracingRateLimitLayer<S>
604where
605 S: Storage<EventSignature, EventState> + Clone,
606{
607 fn extract_span_context<Sub>(
609 &self,
610 cx: &Context<'_, Sub>,
611 ) -> BTreeMap<Cow<'static, str>, Cow<'static, str>>
612 where
613 Sub: Subscriber + for<'lookup> LookupSpan<'lookup>,
614 {
615 if self.span_context_fields.is_empty() {
616 return BTreeMap::new();
617 }
618
619 let mut context_fields = BTreeMap::new();
620
621 if let Some(span) = cx.lookup_current() {
622 for span_ref in span.scope() {
623 let extensions = span_ref.extensions();
624
625 if let Some(stored_fields) =
626 extensions.get::<BTreeMap<Cow<'static, str>, Cow<'static, str>>>()
627 {
628 for field_name in self.span_context_fields.as_ref() {
629 let field_key: Cow<'static, str> = Cow::Owned(field_name.clone());
631 if let std::collections::btree_map::Entry::Vacant(e) =
632 context_fields.entry(field_key.clone())
633 {
634 if let Some(value) = stored_fields.get(&field_key) {
635 e.insert(value.clone());
636 }
637 }
638 }
639 }
640
641 if context_fields.len() == self.span_context_fields.len() {
642 break;
643 }
644 }
645 }
646
647 context_fields
648 }
649
650 fn extract_event_fields(
657 &self,
658 event: &tracing::Event<'_>,
659 ) -> BTreeMap<Cow<'static, str>, Cow<'static, str>> {
660 let mut visitor = FieldVisitor::new();
661 event.record(&mut visitor);
662 let all_fields = visitor.into_fields();
663
664 if self.excluded_fields.is_empty() {
666 all_fields
667 } else {
668 all_fields
669 .into_iter()
670 .filter(|(field_name, _)| !self.excluded_fields.contains(field_name.as_ref()))
671 .collect()
672 }
673 }
674
675 fn compute_signature(
684 &self,
685 metadata: &Metadata,
686 combined_fields: &BTreeMap<Cow<'static, str>, Cow<'static, str>>,
687 ) -> EventSignature {
688 let level = metadata.level().as_str();
689 let message = metadata.name();
690 let target = Some(metadata.target());
691
692 EventSignature::new(level, message, combined_fields, target)
694 }
695
696 pub fn should_allow(&self, signature: EventSignature) -> bool {
698 matches!(self.limiter.check_event(signature), LimitDecision::Allow)
699 }
700
701 #[cfg(feature = "human-readable")]
708 pub fn should_allow_with_metadata(
709 &self,
710 signature: EventSignature,
711 metadata: crate::domain::metadata::EventMetadata,
712 ) -> bool {
713 matches!(
714 self.limiter.check_event_with_metadata(signature, metadata),
715 LimitDecision::Allow
716 )
717 }
718
719 pub fn limiter(&self) -> &RateLimiter<S> {
721 &self.limiter
722 }
723
724 pub fn metrics(&self) -> &Metrics {
731 self.limiter.metrics()
732 }
733
734 pub fn signature_count(&self) -> usize {
736 self.limiter.registry().len()
737 }
738
739 pub fn circuit_breaker(&self) -> &Arc<CircuitBreaker> {
745 self.limiter.circuit_breaker()
746 }
747
748 #[cfg(feature = "async")]
776 pub async fn shutdown(&self) -> Result<(), crate::application::emitter::ShutdownError> {
777 let handle = {
779 let mut handle_guard = self.emitter_handle.lock().unwrap();
780 handle_guard.take()
781 };
782
783 if let Some(handle) = handle {
784 handle.shutdown().await?;
785 }
786 Ok(())
787 }
788}
789
790impl TracingRateLimitLayer<Arc<ShardedStorage<EventSignature, EventState>>> {
791 pub fn builder() -> TracingRateLimitLayerBuilder {
800 TracingRateLimitLayerBuilder {
801 policy: Policy::token_bucket(50.0, 1.0)
802 .expect("default policy with 50 capacity and 1/sec refill is always valid"),
803 summary_interval: Duration::from_secs(30),
804 clock: None,
805 max_signatures: Some(10_000),
806 enable_active_emission: false,
807 #[cfg(feature = "async")]
808 summary_formatter: None,
809 span_context_fields: Vec::new(),
810 excluded_fields: BTreeSet::new(),
811 eviction_strategy: None,
812 exempt_targets: BTreeSet::new(),
813 }
814 }
815
816 pub fn new() -> Self {
828 Self::builder()
829 .build()
830 .expect("default configuration is always valid")
831 }
832
833 pub fn with_storage<ST>(
864 storage: ST,
865 policy: Policy,
866 clock: Arc<dyn Clock>,
867 ) -> TracingRateLimitLayer<ST>
868 where
869 ST: Storage<EventSignature, EventState> + Clone,
870 {
871 let metrics = Metrics::new();
872 let circuit_breaker = Arc::new(CircuitBreaker::new());
873 let registry = SuppressionRegistry::new(storage, clock, policy);
874 let limiter = RateLimiter::new(registry, metrics, circuit_breaker);
875
876 TracingRateLimitLayer {
877 limiter,
878 span_context_fields: Arc::new(Vec::new()),
879 excluded_fields: Arc::new(BTreeSet::new()),
880 exempt_targets: Arc::new(BTreeSet::new()),
881 #[cfg(feature = "async")]
882 emitter_handle: Arc::new(Mutex::new(None)),
883 #[cfg(not(feature = "async"))]
884 _emitter_config: EmitterConfig::new(Duration::from_secs(30))
885 .expect("30 seconds is valid"),
886 }
887 }
888}
889
890impl Default for TracingRateLimitLayer<Arc<ShardedStorage<EventSignature, EventState>>> {
891 fn default() -> Self {
892 Self::new()
893 }
894}
895
896impl<S, Sub> Filter<Sub> for TracingRateLimitLayer<S>
898where
899 S: Storage<EventSignature, EventState> + Clone,
900 Sub: Subscriber + for<'lookup> LookupSpan<'lookup>,
901{
902 fn enabled(&self, _meta: &Metadata<'_>, _cx: &Context<'_, Sub>) -> bool {
903 true
906 }
907
908 fn event_enabled(&self, event: &tracing::Event<'_>, cx: &Context<'_, Sub>) -> bool {
909 let metadata_obj = event.metadata();
910
911 if !self.exempt_targets.is_empty() && self.exempt_targets.contains(metadata_obj.target()) {
914 self.limiter.metrics().record_allowed();
916 return true;
917 }
918
919 let mut combined_fields = self.extract_span_context(cx);
921 let event_fields = self.extract_event_fields(event);
922 combined_fields.extend(event_fields);
923
924 let signature = self.compute_signature(metadata_obj, &combined_fields);
925
926 #[cfg(feature = "human-readable")]
927 {
928 let mut visitor = FieldVisitor::new();
930 event.record(&mut visitor);
931 let all_fields = visitor.into_fields();
932 let message = all_fields
933 .get(&Cow::Borrowed("message"))
934 .map(|v| v.to_string())
935 .unwrap_or_else(|| event.metadata().name().to_string());
936
937 let event_metadata = crate::domain::metadata::EventMetadata::new(
939 metadata_obj.level().as_str().to_string(),
940 message,
941 metadata_obj.target().to_string(),
942 combined_fields,
943 );
944
945 self.should_allow_with_metadata(signature, event_metadata)
946 }
947
948 #[cfg(not(feature = "human-readable"))]
949 {
950 self.should_allow(signature)
951 }
952 }
953}
954
955impl<S, Sub> Layer<Sub> for TracingRateLimitLayer<S>
956where
957 S: Storage<EventSignature, EventState> + Clone + 'static,
958 Sub: Subscriber + for<'lookup> LookupSpan<'lookup>,
959{
960 fn on_new_span(
961 &self,
962 attrs: &tracing::span::Attributes<'_>,
963 id: &tracing::span::Id,
964 ctx: Context<'_, Sub>,
965 ) {
966 if self.span_context_fields.is_empty() {
967 return;
968 }
969
970 let mut visitor = FieldVisitor::new();
971 attrs.record(&mut visitor);
972 let fields = visitor.into_fields();
973
974 if let Some(span) = ctx.span(id) {
975 let mut extensions = span.extensions_mut();
976 extensions.insert(fields);
977 }
978 }
979}
980
981#[cfg(test)]
982mod tests {
983 use super::*;
984 use tracing::info;
985 use tracing_subscriber::layer::SubscriberExt;
986
987 #[test]
988 fn test_layer_builder() {
989 let layer = TracingRateLimitLayer::builder()
990 .with_policy(Policy::count_based(50).unwrap())
991 .with_summary_interval(Duration::from_secs(60))
992 .build()
993 .unwrap();
994
995 assert!(layer.limiter().registry().is_empty());
996 }
997
998 #[test]
999 fn test_span_context_fields_deduplication() {
1000 let layer = TracingRateLimitLayer::builder()
1001 .with_span_context_fields(vec![
1002 "user_id".to_string(),
1003 "user_id".to_string(), "tenant_id".to_string(),
1005 "".to_string(), "user_id".to_string(), ])
1008 .build()
1009 .unwrap();
1010
1011 assert_eq!(layer.span_context_fields.len(), 2);
1013 assert!(layer.span_context_fields.iter().any(|f| f == "user_id"));
1014 assert!(layer.span_context_fields.iter().any(|f| f == "tenant_id"));
1015 }
1016
1017 #[test]
1018 fn test_excluded_fields_deduplication() {
1019 let layer = TracingRateLimitLayer::builder()
1020 .with_excluded_fields(vec![
1021 "request_id".to_string(),
1022 "request_id".to_string(), "trace_id".to_string(),
1024 "".to_string(), "request_id".to_string(), ])
1027 .build()
1028 .unwrap();
1029
1030 assert_eq!(layer.excluded_fields.len(), 2);
1032 assert!(layer.excluded_fields.contains("request_id"));
1033 assert!(layer.excluded_fields.contains("trace_id"));
1034 }
1035
1036 #[test]
1037 fn test_exempt_targets_deduplication() {
1038 let layer = TracingRateLimitLayer::builder()
1039 .with_exempt_targets(vec![
1040 "myapp::security".to_string(),
1041 "myapp::security".to_string(), "myapp::audit".to_string(),
1043 "".to_string(), "myapp::security".to_string(), ])
1046 .build()
1047 .unwrap();
1048
1049 assert_eq!(layer.exempt_targets.len(), 2);
1051 assert!(layer.exempt_targets.contains("myapp::security"));
1052 assert!(layer.exempt_targets.contains("myapp::audit"));
1053 }
1054
1055 #[test]
1056 fn test_exempt_targets_bypass_rate_limiting() {
1057 let rate_limit = TracingRateLimitLayer::builder()
1058 .with_policy(Policy::count_based(2).unwrap())
1059 .with_exempt_targets(vec!["myapp::security".to_string()])
1060 .build()
1061 .unwrap();
1062
1063 let subscriber = tracing_subscriber::registry()
1064 .with(tracing_subscriber::fmt::layer().with_filter(rate_limit.clone()));
1065
1066 tracing::subscriber::with_default(subscriber, || {
1067 for _ in 0..3 {
1069 info!("Regular log"); }
1071
1072 for _ in 0..4 {
1074 info!(target: "myapp::security", "Security event"); }
1076 });
1077
1078 let metrics = rate_limit.metrics();
1080 assert_eq!(metrics.events_allowed(), 6); assert_eq!(metrics.events_suppressed(), 1); }
1083
1084 #[test]
1085 fn test_layer_default() {
1086 let layer = TracingRateLimitLayer::default();
1087 assert!(layer.limiter().registry().is_empty());
1088 }
1089
1090 #[test]
1091 fn test_signature_computation() {
1092 let _layer = TracingRateLimitLayer::new();
1093
1094 let sig1 = EventSignature::simple("INFO", "test_event");
1096 let sig2 = EventSignature::simple("INFO", "test_event");
1097
1098 assert_eq!(sig1, sig2);
1100 }
1101
1102 #[test]
1103 fn test_basic_rate_limiting() {
1104 let layer = TracingRateLimitLayer::builder()
1105 .with_policy(Policy::count_based(2).unwrap())
1106 .build()
1107 .unwrap();
1108
1109 let sig = EventSignature::simple("INFO", "test_message");
1110
1111 assert!(layer.should_allow(sig));
1113 assert!(layer.should_allow(sig));
1114
1115 assert!(!layer.should_allow(sig));
1117 }
1118
1119 #[test]
1120 fn test_layer_integration() {
1121 let layer = TracingRateLimitLayer::builder()
1122 .with_policy(Policy::count_based(3).unwrap())
1123 .build()
1124 .unwrap();
1125
1126 let layer_for_check = layer.clone();
1128
1129 let subscriber = tracing_subscriber::registry()
1130 .with(tracing_subscriber::fmt::layer().with_filter(layer));
1131
1132 tracing::subscriber::with_default(subscriber, || {
1134 for _ in 0..10 {
1136 info!("test event");
1137 }
1138 });
1139
1140 assert_eq!(layer_for_check.limiter().registry().len(), 1);
1144 }
1145
1146 #[test]
1147 fn test_layer_suppression_logic() {
1148 let layer = TracingRateLimitLayer::builder()
1149 .with_policy(Policy::count_based(3).unwrap())
1150 .build()
1151 .unwrap();
1152
1153 let sig = EventSignature::simple("INFO", "test");
1154
1155 let mut allowed_count = 0;
1157 for _ in 0..10 {
1158 if layer.should_allow(sig) {
1159 allowed_count += 1;
1160 }
1161 }
1162
1163 assert_eq!(allowed_count, 3);
1164 }
1165
1166 #[test]
1167 fn test_builder_zero_summary_interval() {
1168 let result = TracingRateLimitLayer::builder()
1169 .with_summary_interval(Duration::from_secs(0))
1170 .build();
1171
1172 assert!(matches!(
1173 result,
1174 Err(BuildError::EmitterConfig(
1175 crate::application::emitter::EmitterConfigError::ZeroSummaryInterval
1176 ))
1177 ));
1178 }
1179
1180 #[test]
1181 fn test_builder_zero_max_signatures() {
1182 let result = TracingRateLimitLayer::builder()
1183 .with_max_signatures(0)
1184 .build();
1185
1186 assert!(matches!(result, Err(BuildError::ZeroMaxSignatures)));
1187 }
1188
1189 #[test]
1190 fn test_builder_valid_max_signatures() {
1191 let layer = TracingRateLimitLayer::builder()
1192 .with_max_signatures(100)
1193 .build()
1194 .unwrap();
1195
1196 assert!(layer.limiter().registry().is_empty());
1197 }
1198
1199 #[test]
1200 fn test_metrics_tracking() {
1201 let layer = TracingRateLimitLayer::builder()
1202 .with_policy(Policy::count_based(2).unwrap())
1203 .build()
1204 .unwrap();
1205
1206 let sig = EventSignature::simple("INFO", "test");
1207
1208 assert_eq!(layer.metrics().events_allowed(), 0);
1210 assert_eq!(layer.metrics().events_suppressed(), 0);
1211
1212 assert!(layer.should_allow(sig));
1214 assert!(layer.should_allow(sig));
1215
1216 assert_eq!(layer.metrics().events_allowed(), 2);
1218 assert_eq!(layer.metrics().events_suppressed(), 0);
1219
1220 assert!(!layer.should_allow(sig));
1222
1223 assert_eq!(layer.metrics().events_allowed(), 2);
1225 assert_eq!(layer.metrics().events_suppressed(), 1);
1226 }
1227
1228 #[test]
1229 fn test_metrics_snapshot() {
1230 let layer = TracingRateLimitLayer::builder()
1231 .with_policy(Policy::count_based(3).unwrap())
1232 .build()
1233 .unwrap();
1234
1235 let sig = EventSignature::simple("INFO", "test");
1236
1237 for _ in 0..5 {
1239 layer.should_allow(sig);
1240 }
1241
1242 let snapshot = layer.metrics().snapshot();
1244 assert_eq!(snapshot.events_allowed, 3);
1245 assert_eq!(snapshot.events_suppressed, 2);
1246 assert_eq!(snapshot.total_events(), 5);
1247 assert!((snapshot.suppression_rate() - 0.4).abs() < f64::EPSILON);
1248 }
1249
1250 #[test]
1251 fn test_signature_count() {
1252 let layer = TracingRateLimitLayer::builder()
1253 .with_policy(Policy::count_based(2).unwrap())
1254 .build()
1255 .unwrap();
1256
1257 assert_eq!(layer.signature_count(), 0);
1258
1259 let sig1 = EventSignature::simple("INFO", "test1");
1260 let sig2 = EventSignature::simple("INFO", "test2");
1261
1262 layer.should_allow(sig1);
1263 assert_eq!(layer.signature_count(), 1);
1264
1265 layer.should_allow(sig2);
1266 assert_eq!(layer.signature_count(), 2);
1267
1268 layer.should_allow(sig1);
1270 assert_eq!(layer.signature_count(), 2);
1271 }
1272
1273 #[test]
1274 fn test_metrics_with_eviction() {
1275 let layer = TracingRateLimitLayer::builder()
1276 .with_policy(Policy::count_based(1).unwrap())
1277 .with_max_signatures(3)
1278 .build()
1279 .unwrap();
1280
1281 for i in 0..3 {
1283 let sig = EventSignature::simple("INFO", &format!("test{}", i));
1284 layer.should_allow(sig);
1285 }
1286
1287 assert_eq!(layer.signature_count(), 3);
1288 assert_eq!(layer.metrics().signatures_evicted(), 0);
1289
1290 let sig = EventSignature::simple("INFO", "test3");
1292 layer.should_allow(sig);
1293
1294 assert_eq!(layer.signature_count(), 3);
1295 assert_eq!(layer.metrics().signatures_evicted(), 1);
1296 }
1297
1298 #[test]
1299 fn test_circuit_breaker_observability() {
1300 use crate::application::circuit_breaker::CircuitState;
1301
1302 let layer = TracingRateLimitLayer::builder()
1303 .with_policy(Policy::count_based(2).unwrap())
1304 .build()
1305 .unwrap();
1306
1307 let cb = layer.circuit_breaker();
1309 assert_eq!(cb.state(), CircuitState::Closed);
1310 assert_eq!(cb.consecutive_failures(), 0);
1311
1312 let sig = EventSignature::simple("INFO", "test");
1314 layer.should_allow(sig);
1315 layer.should_allow(sig);
1316 layer.should_allow(sig);
1317
1318 assert_eq!(cb.state(), CircuitState::Closed);
1319 }
1320
1321 #[test]
1322 fn test_circuit_breaker_fail_open_integration() {
1323 use crate::application::circuit_breaker::{
1324 CircuitBreaker, CircuitBreakerConfig, CircuitState,
1325 };
1326 use std::time::Duration;
1327
1328 let cb_config = CircuitBreakerConfig {
1330 failure_threshold: 2,
1331 recovery_timeout: Duration::from_secs(1),
1332 };
1333 let circuit_breaker = Arc::new(CircuitBreaker::with_config(cb_config));
1334
1335 let storage = Arc::new(ShardedStorage::new());
1337 let clock = Arc::new(SystemClock::new());
1338 let policy = Policy::count_based(2).unwrap();
1339 let registry = SuppressionRegistry::new(storage, clock, policy);
1340 let metrics = Metrics::new();
1341 let limiter = RateLimiter::new(registry, metrics, circuit_breaker.clone());
1342
1343 let layer = TracingRateLimitLayer {
1344 limiter,
1345 span_context_fields: Arc::new(Vec::new()),
1346 excluded_fields: Arc::new(BTreeSet::new()),
1347 exempt_targets: Arc::new(BTreeSet::new()),
1348 #[cfg(feature = "async")]
1349 emitter_handle: Arc::new(Mutex::new(None)),
1350 #[cfg(not(feature = "async"))]
1351 _emitter_config: crate::application::emitter::EmitterConfig::new(Duration::from_secs(
1352 30,
1353 ))
1354 .unwrap(),
1355 };
1356
1357 let sig = EventSignature::simple("INFO", "test");
1358
1359 assert!(layer.should_allow(sig));
1361 assert!(layer.should_allow(sig));
1362 assert!(!layer.should_allow(sig));
1363
1364 assert_eq!(circuit_breaker.state(), CircuitState::Closed);
1366
1367 circuit_breaker.record_failure();
1369 circuit_breaker.record_failure();
1370
1371 assert_eq!(circuit_breaker.state(), CircuitState::Open);
1373
1374 assert!(layer.should_allow(sig));
1377 assert!(layer.should_allow(sig));
1378 assert!(layer.should_allow(sig));
1379
1380 let snapshot = layer.metrics().snapshot();
1382 assert!(snapshot.events_allowed >= 5); }
1384
1385 #[cfg(feature = "async")]
1386 #[tokio::test]
1387 async fn test_active_emission_integration() {
1388 use std::sync::atomic::{AtomicUsize, Ordering};
1389 use std::time::Duration;
1390
1391 let emission_count = Arc::new(AtomicUsize::new(0));
1393 let count_clone = Arc::clone(&emission_count);
1394
1395 let storage = Arc::new(ShardedStorage::new());
1397 let clock = Arc::new(SystemClock::new());
1398 let policy = Policy::count_based(2).unwrap();
1399 let registry = SuppressionRegistry::new(storage, clock, policy);
1400
1401 let emitter_config = EmitterConfig::new(Duration::from_millis(100)).unwrap();
1402 let emitter = SummaryEmitter::new(registry.clone(), emitter_config);
1403
1404 let handle = emitter.start(
1406 move |summaries| {
1407 count_clone.fetch_add(summaries.len(), Ordering::SeqCst);
1408 },
1409 false,
1410 );
1411
1412 let sig = EventSignature::simple("INFO", "test_message");
1414 for _ in 0..10 {
1415 registry.with_event_state(sig, |state, now| {
1416 state.counter.record_suppression(now);
1417 });
1418 }
1419
1420 tokio::time::sleep(Duration::from_millis(250)).await;
1422
1423 let count = emission_count.load(Ordering::SeqCst);
1425 assert!(
1426 count > 0,
1427 "Expected at least one suppression summary to be emitted, got {}",
1428 count
1429 );
1430
1431 handle.shutdown().await.expect("shutdown failed");
1433 }
1434
1435 #[cfg(feature = "async")]
1436 #[tokio::test]
1437 async fn test_active_emission_disabled() {
1438 use crate::infrastructure::mocks::layer::MockCaptureLayer;
1439 use std::time::Duration;
1440
1441 let layer = TracingRateLimitLayer::builder()
1443 .with_policy(Policy::count_based(2).unwrap())
1444 .with_summary_interval(Duration::from_millis(100))
1445 .build()
1446 .unwrap();
1447
1448 let mock = MockCaptureLayer::new();
1449 let mock_clone = mock.clone();
1450
1451 let subscriber = tracing_subscriber::registry()
1452 .with(mock)
1453 .with(tracing_subscriber::fmt::layer().with_filter(layer.clone()));
1454
1455 tracing::subscriber::with_default(subscriber, || {
1456 let sig = EventSignature::simple("INFO", "test_message");
1457 for _ in 0..10 {
1458 layer.should_allow(sig);
1459 }
1460 });
1461
1462 tokio::time::sleep(Duration::from_millis(250)).await;
1464
1465 let events = mock_clone.get_captured();
1467 let summary_count = events
1468 .iter()
1469 .filter(|e| e.message.contains("suppressed"))
1470 .count();
1471
1472 assert_eq!(
1473 summary_count, 0,
1474 "Should not emit summaries when active emission is disabled"
1475 );
1476
1477 layer.shutdown().await.expect("shutdown failed");
1479 }
1480
1481 #[cfg(feature = "async")]
1482 #[tokio::test]
1483 async fn test_shutdown_without_emission() {
1484 let layer = TracingRateLimitLayer::new();
1486
1487 layer
1489 .shutdown()
1490 .await
1491 .expect("shutdown should succeed when emitter not running");
1492 }
1493
1494 #[cfg(feature = "async")]
1495 #[tokio::test]
1496 async fn test_custom_summary_formatter() {
1497 use std::sync::atomic::{AtomicUsize, Ordering};
1498 use std::time::Duration;
1499
1500 let call_count = Arc::new(AtomicUsize::new(0));
1502 let count_clone = Arc::clone(&call_count);
1503
1504 let last_count = Arc::new(AtomicUsize::new(0));
1506 let last_count_clone = Arc::clone(&last_count);
1507
1508 let layer = TracingRateLimitLayer::builder()
1510 .with_policy(Policy::count_based(2).unwrap())
1511 .with_active_emission(true)
1512 .with_summary_interval(Duration::from_millis(100))
1513 .with_summary_formatter(Arc::new(move |summary| {
1514 count_clone.fetch_add(1, Ordering::SeqCst);
1515 last_count_clone.store(summary.count, Ordering::SeqCst);
1516 tracing::info!(
1518 sig = %summary.signature,
1519 suppressed = summary.count,
1520 "Custom format"
1521 );
1522 }))
1523 .build()
1524 .unwrap();
1525
1526 let sig = EventSignature::simple("INFO", "test_message");
1528 for _ in 0..10 {
1529 layer.should_allow(sig);
1530 }
1531
1532 tokio::time::sleep(Duration::from_millis(250)).await;
1534
1535 let calls = call_count.load(Ordering::SeqCst);
1537 assert!(calls > 0, "Custom formatter should have been called");
1538
1539 let count = last_count.load(Ordering::SeqCst);
1541 assert!(
1542 count >= 8,
1543 "Expected at least 8 suppressions, got {}",
1544 count
1545 );
1546
1547 layer.shutdown().await.expect("shutdown failed");
1548 }
1549
1550 #[cfg(feature = "async")]
1551 #[tokio::test]
1552 async fn test_default_formatter_used() {
1553 use std::sync::atomic::{AtomicUsize, Ordering};
1554 use std::time::Duration;
1555
1556 let emission_count = Arc::new(AtomicUsize::new(0));
1557 let count_clone = Arc::clone(&emission_count);
1558
1559 let storage = Arc::new(ShardedStorage::new());
1560 let clock = Arc::new(SystemClock::new());
1561 let policy = Policy::count_based(2).unwrap();
1562 let registry = SuppressionRegistry::new(storage, clock, policy);
1563
1564 let emitter_config = EmitterConfig::new(Duration::from_millis(100)).unwrap();
1565 let emitter = SummaryEmitter::new(registry.clone(), emitter_config);
1566
1567 let handle = emitter.start(
1569 move |summaries| {
1570 count_clone.fetch_add(summaries.len(), Ordering::SeqCst);
1571 },
1572 false,
1573 );
1574
1575 let sig = EventSignature::simple("INFO", "test_message");
1576 for _ in 0..10 {
1577 registry.with_event_state(sig, |state, now| {
1578 state.counter.record_suppression(now);
1579 });
1580 }
1581
1582 tokio::time::sleep(Duration::from_millis(250)).await;
1583
1584 let count = emission_count.load(Ordering::SeqCst);
1585 assert!(count > 0, "Default formatter should have emitted summaries");
1586
1587 handle.shutdown().await.expect("shutdown failed");
1588 }
1589}