1use crate::application::{
7 circuit_breaker::CircuitBreaker,
8 emitter::EmitterConfig,
9 limiter::{LimitDecision, RateLimiter},
10 metrics::Metrics,
11 ports::{Clock, Storage},
12 registry::{EventState, SuppressionRegistry},
13};
14use crate::domain::{policy::Policy, signature::EventSignature};
15use crate::infrastructure::clock::SystemClock;
16use crate::infrastructure::storage::ShardedStorage;
17use crate::infrastructure::visitor::FieldVisitor;
18
19use std::collections::{BTreeMap, BTreeSet};
20use std::sync::Arc;
21use std::time::Duration;
22use tracing::{Metadata, Subscriber};
23use tracing_subscriber::layer::Filter;
24use tracing_subscriber::registry::LookupSpan;
25use tracing_subscriber::{layer::Context, Layer};
26
27#[cfg(feature = "async")]
28use crate::application::emitter::{EmitterHandle, SummaryEmitter};
29
30#[cfg(feature = "async")]
31use crate::domain::summary::SuppressionSummary;
32
33#[cfg(feature = "async")]
34use std::sync::Mutex;
35
36#[cfg(feature = "async")]
41pub type SummaryFormatter = Arc<dyn Fn(&SuppressionSummary) + Send + Sync + 'static>;
42
43#[derive(Debug, Clone, PartialEq, Eq)]
45pub enum BuildError {
46 ZeroMaxSignatures,
48 EmitterConfig(crate::application::emitter::EmitterConfigError),
50}
51
52impl std::fmt::Display for BuildError {
53 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
54 match self {
55 BuildError::ZeroMaxSignatures => {
56 write!(f, "max_signatures must be greater than 0")
57 }
58 BuildError::EmitterConfig(e) => {
59 write!(f, "emitter configuration error: {}", e)
60 }
61 }
62 }
63}
64
65impl std::error::Error for BuildError {}
66
67impl From<crate::application::emitter::EmitterConfigError> for BuildError {
68 fn from(e: crate::application::emitter::EmitterConfigError) -> Self {
69 BuildError::EmitterConfig(e)
70 }
71}
72
73pub struct TracingRateLimitLayerBuilder {
75 policy: Policy,
76 summary_interval: Duration,
77 clock: Option<Arc<dyn Clock>>,
78 max_signatures: Option<usize>,
79 enable_active_emission: bool,
80 #[cfg(feature = "async")]
81 summary_formatter: Option<SummaryFormatter>,
82 span_context_fields: Vec<String>,
83 event_fields: Vec<String>,
84 eviction_strategy:
85 Option<crate::infrastructure::eviction::EvictionStrategy<EventSignature, EventState>>,
86}
87
88impl TracingRateLimitLayerBuilder {
89 pub fn with_policy(mut self, policy: Policy) -> Self {
91 self.policy = policy;
92 self
93 }
94
95 pub fn with_summary_interval(mut self, interval: Duration) -> Self {
99 self.summary_interval = interval;
100 self
101 }
102
103 pub fn with_clock(mut self, clock: Arc<dyn Clock>) -> Self {
105 self.clock = Some(clock);
106 self
107 }
108
109 pub fn with_max_signatures(mut self, max_signatures: usize) -> Self {
118 self.max_signatures = Some(max_signatures);
119 self
120 }
121
122 pub fn with_unlimited_signatures(mut self) -> Self {
128 self.max_signatures = None;
129 self
130 }
131
132 pub fn with_active_emission(mut self, enabled: bool) -> Self {
153 self.enable_active_emission = enabled;
154 self
155 }
156
157 #[cfg(feature = "async")]
187 pub fn with_summary_formatter(mut self, formatter: SummaryFormatter) -> Self {
188 self.summary_formatter = Some(formatter);
189 self
190 }
191
192 pub fn with_span_context_fields(mut self, fields: Vec<String>) -> Self {
229 let unique_fields: BTreeSet<_> = fields.into_iter().filter(|f| !f.is_empty()).collect();
231 self.span_context_fields = unique_fields.into_iter().collect();
232 self
233 }
234
235 pub fn with_event_fields(mut self, fields: Vec<String>) -> Self {
269 let unique_fields: BTreeSet<_> = fields.into_iter().filter(|f| !f.is_empty()).collect();
271 self.event_fields = unique_fields.into_iter().collect();
272 self
273 }
274
275 pub fn with_eviction_strategy(
312 mut self,
313 strategy: crate::infrastructure::eviction::EvictionStrategy<EventSignature, EventState>,
314 ) -> Self {
315 self.eviction_strategy = Some(strategy);
316 self
317 }
318
319 pub fn build(self) -> Result<TracingRateLimitLayer, BuildError> {
324 if let Some(max) = self.max_signatures {
326 if max == 0 {
327 return Err(BuildError::ZeroMaxSignatures);
328 }
329 }
330
331 let metrics = Metrics::new();
333 let circuit_breaker = Arc::new(CircuitBreaker::new());
334
335 let clock = self.clock.unwrap_or_else(|| Arc::new(SystemClock::new()));
336 let mut storage = if let Some(max) = self.max_signatures {
337 ShardedStorage::with_max_entries(max).with_metrics(metrics.clone())
338 } else {
339 ShardedStorage::new().with_metrics(metrics.clone())
340 };
341
342 if let Some(strategy) = self.eviction_strategy {
344 storage = storage.with_eviction_strategy(strategy);
345 }
346
347 let storage = Arc::new(storage);
348 let registry = SuppressionRegistry::new(storage, clock, self.policy);
349 let limiter = RateLimiter::new(registry.clone(), metrics.clone(), circuit_breaker);
350
351 let emitter_config = EmitterConfig::new(self.summary_interval)?;
353
354 #[cfg(feature = "async")]
355 let emitter_handle = if self.enable_active_emission {
356 let emitter = SummaryEmitter::new(registry, emitter_config);
357
358 let formatter = self.summary_formatter.unwrap_or_else(|| {
360 Arc::new(|summary: &SuppressionSummary| {
361 tracing::warn!(
362 signature = %summary.signature,
363 count = summary.count,
364 "{}",
365 summary.format_message()
366 );
367 })
368 });
369
370 let handle = emitter.start(
371 move |summaries| {
372 for summary in summaries {
373 formatter(&summary);
374 }
375 },
376 false, );
378 Arc::new(Mutex::new(Some(handle)))
379 } else {
380 Arc::new(Mutex::new(None))
381 };
382
383 Ok(TracingRateLimitLayer {
384 limiter,
385 span_context_fields: Arc::new(self.span_context_fields),
386 event_fields: Arc::new(self.event_fields),
387 #[cfg(feature = "async")]
388 emitter_handle,
389 #[cfg(not(feature = "async"))]
390 _emitter_config: emitter_config,
391 })
392 }
393}
394
395#[derive(Clone)]
403pub struct TracingRateLimitLayer<S = Arc<ShardedStorage<EventSignature, EventState>>>
404where
405 S: Storage<EventSignature, EventState> + Clone,
406{
407 limiter: RateLimiter<S>,
408 span_context_fields: Arc<Vec<String>>,
409 event_fields: Arc<Vec<String>>,
410 #[cfg(feature = "async")]
411 emitter_handle: Arc<Mutex<Option<EmitterHandle>>>,
412 #[cfg(not(feature = "async"))]
413 _emitter_config: EmitterConfig,
414}
415
416impl<S> TracingRateLimitLayer<S>
417where
418 S: Storage<EventSignature, EventState> + Clone,
419{
420 fn extract_span_context<Sub>(&self, cx: &Context<'_, Sub>) -> BTreeMap<String, String>
422 where
423 Sub: Subscriber + for<'lookup> LookupSpan<'lookup>,
424 {
425 if self.span_context_fields.is_empty() {
426 return BTreeMap::new();
427 }
428
429 let mut context_fields = BTreeMap::new();
430
431 if let Some(span) = cx.lookup_current() {
432 for span_ref in span.scope() {
433 let extensions = span_ref.extensions();
434
435 if let Some(stored_fields) = extensions.get::<BTreeMap<String, String>>() {
436 for field_name in self.span_context_fields.as_ref() {
437 if !context_fields.contains_key(field_name) {
438 if let Some(value) = stored_fields.get(field_name) {
439 context_fields.insert(field_name.clone(), value.clone());
440 }
441 }
442 }
443 }
444
445 if context_fields.len() == self.span_context_fields.len() {
446 break;
447 }
448 }
449 }
450
451 context_fields
452 }
453
454 fn extract_event_fields(&self, event: &tracing::Event<'_>) -> BTreeMap<String, String> {
456 if self.event_fields.is_empty() {
457 return BTreeMap::new();
458 }
459
460 let mut visitor = FieldVisitor::new();
461 event.record(&mut visitor);
462 let all_fields = visitor.into_fields();
463
464 self.event_fields
466 .iter()
467 .filter_map(|field_name| {
468 all_fields
469 .get(field_name)
470 .map(|value| (field_name.clone(), value.clone()))
471 })
472 .collect()
473 }
474
475 fn compute_signature(
484 &self,
485 metadata: &Metadata,
486 combined_fields: &BTreeMap<String, String>,
487 ) -> EventSignature {
488 let level = metadata.level().as_str();
489 let message = metadata.name();
490 let target = Some(metadata.target());
491
492 EventSignature::new(level, message, combined_fields, target)
494 }
495
496 pub fn should_allow(&self, signature: EventSignature) -> bool {
498 matches!(self.limiter.check_event(signature), LimitDecision::Allow)
499 }
500
501 #[cfg(feature = "human-readable")]
508 pub fn should_allow_with_metadata(
509 &self,
510 signature: EventSignature,
511 metadata: crate::domain::metadata::EventMetadata,
512 ) -> bool {
513 matches!(
514 self.limiter.check_event_with_metadata(signature, metadata),
515 LimitDecision::Allow
516 )
517 }
518
519 pub fn limiter(&self) -> &RateLimiter<S> {
521 &self.limiter
522 }
523
524 pub fn metrics(&self) -> &Metrics {
531 self.limiter.metrics()
532 }
533
534 pub fn signature_count(&self) -> usize {
536 self.limiter.registry().len()
537 }
538
539 pub fn circuit_breaker(&self) -> &Arc<CircuitBreaker> {
545 self.limiter.circuit_breaker()
546 }
547
548 #[cfg(feature = "async")]
576 pub async fn shutdown(&self) -> Result<(), crate::application::emitter::ShutdownError> {
577 let handle = {
579 let mut handle_guard = self.emitter_handle.lock().unwrap();
580 handle_guard.take()
581 };
582
583 if let Some(handle) = handle {
584 handle.shutdown().await?;
585 }
586 Ok(())
587 }
588}
589
590impl TracingRateLimitLayer<Arc<ShardedStorage<EventSignature, EventState>>> {
591 pub fn builder() -> TracingRateLimitLayerBuilder {
600 TracingRateLimitLayerBuilder {
601 policy: Policy::token_bucket(50.0, 1.0)
602 .expect("default policy with 50 capacity and 1/sec refill is always valid"),
603 summary_interval: Duration::from_secs(30),
604 clock: None,
605 max_signatures: Some(10_000),
606 enable_active_emission: false,
607 #[cfg(feature = "async")]
608 summary_formatter: None,
609 span_context_fields: Vec::new(),
610 event_fields: Vec::new(),
611 eviction_strategy: None,
612 }
613 }
614
615 pub fn new() -> Self {
627 Self::builder()
628 .build()
629 .expect("default configuration is always valid")
630 }
631
632 pub fn with_storage<ST>(
663 storage: ST,
664 policy: Policy,
665 clock: Arc<dyn Clock>,
666 ) -> TracingRateLimitLayer<ST>
667 where
668 ST: Storage<EventSignature, EventState> + Clone,
669 {
670 let metrics = Metrics::new();
671 let circuit_breaker = Arc::new(CircuitBreaker::new());
672 let registry = SuppressionRegistry::new(storage, clock, policy);
673 let limiter = RateLimiter::new(registry, metrics, circuit_breaker);
674
675 TracingRateLimitLayer {
676 limiter,
677 span_context_fields: Arc::new(Vec::new()),
678 event_fields: Arc::new(Vec::new()),
679 #[cfg(feature = "async")]
680 emitter_handle: Arc::new(Mutex::new(None)),
681 #[cfg(not(feature = "async"))]
682 _emitter_config: EmitterConfig::new(Duration::from_secs(30))
683 .expect("30 seconds is valid"),
684 }
685 }
686}
687
688impl Default for TracingRateLimitLayer<Arc<ShardedStorage<EventSignature, EventState>>> {
689 fn default() -> Self {
690 Self::new()
691 }
692}
693
694impl<S, Sub> Filter<Sub> for TracingRateLimitLayer<S>
696where
697 S: Storage<EventSignature, EventState> + Clone,
698 Sub: Subscriber + for<'lookup> LookupSpan<'lookup>,
699{
700 fn enabled(&self, _meta: &Metadata<'_>, _cx: &Context<'_, Sub>) -> bool {
701 true
704 }
705
706 fn event_enabled(&self, event: &tracing::Event<'_>, cx: &Context<'_, Sub>) -> bool {
707 let mut combined_fields = self.extract_span_context(cx);
709 let event_fields = self.extract_event_fields(event);
710 combined_fields.extend(event_fields);
711
712 let metadata_obj = event.metadata();
713 let signature = self.compute_signature(metadata_obj, &combined_fields);
714
715 #[cfg(feature = "human-readable")]
716 {
717 let mut visitor = FieldVisitor::new();
719 event.record(&mut visitor);
720 let all_fields = visitor.into_fields();
721 let message = all_fields
722 .get("message")
723 .cloned()
724 .unwrap_or_else(|| event.metadata().name().to_string());
725
726 let event_metadata = crate::domain::metadata::EventMetadata::new(
728 metadata_obj.level().as_str().to_string(),
729 message,
730 metadata_obj.target().to_string(),
731 combined_fields,
732 );
733
734 self.should_allow_with_metadata(signature, event_metadata)
735 }
736
737 #[cfg(not(feature = "human-readable"))]
738 {
739 self.should_allow(signature)
740 }
741 }
742}
743
744impl<S, Sub> Layer<Sub> for TracingRateLimitLayer<S>
745where
746 S: Storage<EventSignature, EventState> + Clone + 'static,
747 Sub: Subscriber + for<'lookup> LookupSpan<'lookup>,
748{
749 fn on_new_span(
750 &self,
751 attrs: &tracing::span::Attributes<'_>,
752 id: &tracing::span::Id,
753 ctx: Context<'_, Sub>,
754 ) {
755 if self.span_context_fields.is_empty() {
756 return;
757 }
758
759 let mut visitor = FieldVisitor::new();
760 attrs.record(&mut visitor);
761 let fields = visitor.into_fields();
762
763 if let Some(span) = ctx.span(id) {
764 let mut extensions = span.extensions_mut();
765 extensions.insert(fields);
766 }
767 }
768}
769
770#[cfg(test)]
771mod tests {
772 use super::*;
773 use tracing::info;
774 use tracing_subscriber::layer::SubscriberExt;
775
776 #[test]
777 fn test_layer_builder() {
778 let layer = TracingRateLimitLayer::builder()
779 .with_policy(Policy::count_based(50).unwrap())
780 .with_summary_interval(Duration::from_secs(60))
781 .build()
782 .unwrap();
783
784 assert!(layer.limiter().registry().is_empty());
785 }
786
787 #[test]
788 fn test_span_context_fields_deduplication() {
789 let layer = TracingRateLimitLayer::builder()
790 .with_span_context_fields(vec![
791 "user_id".to_string(),
792 "user_id".to_string(), "tenant_id".to_string(),
794 "".to_string(), "user_id".to_string(), ])
797 .build()
798 .unwrap();
799
800 assert_eq!(layer.span_context_fields.len(), 2);
802 assert!(layer.span_context_fields.iter().any(|f| f == "user_id"));
803 assert!(layer.span_context_fields.iter().any(|f| f == "tenant_id"));
804 }
805
806 #[test]
807 fn test_event_fields_deduplication() {
808 let layer = TracingRateLimitLayer::builder()
809 .with_event_fields(vec![
810 "error_code".to_string(),
811 "error_code".to_string(), "status".to_string(),
813 "".to_string(), "error_code".to_string(), ])
816 .build()
817 .unwrap();
818
819 assert_eq!(layer.event_fields.len(), 2);
821 assert!(layer.event_fields.iter().any(|f| f == "error_code"));
822 assert!(layer.event_fields.iter().any(|f| f == "status"));
823 }
824
825 #[test]
826 fn test_layer_default() {
827 let layer = TracingRateLimitLayer::default();
828 assert!(layer.limiter().registry().is_empty());
829 }
830
831 #[test]
832 fn test_signature_computation() {
833 let _layer = TracingRateLimitLayer::new();
834
835 let sig1 = EventSignature::simple("INFO", "test_event");
837 let sig2 = EventSignature::simple("INFO", "test_event");
838
839 assert_eq!(sig1, sig2);
841 }
842
843 #[test]
844 fn test_basic_rate_limiting() {
845 let layer = TracingRateLimitLayer::builder()
846 .with_policy(Policy::count_based(2).unwrap())
847 .build()
848 .unwrap();
849
850 let sig = EventSignature::simple("INFO", "test_message");
851
852 assert!(layer.should_allow(sig));
854 assert!(layer.should_allow(sig));
855
856 assert!(!layer.should_allow(sig));
858 }
859
860 #[test]
861 fn test_layer_integration() {
862 let layer = TracingRateLimitLayer::builder()
863 .with_policy(Policy::count_based(3).unwrap())
864 .build()
865 .unwrap();
866
867 let layer_for_check = layer.clone();
869
870 let subscriber = tracing_subscriber::registry()
871 .with(tracing_subscriber::fmt::layer().with_filter(layer));
872
873 tracing::subscriber::with_default(subscriber, || {
875 for _ in 0..10 {
877 info!("test event");
878 }
879 });
880
881 assert_eq!(layer_for_check.limiter().registry().len(), 1);
885 }
886
887 #[test]
888 fn test_layer_suppression_logic() {
889 let layer = TracingRateLimitLayer::builder()
890 .with_policy(Policy::count_based(3).unwrap())
891 .build()
892 .unwrap();
893
894 let sig = EventSignature::simple("INFO", "test");
895
896 let mut allowed_count = 0;
898 for _ in 0..10 {
899 if layer.should_allow(sig) {
900 allowed_count += 1;
901 }
902 }
903
904 assert_eq!(allowed_count, 3);
905 }
906
907 #[test]
908 fn test_builder_zero_summary_interval() {
909 let result = TracingRateLimitLayer::builder()
910 .with_summary_interval(Duration::from_secs(0))
911 .build();
912
913 assert!(matches!(
914 result,
915 Err(BuildError::EmitterConfig(
916 crate::application::emitter::EmitterConfigError::ZeroSummaryInterval
917 ))
918 ));
919 }
920
921 #[test]
922 fn test_builder_zero_max_signatures() {
923 let result = TracingRateLimitLayer::builder()
924 .with_max_signatures(0)
925 .build();
926
927 assert!(matches!(result, Err(BuildError::ZeroMaxSignatures)));
928 }
929
930 #[test]
931 fn test_builder_valid_max_signatures() {
932 let layer = TracingRateLimitLayer::builder()
933 .with_max_signatures(100)
934 .build()
935 .unwrap();
936
937 assert!(layer.limiter().registry().is_empty());
938 }
939
940 #[test]
941 fn test_metrics_tracking() {
942 let layer = TracingRateLimitLayer::builder()
943 .with_policy(Policy::count_based(2).unwrap())
944 .build()
945 .unwrap();
946
947 let sig = EventSignature::simple("INFO", "test");
948
949 assert_eq!(layer.metrics().events_allowed(), 0);
951 assert_eq!(layer.metrics().events_suppressed(), 0);
952
953 assert!(layer.should_allow(sig));
955 assert!(layer.should_allow(sig));
956
957 assert_eq!(layer.metrics().events_allowed(), 2);
959 assert_eq!(layer.metrics().events_suppressed(), 0);
960
961 assert!(!layer.should_allow(sig));
963
964 assert_eq!(layer.metrics().events_allowed(), 2);
966 assert_eq!(layer.metrics().events_suppressed(), 1);
967 }
968
969 #[test]
970 fn test_metrics_snapshot() {
971 let layer = TracingRateLimitLayer::builder()
972 .with_policy(Policy::count_based(3).unwrap())
973 .build()
974 .unwrap();
975
976 let sig = EventSignature::simple("INFO", "test");
977
978 for _ in 0..5 {
980 layer.should_allow(sig);
981 }
982
983 let snapshot = layer.metrics().snapshot();
985 assert_eq!(snapshot.events_allowed, 3);
986 assert_eq!(snapshot.events_suppressed, 2);
987 assert_eq!(snapshot.total_events(), 5);
988 assert!((snapshot.suppression_rate() - 0.4).abs() < f64::EPSILON);
989 }
990
991 #[test]
992 fn test_signature_count() {
993 let layer = TracingRateLimitLayer::builder()
994 .with_policy(Policy::count_based(2).unwrap())
995 .build()
996 .unwrap();
997
998 assert_eq!(layer.signature_count(), 0);
999
1000 let sig1 = EventSignature::simple("INFO", "test1");
1001 let sig2 = EventSignature::simple("INFO", "test2");
1002
1003 layer.should_allow(sig1);
1004 assert_eq!(layer.signature_count(), 1);
1005
1006 layer.should_allow(sig2);
1007 assert_eq!(layer.signature_count(), 2);
1008
1009 layer.should_allow(sig1);
1011 assert_eq!(layer.signature_count(), 2);
1012 }
1013
1014 #[test]
1015 fn test_metrics_with_eviction() {
1016 let layer = TracingRateLimitLayer::builder()
1017 .with_policy(Policy::count_based(1).unwrap())
1018 .with_max_signatures(3)
1019 .build()
1020 .unwrap();
1021
1022 for i in 0..3 {
1024 let sig = EventSignature::simple("INFO", &format!("test{}", i));
1025 layer.should_allow(sig);
1026 }
1027
1028 assert_eq!(layer.signature_count(), 3);
1029 assert_eq!(layer.metrics().signatures_evicted(), 0);
1030
1031 let sig = EventSignature::simple("INFO", "test3");
1033 layer.should_allow(sig);
1034
1035 assert_eq!(layer.signature_count(), 3);
1036 assert_eq!(layer.metrics().signatures_evicted(), 1);
1037 }
1038
1039 #[test]
1040 fn test_circuit_breaker_observability() {
1041 use crate::application::circuit_breaker::CircuitState;
1042
1043 let layer = TracingRateLimitLayer::builder()
1044 .with_policy(Policy::count_based(2).unwrap())
1045 .build()
1046 .unwrap();
1047
1048 let cb = layer.circuit_breaker();
1050 assert_eq!(cb.state(), CircuitState::Closed);
1051 assert_eq!(cb.consecutive_failures(), 0);
1052
1053 let sig = EventSignature::simple("INFO", "test");
1055 layer.should_allow(sig);
1056 layer.should_allow(sig);
1057 layer.should_allow(sig);
1058
1059 assert_eq!(cb.state(), CircuitState::Closed);
1060 }
1061
1062 #[test]
1063 fn test_circuit_breaker_fail_open_integration() {
1064 use crate::application::circuit_breaker::{
1065 CircuitBreaker, CircuitBreakerConfig, CircuitState,
1066 };
1067 use std::time::Duration;
1068
1069 let cb_config = CircuitBreakerConfig {
1071 failure_threshold: 2,
1072 recovery_timeout: Duration::from_secs(1),
1073 };
1074 let circuit_breaker = Arc::new(CircuitBreaker::with_config(cb_config));
1075
1076 let storage = Arc::new(ShardedStorage::new());
1078 let clock = Arc::new(SystemClock::new());
1079 let policy = Policy::count_based(2).unwrap();
1080 let registry = SuppressionRegistry::new(storage, clock, policy);
1081 let metrics = Metrics::new();
1082 let limiter = RateLimiter::new(registry, metrics, circuit_breaker.clone());
1083
1084 let layer = TracingRateLimitLayer {
1085 limiter,
1086 span_context_fields: Arc::new(Vec::new()),
1087 event_fields: Arc::new(Vec::new()),
1088 #[cfg(feature = "async")]
1089 emitter_handle: Arc::new(Mutex::new(None)),
1090 #[cfg(not(feature = "async"))]
1091 _emitter_config: crate::application::emitter::EmitterConfig::new(Duration::from_secs(
1092 30,
1093 ))
1094 .unwrap(),
1095 };
1096
1097 let sig = EventSignature::simple("INFO", "test");
1098
1099 assert!(layer.should_allow(sig));
1101 assert!(layer.should_allow(sig));
1102 assert!(!layer.should_allow(sig));
1103
1104 assert_eq!(circuit_breaker.state(), CircuitState::Closed);
1106
1107 circuit_breaker.record_failure();
1109 circuit_breaker.record_failure();
1110
1111 assert_eq!(circuit_breaker.state(), CircuitState::Open);
1113
1114 assert!(layer.should_allow(sig));
1117 assert!(layer.should_allow(sig));
1118 assert!(layer.should_allow(sig));
1119
1120 let snapshot = layer.metrics().snapshot();
1122 assert!(snapshot.events_allowed >= 5); }
1124
1125 #[cfg(feature = "async")]
1126 #[tokio::test]
1127 async fn test_active_emission_integration() {
1128 use std::sync::atomic::{AtomicUsize, Ordering};
1129 use std::time::Duration;
1130
1131 let emission_count = Arc::new(AtomicUsize::new(0));
1133 let count_clone = Arc::clone(&emission_count);
1134
1135 let storage = Arc::new(ShardedStorage::new());
1137 let clock = Arc::new(SystemClock::new());
1138 let policy = Policy::count_based(2).unwrap();
1139 let registry = SuppressionRegistry::new(storage, clock, policy);
1140
1141 let emitter_config = EmitterConfig::new(Duration::from_millis(100)).unwrap();
1142 let emitter = SummaryEmitter::new(registry.clone(), emitter_config);
1143
1144 let handle = emitter.start(
1146 move |summaries| {
1147 count_clone.fetch_add(summaries.len(), Ordering::SeqCst);
1148 },
1149 false,
1150 );
1151
1152 let sig = EventSignature::simple("INFO", "test_message");
1154 for _ in 0..10 {
1155 registry.with_event_state(sig, |state, now| {
1156 state.counter.record_suppression(now);
1157 });
1158 }
1159
1160 tokio::time::sleep(Duration::from_millis(250)).await;
1162
1163 let count = emission_count.load(Ordering::SeqCst);
1165 assert!(
1166 count > 0,
1167 "Expected at least one suppression summary to be emitted, got {}",
1168 count
1169 );
1170
1171 handle.shutdown().await.expect("shutdown failed");
1173 }
1174
1175 #[cfg(feature = "async")]
1176 #[tokio::test]
1177 async fn test_active_emission_disabled() {
1178 use crate::infrastructure::mocks::layer::MockCaptureLayer;
1179 use std::time::Duration;
1180
1181 let layer = TracingRateLimitLayer::builder()
1183 .with_policy(Policy::count_based(2).unwrap())
1184 .with_summary_interval(Duration::from_millis(100))
1185 .build()
1186 .unwrap();
1187
1188 let mock = MockCaptureLayer::new();
1189 let mock_clone = mock.clone();
1190
1191 let subscriber = tracing_subscriber::registry()
1192 .with(mock)
1193 .with(tracing_subscriber::fmt::layer().with_filter(layer.clone()));
1194
1195 tracing::subscriber::with_default(subscriber, || {
1196 let sig = EventSignature::simple("INFO", "test_message");
1197 for _ in 0..10 {
1198 layer.should_allow(sig);
1199 }
1200 });
1201
1202 tokio::time::sleep(Duration::from_millis(250)).await;
1204
1205 let events = mock_clone.get_captured();
1207 let summary_count = events
1208 .iter()
1209 .filter(|e| e.message.contains("suppressed"))
1210 .count();
1211
1212 assert_eq!(
1213 summary_count, 0,
1214 "Should not emit summaries when active emission is disabled"
1215 );
1216
1217 layer.shutdown().await.expect("shutdown failed");
1219 }
1220
1221 #[cfg(feature = "async")]
1222 #[tokio::test]
1223 async fn test_shutdown_without_emission() {
1224 let layer = TracingRateLimitLayer::new();
1226
1227 layer
1229 .shutdown()
1230 .await
1231 .expect("shutdown should succeed when emitter not running");
1232 }
1233
1234 #[cfg(feature = "async")]
1235 #[tokio::test]
1236 async fn test_custom_summary_formatter() {
1237 use std::sync::atomic::{AtomicUsize, Ordering};
1238 use std::time::Duration;
1239
1240 let call_count = Arc::new(AtomicUsize::new(0));
1242 let count_clone = Arc::clone(&call_count);
1243
1244 let last_count = Arc::new(AtomicUsize::new(0));
1246 let last_count_clone = Arc::clone(&last_count);
1247
1248 let layer = TracingRateLimitLayer::builder()
1250 .with_policy(Policy::count_based(2).unwrap())
1251 .with_active_emission(true)
1252 .with_summary_interval(Duration::from_millis(100))
1253 .with_summary_formatter(Arc::new(move |summary| {
1254 count_clone.fetch_add(1, Ordering::SeqCst);
1255 last_count_clone.store(summary.count, Ordering::SeqCst);
1256 tracing::info!(
1258 sig = %summary.signature,
1259 suppressed = summary.count,
1260 "Custom format"
1261 );
1262 }))
1263 .build()
1264 .unwrap();
1265
1266 let sig = EventSignature::simple("INFO", "test_message");
1268 for _ in 0..10 {
1269 layer.should_allow(sig);
1270 }
1271
1272 tokio::time::sleep(Duration::from_millis(250)).await;
1274
1275 let calls = call_count.load(Ordering::SeqCst);
1277 assert!(calls > 0, "Custom formatter should have been called");
1278
1279 let count = last_count.load(Ordering::SeqCst);
1281 assert!(
1282 count >= 8,
1283 "Expected at least 8 suppressions, got {}",
1284 count
1285 );
1286
1287 layer.shutdown().await.expect("shutdown failed");
1288 }
1289
1290 #[cfg(feature = "async")]
1291 #[tokio::test]
1292 async fn test_default_formatter_used() {
1293 use std::sync::atomic::{AtomicUsize, Ordering};
1294 use std::time::Duration;
1295
1296 let emission_count = Arc::new(AtomicUsize::new(0));
1297 let count_clone = Arc::clone(&emission_count);
1298
1299 let storage = Arc::new(ShardedStorage::new());
1300 let clock = Arc::new(SystemClock::new());
1301 let policy = Policy::count_based(2).unwrap();
1302 let registry = SuppressionRegistry::new(storage, clock, policy);
1303
1304 let emitter_config = EmitterConfig::new(Duration::from_millis(100)).unwrap();
1305 let emitter = SummaryEmitter::new(registry.clone(), emitter_config);
1306
1307 let handle = emitter.start(
1309 move |summaries| {
1310 count_clone.fetch_add(summaries.len(), Ordering::SeqCst);
1311 },
1312 false,
1313 );
1314
1315 let sig = EventSignature::simple("INFO", "test_message");
1316 for _ in 0..10 {
1317 registry.with_event_state(sig, |state, now| {
1318 state.counter.record_suppression(now);
1319 });
1320 }
1321
1322 tokio::time::sleep(Duration::from_millis(250)).await;
1323
1324 let count = emission_count.load(Ordering::SeqCst);
1325 assert!(count > 0, "Default formatter should have emitted summaries");
1326
1327 handle.shutdown().await.expect("shutdown failed");
1328 }
1329}