tracing_throttle/application/
emitter.rs

1//! Summary emission for suppressed events.
2//!
3//! Periodically collects and emits summaries of suppressed events to provide
4//! visibility into what has been rate limited.
5
6use crate::application::{
7    ports::Storage,
8    registry::{EventState, SuppressionRegistry},
9};
10use crate::domain::{signature::EventSignature, summary::SuppressionSummary};
11use std::time::Duration;
12
13#[cfg(feature = "async")]
14use tokio::time::interval;
15
16/// Error returned when emitter configuration validation fails.
17#[derive(Debug, Clone, PartialEq, Eq)]
18pub enum EmitterConfigError {
19    /// Summary interval duration must be greater than zero
20    ZeroSummaryInterval,
21}
22
23impl std::fmt::Display for EmitterConfigError {
24    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
25        match self {
26            EmitterConfigError::ZeroSummaryInterval => {
27                write!(f, "summary interval must be greater than 0")
28            }
29        }
30    }
31}
32
33impl std::error::Error for EmitterConfigError {}
34
35/// Configuration for summary emission.
36#[derive(Debug, Clone)]
37pub struct EmitterConfig {
38    /// How often to emit summaries
39    pub interval: Duration,
40    /// Minimum suppression count to include in summary
41    pub min_count: usize,
42}
43
44impl Default for EmitterConfig {
45    fn default() -> Self {
46        Self {
47            interval: Duration::from_secs(30),
48            min_count: 1,
49        }
50    }
51}
52
53impl EmitterConfig {
54    /// Create a new emitter config with the specified interval.
55    ///
56    /// # Errors
57    /// Returns `EmitterConfigError::ZeroSummaryInterval` if `interval` is zero.
58    pub fn new(interval: Duration) -> Result<Self, EmitterConfigError> {
59        if interval.is_zero() {
60            return Err(EmitterConfigError::ZeroSummaryInterval);
61        }
62        Ok(Self {
63            interval,
64            min_count: 1,
65        })
66    }
67
68    /// Set the minimum suppression count threshold.
69    pub fn with_min_count(mut self, min_count: usize) -> Self {
70        self.min_count = min_count;
71        self
72    }
73}
74
75/// Emits periodic summaries of suppressed events.
76pub struct SummaryEmitter<S>
77where
78    S: Storage<EventSignature, EventState> + Clone,
79{
80    registry: SuppressionRegistry<S>,
81    config: EmitterConfig,
82}
83
84impl<S> SummaryEmitter<S>
85where
86    S: Storage<EventSignature, EventState> + Clone,
87{
88    /// Create a new summary emitter.
89    pub fn new(registry: SuppressionRegistry<S>, config: EmitterConfig) -> Self {
90        Self { registry, config }
91    }
92
93    /// Collect current suppression summaries.
94    ///
95    /// Returns summaries for all events that have been suppressed at least
96    /// `min_count` times.
97    pub fn collect_summaries(&self) -> Vec<SuppressionSummary> {
98        let mut summaries = Vec::new();
99        let min_count = self.config.min_count;
100
101        self.registry.for_each(|signature, state| {
102            let count = state.counter.count();
103
104            if count >= min_count {
105                let summary = SuppressionSummary::from_counter(*signature, &state.counter);
106                summaries.push(summary);
107            }
108        });
109
110        summaries
111    }
112
113    /// Start emitting summaries periodically (async version).
114    ///
115    /// This spawns a background task that emits summaries at the configured interval.
116    #[cfg(feature = "async")]
117    pub fn start<F>(self, mut emit_fn: F) -> tokio::task::JoinHandle<()>
118    where
119        F: FnMut(Vec<SuppressionSummary>) + Send + 'static,
120        S: Send + 'static,
121    {
122        tokio::spawn(async move {
123            let mut ticker = interval(self.config.interval);
124
125            loop {
126                ticker.tick().await;
127                let summaries = self.collect_summaries();
128
129                if !summaries.is_empty() {
130                    emit_fn(summaries);
131                }
132            }
133        })
134    }
135
136    /// Get the emitter configuration.
137    pub fn config(&self) -> &EmitterConfig {
138        &self.config
139    }
140
141    /// Get a reference to the registry.
142    pub fn registry(&self) -> &SuppressionRegistry<S> {
143        &self.registry
144    }
145}
146
147#[cfg(test)]
148mod tests {
149    use super::*;
150    use crate::domain::{policy::Policy, signature::EventSignature};
151    use crate::infrastructure::clock::SystemClock;
152    use crate::infrastructure::storage::ShardedStorage;
153    use std::sync::Arc;
154
155    #[test]
156    fn test_collect_summaries_empty() {
157        let storage = Arc::new(ShardedStorage::new());
158        let clock = Arc::new(SystemClock::new());
159        let policy = Policy::count_based(100).unwrap();
160        let registry = SuppressionRegistry::new(storage, clock, policy);
161        let config = EmitterConfig::default();
162        let emitter = SummaryEmitter::new(registry, config);
163
164        let summaries = emitter.collect_summaries();
165        assert!(summaries.is_empty());
166    }
167
168    #[test]
169    fn test_collect_summaries_with_suppressions() {
170        let storage = Arc::new(ShardedStorage::new());
171        let clock = Arc::new(SystemClock::new());
172        let policy = Policy::count_based(100).unwrap();
173        let registry = SuppressionRegistry::new(storage, clock, policy);
174        let config = EmitterConfig::default();
175
176        // Add some suppressed events
177        for i in 0..3 {
178            let sig = EventSignature::simple("INFO", &format!("Message {}", i));
179            registry.with_event_state(sig, |state, now| {
180                // Simulate some suppressions
181                for _ in 0..(i + 1) * 5 {
182                    state.counter.record_suppression(now);
183                }
184            });
185        }
186
187        let emitter = SummaryEmitter::new(registry, config);
188        let summaries = emitter.collect_summaries();
189
190        assert_eq!(summaries.len(), 3);
191
192        // Verify counts
193        let counts: Vec<usize> = summaries.iter().map(|s| s.count).collect();
194        assert!(counts.contains(&6)); // 5 + 1 (initial)
195        assert!(counts.contains(&11)); // 10 + 1
196        assert!(counts.contains(&16)); // 15 + 1
197    }
198
199    #[test]
200    fn test_min_count_filtering() {
201        let storage = Arc::new(ShardedStorage::new());
202        let clock = Arc::new(SystemClock::new());
203        let policy = Policy::count_based(100).unwrap();
204        let registry = SuppressionRegistry::new(storage, clock, policy);
205        let config = EmitterConfig::default().with_min_count(10);
206
207        // Add event with low count (below threshold)
208        let sig1 = EventSignature::simple("INFO", "Low count");
209        registry.with_event_state(sig1, |state, now| {
210            for _ in 0..4 {
211                state.counter.record_suppression(now);
212            }
213        });
214
215        // Add event with high count (above threshold)
216        let sig2 = EventSignature::simple("INFO", "High count");
217        registry.with_event_state(sig2, |state, now| {
218            for _ in 0..14 {
219                state.counter.record_suppression(now);
220            }
221        });
222
223        let emitter = SummaryEmitter::new(registry, config);
224        let summaries = emitter.collect_summaries();
225
226        // Only the high-count event should be included
227        assert_eq!(summaries.len(), 1);
228        assert_eq!(summaries[0].count, 15); // 14 + 1 initial
229    }
230
231    #[cfg(feature = "async")]
232    #[tokio::test]
233    async fn test_async_emission() {
234        use std::sync::Mutex;
235
236        let storage = Arc::new(ShardedStorage::new());
237        let clock = Arc::new(SystemClock::new());
238        let policy = Policy::count_based(100).unwrap();
239        let registry = SuppressionRegistry::new(storage, clock, policy);
240        let config = EmitterConfig::new(Duration::from_millis(100)).unwrap();
241
242        // Add a suppressed event
243        let sig = EventSignature::simple("INFO", "Test");
244        registry.with_event_state(sig, |state, now| {
245            state.counter.record_suppression(now);
246        });
247
248        let emitter = SummaryEmitter::new(registry, config);
249
250        // Track emissions
251        let emissions = Arc::new(Mutex::new(Vec::new()));
252        let emissions_clone = Arc::clone(&emissions);
253
254        let handle = emitter.start(move |summaries| {
255            emissions_clone.lock().unwrap().push(summaries.len());
256        });
257
258        // Wait for a couple of intervals
259        tokio::time::sleep(Duration::from_millis(250)).await;
260
261        handle.abort();
262
263        // Should have emitted at least once
264        let emission_count = emissions.lock().unwrap().len();
265        assert!(emission_count >= 2);
266    }
267
268    #[test]
269    fn test_emitter_config_zero_interval() {
270        let result = EmitterConfig::new(Duration::from_secs(0));
271        assert!(matches!(
272            result,
273            Err(EmitterConfigError::ZeroSummaryInterval)
274        ));
275    }
276
277    #[test]
278    fn test_emitter_config_valid_interval() {
279        let config = EmitterConfig::new(Duration::from_secs(30)).unwrap();
280        assert_eq!(config.interval, Duration::from_secs(30));
281        assert_eq!(config.min_count, 1);
282    }
283}