1use crate::application::{
7 circuit_breaker::CircuitBreaker,
8 emitter::EmitterConfig,
9 limiter::{LimitDecision, RateLimiter},
10 metrics::Metrics,
11 ports::{Clock, Storage},
12 registry::{EventState, SuppressionRegistry},
13};
14use crate::domain::{policy::Policy, signature::EventSignature};
15use crate::infrastructure::clock::SystemClock;
16use crate::infrastructure::storage::ShardedStorage;
17use crate::infrastructure::visitor::FieldVisitor;
18
19use std::borrow::Cow;
20use std::collections::{BTreeMap, BTreeSet};
21use std::sync::Arc;
22use std::time::Duration;
23use tracing::{Metadata, Subscriber};
24use tracing_subscriber::layer::Filter;
25use tracing_subscriber::registry::LookupSpan;
26use tracing_subscriber::{layer::Context, Layer};
27
28#[cfg(feature = "async")]
34const SUMMARY_TARGET: &str = "tracing_throttle::summary";
35
36#[cfg(feature = "async")]
37use crate::application::emitter::{EmitterHandle, SummaryEmitter};
38
39#[cfg(feature = "async")]
40use crate::domain::summary::SuppressionSummary;
41
42#[cfg(feature = "async")]
43use std::sync::Mutex;
44
45#[cfg(feature = "async")]
50pub type SummaryFormatter = Arc<dyn Fn(&SuppressionSummary) + Send + Sync + 'static>;
51
52#[derive(Debug, Clone, PartialEq, Eq)]
54pub enum BuildError {
55 ZeroMaxSignatures,
57 EmitterConfig(crate::application::emitter::EmitterConfigError),
59}
60
61impl std::fmt::Display for BuildError {
62 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
63 match self {
64 BuildError::ZeroMaxSignatures => {
65 write!(f, "max_signatures must be greater than 0")
66 }
67 BuildError::EmitterConfig(e) => {
68 write!(f, "emitter configuration error: {}", e)
69 }
70 }
71 }
72}
73
74impl std::error::Error for BuildError {}
75
76impl From<crate::application::emitter::EmitterConfigError> for BuildError {
77 fn from(e: crate::application::emitter::EmitterConfigError) -> Self {
78 BuildError::EmitterConfig(e)
79 }
80}
81
82pub struct TracingRateLimitLayerBuilder {
84 policy: Policy,
85 summary_interval: Duration,
86 clock: Option<Arc<dyn Clock>>,
87 max_signatures: Option<usize>,
88 enable_active_emission: bool,
89 #[cfg(feature = "async")]
90 summary_formatter: Option<SummaryFormatter>,
91 span_context_fields: Vec<String>,
92 excluded_fields: BTreeSet<String>,
93 eviction_strategy: Option<EvictionStrategy>,
94 exempt_targets: BTreeSet<String>,
95}
96
97#[derive(Clone)]
102pub enum EvictionStrategy {
103 Lru {
105 max_entries: usize,
107 },
108 Priority {
110 max_entries: usize,
112 priority_fn: crate::infrastructure::eviction::PriorityFn<EventSignature, EventState>,
114 },
115 Memory {
117 max_bytes: usize,
119 },
120 PriorityWithMemory {
122 max_entries: usize,
124 priority_fn: crate::infrastructure::eviction::PriorityFn<EventSignature, EventState>,
126 max_bytes: usize,
128 },
129}
130
131impl EvictionStrategy {
132 pub fn tracks_memory(&self) -> bool {
134 matches!(
135 self,
136 EvictionStrategy::Memory { .. } | EvictionStrategy::PriorityWithMemory { .. }
137 )
138 }
139
140 pub fn memory_limit(&self) -> Option<usize> {
142 match self {
143 EvictionStrategy::Memory { max_bytes } => Some(*max_bytes),
144 EvictionStrategy::PriorityWithMemory { max_bytes, .. } => Some(*max_bytes),
145 _ => None,
146 }
147 }
148
149 pub fn uses_priority(&self) -> bool {
151 matches!(
152 self,
153 EvictionStrategy::Priority { .. } | EvictionStrategy::PriorityWithMemory { .. }
154 )
155 }
156}
157
158impl std::fmt::Debug for EvictionStrategy {
159 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
160 match self {
161 EvictionStrategy::Lru { max_entries } => f
162 .debug_struct("Lru")
163 .field("max_entries", max_entries)
164 .finish(),
165 EvictionStrategy::Priority {
166 max_entries,
167 priority_fn: _,
168 } => f
169 .debug_struct("Priority")
170 .field("max_entries", max_entries)
171 .field("priority_fn", &"<fn>")
172 .finish(),
173 EvictionStrategy::Memory { max_bytes } => f
174 .debug_struct("Memory")
175 .field("max_bytes", max_bytes)
176 .finish(),
177 EvictionStrategy::PriorityWithMemory {
178 max_entries,
179 priority_fn: _,
180 max_bytes,
181 } => f
182 .debug_struct("PriorityWithMemory")
183 .field("max_entries", max_entries)
184 .field("priority_fn", &"<fn>")
185 .field("max_bytes", max_bytes)
186 .finish(),
187 }
188 }
189}
190
191impl TracingRateLimitLayerBuilder {
192 pub fn with_policy(mut self, policy: Policy) -> Self {
194 self.policy = policy;
195 self
196 }
197
198 pub fn with_summary_interval(mut self, interval: Duration) -> Self {
202 self.summary_interval = interval;
203 self
204 }
205
206 pub fn with_clock(mut self, clock: Arc<dyn Clock>) -> Self {
208 self.clock = Some(clock);
209 self
210 }
211
212 pub fn with_max_signatures(mut self, max_signatures: usize) -> Self {
221 self.max_signatures = Some(max_signatures);
222 self
223 }
224
225 pub fn with_unlimited_signatures(mut self) -> Self {
231 self.max_signatures = None;
232 self
233 }
234
235 pub fn with_active_emission(mut self, enabled: bool) -> Self {
256 self.enable_active_emission = enabled;
257 self
258 }
259
260 #[cfg(feature = "async")]
290 pub fn with_summary_formatter(mut self, formatter: SummaryFormatter) -> Self {
291 self.summary_formatter = Some(formatter);
292 self
293 }
294
295 pub fn with_span_context_fields(mut self, fields: Vec<String>) -> Self {
332 let unique_fields: BTreeSet<_> = fields.into_iter().filter(|f| !f.is_empty()).collect();
334 self.span_context_fields = unique_fields.into_iter().collect();
335 self
336 }
337
338 pub fn with_excluded_fields(mut self, fields: Vec<String>) -> Self {
380 let unique_fields: BTreeSet<_> = fields.into_iter().filter(|f| !f.is_empty()).collect();
382 self.excluded_fields = unique_fields;
383 self
384 }
385
386 pub fn with_exempt_targets(mut self, targets: Vec<String>) -> Self {
426 let unique_targets: BTreeSet<_> = targets.into_iter().filter(|t| !t.is_empty()).collect();
428 self.exempt_targets = unique_targets;
429 self
430 }
431
432 pub fn with_eviction_strategy(mut self, strategy: EvictionStrategy) -> Self {
472 self.eviction_strategy = Some(strategy);
473 self
474 }
475
476 pub fn build(self) -> Result<TracingRateLimitLayer, BuildError> {
481 if let Some(max) = self.max_signatures {
483 if max == 0 {
484 return Err(BuildError::ZeroMaxSignatures);
485 }
486 }
487
488 let metrics = Metrics::new();
490 let circuit_breaker = Arc::new(CircuitBreaker::new());
491
492 let clock = self.clock.unwrap_or_else(|| Arc::new(SystemClock::new()));
493 let mut storage = ShardedStorage::new().with_metrics(metrics.clone());
494
495 let eviction_policy: Option<
497 Arc<dyn crate::application::ports::EvictionPolicy<EventSignature, EventState>>,
498 > = match self.eviction_strategy {
499 Some(EvictionStrategy::Lru { max_entries }) => Some(Arc::new(
500 crate::infrastructure::eviction::LruEviction::new(max_entries),
501 )),
502 Some(EvictionStrategy::Priority {
503 max_entries,
504 priority_fn,
505 }) => Some(Arc::new(
506 crate::infrastructure::eviction::PriorityEviction::new(max_entries, priority_fn),
507 )),
508 Some(EvictionStrategy::Memory { max_bytes }) => Some(Arc::new(
509 crate::infrastructure::eviction::MemoryEviction::new(max_bytes),
510 )),
511 Some(EvictionStrategy::PriorityWithMemory {
512 max_entries,
513 priority_fn,
514 max_bytes,
515 }) => Some(Arc::new(
516 crate::infrastructure::eviction::PriorityWithMemoryEviction::new(
517 max_entries,
518 priority_fn,
519 max_bytes,
520 ),
521 )),
522 None => {
523 self.max_signatures.map(|max| {
525 Arc::new(crate::infrastructure::eviction::LruEviction::new(max))
526 as Arc<
527 dyn crate::application::ports::EvictionPolicy<
528 EventSignature,
529 EventState,
530 >,
531 >
532 })
533 }
534 };
535
536 if let Some(policy) = eviction_policy {
537 storage = storage.with_eviction_policy(policy);
538 }
539
540 let storage = Arc::new(storage);
541 let registry = SuppressionRegistry::new(storage, clock, self.policy);
542 let limiter = RateLimiter::new(registry.clone(), metrics.clone(), circuit_breaker);
543
544 let emitter_config = EmitterConfig::new(self.summary_interval)?;
546
547 #[cfg(feature = "async")]
548 let emitter_handle = if self.enable_active_emission {
549 let emitter = SummaryEmitter::new(registry, emitter_config);
550
551 let formatter = self.summary_formatter.unwrap_or_else(|| {
553 Arc::new(|summary: &SuppressionSummary| {
554 tracing::warn!(
555 target: SUMMARY_TARGET,
556 signature = %summary.signature,
557 count = summary.count,
558 "{}",
559 summary.format_message()
560 );
561 })
562 });
563
564 let handle = emitter.start(
565 move |summaries| {
566 for summary in summaries {
567 formatter(&summary);
568 }
569 },
570 false, );
572 Arc::new(Mutex::new(Some(handle)))
573 } else {
574 Arc::new(Mutex::new(None))
575 };
576
577 Ok(TracingRateLimitLayer {
578 limiter,
579 span_context_fields: Arc::new(self.span_context_fields),
580 excluded_fields: Arc::new(self.excluded_fields),
581 exempt_targets: Arc::new(self.exempt_targets),
582 #[cfg(feature = "async")]
583 emitter_handle,
584 #[cfg(not(feature = "async"))]
585 _emitter_config: emitter_config,
586 })
587 }
588}
589
590#[derive(Clone)]
598pub struct TracingRateLimitLayer<S = Arc<ShardedStorage<EventSignature, EventState>>>
599where
600 S: Storage<EventSignature, EventState> + Clone,
601{
602 limiter: RateLimiter<S>,
603 span_context_fields: Arc<Vec<String>>,
604 excluded_fields: Arc<BTreeSet<String>>,
605 exempt_targets: Arc<BTreeSet<String>>,
606 #[cfg(feature = "async")]
607 emitter_handle: Arc<Mutex<Option<EmitterHandle>>>,
608 #[cfg(not(feature = "async"))]
609 _emitter_config: EmitterConfig,
610}
611
612impl<S> TracingRateLimitLayer<S>
613where
614 S: Storage<EventSignature, EventState> + Clone,
615{
616 fn extract_span_context<Sub>(
618 &self,
619 cx: &Context<'_, Sub>,
620 ) -> BTreeMap<Cow<'static, str>, Cow<'static, str>>
621 where
622 Sub: Subscriber + for<'lookup> LookupSpan<'lookup>,
623 {
624 if self.span_context_fields.is_empty() {
625 return BTreeMap::new();
626 }
627
628 let mut context_fields = BTreeMap::new();
629
630 if let Some(span) = cx.lookup_current() {
631 for span_ref in span.scope() {
632 let extensions = span_ref.extensions();
633
634 if let Some(stored_fields) =
635 extensions.get::<BTreeMap<Cow<'static, str>, Cow<'static, str>>>()
636 {
637 for field_name in self.span_context_fields.as_ref() {
638 let field_key: Cow<'static, str> = Cow::Owned(field_name.clone());
640 if let std::collections::btree_map::Entry::Vacant(e) =
641 context_fields.entry(field_key.clone())
642 {
643 if let Some(value) = stored_fields.get(&field_key) {
644 e.insert(value.clone());
645 }
646 }
647 }
648 }
649
650 if context_fields.len() == self.span_context_fields.len() {
651 break;
652 }
653 }
654 }
655
656 context_fields
657 }
658
659 fn extract_event_fields(
666 &self,
667 event: &tracing::Event<'_>,
668 ) -> BTreeMap<Cow<'static, str>, Cow<'static, str>> {
669 let mut visitor = FieldVisitor::new();
670 event.record(&mut visitor);
671 let all_fields = visitor.into_fields();
672
673 if self.excluded_fields.is_empty() {
675 all_fields
676 } else {
677 all_fields
678 .into_iter()
679 .filter(|(field_name, _)| !self.excluded_fields.contains(field_name.as_ref()))
680 .collect()
681 }
682 }
683
684 fn compute_signature(
693 &self,
694 metadata: &Metadata,
695 combined_fields: &BTreeMap<Cow<'static, str>, Cow<'static, str>>,
696 ) -> EventSignature {
697 let level = metadata.level().as_str();
698 let message = metadata.name();
699 let target = Some(metadata.target());
700
701 EventSignature::new(level, message, combined_fields, target)
703 }
704
705 pub fn should_allow(&self, signature: EventSignature) -> bool {
707 matches!(self.limiter.check_event(signature), LimitDecision::Allow)
708 }
709
710 #[cfg(feature = "human-readable")]
717 pub fn should_allow_with_metadata(
718 &self,
719 signature: EventSignature,
720 metadata: crate::domain::metadata::EventMetadata,
721 ) -> bool {
722 matches!(
723 self.limiter.check_event_with_metadata(signature, metadata),
724 LimitDecision::Allow
725 )
726 }
727
728 pub fn limiter(&self) -> &RateLimiter<S> {
730 &self.limiter
731 }
732
733 pub fn metrics(&self) -> &Metrics {
740 self.limiter.metrics()
741 }
742
743 pub fn signature_count(&self) -> usize {
745 self.limiter.registry().len()
746 }
747
748 pub fn circuit_breaker(&self) -> &Arc<CircuitBreaker> {
754 self.limiter.circuit_breaker()
755 }
756
757 #[cfg(feature = "async")]
785 pub async fn shutdown(&self) -> Result<(), crate::application::emitter::ShutdownError> {
786 let handle = {
788 let mut handle_guard = self.emitter_handle.lock().unwrap();
789 handle_guard.take()
790 };
791
792 if let Some(handle) = handle {
793 handle.shutdown().await?;
794 }
795 Ok(())
796 }
797}
798
799impl TracingRateLimitLayer<Arc<ShardedStorage<EventSignature, EventState>>> {
800 pub fn builder() -> TracingRateLimitLayerBuilder {
809 TracingRateLimitLayerBuilder {
810 policy: Policy::token_bucket(50.0, 1.0)
811 .expect("default policy with 50 capacity and 1/sec refill is always valid"),
812 summary_interval: Duration::from_secs(30),
813 clock: None,
814 max_signatures: Some(10_000),
815 enable_active_emission: false,
816 #[cfg(feature = "async")]
817 summary_formatter: None,
818 span_context_fields: Vec::new(),
819 excluded_fields: BTreeSet::new(),
820 eviction_strategy: None,
821 exempt_targets: BTreeSet::new(),
822 }
823 }
824
825 pub fn new() -> Self {
837 Self::builder()
838 .build()
839 .expect("default configuration is always valid")
840 }
841
842 pub fn with_storage<ST>(
873 storage: ST,
874 policy: Policy,
875 clock: Arc<dyn Clock>,
876 ) -> TracingRateLimitLayer<ST>
877 where
878 ST: Storage<EventSignature, EventState> + Clone,
879 {
880 let metrics = Metrics::new();
881 let circuit_breaker = Arc::new(CircuitBreaker::new());
882 let registry = SuppressionRegistry::new(storage, clock, policy);
883 let limiter = RateLimiter::new(registry, metrics, circuit_breaker);
884
885 TracingRateLimitLayer {
886 limiter,
887 span_context_fields: Arc::new(Vec::new()),
888 excluded_fields: Arc::new(BTreeSet::new()),
889 exempt_targets: Arc::new(BTreeSet::new()),
890 #[cfg(feature = "async")]
891 emitter_handle: Arc::new(Mutex::new(None)),
892 #[cfg(not(feature = "async"))]
893 _emitter_config: EmitterConfig::new(Duration::from_secs(30))
894 .expect("30 seconds is valid"),
895 }
896 }
897}
898
899impl Default for TracingRateLimitLayer<Arc<ShardedStorage<EventSignature, EventState>>> {
900 fn default() -> Self {
901 Self::new()
902 }
903}
904
905impl<S, Sub> Filter<Sub> for TracingRateLimitLayer<S>
907where
908 S: Storage<EventSignature, EventState> + Clone,
909 Sub: Subscriber + for<'lookup> LookupSpan<'lookup>,
910{
911 fn enabled(&self, _meta: &Metadata<'_>, _cx: &Context<'_, Sub>) -> bool {
912 true
915 }
916
917 fn event_enabled(&self, event: &tracing::Event<'_>, cx: &Context<'_, Sub>) -> bool {
918 let metadata_obj = event.metadata();
919 let target = metadata_obj.target();
920
921 #[cfg(feature = "async")]
925 if target == SUMMARY_TARGET {
926 return true;
927 }
928
929 if !self.exempt_targets.is_empty() && self.exempt_targets.contains(target) {
932 self.limiter.metrics().record_allowed();
934 return true;
935 }
936
937 let mut combined_fields = self.extract_span_context(cx);
939 let event_fields = self.extract_event_fields(event);
940 combined_fields.extend(event_fields);
941
942 let signature = self.compute_signature(metadata_obj, &combined_fields);
943
944 #[cfg(feature = "human-readable")]
945 {
946 let mut visitor = FieldVisitor::new();
948 event.record(&mut visitor);
949 let all_fields = visitor.into_fields();
950 let message = all_fields
951 .get(&Cow::Borrowed("message"))
952 .map(|v| v.to_string())
953 .unwrap_or_else(|| event.metadata().name().to_string());
954
955 let event_metadata = crate::domain::metadata::EventMetadata::new(
957 metadata_obj.level().as_str().to_string(),
958 message,
959 target.to_string(),
960 combined_fields,
961 );
962
963 self.should_allow_with_metadata(signature, event_metadata)
964 }
965
966 #[cfg(not(feature = "human-readable"))]
967 {
968 self.should_allow(signature)
969 }
970 }
971}
972
973impl<S, Sub> Layer<Sub> for TracingRateLimitLayer<S>
974where
975 S: Storage<EventSignature, EventState> + Clone + 'static,
976 Sub: Subscriber + for<'lookup> LookupSpan<'lookup>,
977{
978 fn on_new_span(
979 &self,
980 attrs: &tracing::span::Attributes<'_>,
981 id: &tracing::span::Id,
982 ctx: Context<'_, Sub>,
983 ) {
984 if self.span_context_fields.is_empty() {
985 return;
986 }
987
988 let mut visitor = FieldVisitor::new();
989 attrs.record(&mut visitor);
990 let fields = visitor.into_fields();
991
992 if let Some(span) = ctx.span(id) {
993 let mut extensions = span.extensions_mut();
994 extensions.insert(fields);
995 }
996 }
997}
998
999#[cfg(test)]
1000mod tests {
1001 use super::*;
1002 use tracing::info;
1003 use tracing_subscriber::layer::SubscriberExt;
1004
1005 #[test]
1006 fn test_layer_builder() {
1007 let layer = TracingRateLimitLayer::builder()
1008 .with_policy(Policy::count_based(50).unwrap())
1009 .with_summary_interval(Duration::from_secs(60))
1010 .build()
1011 .unwrap();
1012
1013 assert!(layer.limiter().registry().is_empty());
1014 }
1015
1016 #[test]
1017 fn test_span_context_fields_deduplication() {
1018 let layer = TracingRateLimitLayer::builder()
1019 .with_span_context_fields(vec![
1020 "user_id".to_string(),
1021 "user_id".to_string(), "tenant_id".to_string(),
1023 "".to_string(), "user_id".to_string(), ])
1026 .build()
1027 .unwrap();
1028
1029 assert_eq!(layer.span_context_fields.len(), 2);
1031 assert!(layer.span_context_fields.iter().any(|f| f == "user_id"));
1032 assert!(layer.span_context_fields.iter().any(|f| f == "tenant_id"));
1033 }
1034
1035 #[test]
1036 fn test_excluded_fields_deduplication() {
1037 let layer = TracingRateLimitLayer::builder()
1038 .with_excluded_fields(vec![
1039 "request_id".to_string(),
1040 "request_id".to_string(), "trace_id".to_string(),
1042 "".to_string(), "request_id".to_string(), ])
1045 .build()
1046 .unwrap();
1047
1048 assert_eq!(layer.excluded_fields.len(), 2);
1050 assert!(layer.excluded_fields.contains("request_id"));
1051 assert!(layer.excluded_fields.contains("trace_id"));
1052 }
1053
1054 #[test]
1055 fn test_exempt_targets_deduplication() {
1056 let layer = TracingRateLimitLayer::builder()
1057 .with_exempt_targets(vec![
1058 "myapp::security".to_string(),
1059 "myapp::security".to_string(), "myapp::audit".to_string(),
1061 "".to_string(), "myapp::security".to_string(), ])
1064 .build()
1065 .unwrap();
1066
1067 assert_eq!(layer.exempt_targets.len(), 2);
1069 assert!(layer.exempt_targets.contains("myapp::security"));
1070 assert!(layer.exempt_targets.contains("myapp::audit"));
1071 }
1072
1073 #[test]
1074 fn test_exempt_targets_bypass_rate_limiting() {
1075 let rate_limit = TracingRateLimitLayer::builder()
1076 .with_policy(Policy::count_based(2).unwrap())
1077 .with_exempt_targets(vec!["myapp::security".to_string()])
1078 .build()
1079 .unwrap();
1080
1081 let subscriber = tracing_subscriber::registry()
1082 .with(tracing_subscriber::fmt::layer().with_filter(rate_limit.clone()));
1083
1084 tracing::subscriber::with_default(subscriber, || {
1085 for _ in 0..3 {
1087 info!("Regular log"); }
1089
1090 for _ in 0..4 {
1092 info!(target: "myapp::security", "Security event"); }
1094 });
1095
1096 let metrics = rate_limit.metrics();
1098 assert_eq!(metrics.events_allowed(), 6); assert_eq!(metrics.events_suppressed(), 1); }
1101
1102 #[test]
1103 fn test_layer_default() {
1104 let layer = TracingRateLimitLayer::default();
1105 assert!(layer.limiter().registry().is_empty());
1106 }
1107
1108 #[test]
1109 fn test_signature_computation() {
1110 let _layer = TracingRateLimitLayer::new();
1111
1112 let sig1 = EventSignature::simple("INFO", "test_event");
1114 let sig2 = EventSignature::simple("INFO", "test_event");
1115
1116 assert_eq!(sig1, sig2);
1118 }
1119
1120 #[test]
1121 fn test_basic_rate_limiting() {
1122 let layer = TracingRateLimitLayer::builder()
1123 .with_policy(Policy::count_based(2).unwrap())
1124 .build()
1125 .unwrap();
1126
1127 let sig = EventSignature::simple("INFO", "test_message");
1128
1129 assert!(layer.should_allow(sig));
1131 assert!(layer.should_allow(sig));
1132
1133 assert!(!layer.should_allow(sig));
1135 }
1136
1137 #[test]
1138 fn test_layer_integration() {
1139 let layer = TracingRateLimitLayer::builder()
1140 .with_policy(Policy::count_based(3).unwrap())
1141 .build()
1142 .unwrap();
1143
1144 let layer_for_check = layer.clone();
1146
1147 let subscriber = tracing_subscriber::registry()
1148 .with(tracing_subscriber::fmt::layer().with_filter(layer));
1149
1150 tracing::subscriber::with_default(subscriber, || {
1152 for _ in 0..10 {
1154 info!("test event");
1155 }
1156 });
1157
1158 assert_eq!(layer_for_check.limiter().registry().len(), 1);
1162 }
1163
1164 #[test]
1165 fn test_layer_suppression_logic() {
1166 let layer = TracingRateLimitLayer::builder()
1167 .with_policy(Policy::count_based(3).unwrap())
1168 .build()
1169 .unwrap();
1170
1171 let sig = EventSignature::simple("INFO", "test");
1172
1173 let mut allowed_count = 0;
1175 for _ in 0..10 {
1176 if layer.should_allow(sig) {
1177 allowed_count += 1;
1178 }
1179 }
1180
1181 assert_eq!(allowed_count, 3);
1182 }
1183
1184 #[test]
1185 fn test_builder_zero_summary_interval() {
1186 let result = TracingRateLimitLayer::builder()
1187 .with_summary_interval(Duration::from_secs(0))
1188 .build();
1189
1190 assert!(matches!(
1191 result,
1192 Err(BuildError::EmitterConfig(
1193 crate::application::emitter::EmitterConfigError::ZeroSummaryInterval
1194 ))
1195 ));
1196 }
1197
1198 #[test]
1199 fn test_builder_zero_max_signatures() {
1200 let result = TracingRateLimitLayer::builder()
1201 .with_max_signatures(0)
1202 .build();
1203
1204 assert!(matches!(result, Err(BuildError::ZeroMaxSignatures)));
1205 }
1206
1207 #[test]
1208 fn test_builder_valid_max_signatures() {
1209 let layer = TracingRateLimitLayer::builder()
1210 .with_max_signatures(100)
1211 .build()
1212 .unwrap();
1213
1214 assert!(layer.limiter().registry().is_empty());
1215 }
1216
1217 #[test]
1218 fn test_metrics_tracking() {
1219 let layer = TracingRateLimitLayer::builder()
1220 .with_policy(Policy::count_based(2).unwrap())
1221 .build()
1222 .unwrap();
1223
1224 let sig = EventSignature::simple("INFO", "test");
1225
1226 assert_eq!(layer.metrics().events_allowed(), 0);
1228 assert_eq!(layer.metrics().events_suppressed(), 0);
1229
1230 assert!(layer.should_allow(sig));
1232 assert!(layer.should_allow(sig));
1233
1234 assert_eq!(layer.metrics().events_allowed(), 2);
1236 assert_eq!(layer.metrics().events_suppressed(), 0);
1237
1238 assert!(!layer.should_allow(sig));
1240
1241 assert_eq!(layer.metrics().events_allowed(), 2);
1243 assert_eq!(layer.metrics().events_suppressed(), 1);
1244 }
1245
1246 #[test]
1247 fn test_metrics_snapshot() {
1248 let layer = TracingRateLimitLayer::builder()
1249 .with_policy(Policy::count_based(3).unwrap())
1250 .build()
1251 .unwrap();
1252
1253 let sig = EventSignature::simple("INFO", "test");
1254
1255 for _ in 0..5 {
1257 layer.should_allow(sig);
1258 }
1259
1260 let snapshot = layer.metrics().snapshot();
1262 assert_eq!(snapshot.events_allowed, 3);
1263 assert_eq!(snapshot.events_suppressed, 2);
1264 assert_eq!(snapshot.total_events(), 5);
1265 assert!((snapshot.suppression_rate() - 0.4).abs() < f64::EPSILON);
1266 }
1267
1268 #[test]
1269 fn test_signature_count() {
1270 let layer = TracingRateLimitLayer::builder()
1271 .with_policy(Policy::count_based(2).unwrap())
1272 .build()
1273 .unwrap();
1274
1275 assert_eq!(layer.signature_count(), 0);
1276
1277 let sig1 = EventSignature::simple("INFO", "test1");
1278 let sig2 = EventSignature::simple("INFO", "test2");
1279
1280 layer.should_allow(sig1);
1281 assert_eq!(layer.signature_count(), 1);
1282
1283 layer.should_allow(sig2);
1284 assert_eq!(layer.signature_count(), 2);
1285
1286 layer.should_allow(sig1);
1288 assert_eq!(layer.signature_count(), 2);
1289 }
1290
1291 #[test]
1292 fn test_metrics_with_eviction() {
1293 let layer = TracingRateLimitLayer::builder()
1294 .with_policy(Policy::count_based(1).unwrap())
1295 .with_max_signatures(3)
1296 .build()
1297 .unwrap();
1298
1299 for i in 0..3 {
1301 let sig = EventSignature::simple("INFO", &format!("test{}", i));
1302 layer.should_allow(sig);
1303 }
1304
1305 assert_eq!(layer.signature_count(), 3);
1306 assert_eq!(layer.metrics().signatures_evicted(), 0);
1307
1308 let sig = EventSignature::simple("INFO", "test3");
1310 layer.should_allow(sig);
1311
1312 assert_eq!(layer.signature_count(), 3);
1313 assert_eq!(layer.metrics().signatures_evicted(), 1);
1314 }
1315
1316 #[test]
1317 fn test_circuit_breaker_observability() {
1318 use crate::application::circuit_breaker::CircuitState;
1319
1320 let layer = TracingRateLimitLayer::builder()
1321 .with_policy(Policy::count_based(2).unwrap())
1322 .build()
1323 .unwrap();
1324
1325 let cb = layer.circuit_breaker();
1327 assert_eq!(cb.state(), CircuitState::Closed);
1328 assert_eq!(cb.consecutive_failures(), 0);
1329
1330 let sig = EventSignature::simple("INFO", "test");
1332 layer.should_allow(sig);
1333 layer.should_allow(sig);
1334 layer.should_allow(sig);
1335
1336 assert_eq!(cb.state(), CircuitState::Closed);
1337 }
1338
1339 #[test]
1340 fn test_circuit_breaker_fail_open_integration() {
1341 use crate::application::circuit_breaker::{
1342 CircuitBreaker, CircuitBreakerConfig, CircuitState,
1343 };
1344 use std::time::Duration;
1345
1346 let cb_config = CircuitBreakerConfig {
1348 failure_threshold: 2,
1349 recovery_timeout: Duration::from_secs(1),
1350 };
1351 let circuit_breaker = Arc::new(CircuitBreaker::with_config(cb_config));
1352
1353 let storage = Arc::new(ShardedStorage::new());
1355 let clock = Arc::new(SystemClock::new());
1356 let policy = Policy::count_based(2).unwrap();
1357 let registry = SuppressionRegistry::new(storage, clock, policy);
1358 let metrics = Metrics::new();
1359 let limiter = RateLimiter::new(registry, metrics, circuit_breaker.clone());
1360
1361 let layer = TracingRateLimitLayer {
1362 limiter,
1363 span_context_fields: Arc::new(Vec::new()),
1364 excluded_fields: Arc::new(BTreeSet::new()),
1365 exempt_targets: Arc::new(BTreeSet::new()),
1366 #[cfg(feature = "async")]
1367 emitter_handle: Arc::new(Mutex::new(None)),
1368 #[cfg(not(feature = "async"))]
1369 _emitter_config: crate::application::emitter::EmitterConfig::new(Duration::from_secs(
1370 30,
1371 ))
1372 .unwrap(),
1373 };
1374
1375 let sig = EventSignature::simple("INFO", "test");
1376
1377 assert!(layer.should_allow(sig));
1379 assert!(layer.should_allow(sig));
1380 assert!(!layer.should_allow(sig));
1381
1382 assert_eq!(circuit_breaker.state(), CircuitState::Closed);
1384
1385 circuit_breaker.record_failure();
1387 circuit_breaker.record_failure();
1388
1389 assert_eq!(circuit_breaker.state(), CircuitState::Open);
1391
1392 assert!(layer.should_allow(sig));
1395 assert!(layer.should_allow(sig));
1396 assert!(layer.should_allow(sig));
1397
1398 let snapshot = layer.metrics().snapshot();
1400 assert!(snapshot.events_allowed >= 5); }
1402
1403 #[cfg(feature = "async")]
1404 #[tokio::test]
1405 async fn test_active_emission_integration() {
1406 use std::sync::atomic::{AtomicUsize, Ordering};
1407 use std::time::Duration;
1408
1409 let emission_count = Arc::new(AtomicUsize::new(0));
1411 let count_clone = Arc::clone(&emission_count);
1412
1413 let storage = Arc::new(ShardedStorage::new());
1415 let clock = Arc::new(SystemClock::new());
1416 let policy = Policy::count_based(2).unwrap();
1417 let registry = SuppressionRegistry::new(storage, clock, policy);
1418
1419 let emitter_config = EmitterConfig::new(Duration::from_millis(100)).unwrap();
1420 let emitter = SummaryEmitter::new(registry.clone(), emitter_config);
1421
1422 let handle = emitter.start(
1424 move |summaries| {
1425 count_clone.fetch_add(summaries.len(), Ordering::SeqCst);
1426 },
1427 false,
1428 );
1429
1430 let sig = EventSignature::simple("INFO", "test_message");
1432 for _ in 0..10 {
1433 registry.with_event_state(sig, |state, now| {
1434 state.counter.record_suppression(now);
1435 });
1436 }
1437
1438 tokio::time::sleep(Duration::from_millis(250)).await;
1440
1441 let count = emission_count.load(Ordering::SeqCst);
1443 assert!(
1444 count > 0,
1445 "Expected at least one suppression summary to be emitted, got {}",
1446 count
1447 );
1448
1449 handle.shutdown().await.expect("shutdown failed");
1451 }
1452
1453 #[cfg(feature = "async")]
1454 #[tokio::test]
1455 async fn test_active_emission_disabled() {
1456 use crate::infrastructure::mocks::layer::MockCaptureLayer;
1457 use std::time::Duration;
1458
1459 let layer = TracingRateLimitLayer::builder()
1461 .with_policy(Policy::count_based(2).unwrap())
1462 .with_summary_interval(Duration::from_millis(100))
1463 .build()
1464 .unwrap();
1465
1466 let mock = MockCaptureLayer::new();
1467 let mock_clone = mock.clone();
1468
1469 let subscriber = tracing_subscriber::registry()
1470 .with(mock)
1471 .with(tracing_subscriber::fmt::layer().with_filter(layer.clone()));
1472
1473 tracing::subscriber::with_default(subscriber, || {
1474 let sig = EventSignature::simple("INFO", "test_message");
1475 for _ in 0..10 {
1476 layer.should_allow(sig);
1477 }
1478 });
1479
1480 tokio::time::sleep(Duration::from_millis(250)).await;
1482
1483 let events = mock_clone.get_captured();
1485 let summary_count = events
1486 .iter()
1487 .filter(|e| e.message.contains("suppressed"))
1488 .count();
1489
1490 assert_eq!(
1491 summary_count, 0,
1492 "Should not emit summaries when active emission is disabled"
1493 );
1494
1495 layer.shutdown().await.expect("shutdown failed");
1497 }
1498
1499 #[cfg(feature = "async")]
1500 #[tokio::test]
1501 async fn test_shutdown_without_emission() {
1502 let layer = TracingRateLimitLayer::new();
1504
1505 layer
1507 .shutdown()
1508 .await
1509 .expect("shutdown should succeed when emitter not running");
1510 }
1511
1512 #[cfg(feature = "async")]
1513 #[tokio::test]
1514 async fn test_custom_summary_formatter() {
1515 use std::sync::atomic::{AtomicUsize, Ordering};
1516 use std::time::Duration;
1517
1518 let call_count = Arc::new(AtomicUsize::new(0));
1520 let count_clone = Arc::clone(&call_count);
1521
1522 let last_count = Arc::new(AtomicUsize::new(0));
1524 let last_count_clone = Arc::clone(&last_count);
1525
1526 let layer = TracingRateLimitLayer::builder()
1528 .with_policy(Policy::count_based(2).unwrap())
1529 .with_active_emission(true)
1530 .with_summary_interval(Duration::from_millis(100))
1531 .with_summary_formatter(Arc::new(move |summary| {
1532 count_clone.fetch_add(1, Ordering::SeqCst);
1533 last_count_clone.store(summary.count, Ordering::SeqCst);
1534 tracing::info!(
1536 sig = %summary.signature,
1537 suppressed = summary.count,
1538 "Custom format"
1539 );
1540 }))
1541 .build()
1542 .unwrap();
1543
1544 let sig = EventSignature::simple("INFO", "test_message");
1546 for _ in 0..10 {
1547 layer.should_allow(sig);
1548 }
1549
1550 tokio::time::sleep(Duration::from_millis(250)).await;
1552
1553 let calls = call_count.load(Ordering::SeqCst);
1555 assert!(calls > 0, "Custom formatter should have been called");
1556
1557 let count = last_count.load(Ordering::SeqCst);
1559 assert!(
1560 count >= 8,
1561 "Expected at least 8 suppressions, got {}",
1562 count
1563 );
1564
1565 layer.shutdown().await.expect("shutdown failed");
1566 }
1567
1568 #[cfg(feature = "async")]
1569 #[tokio::test]
1570 async fn test_default_formatter_used() {
1571 use std::sync::atomic::{AtomicUsize, Ordering};
1572 use std::time::Duration;
1573
1574 let emission_count = Arc::new(AtomicUsize::new(0));
1575 let count_clone = Arc::clone(&emission_count);
1576
1577 let storage = Arc::new(ShardedStorage::new());
1578 let clock = Arc::new(SystemClock::new());
1579 let policy = Policy::count_based(2).unwrap();
1580 let registry = SuppressionRegistry::new(storage, clock, policy);
1581
1582 let emitter_config = EmitterConfig::new(Duration::from_millis(100)).unwrap();
1583 let emitter = SummaryEmitter::new(registry.clone(), emitter_config);
1584
1585 let handle = emitter.start(
1587 move |summaries| {
1588 count_clone.fetch_add(summaries.len(), Ordering::SeqCst);
1589 },
1590 false,
1591 );
1592
1593 let sig = EventSignature::simple("INFO", "test_message");
1594 for _ in 0..10 {
1595 registry.with_event_state(sig, |state, now| {
1596 state.counter.record_suppression(now);
1597 });
1598 }
1599
1600 tokio::time::sleep(Duration::from_millis(250)).await;
1601
1602 let count = emission_count.load(Ordering::SeqCst);
1603 assert!(count > 0, "Default formatter should have emitted summaries");
1604
1605 handle.shutdown().await.expect("shutdown failed");
1606 }
1607
1608 #[cfg(feature = "async")]
1609 #[tokio::test]
1610 async fn test_summary_emission_not_recursively_throttled() {
1611 use std::time::Duration;
1612
1613 let layer = TracingRateLimitLayer::builder()
1615 .with_policy(Policy::count_based(1).unwrap())
1616 .with_active_emission(true)
1617 .with_summary_interval(Duration::from_millis(100))
1618 .build()
1619 .unwrap();
1620
1621 let layer_clone = layer.clone();
1622
1623 let subscriber = tracing_subscriber::registry()
1624 .with(tracing_subscriber::fmt::layer().with_filter(layer));
1625
1626 tracing::subscriber::with_default(subscriber, || {
1631 for _ in 0..5 {
1633 tracing::info!(target: "myapp", "repetitive event");
1634 }
1635 });
1636
1637 tokio::time::sleep(Duration::from_millis(250)).await;
1639
1640 assert_eq!(
1643 layer_clone.signature_count(),
1644 1,
1645 "Summary emissions should not create additional throttle signatures"
1646 );
1647
1648 layer_clone.shutdown().await.expect("shutdown failed");
1649 }
1650}