1use crate::application::{
7 circuit_breaker::CircuitBreaker,
8 emitter::EmitterConfig,
9 limiter::{LimitDecision, RateLimiter},
10 metrics::Metrics,
11 ports::{Clock, Storage},
12 registry::{EventState, SuppressionRegistry},
13};
14use crate::domain::{policy::Policy, signature::EventSignature};
15use crate::infrastructure::clock::SystemClock;
16use crate::infrastructure::storage::ShardedStorage;
17use crate::infrastructure::visitor::FieldVisitor;
18
19use std::collections::{BTreeMap, BTreeSet};
20use std::sync::Arc;
21use std::time::Duration;
22use tracing::{Metadata, Subscriber};
23use tracing_subscriber::layer::Filter;
24use tracing_subscriber::registry::LookupSpan;
25use tracing_subscriber::{layer::Context, Layer};
26
27#[cfg(feature = "async")]
28use crate::application::emitter::{EmitterHandle, SummaryEmitter};
29
30#[cfg(feature = "async")]
31use crate::domain::summary::SuppressionSummary;
32
33#[cfg(feature = "async")]
34use std::sync::Mutex;
35
36#[cfg(feature = "async")]
41pub type SummaryFormatter = Arc<dyn Fn(&SuppressionSummary) + Send + Sync + 'static>;
42
43#[derive(Debug, Clone, PartialEq, Eq)]
45pub enum BuildError {
46 ZeroMaxSignatures,
48 EmitterConfig(crate::application::emitter::EmitterConfigError),
50}
51
52impl std::fmt::Display for BuildError {
53 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
54 match self {
55 BuildError::ZeroMaxSignatures => {
56 write!(f, "max_signatures must be greater than 0")
57 }
58 BuildError::EmitterConfig(e) => {
59 write!(f, "emitter configuration error: {}", e)
60 }
61 }
62 }
63}
64
65impl std::error::Error for BuildError {}
66
67impl From<crate::application::emitter::EmitterConfigError> for BuildError {
68 fn from(e: crate::application::emitter::EmitterConfigError) -> Self {
69 BuildError::EmitterConfig(e)
70 }
71}
72
73pub struct TracingRateLimitLayerBuilder {
75 policy: Policy,
76 summary_interval: Duration,
77 clock: Option<Arc<dyn Clock>>,
78 max_signatures: Option<usize>,
79 enable_active_emission: bool,
80 #[cfg(feature = "async")]
81 summary_formatter: Option<SummaryFormatter>,
82 span_context_fields: Vec<String>,
83 event_fields: Vec<String>,
84 eviction_strategy: Option<EvictionStrategy>,
85}
86
87#[derive(Clone)]
92pub enum EvictionStrategy {
93 Lru {
95 max_entries: usize,
97 },
98 Priority {
100 max_entries: usize,
102 priority_fn: crate::infrastructure::eviction::PriorityFn<EventSignature, EventState>,
104 },
105 Memory {
107 max_bytes: usize,
109 },
110 PriorityWithMemory {
112 max_entries: usize,
114 priority_fn: crate::infrastructure::eviction::PriorityFn<EventSignature, EventState>,
116 max_bytes: usize,
118 },
119}
120
121impl EvictionStrategy {
122 pub fn tracks_memory(&self) -> bool {
124 matches!(
125 self,
126 EvictionStrategy::Memory { .. } | EvictionStrategy::PriorityWithMemory { .. }
127 )
128 }
129
130 pub fn memory_limit(&self) -> Option<usize> {
132 match self {
133 EvictionStrategy::Memory { max_bytes } => Some(*max_bytes),
134 EvictionStrategy::PriorityWithMemory { max_bytes, .. } => Some(*max_bytes),
135 _ => None,
136 }
137 }
138
139 pub fn uses_priority(&self) -> bool {
141 matches!(
142 self,
143 EvictionStrategy::Priority { .. } | EvictionStrategy::PriorityWithMemory { .. }
144 )
145 }
146}
147
148impl std::fmt::Debug for EvictionStrategy {
149 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
150 match self {
151 EvictionStrategy::Lru { max_entries } => f
152 .debug_struct("Lru")
153 .field("max_entries", max_entries)
154 .finish(),
155 EvictionStrategy::Priority {
156 max_entries,
157 priority_fn: _,
158 } => f
159 .debug_struct("Priority")
160 .field("max_entries", max_entries)
161 .field("priority_fn", &"<fn>")
162 .finish(),
163 EvictionStrategy::Memory { max_bytes } => f
164 .debug_struct("Memory")
165 .field("max_bytes", max_bytes)
166 .finish(),
167 EvictionStrategy::PriorityWithMemory {
168 max_entries,
169 priority_fn: _,
170 max_bytes,
171 } => f
172 .debug_struct("PriorityWithMemory")
173 .field("max_entries", max_entries)
174 .field("priority_fn", &"<fn>")
175 .field("max_bytes", max_bytes)
176 .finish(),
177 }
178 }
179}
180
181impl TracingRateLimitLayerBuilder {
182 pub fn with_policy(mut self, policy: Policy) -> Self {
184 self.policy = policy;
185 self
186 }
187
188 pub fn with_summary_interval(mut self, interval: Duration) -> Self {
192 self.summary_interval = interval;
193 self
194 }
195
196 pub fn with_clock(mut self, clock: Arc<dyn Clock>) -> Self {
198 self.clock = Some(clock);
199 self
200 }
201
202 pub fn with_max_signatures(mut self, max_signatures: usize) -> Self {
211 self.max_signatures = Some(max_signatures);
212 self
213 }
214
215 pub fn with_unlimited_signatures(mut self) -> Self {
221 self.max_signatures = None;
222 self
223 }
224
225 pub fn with_active_emission(mut self, enabled: bool) -> Self {
246 self.enable_active_emission = enabled;
247 self
248 }
249
250 #[cfg(feature = "async")]
280 pub fn with_summary_formatter(mut self, formatter: SummaryFormatter) -> Self {
281 self.summary_formatter = Some(formatter);
282 self
283 }
284
285 pub fn with_span_context_fields(mut self, fields: Vec<String>) -> Self {
322 let unique_fields: BTreeSet<_> = fields.into_iter().filter(|f| !f.is_empty()).collect();
324 self.span_context_fields = unique_fields.into_iter().collect();
325 self
326 }
327
328 pub fn with_event_fields(mut self, fields: Vec<String>) -> Self {
362 let unique_fields: BTreeSet<_> = fields.into_iter().filter(|f| !f.is_empty()).collect();
364 self.event_fields = unique_fields.into_iter().collect();
365 self
366 }
367
368 pub fn with_eviction_strategy(mut self, strategy: EvictionStrategy) -> Self {
408 self.eviction_strategy = Some(strategy);
409 self
410 }
411
412 pub fn build(self) -> Result<TracingRateLimitLayer, BuildError> {
417 if let Some(max) = self.max_signatures {
419 if max == 0 {
420 return Err(BuildError::ZeroMaxSignatures);
421 }
422 }
423
424 let metrics = Metrics::new();
426 let circuit_breaker = Arc::new(CircuitBreaker::new());
427
428 let clock = self.clock.unwrap_or_else(|| Arc::new(SystemClock::new()));
429 let mut storage = ShardedStorage::new().with_metrics(metrics.clone());
430
431 let eviction_policy: Option<
433 Arc<dyn crate::application::ports::EvictionPolicy<EventSignature, EventState>>,
434 > = match self.eviction_strategy {
435 Some(EvictionStrategy::Lru { max_entries }) => Some(Arc::new(
436 crate::infrastructure::eviction::LruEviction::new(max_entries),
437 )),
438 Some(EvictionStrategy::Priority {
439 max_entries,
440 priority_fn,
441 }) => Some(Arc::new(
442 crate::infrastructure::eviction::PriorityEviction::new(max_entries, priority_fn),
443 )),
444 Some(EvictionStrategy::Memory { max_bytes }) => Some(Arc::new(
445 crate::infrastructure::eviction::MemoryEviction::new(max_bytes),
446 )),
447 Some(EvictionStrategy::PriorityWithMemory {
448 max_entries,
449 priority_fn,
450 max_bytes,
451 }) => Some(Arc::new(
452 crate::infrastructure::eviction::PriorityWithMemoryEviction::new(
453 max_entries,
454 priority_fn,
455 max_bytes,
456 ),
457 )),
458 None => {
459 self.max_signatures.map(|max| {
461 Arc::new(crate::infrastructure::eviction::LruEviction::new(max))
462 as Arc<
463 dyn crate::application::ports::EvictionPolicy<
464 EventSignature,
465 EventState,
466 >,
467 >
468 })
469 }
470 };
471
472 if let Some(policy) = eviction_policy {
473 storage = storage.with_eviction_policy(policy);
474 }
475
476 let storage = Arc::new(storage);
477 let registry = SuppressionRegistry::new(storage, clock, self.policy);
478 let limiter = RateLimiter::new(registry.clone(), metrics.clone(), circuit_breaker);
479
480 let emitter_config = EmitterConfig::new(self.summary_interval)?;
482
483 #[cfg(feature = "async")]
484 let emitter_handle = if self.enable_active_emission {
485 let emitter = SummaryEmitter::new(registry, emitter_config);
486
487 let formatter = self.summary_formatter.unwrap_or_else(|| {
489 Arc::new(|summary: &SuppressionSummary| {
490 tracing::warn!(
491 signature = %summary.signature,
492 count = summary.count,
493 "{}",
494 summary.format_message()
495 );
496 })
497 });
498
499 let handle = emitter.start(
500 move |summaries| {
501 for summary in summaries {
502 formatter(&summary);
503 }
504 },
505 false, );
507 Arc::new(Mutex::new(Some(handle)))
508 } else {
509 Arc::new(Mutex::new(None))
510 };
511
512 Ok(TracingRateLimitLayer {
513 limiter,
514 span_context_fields: Arc::new(self.span_context_fields),
515 event_fields: Arc::new(self.event_fields),
516 #[cfg(feature = "async")]
517 emitter_handle,
518 #[cfg(not(feature = "async"))]
519 _emitter_config: emitter_config,
520 })
521 }
522}
523
524#[derive(Clone)]
532pub struct TracingRateLimitLayer<S = Arc<ShardedStorage<EventSignature, EventState>>>
533where
534 S: Storage<EventSignature, EventState> + Clone,
535{
536 limiter: RateLimiter<S>,
537 span_context_fields: Arc<Vec<String>>,
538 event_fields: Arc<Vec<String>>,
539 #[cfg(feature = "async")]
540 emitter_handle: Arc<Mutex<Option<EmitterHandle>>>,
541 #[cfg(not(feature = "async"))]
542 _emitter_config: EmitterConfig,
543}
544
545impl<S> TracingRateLimitLayer<S>
546where
547 S: Storage<EventSignature, EventState> + Clone,
548{
549 fn extract_span_context<Sub>(&self, cx: &Context<'_, Sub>) -> BTreeMap<String, String>
551 where
552 Sub: Subscriber + for<'lookup> LookupSpan<'lookup>,
553 {
554 if self.span_context_fields.is_empty() {
555 return BTreeMap::new();
556 }
557
558 let mut context_fields = BTreeMap::new();
559
560 if let Some(span) = cx.lookup_current() {
561 for span_ref in span.scope() {
562 let extensions = span_ref.extensions();
563
564 if let Some(stored_fields) = extensions.get::<BTreeMap<String, String>>() {
565 for field_name in self.span_context_fields.as_ref() {
566 if !context_fields.contains_key(field_name) {
567 if let Some(value) = stored_fields.get(field_name) {
568 context_fields.insert(field_name.clone(), value.clone());
569 }
570 }
571 }
572 }
573
574 if context_fields.len() == self.span_context_fields.len() {
575 break;
576 }
577 }
578 }
579
580 context_fields
581 }
582
583 fn extract_event_fields(&self, event: &tracing::Event<'_>) -> BTreeMap<String, String> {
585 if self.event_fields.is_empty() {
586 return BTreeMap::new();
587 }
588
589 let mut visitor = FieldVisitor::new();
590 event.record(&mut visitor);
591 let all_fields = visitor.into_fields();
592
593 self.event_fields
595 .iter()
596 .filter_map(|field_name| {
597 all_fields
598 .get(field_name)
599 .map(|value| (field_name.clone(), value.clone()))
600 })
601 .collect()
602 }
603
604 fn compute_signature(
613 &self,
614 metadata: &Metadata,
615 combined_fields: &BTreeMap<String, String>,
616 ) -> EventSignature {
617 let level = metadata.level().as_str();
618 let message = metadata.name();
619 let target = Some(metadata.target());
620
621 EventSignature::new(level, message, combined_fields, target)
623 }
624
625 pub fn should_allow(&self, signature: EventSignature) -> bool {
627 matches!(self.limiter.check_event(signature), LimitDecision::Allow)
628 }
629
630 #[cfg(feature = "human-readable")]
637 pub fn should_allow_with_metadata(
638 &self,
639 signature: EventSignature,
640 metadata: crate::domain::metadata::EventMetadata,
641 ) -> bool {
642 matches!(
643 self.limiter.check_event_with_metadata(signature, metadata),
644 LimitDecision::Allow
645 )
646 }
647
648 pub fn limiter(&self) -> &RateLimiter<S> {
650 &self.limiter
651 }
652
653 pub fn metrics(&self) -> &Metrics {
660 self.limiter.metrics()
661 }
662
663 pub fn signature_count(&self) -> usize {
665 self.limiter.registry().len()
666 }
667
668 pub fn circuit_breaker(&self) -> &Arc<CircuitBreaker> {
674 self.limiter.circuit_breaker()
675 }
676
677 #[cfg(feature = "async")]
705 pub async fn shutdown(&self) -> Result<(), crate::application::emitter::ShutdownError> {
706 let handle = {
708 let mut handle_guard = self.emitter_handle.lock().unwrap();
709 handle_guard.take()
710 };
711
712 if let Some(handle) = handle {
713 handle.shutdown().await?;
714 }
715 Ok(())
716 }
717}
718
719impl TracingRateLimitLayer<Arc<ShardedStorage<EventSignature, EventState>>> {
720 pub fn builder() -> TracingRateLimitLayerBuilder {
729 TracingRateLimitLayerBuilder {
730 policy: Policy::token_bucket(50.0, 1.0)
731 .expect("default policy with 50 capacity and 1/sec refill is always valid"),
732 summary_interval: Duration::from_secs(30),
733 clock: None,
734 max_signatures: Some(10_000),
735 enable_active_emission: false,
736 #[cfg(feature = "async")]
737 summary_formatter: None,
738 span_context_fields: Vec::new(),
739 event_fields: Vec::new(),
740 eviction_strategy: None,
741 }
742 }
743
744 pub fn new() -> Self {
756 Self::builder()
757 .build()
758 .expect("default configuration is always valid")
759 }
760
761 pub fn with_storage<ST>(
792 storage: ST,
793 policy: Policy,
794 clock: Arc<dyn Clock>,
795 ) -> TracingRateLimitLayer<ST>
796 where
797 ST: Storage<EventSignature, EventState> + Clone,
798 {
799 let metrics = Metrics::new();
800 let circuit_breaker = Arc::new(CircuitBreaker::new());
801 let registry = SuppressionRegistry::new(storage, clock, policy);
802 let limiter = RateLimiter::new(registry, metrics, circuit_breaker);
803
804 TracingRateLimitLayer {
805 limiter,
806 span_context_fields: Arc::new(Vec::new()),
807 event_fields: Arc::new(Vec::new()),
808 #[cfg(feature = "async")]
809 emitter_handle: Arc::new(Mutex::new(None)),
810 #[cfg(not(feature = "async"))]
811 _emitter_config: EmitterConfig::new(Duration::from_secs(30))
812 .expect("30 seconds is valid"),
813 }
814 }
815}
816
817impl Default for TracingRateLimitLayer<Arc<ShardedStorage<EventSignature, EventState>>> {
818 fn default() -> Self {
819 Self::new()
820 }
821}
822
823impl<S, Sub> Filter<Sub> for TracingRateLimitLayer<S>
825where
826 S: Storage<EventSignature, EventState> + Clone,
827 Sub: Subscriber + for<'lookup> LookupSpan<'lookup>,
828{
829 fn enabled(&self, _meta: &Metadata<'_>, _cx: &Context<'_, Sub>) -> bool {
830 true
833 }
834
835 fn event_enabled(&self, event: &tracing::Event<'_>, cx: &Context<'_, Sub>) -> bool {
836 let mut combined_fields = self.extract_span_context(cx);
838 let event_fields = self.extract_event_fields(event);
839 combined_fields.extend(event_fields);
840
841 let metadata_obj = event.metadata();
842 let signature = self.compute_signature(metadata_obj, &combined_fields);
843
844 #[cfg(feature = "human-readable")]
845 {
846 let mut visitor = FieldVisitor::new();
848 event.record(&mut visitor);
849 let all_fields = visitor.into_fields();
850 let message = all_fields
851 .get("message")
852 .cloned()
853 .unwrap_or_else(|| event.metadata().name().to_string());
854
855 let event_metadata = crate::domain::metadata::EventMetadata::new(
857 metadata_obj.level().as_str().to_string(),
858 message,
859 metadata_obj.target().to_string(),
860 combined_fields,
861 );
862
863 self.should_allow_with_metadata(signature, event_metadata)
864 }
865
866 #[cfg(not(feature = "human-readable"))]
867 {
868 self.should_allow(signature)
869 }
870 }
871}
872
873impl<S, Sub> Layer<Sub> for TracingRateLimitLayer<S>
874where
875 S: Storage<EventSignature, EventState> + Clone + 'static,
876 Sub: Subscriber + for<'lookup> LookupSpan<'lookup>,
877{
878 fn on_new_span(
879 &self,
880 attrs: &tracing::span::Attributes<'_>,
881 id: &tracing::span::Id,
882 ctx: Context<'_, Sub>,
883 ) {
884 if self.span_context_fields.is_empty() {
885 return;
886 }
887
888 let mut visitor = FieldVisitor::new();
889 attrs.record(&mut visitor);
890 let fields = visitor.into_fields();
891
892 if let Some(span) = ctx.span(id) {
893 let mut extensions = span.extensions_mut();
894 extensions.insert(fields);
895 }
896 }
897}
898
899#[cfg(test)]
900mod tests {
901 use super::*;
902 use tracing::info;
903 use tracing_subscriber::layer::SubscriberExt;
904
905 #[test]
906 fn test_layer_builder() {
907 let layer = TracingRateLimitLayer::builder()
908 .with_policy(Policy::count_based(50).unwrap())
909 .with_summary_interval(Duration::from_secs(60))
910 .build()
911 .unwrap();
912
913 assert!(layer.limiter().registry().is_empty());
914 }
915
916 #[test]
917 fn test_span_context_fields_deduplication() {
918 let layer = TracingRateLimitLayer::builder()
919 .with_span_context_fields(vec![
920 "user_id".to_string(),
921 "user_id".to_string(), "tenant_id".to_string(),
923 "".to_string(), "user_id".to_string(), ])
926 .build()
927 .unwrap();
928
929 assert_eq!(layer.span_context_fields.len(), 2);
931 assert!(layer.span_context_fields.iter().any(|f| f == "user_id"));
932 assert!(layer.span_context_fields.iter().any(|f| f == "tenant_id"));
933 }
934
935 #[test]
936 fn test_event_fields_deduplication() {
937 let layer = TracingRateLimitLayer::builder()
938 .with_event_fields(vec![
939 "error_code".to_string(),
940 "error_code".to_string(), "status".to_string(),
942 "".to_string(), "error_code".to_string(), ])
945 .build()
946 .unwrap();
947
948 assert_eq!(layer.event_fields.len(), 2);
950 assert!(layer.event_fields.iter().any(|f| f == "error_code"));
951 assert!(layer.event_fields.iter().any(|f| f == "status"));
952 }
953
954 #[test]
955 fn test_layer_default() {
956 let layer = TracingRateLimitLayer::default();
957 assert!(layer.limiter().registry().is_empty());
958 }
959
960 #[test]
961 fn test_signature_computation() {
962 let _layer = TracingRateLimitLayer::new();
963
964 let sig1 = EventSignature::simple("INFO", "test_event");
966 let sig2 = EventSignature::simple("INFO", "test_event");
967
968 assert_eq!(sig1, sig2);
970 }
971
972 #[test]
973 fn test_basic_rate_limiting() {
974 let layer = TracingRateLimitLayer::builder()
975 .with_policy(Policy::count_based(2).unwrap())
976 .build()
977 .unwrap();
978
979 let sig = EventSignature::simple("INFO", "test_message");
980
981 assert!(layer.should_allow(sig));
983 assert!(layer.should_allow(sig));
984
985 assert!(!layer.should_allow(sig));
987 }
988
989 #[test]
990 fn test_layer_integration() {
991 let layer = TracingRateLimitLayer::builder()
992 .with_policy(Policy::count_based(3).unwrap())
993 .build()
994 .unwrap();
995
996 let layer_for_check = layer.clone();
998
999 let subscriber = tracing_subscriber::registry()
1000 .with(tracing_subscriber::fmt::layer().with_filter(layer));
1001
1002 tracing::subscriber::with_default(subscriber, || {
1004 for _ in 0..10 {
1006 info!("test event");
1007 }
1008 });
1009
1010 assert_eq!(layer_for_check.limiter().registry().len(), 1);
1014 }
1015
1016 #[test]
1017 fn test_layer_suppression_logic() {
1018 let layer = TracingRateLimitLayer::builder()
1019 .with_policy(Policy::count_based(3).unwrap())
1020 .build()
1021 .unwrap();
1022
1023 let sig = EventSignature::simple("INFO", "test");
1024
1025 let mut allowed_count = 0;
1027 for _ in 0..10 {
1028 if layer.should_allow(sig) {
1029 allowed_count += 1;
1030 }
1031 }
1032
1033 assert_eq!(allowed_count, 3);
1034 }
1035
1036 #[test]
1037 fn test_builder_zero_summary_interval() {
1038 let result = TracingRateLimitLayer::builder()
1039 .with_summary_interval(Duration::from_secs(0))
1040 .build();
1041
1042 assert!(matches!(
1043 result,
1044 Err(BuildError::EmitterConfig(
1045 crate::application::emitter::EmitterConfigError::ZeroSummaryInterval
1046 ))
1047 ));
1048 }
1049
1050 #[test]
1051 fn test_builder_zero_max_signatures() {
1052 let result = TracingRateLimitLayer::builder()
1053 .with_max_signatures(0)
1054 .build();
1055
1056 assert!(matches!(result, Err(BuildError::ZeroMaxSignatures)));
1057 }
1058
1059 #[test]
1060 fn test_builder_valid_max_signatures() {
1061 let layer = TracingRateLimitLayer::builder()
1062 .with_max_signatures(100)
1063 .build()
1064 .unwrap();
1065
1066 assert!(layer.limiter().registry().is_empty());
1067 }
1068
1069 #[test]
1070 fn test_metrics_tracking() {
1071 let layer = TracingRateLimitLayer::builder()
1072 .with_policy(Policy::count_based(2).unwrap())
1073 .build()
1074 .unwrap();
1075
1076 let sig = EventSignature::simple("INFO", "test");
1077
1078 assert_eq!(layer.metrics().events_allowed(), 0);
1080 assert_eq!(layer.metrics().events_suppressed(), 0);
1081
1082 assert!(layer.should_allow(sig));
1084 assert!(layer.should_allow(sig));
1085
1086 assert_eq!(layer.metrics().events_allowed(), 2);
1088 assert_eq!(layer.metrics().events_suppressed(), 0);
1089
1090 assert!(!layer.should_allow(sig));
1092
1093 assert_eq!(layer.metrics().events_allowed(), 2);
1095 assert_eq!(layer.metrics().events_suppressed(), 1);
1096 }
1097
1098 #[test]
1099 fn test_metrics_snapshot() {
1100 let layer = TracingRateLimitLayer::builder()
1101 .with_policy(Policy::count_based(3).unwrap())
1102 .build()
1103 .unwrap();
1104
1105 let sig = EventSignature::simple("INFO", "test");
1106
1107 for _ in 0..5 {
1109 layer.should_allow(sig);
1110 }
1111
1112 let snapshot = layer.metrics().snapshot();
1114 assert_eq!(snapshot.events_allowed, 3);
1115 assert_eq!(snapshot.events_suppressed, 2);
1116 assert_eq!(snapshot.total_events(), 5);
1117 assert!((snapshot.suppression_rate() - 0.4).abs() < f64::EPSILON);
1118 }
1119
1120 #[test]
1121 fn test_signature_count() {
1122 let layer = TracingRateLimitLayer::builder()
1123 .with_policy(Policy::count_based(2).unwrap())
1124 .build()
1125 .unwrap();
1126
1127 assert_eq!(layer.signature_count(), 0);
1128
1129 let sig1 = EventSignature::simple("INFO", "test1");
1130 let sig2 = EventSignature::simple("INFO", "test2");
1131
1132 layer.should_allow(sig1);
1133 assert_eq!(layer.signature_count(), 1);
1134
1135 layer.should_allow(sig2);
1136 assert_eq!(layer.signature_count(), 2);
1137
1138 layer.should_allow(sig1);
1140 assert_eq!(layer.signature_count(), 2);
1141 }
1142
1143 #[test]
1144 fn test_metrics_with_eviction() {
1145 let layer = TracingRateLimitLayer::builder()
1146 .with_policy(Policy::count_based(1).unwrap())
1147 .with_max_signatures(3)
1148 .build()
1149 .unwrap();
1150
1151 for i in 0..3 {
1153 let sig = EventSignature::simple("INFO", &format!("test{}", i));
1154 layer.should_allow(sig);
1155 }
1156
1157 assert_eq!(layer.signature_count(), 3);
1158 assert_eq!(layer.metrics().signatures_evicted(), 0);
1159
1160 let sig = EventSignature::simple("INFO", "test3");
1162 layer.should_allow(sig);
1163
1164 assert_eq!(layer.signature_count(), 3);
1165 assert_eq!(layer.metrics().signatures_evicted(), 1);
1166 }
1167
1168 #[test]
1169 fn test_circuit_breaker_observability() {
1170 use crate::application::circuit_breaker::CircuitState;
1171
1172 let layer = TracingRateLimitLayer::builder()
1173 .with_policy(Policy::count_based(2).unwrap())
1174 .build()
1175 .unwrap();
1176
1177 let cb = layer.circuit_breaker();
1179 assert_eq!(cb.state(), CircuitState::Closed);
1180 assert_eq!(cb.consecutive_failures(), 0);
1181
1182 let sig = EventSignature::simple("INFO", "test");
1184 layer.should_allow(sig);
1185 layer.should_allow(sig);
1186 layer.should_allow(sig);
1187
1188 assert_eq!(cb.state(), CircuitState::Closed);
1189 }
1190
1191 #[test]
1192 fn test_circuit_breaker_fail_open_integration() {
1193 use crate::application::circuit_breaker::{
1194 CircuitBreaker, CircuitBreakerConfig, CircuitState,
1195 };
1196 use std::time::Duration;
1197
1198 let cb_config = CircuitBreakerConfig {
1200 failure_threshold: 2,
1201 recovery_timeout: Duration::from_secs(1),
1202 };
1203 let circuit_breaker = Arc::new(CircuitBreaker::with_config(cb_config));
1204
1205 let storage = Arc::new(ShardedStorage::new());
1207 let clock = Arc::new(SystemClock::new());
1208 let policy = Policy::count_based(2).unwrap();
1209 let registry = SuppressionRegistry::new(storage, clock, policy);
1210 let metrics = Metrics::new();
1211 let limiter = RateLimiter::new(registry, metrics, circuit_breaker.clone());
1212
1213 let layer = TracingRateLimitLayer {
1214 limiter,
1215 span_context_fields: Arc::new(Vec::new()),
1216 event_fields: Arc::new(Vec::new()),
1217 #[cfg(feature = "async")]
1218 emitter_handle: Arc::new(Mutex::new(None)),
1219 #[cfg(not(feature = "async"))]
1220 _emitter_config: crate::application::emitter::EmitterConfig::new(Duration::from_secs(
1221 30,
1222 ))
1223 .unwrap(),
1224 };
1225
1226 let sig = EventSignature::simple("INFO", "test");
1227
1228 assert!(layer.should_allow(sig));
1230 assert!(layer.should_allow(sig));
1231 assert!(!layer.should_allow(sig));
1232
1233 assert_eq!(circuit_breaker.state(), CircuitState::Closed);
1235
1236 circuit_breaker.record_failure();
1238 circuit_breaker.record_failure();
1239
1240 assert_eq!(circuit_breaker.state(), CircuitState::Open);
1242
1243 assert!(layer.should_allow(sig));
1246 assert!(layer.should_allow(sig));
1247 assert!(layer.should_allow(sig));
1248
1249 let snapshot = layer.metrics().snapshot();
1251 assert!(snapshot.events_allowed >= 5); }
1253
1254 #[cfg(feature = "async")]
1255 #[tokio::test]
1256 async fn test_active_emission_integration() {
1257 use std::sync::atomic::{AtomicUsize, Ordering};
1258 use std::time::Duration;
1259
1260 let emission_count = Arc::new(AtomicUsize::new(0));
1262 let count_clone = Arc::clone(&emission_count);
1263
1264 let storage = Arc::new(ShardedStorage::new());
1266 let clock = Arc::new(SystemClock::new());
1267 let policy = Policy::count_based(2).unwrap();
1268 let registry = SuppressionRegistry::new(storage, clock, policy);
1269
1270 let emitter_config = EmitterConfig::new(Duration::from_millis(100)).unwrap();
1271 let emitter = SummaryEmitter::new(registry.clone(), emitter_config);
1272
1273 let handle = emitter.start(
1275 move |summaries| {
1276 count_clone.fetch_add(summaries.len(), Ordering::SeqCst);
1277 },
1278 false,
1279 );
1280
1281 let sig = EventSignature::simple("INFO", "test_message");
1283 for _ in 0..10 {
1284 registry.with_event_state(sig, |state, now| {
1285 state.counter.record_suppression(now);
1286 });
1287 }
1288
1289 tokio::time::sleep(Duration::from_millis(250)).await;
1291
1292 let count = emission_count.load(Ordering::SeqCst);
1294 assert!(
1295 count > 0,
1296 "Expected at least one suppression summary to be emitted, got {}",
1297 count
1298 );
1299
1300 handle.shutdown().await.expect("shutdown failed");
1302 }
1303
1304 #[cfg(feature = "async")]
1305 #[tokio::test]
1306 async fn test_active_emission_disabled() {
1307 use crate::infrastructure::mocks::layer::MockCaptureLayer;
1308 use std::time::Duration;
1309
1310 let layer = TracingRateLimitLayer::builder()
1312 .with_policy(Policy::count_based(2).unwrap())
1313 .with_summary_interval(Duration::from_millis(100))
1314 .build()
1315 .unwrap();
1316
1317 let mock = MockCaptureLayer::new();
1318 let mock_clone = mock.clone();
1319
1320 let subscriber = tracing_subscriber::registry()
1321 .with(mock)
1322 .with(tracing_subscriber::fmt::layer().with_filter(layer.clone()));
1323
1324 tracing::subscriber::with_default(subscriber, || {
1325 let sig = EventSignature::simple("INFO", "test_message");
1326 for _ in 0..10 {
1327 layer.should_allow(sig);
1328 }
1329 });
1330
1331 tokio::time::sleep(Duration::from_millis(250)).await;
1333
1334 let events = mock_clone.get_captured();
1336 let summary_count = events
1337 .iter()
1338 .filter(|e| e.message.contains("suppressed"))
1339 .count();
1340
1341 assert_eq!(
1342 summary_count, 0,
1343 "Should not emit summaries when active emission is disabled"
1344 );
1345
1346 layer.shutdown().await.expect("shutdown failed");
1348 }
1349
1350 #[cfg(feature = "async")]
1351 #[tokio::test]
1352 async fn test_shutdown_without_emission() {
1353 let layer = TracingRateLimitLayer::new();
1355
1356 layer
1358 .shutdown()
1359 .await
1360 .expect("shutdown should succeed when emitter not running");
1361 }
1362
1363 #[cfg(feature = "async")]
1364 #[tokio::test]
1365 async fn test_custom_summary_formatter() {
1366 use std::sync::atomic::{AtomicUsize, Ordering};
1367 use std::time::Duration;
1368
1369 let call_count = Arc::new(AtomicUsize::new(0));
1371 let count_clone = Arc::clone(&call_count);
1372
1373 let last_count = Arc::new(AtomicUsize::new(0));
1375 let last_count_clone = Arc::clone(&last_count);
1376
1377 let layer = TracingRateLimitLayer::builder()
1379 .with_policy(Policy::count_based(2).unwrap())
1380 .with_active_emission(true)
1381 .with_summary_interval(Duration::from_millis(100))
1382 .with_summary_formatter(Arc::new(move |summary| {
1383 count_clone.fetch_add(1, Ordering::SeqCst);
1384 last_count_clone.store(summary.count, Ordering::SeqCst);
1385 tracing::info!(
1387 sig = %summary.signature,
1388 suppressed = summary.count,
1389 "Custom format"
1390 );
1391 }))
1392 .build()
1393 .unwrap();
1394
1395 let sig = EventSignature::simple("INFO", "test_message");
1397 for _ in 0..10 {
1398 layer.should_allow(sig);
1399 }
1400
1401 tokio::time::sleep(Duration::from_millis(250)).await;
1403
1404 let calls = call_count.load(Ordering::SeqCst);
1406 assert!(calls > 0, "Custom formatter should have been called");
1407
1408 let count = last_count.load(Ordering::SeqCst);
1410 assert!(
1411 count >= 8,
1412 "Expected at least 8 suppressions, got {}",
1413 count
1414 );
1415
1416 layer.shutdown().await.expect("shutdown failed");
1417 }
1418
1419 #[cfg(feature = "async")]
1420 #[tokio::test]
1421 async fn test_default_formatter_used() {
1422 use std::sync::atomic::{AtomicUsize, Ordering};
1423 use std::time::Duration;
1424
1425 let emission_count = Arc::new(AtomicUsize::new(0));
1426 let count_clone = Arc::clone(&emission_count);
1427
1428 let storage = Arc::new(ShardedStorage::new());
1429 let clock = Arc::new(SystemClock::new());
1430 let policy = Policy::count_based(2).unwrap();
1431 let registry = SuppressionRegistry::new(storage, clock, policy);
1432
1433 let emitter_config = EmitterConfig::new(Duration::from_millis(100)).unwrap();
1434 let emitter = SummaryEmitter::new(registry.clone(), emitter_config);
1435
1436 let handle = emitter.start(
1438 move |summaries| {
1439 count_clone.fetch_add(summaries.len(), Ordering::SeqCst);
1440 },
1441 false,
1442 );
1443
1444 let sig = EventSignature::simple("INFO", "test_message");
1445 for _ in 0..10 {
1446 registry.with_event_state(sig, |state, now| {
1447 state.counter.record_suppression(now);
1448 });
1449 }
1450
1451 tokio::time::sleep(Duration::from_millis(250)).await;
1452
1453 let count = emission_count.load(Ordering::SeqCst);
1454 assert!(count > 0, "Default formatter should have emitted summaries");
1455
1456 handle.shutdown().await.expect("shutdown failed");
1457 }
1458}