1use crate::application::{
7 circuit_breaker::CircuitBreaker,
8 emitter::EmitterConfig,
9 limiter::{LimitDecision, RateLimiter},
10 metrics::Metrics,
11 ports::{Clock, Storage},
12 registry::{EventState, SuppressionRegistry},
13};
14use crate::domain::{policy::Policy, signature::EventSignature};
15use crate::infrastructure::clock::SystemClock;
16use crate::infrastructure::storage::ShardedStorage;
17use crate::infrastructure::visitor::FieldVisitor;
18
19use std::collections::{BTreeMap, BTreeSet};
20use std::sync::Arc;
21use std::time::Duration;
22use tracing::{Metadata, Subscriber};
23use tracing_subscriber::layer::Filter;
24use tracing_subscriber::registry::LookupSpan;
25use tracing_subscriber::{layer::Context, Layer};
26
27#[cfg(feature = "async")]
28use crate::application::emitter::{EmitterHandle, SummaryEmitter};
29
30#[cfg(feature = "async")]
31use crate::domain::summary::SuppressionSummary;
32
33#[cfg(feature = "async")]
34use std::sync::Mutex;
35
36#[cfg(feature = "async")]
41pub type SummaryFormatter = Arc<dyn Fn(&SuppressionSummary) + Send + Sync + 'static>;
42
43#[derive(Debug, Clone, PartialEq, Eq)]
45pub enum BuildError {
46 ZeroMaxSignatures,
48 EmitterConfig(crate::application::emitter::EmitterConfigError),
50}
51
52impl std::fmt::Display for BuildError {
53 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
54 match self {
55 BuildError::ZeroMaxSignatures => {
56 write!(f, "max_signatures must be greater than 0")
57 }
58 BuildError::EmitterConfig(e) => {
59 write!(f, "emitter configuration error: {}", e)
60 }
61 }
62 }
63}
64
65impl std::error::Error for BuildError {}
66
67impl From<crate::application::emitter::EmitterConfigError> for BuildError {
68 fn from(e: crate::application::emitter::EmitterConfigError) -> Self {
69 BuildError::EmitterConfig(e)
70 }
71}
72
73pub struct TracingRateLimitLayerBuilder {
75 policy: Policy,
76 summary_interval: Duration,
77 clock: Option<Arc<dyn Clock>>,
78 max_signatures: Option<usize>,
79 enable_active_emission: bool,
80 #[cfg(feature = "async")]
81 summary_formatter: Option<SummaryFormatter>,
82 span_context_fields: Vec<String>,
83 event_fields: Vec<String>,
84}
85
86impl TracingRateLimitLayerBuilder {
87 pub fn with_policy(mut self, policy: Policy) -> Self {
89 self.policy = policy;
90 self
91 }
92
93 pub fn with_summary_interval(mut self, interval: Duration) -> Self {
97 self.summary_interval = interval;
98 self
99 }
100
101 pub fn with_clock(mut self, clock: Arc<dyn Clock>) -> Self {
103 self.clock = Some(clock);
104 self
105 }
106
107 pub fn with_max_signatures(mut self, max_signatures: usize) -> Self {
116 self.max_signatures = Some(max_signatures);
117 self
118 }
119
120 pub fn with_unlimited_signatures(mut self) -> Self {
126 self.max_signatures = None;
127 self
128 }
129
130 pub fn with_active_emission(mut self, enabled: bool) -> Self {
151 self.enable_active_emission = enabled;
152 self
153 }
154
155 #[cfg(feature = "async")]
185 pub fn with_summary_formatter(mut self, formatter: SummaryFormatter) -> Self {
186 self.summary_formatter = Some(formatter);
187 self
188 }
189
190 pub fn with_span_context_fields(mut self, fields: Vec<String>) -> Self {
227 let unique_fields: BTreeSet<_> = fields.into_iter().filter(|f| !f.is_empty()).collect();
229 self.span_context_fields = unique_fields.into_iter().collect();
230 self
231 }
232
233 pub fn with_event_fields(mut self, fields: Vec<String>) -> Self {
267 let unique_fields: BTreeSet<_> = fields.into_iter().filter(|f| !f.is_empty()).collect();
269 self.event_fields = unique_fields.into_iter().collect();
270 self
271 }
272
273 pub fn build(self) -> Result<TracingRateLimitLayer, BuildError> {
278 if let Some(max) = self.max_signatures {
280 if max == 0 {
281 return Err(BuildError::ZeroMaxSignatures);
282 }
283 }
284
285 let metrics = Metrics::new();
287 let circuit_breaker = Arc::new(CircuitBreaker::new());
288
289 let clock = self.clock.unwrap_or_else(|| Arc::new(SystemClock::new()));
290 let storage = if let Some(max) = self.max_signatures {
291 Arc::new(ShardedStorage::with_max_entries(max).with_metrics(metrics.clone()))
292 } else {
293 Arc::new(ShardedStorage::new().with_metrics(metrics.clone()))
294 };
295 let registry = SuppressionRegistry::new(storage, clock, self.policy);
296 let limiter = RateLimiter::new(registry.clone(), metrics.clone(), circuit_breaker);
297
298 let emitter_config = EmitterConfig::new(self.summary_interval)?;
300
301 #[cfg(feature = "async")]
302 let emitter_handle = if self.enable_active_emission {
303 let emitter = SummaryEmitter::new(registry, emitter_config);
304
305 let formatter = self.summary_formatter.unwrap_or_else(|| {
307 Arc::new(|summary: &SuppressionSummary| {
308 tracing::warn!(
309 signature = %summary.signature,
310 count = summary.count,
311 "{}",
312 summary.format_message()
313 );
314 })
315 });
316
317 let handle = emitter.start(
318 move |summaries| {
319 for summary in summaries {
320 formatter(&summary);
321 }
322 },
323 false, );
325 Arc::new(Mutex::new(Some(handle)))
326 } else {
327 Arc::new(Mutex::new(None))
328 };
329
330 Ok(TracingRateLimitLayer {
331 limiter,
332 span_context_fields: Arc::new(self.span_context_fields),
333 event_fields: Arc::new(self.event_fields),
334 #[cfg(feature = "async")]
335 emitter_handle,
336 #[cfg(not(feature = "async"))]
337 _emitter_config: emitter_config,
338 })
339 }
340}
341
342#[derive(Clone)]
350pub struct TracingRateLimitLayer<S = Arc<ShardedStorage<EventSignature, EventState>>>
351where
352 S: Storage<EventSignature, EventState> + Clone,
353{
354 limiter: RateLimiter<S>,
355 span_context_fields: Arc<Vec<String>>,
356 event_fields: Arc<Vec<String>>,
357 #[cfg(feature = "async")]
358 emitter_handle: Arc<Mutex<Option<EmitterHandle>>>,
359 #[cfg(not(feature = "async"))]
360 _emitter_config: EmitterConfig,
361}
362
363impl<S> TracingRateLimitLayer<S>
364where
365 S: Storage<EventSignature, EventState> + Clone,
366{
367 fn extract_span_context<Sub>(&self, cx: &Context<'_, Sub>) -> BTreeMap<String, String>
369 where
370 Sub: Subscriber + for<'lookup> LookupSpan<'lookup>,
371 {
372 if self.span_context_fields.is_empty() {
373 return BTreeMap::new();
374 }
375
376 let mut context_fields = BTreeMap::new();
377
378 if let Some(span) = cx.lookup_current() {
379 for span_ref in span.scope() {
380 let extensions = span_ref.extensions();
381
382 if let Some(stored_fields) = extensions.get::<BTreeMap<String, String>>() {
383 for field_name in self.span_context_fields.as_ref() {
384 if !context_fields.contains_key(field_name) {
385 if let Some(value) = stored_fields.get(field_name) {
386 context_fields.insert(field_name.clone(), value.clone());
387 }
388 }
389 }
390 }
391
392 if context_fields.len() == self.span_context_fields.len() {
393 break;
394 }
395 }
396 }
397
398 context_fields
399 }
400
401 fn extract_event_fields(&self, event: &tracing::Event<'_>) -> BTreeMap<String, String> {
403 if self.event_fields.is_empty() {
404 return BTreeMap::new();
405 }
406
407 let mut visitor = FieldVisitor::new();
408 event.record(&mut visitor);
409 let all_fields = visitor.into_fields();
410
411 self.event_fields
413 .iter()
414 .filter_map(|field_name| {
415 all_fields
416 .get(field_name)
417 .map(|value| (field_name.clone(), value.clone()))
418 })
419 .collect()
420 }
421
422 fn compute_signature(
431 &self,
432 metadata: &Metadata,
433 combined_fields: &BTreeMap<String, String>,
434 ) -> EventSignature {
435 let level = metadata.level().as_str();
436 let message = metadata.name();
437 let target = Some(metadata.target());
438
439 EventSignature::new(level, message, combined_fields, target)
441 }
442
443 pub fn should_allow(&self, signature: EventSignature) -> bool {
445 matches!(self.limiter.check_event(signature), LimitDecision::Allow)
446 }
447
448 pub fn limiter(&self) -> &RateLimiter<S> {
450 &self.limiter
451 }
452
453 pub fn metrics(&self) -> &Metrics {
460 self.limiter.metrics()
461 }
462
463 pub fn signature_count(&self) -> usize {
465 self.limiter.registry().len()
466 }
467
468 pub fn circuit_breaker(&self) -> &Arc<CircuitBreaker> {
474 self.limiter.circuit_breaker()
475 }
476
477 #[cfg(feature = "async")]
505 pub async fn shutdown(&self) -> Result<(), crate::application::emitter::ShutdownError> {
506 let handle = {
508 let mut handle_guard = self.emitter_handle.lock().unwrap();
509 handle_guard.take()
510 };
511
512 if let Some(handle) = handle {
513 handle.shutdown().await?;
514 }
515 Ok(())
516 }
517}
518
519impl TracingRateLimitLayer<Arc<ShardedStorage<EventSignature, EventState>>> {
520 pub fn builder() -> TracingRateLimitLayerBuilder {
529 TracingRateLimitLayerBuilder {
530 policy: Policy::token_bucket(50.0, 1.0)
531 .expect("default policy with 50 capacity and 1/sec refill is always valid"),
532 summary_interval: Duration::from_secs(30),
533 clock: None,
534 max_signatures: Some(10_000),
535 enable_active_emission: false,
536 #[cfg(feature = "async")]
537 summary_formatter: None,
538 span_context_fields: Vec::new(),
539 event_fields: Vec::new(),
540 }
541 }
542
543 pub fn new() -> Self {
555 Self::builder()
556 .build()
557 .expect("default configuration is always valid")
558 }
559
560 pub fn with_storage<ST>(
591 storage: ST,
592 policy: Policy,
593 clock: Arc<dyn Clock>,
594 ) -> TracingRateLimitLayer<ST>
595 where
596 ST: Storage<EventSignature, EventState> + Clone,
597 {
598 let metrics = Metrics::new();
599 let circuit_breaker = Arc::new(CircuitBreaker::new());
600 let registry = SuppressionRegistry::new(storage, clock, policy);
601 let limiter = RateLimiter::new(registry, metrics, circuit_breaker);
602
603 TracingRateLimitLayer {
604 limiter,
605 span_context_fields: Arc::new(Vec::new()),
606 event_fields: Arc::new(Vec::new()),
607 #[cfg(feature = "async")]
608 emitter_handle: Arc::new(Mutex::new(None)),
609 #[cfg(not(feature = "async"))]
610 _emitter_config: EmitterConfig::new(Duration::from_secs(30))
611 .expect("30 seconds is valid"),
612 }
613 }
614}
615
616impl Default for TracingRateLimitLayer<Arc<ShardedStorage<EventSignature, EventState>>> {
617 fn default() -> Self {
618 Self::new()
619 }
620}
621
622impl<S, Sub> Filter<Sub> for TracingRateLimitLayer<S>
624where
625 S: Storage<EventSignature, EventState> + Clone,
626 Sub: Subscriber + for<'lookup> LookupSpan<'lookup>,
627{
628 fn enabled(&self, _meta: &Metadata<'_>, _cx: &Context<'_, Sub>) -> bool {
629 true
632 }
633
634 fn event_enabled(&self, event: &tracing::Event<'_>, cx: &Context<'_, Sub>) -> bool {
635 let mut combined_fields = self.extract_span_context(cx);
637 let event_fields = self.extract_event_fields(event);
638 combined_fields.extend(event_fields);
639
640 let signature = self.compute_signature(event.metadata(), &combined_fields);
641 self.should_allow(signature)
642 }
643}
644
645impl<S, Sub> Layer<Sub> for TracingRateLimitLayer<S>
646where
647 S: Storage<EventSignature, EventState> + Clone + 'static,
648 Sub: Subscriber + for<'lookup> LookupSpan<'lookup>,
649{
650 fn on_new_span(
651 &self,
652 attrs: &tracing::span::Attributes<'_>,
653 id: &tracing::span::Id,
654 ctx: Context<'_, Sub>,
655 ) {
656 if self.span_context_fields.is_empty() {
657 return;
658 }
659
660 let mut visitor = FieldVisitor::new();
661 attrs.record(&mut visitor);
662 let fields = visitor.into_fields();
663
664 if let Some(span) = ctx.span(id) {
665 let mut extensions = span.extensions_mut();
666 extensions.insert(fields);
667 }
668 }
669}
670
671#[cfg(test)]
672mod tests {
673 use super::*;
674 use tracing::info;
675 use tracing_subscriber::layer::SubscriberExt;
676
677 #[test]
678 fn test_layer_builder() {
679 let layer = TracingRateLimitLayer::builder()
680 .with_policy(Policy::count_based(50).unwrap())
681 .with_summary_interval(Duration::from_secs(60))
682 .build()
683 .unwrap();
684
685 assert!(layer.limiter().registry().is_empty());
686 }
687
688 #[test]
689 fn test_span_context_fields_deduplication() {
690 let layer = TracingRateLimitLayer::builder()
691 .with_span_context_fields(vec![
692 "user_id".to_string(),
693 "user_id".to_string(), "tenant_id".to_string(),
695 "".to_string(), "user_id".to_string(), ])
698 .build()
699 .unwrap();
700
701 assert_eq!(layer.span_context_fields.len(), 2);
703 assert!(layer.span_context_fields.iter().any(|f| f == "user_id"));
704 assert!(layer.span_context_fields.iter().any(|f| f == "tenant_id"));
705 }
706
707 #[test]
708 fn test_event_fields_deduplication() {
709 let layer = TracingRateLimitLayer::builder()
710 .with_event_fields(vec![
711 "error_code".to_string(),
712 "error_code".to_string(), "status".to_string(),
714 "".to_string(), "error_code".to_string(), ])
717 .build()
718 .unwrap();
719
720 assert_eq!(layer.event_fields.len(), 2);
722 assert!(layer.event_fields.iter().any(|f| f == "error_code"));
723 assert!(layer.event_fields.iter().any(|f| f == "status"));
724 }
725
726 #[test]
727 fn test_layer_default() {
728 let layer = TracingRateLimitLayer::default();
729 assert!(layer.limiter().registry().is_empty());
730 }
731
732 #[test]
733 fn test_signature_computation() {
734 let _layer = TracingRateLimitLayer::new();
735
736 let sig1 = EventSignature::simple("INFO", "test_event");
738 let sig2 = EventSignature::simple("INFO", "test_event");
739
740 assert_eq!(sig1, sig2);
742 }
743
744 #[test]
745 fn test_basic_rate_limiting() {
746 let layer = TracingRateLimitLayer::builder()
747 .with_policy(Policy::count_based(2).unwrap())
748 .build()
749 .unwrap();
750
751 let sig = EventSignature::simple("INFO", "test_message");
752
753 assert!(layer.should_allow(sig));
755 assert!(layer.should_allow(sig));
756
757 assert!(!layer.should_allow(sig));
759 }
760
761 #[test]
762 fn test_layer_integration() {
763 let layer = TracingRateLimitLayer::builder()
764 .with_policy(Policy::count_based(3).unwrap())
765 .build()
766 .unwrap();
767
768 let layer_for_check = layer.clone();
770
771 let subscriber = tracing_subscriber::registry()
772 .with(tracing_subscriber::fmt::layer().with_filter(layer));
773
774 tracing::subscriber::with_default(subscriber, || {
776 for _ in 0..10 {
778 info!("test event");
779 }
780 });
781
782 assert_eq!(layer_for_check.limiter().registry().len(), 1);
786 }
787
788 #[test]
789 fn test_layer_suppression_logic() {
790 let layer = TracingRateLimitLayer::builder()
791 .with_policy(Policy::count_based(3).unwrap())
792 .build()
793 .unwrap();
794
795 let sig = EventSignature::simple("INFO", "test");
796
797 let mut allowed_count = 0;
799 for _ in 0..10 {
800 if layer.should_allow(sig) {
801 allowed_count += 1;
802 }
803 }
804
805 assert_eq!(allowed_count, 3);
806 }
807
808 #[test]
809 fn test_builder_zero_summary_interval() {
810 let result = TracingRateLimitLayer::builder()
811 .with_summary_interval(Duration::from_secs(0))
812 .build();
813
814 assert!(matches!(
815 result,
816 Err(BuildError::EmitterConfig(
817 crate::application::emitter::EmitterConfigError::ZeroSummaryInterval
818 ))
819 ));
820 }
821
822 #[test]
823 fn test_builder_zero_max_signatures() {
824 let result = TracingRateLimitLayer::builder()
825 .with_max_signatures(0)
826 .build();
827
828 assert!(matches!(result, Err(BuildError::ZeroMaxSignatures)));
829 }
830
831 #[test]
832 fn test_builder_valid_max_signatures() {
833 let layer = TracingRateLimitLayer::builder()
834 .with_max_signatures(100)
835 .build()
836 .unwrap();
837
838 assert!(layer.limiter().registry().is_empty());
839 }
840
841 #[test]
842 fn test_metrics_tracking() {
843 let layer = TracingRateLimitLayer::builder()
844 .with_policy(Policy::count_based(2).unwrap())
845 .build()
846 .unwrap();
847
848 let sig = EventSignature::simple("INFO", "test");
849
850 assert_eq!(layer.metrics().events_allowed(), 0);
852 assert_eq!(layer.metrics().events_suppressed(), 0);
853
854 assert!(layer.should_allow(sig));
856 assert!(layer.should_allow(sig));
857
858 assert_eq!(layer.metrics().events_allowed(), 2);
860 assert_eq!(layer.metrics().events_suppressed(), 0);
861
862 assert!(!layer.should_allow(sig));
864
865 assert_eq!(layer.metrics().events_allowed(), 2);
867 assert_eq!(layer.metrics().events_suppressed(), 1);
868 }
869
870 #[test]
871 fn test_metrics_snapshot() {
872 let layer = TracingRateLimitLayer::builder()
873 .with_policy(Policy::count_based(3).unwrap())
874 .build()
875 .unwrap();
876
877 let sig = EventSignature::simple("INFO", "test");
878
879 for _ in 0..5 {
881 layer.should_allow(sig);
882 }
883
884 let snapshot = layer.metrics().snapshot();
886 assert_eq!(snapshot.events_allowed, 3);
887 assert_eq!(snapshot.events_suppressed, 2);
888 assert_eq!(snapshot.total_events(), 5);
889 assert!((snapshot.suppression_rate() - 0.4).abs() < f64::EPSILON);
890 }
891
892 #[test]
893 fn test_signature_count() {
894 let layer = TracingRateLimitLayer::builder()
895 .with_policy(Policy::count_based(2).unwrap())
896 .build()
897 .unwrap();
898
899 assert_eq!(layer.signature_count(), 0);
900
901 let sig1 = EventSignature::simple("INFO", "test1");
902 let sig2 = EventSignature::simple("INFO", "test2");
903
904 layer.should_allow(sig1);
905 assert_eq!(layer.signature_count(), 1);
906
907 layer.should_allow(sig2);
908 assert_eq!(layer.signature_count(), 2);
909
910 layer.should_allow(sig1);
912 assert_eq!(layer.signature_count(), 2);
913 }
914
915 #[test]
916 fn test_metrics_with_eviction() {
917 let layer = TracingRateLimitLayer::builder()
918 .with_policy(Policy::count_based(1).unwrap())
919 .with_max_signatures(3)
920 .build()
921 .unwrap();
922
923 for i in 0..3 {
925 let sig = EventSignature::simple("INFO", &format!("test{}", i));
926 layer.should_allow(sig);
927 }
928
929 assert_eq!(layer.signature_count(), 3);
930 assert_eq!(layer.metrics().signatures_evicted(), 0);
931
932 let sig = EventSignature::simple("INFO", "test3");
934 layer.should_allow(sig);
935
936 assert_eq!(layer.signature_count(), 3);
937 assert_eq!(layer.metrics().signatures_evicted(), 1);
938 }
939
940 #[test]
941 fn test_circuit_breaker_observability() {
942 use crate::application::circuit_breaker::CircuitState;
943
944 let layer = TracingRateLimitLayer::builder()
945 .with_policy(Policy::count_based(2).unwrap())
946 .build()
947 .unwrap();
948
949 let cb = layer.circuit_breaker();
951 assert_eq!(cb.state(), CircuitState::Closed);
952 assert_eq!(cb.consecutive_failures(), 0);
953
954 let sig = EventSignature::simple("INFO", "test");
956 layer.should_allow(sig);
957 layer.should_allow(sig);
958 layer.should_allow(sig);
959
960 assert_eq!(cb.state(), CircuitState::Closed);
961 }
962
963 #[test]
964 fn test_circuit_breaker_fail_open_integration() {
965 use crate::application::circuit_breaker::{
966 CircuitBreaker, CircuitBreakerConfig, CircuitState,
967 };
968 use std::time::Duration;
969
970 let cb_config = CircuitBreakerConfig {
972 failure_threshold: 2,
973 recovery_timeout: Duration::from_secs(1),
974 };
975 let circuit_breaker = Arc::new(CircuitBreaker::with_config(cb_config));
976
977 let storage = Arc::new(ShardedStorage::new());
979 let clock = Arc::new(SystemClock::new());
980 let policy = Policy::count_based(2).unwrap();
981 let registry = SuppressionRegistry::new(storage, clock, policy);
982 let metrics = Metrics::new();
983 let limiter = RateLimiter::new(registry, metrics, circuit_breaker.clone());
984
985 let layer = TracingRateLimitLayer {
986 limiter,
987 span_context_fields: Arc::new(Vec::new()),
988 event_fields: Arc::new(Vec::new()),
989 #[cfg(feature = "async")]
990 emitter_handle: Arc::new(Mutex::new(None)),
991 #[cfg(not(feature = "async"))]
992 _emitter_config: crate::application::emitter::EmitterConfig::new(Duration::from_secs(
993 30,
994 ))
995 .unwrap(),
996 };
997
998 let sig = EventSignature::simple("INFO", "test");
999
1000 assert!(layer.should_allow(sig));
1002 assert!(layer.should_allow(sig));
1003 assert!(!layer.should_allow(sig));
1004
1005 assert_eq!(circuit_breaker.state(), CircuitState::Closed);
1007
1008 circuit_breaker.record_failure();
1010 circuit_breaker.record_failure();
1011
1012 assert_eq!(circuit_breaker.state(), CircuitState::Open);
1014
1015 assert!(layer.should_allow(sig));
1018 assert!(layer.should_allow(sig));
1019 assert!(layer.should_allow(sig));
1020
1021 let snapshot = layer.metrics().snapshot();
1023 assert!(snapshot.events_allowed >= 5); }
1025
1026 #[cfg(feature = "async")]
1027 #[tokio::test]
1028 async fn test_active_emission_integration() {
1029 use std::sync::atomic::{AtomicUsize, Ordering};
1030 use std::time::Duration;
1031
1032 let emission_count = Arc::new(AtomicUsize::new(0));
1034 let count_clone = Arc::clone(&emission_count);
1035
1036 let storage = Arc::new(ShardedStorage::new());
1038 let clock = Arc::new(SystemClock::new());
1039 let policy = Policy::count_based(2).unwrap();
1040 let registry = SuppressionRegistry::new(storage, clock, policy);
1041
1042 let emitter_config = EmitterConfig::new(Duration::from_millis(100)).unwrap();
1043 let emitter = SummaryEmitter::new(registry.clone(), emitter_config);
1044
1045 let handle = emitter.start(
1047 move |summaries| {
1048 count_clone.fetch_add(summaries.len(), Ordering::SeqCst);
1049 },
1050 false,
1051 );
1052
1053 let sig = EventSignature::simple("INFO", "test_message");
1055 for _ in 0..10 {
1056 registry.with_event_state(sig, |state, now| {
1057 state.counter.record_suppression(now);
1058 });
1059 }
1060
1061 tokio::time::sleep(Duration::from_millis(250)).await;
1063
1064 let count = emission_count.load(Ordering::SeqCst);
1066 assert!(
1067 count > 0,
1068 "Expected at least one suppression summary to be emitted, got {}",
1069 count
1070 );
1071
1072 handle.shutdown().await.expect("shutdown failed");
1074 }
1075
1076 #[cfg(feature = "async")]
1077 #[tokio::test]
1078 async fn test_active_emission_disabled() {
1079 use crate::infrastructure::mocks::layer::MockCaptureLayer;
1080 use std::time::Duration;
1081
1082 let layer = TracingRateLimitLayer::builder()
1084 .with_policy(Policy::count_based(2).unwrap())
1085 .with_summary_interval(Duration::from_millis(100))
1086 .build()
1087 .unwrap();
1088
1089 let mock = MockCaptureLayer::new();
1090 let mock_clone = mock.clone();
1091
1092 let subscriber = tracing_subscriber::registry()
1093 .with(mock)
1094 .with(tracing_subscriber::fmt::layer().with_filter(layer.clone()));
1095
1096 tracing::subscriber::with_default(subscriber, || {
1097 let sig = EventSignature::simple("INFO", "test_message");
1098 for _ in 0..10 {
1099 layer.should_allow(sig);
1100 }
1101 });
1102
1103 tokio::time::sleep(Duration::from_millis(250)).await;
1105
1106 let events = mock_clone.get_captured();
1108 let summary_count = events
1109 .iter()
1110 .filter(|e| e.message.contains("suppressed"))
1111 .count();
1112
1113 assert_eq!(
1114 summary_count, 0,
1115 "Should not emit summaries when active emission is disabled"
1116 );
1117
1118 layer.shutdown().await.expect("shutdown failed");
1120 }
1121
1122 #[cfg(feature = "async")]
1123 #[tokio::test]
1124 async fn test_shutdown_without_emission() {
1125 let layer = TracingRateLimitLayer::new();
1127
1128 layer
1130 .shutdown()
1131 .await
1132 .expect("shutdown should succeed when emitter not running");
1133 }
1134
1135 #[cfg(feature = "async")]
1136 #[tokio::test]
1137 async fn test_custom_summary_formatter() {
1138 use std::sync::atomic::{AtomicUsize, Ordering};
1139 use std::time::Duration;
1140
1141 let call_count = Arc::new(AtomicUsize::new(0));
1143 let count_clone = Arc::clone(&call_count);
1144
1145 let last_count = Arc::new(AtomicUsize::new(0));
1147 let last_count_clone = Arc::clone(&last_count);
1148
1149 let layer = TracingRateLimitLayer::builder()
1151 .with_policy(Policy::count_based(2).unwrap())
1152 .with_active_emission(true)
1153 .with_summary_interval(Duration::from_millis(100))
1154 .with_summary_formatter(Arc::new(move |summary| {
1155 count_clone.fetch_add(1, Ordering::SeqCst);
1156 last_count_clone.store(summary.count, Ordering::SeqCst);
1157 tracing::info!(
1159 sig = %summary.signature,
1160 suppressed = summary.count,
1161 "Custom format"
1162 );
1163 }))
1164 .build()
1165 .unwrap();
1166
1167 let sig = EventSignature::simple("INFO", "test_message");
1169 for _ in 0..10 {
1170 layer.should_allow(sig);
1171 }
1172
1173 tokio::time::sleep(Duration::from_millis(250)).await;
1175
1176 let calls = call_count.load(Ordering::SeqCst);
1178 assert!(calls > 0, "Custom formatter should have been called");
1179
1180 let count = last_count.load(Ordering::SeqCst);
1182 assert!(
1183 count >= 8,
1184 "Expected at least 8 suppressions, got {}",
1185 count
1186 );
1187
1188 layer.shutdown().await.expect("shutdown failed");
1189 }
1190
1191 #[cfg(feature = "async")]
1192 #[tokio::test]
1193 async fn test_default_formatter_used() {
1194 use std::sync::atomic::{AtomicUsize, Ordering};
1195 use std::time::Duration;
1196
1197 let emission_count = Arc::new(AtomicUsize::new(0));
1198 let count_clone = Arc::clone(&emission_count);
1199
1200 let storage = Arc::new(ShardedStorage::new());
1201 let clock = Arc::new(SystemClock::new());
1202 let policy = Policy::count_based(2).unwrap();
1203 let registry = SuppressionRegistry::new(storage, clock, policy);
1204
1205 let emitter_config = EmitterConfig::new(Duration::from_millis(100)).unwrap();
1206 let emitter = SummaryEmitter::new(registry.clone(), emitter_config);
1207
1208 let handle = emitter.start(
1210 move |summaries| {
1211 count_clone.fetch_add(summaries.len(), Ordering::SeqCst);
1212 },
1213 false,
1214 );
1215
1216 let sig = EventSignature::simple("INFO", "test_message");
1217 for _ in 0..10 {
1218 registry.with_event_state(sig, |state, now| {
1219 state.counter.record_suppression(now);
1220 });
1221 }
1222
1223 tokio::time::sleep(Duration::from_millis(250)).await;
1224
1225 let count = emission_count.load(Ordering::SeqCst);
1226 assert!(count > 0, "Default formatter should have emitted summaries");
1227
1228 handle.shutdown().await.expect("shutdown failed");
1229 }
1230}