tracing_throttle/application/
emitter.rs1use crate::application::{
7 ports::Storage,
8 registry::{EventState, SuppressionRegistry},
9};
10use crate::domain::{signature::EventSignature, summary::SuppressionSummary};
11use std::time::Duration;
12
13#[cfg(feature = "async")]
14use tokio::time::interval;
15
16#[derive(Debug, Clone, PartialEq, Eq)]
18pub enum EmitterConfigError {
19 ZeroSummaryInterval,
21}
22
23impl std::fmt::Display for EmitterConfigError {
24 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
25 match self {
26 EmitterConfigError::ZeroSummaryInterval => {
27 write!(f, "summary interval must be greater than 0")
28 }
29 }
30 }
31}
32
33impl std::error::Error for EmitterConfigError {}
34
35#[derive(Debug, Clone)]
37pub struct EmitterConfig {
38 pub interval: Duration,
40 pub min_count: usize,
42}
43
44impl Default for EmitterConfig {
45 fn default() -> Self {
46 Self {
47 interval: Duration::from_secs(30),
48 min_count: 1,
49 }
50 }
51}
52
53impl EmitterConfig {
54 pub fn new(interval: Duration) -> Result<Self, EmitterConfigError> {
59 if interval.is_zero() {
60 return Err(EmitterConfigError::ZeroSummaryInterval);
61 }
62 Ok(Self {
63 interval,
64 min_count: 1,
65 })
66 }
67
68 pub fn with_min_count(mut self, min_count: usize) -> Self {
70 self.min_count = min_count;
71 self
72 }
73}
74
75pub struct SummaryEmitter<S>
77where
78 S: Storage<EventSignature, EventState> + Clone,
79{
80 registry: SuppressionRegistry<S>,
81 config: EmitterConfig,
82}
83
84impl<S> SummaryEmitter<S>
85where
86 S: Storage<EventSignature, EventState> + Clone,
87{
88 pub fn new(registry: SuppressionRegistry<S>, config: EmitterConfig) -> Self {
90 Self { registry, config }
91 }
92
93 pub fn collect_summaries(&self) -> Vec<SuppressionSummary> {
98 let mut summaries = Vec::new();
99 let min_count = self.config.min_count;
100
101 self.registry.for_each(|signature, state| {
102 let count = state.counter.count();
103
104 if count >= min_count {
105 let summary = SuppressionSummary::from_counter(*signature, &state.counter);
106 summaries.push(summary);
107 }
108 });
109
110 summaries
111 }
112
113 #[cfg(feature = "async")]
117 pub fn start<F>(self, mut emit_fn: F) -> tokio::task::JoinHandle<()>
118 where
119 F: FnMut(Vec<SuppressionSummary>) + Send + 'static,
120 S: Send + 'static,
121 {
122 tokio::spawn(async move {
123 let mut ticker = interval(self.config.interval);
124
125 loop {
126 ticker.tick().await;
127 let summaries = self.collect_summaries();
128
129 if !summaries.is_empty() {
130 emit_fn(summaries);
131 }
132 }
133 })
134 }
135
136 pub fn config(&self) -> &EmitterConfig {
138 &self.config
139 }
140
141 pub fn registry(&self) -> &SuppressionRegistry<S> {
143 &self.registry
144 }
145}
146
147#[cfg(test)]
148mod tests {
149 use super::*;
150 use crate::domain::{policy::Policy, signature::EventSignature};
151 use crate::infrastructure::clock::SystemClock;
152 use crate::infrastructure::storage::ShardedStorage;
153 use std::sync::Arc;
154
155 #[test]
156 fn test_collect_summaries_empty() {
157 let storage = Arc::new(ShardedStorage::new());
158 let clock = Arc::new(SystemClock::new());
159 let policy = Policy::count_based(100).unwrap();
160 let registry = SuppressionRegistry::new(storage, clock, policy);
161 let config = EmitterConfig::default();
162 let emitter = SummaryEmitter::new(registry, config);
163
164 let summaries = emitter.collect_summaries();
165 assert!(summaries.is_empty());
166 }
167
168 #[test]
169 fn test_collect_summaries_with_suppressions() {
170 let storage = Arc::new(ShardedStorage::new());
171 let clock = Arc::new(SystemClock::new());
172 let policy = Policy::count_based(100).unwrap();
173 let registry = SuppressionRegistry::new(storage, clock, policy);
174 let config = EmitterConfig::default();
175
176 for i in 0..3 {
178 let sig = EventSignature::simple("INFO", &format!("Message {}", i));
179 registry.with_event_state(sig, |state, now| {
180 for _ in 0..(i + 1) * 5 {
182 state.counter.record_suppression(now);
183 }
184 });
185 }
186
187 let emitter = SummaryEmitter::new(registry, config);
188 let summaries = emitter.collect_summaries();
189
190 assert_eq!(summaries.len(), 3);
191
192 let counts: Vec<usize> = summaries.iter().map(|s| s.count).collect();
194 assert!(counts.contains(&6)); assert!(counts.contains(&11)); assert!(counts.contains(&16)); }
198
199 #[test]
200 fn test_min_count_filtering() {
201 let storage = Arc::new(ShardedStorage::new());
202 let clock = Arc::new(SystemClock::new());
203 let policy = Policy::count_based(100).unwrap();
204 let registry = SuppressionRegistry::new(storage, clock, policy);
205 let config = EmitterConfig::default().with_min_count(10);
206
207 let sig1 = EventSignature::simple("INFO", "Low count");
209 registry.with_event_state(sig1, |state, now| {
210 for _ in 0..4 {
211 state.counter.record_suppression(now);
212 }
213 });
214
215 let sig2 = EventSignature::simple("INFO", "High count");
217 registry.with_event_state(sig2, |state, now| {
218 for _ in 0..14 {
219 state.counter.record_suppression(now);
220 }
221 });
222
223 let emitter = SummaryEmitter::new(registry, config);
224 let summaries = emitter.collect_summaries();
225
226 assert_eq!(summaries.len(), 1);
228 assert_eq!(summaries[0].count, 15); }
230
231 #[cfg(feature = "async")]
232 #[tokio::test]
233 async fn test_async_emission() {
234 use std::sync::Mutex;
235
236 let storage = Arc::new(ShardedStorage::new());
237 let clock = Arc::new(SystemClock::new());
238 let policy = Policy::count_based(100).unwrap();
239 let registry = SuppressionRegistry::new(storage, clock, policy);
240 let config = EmitterConfig::new(Duration::from_millis(100)).unwrap();
241
242 let sig = EventSignature::simple("INFO", "Test");
244 registry.with_event_state(sig, |state, now| {
245 state.counter.record_suppression(now);
246 });
247
248 let emitter = SummaryEmitter::new(registry, config);
249
250 let emissions = Arc::new(Mutex::new(Vec::new()));
252 let emissions_clone = Arc::clone(&emissions);
253
254 let handle = emitter.start(move |summaries| {
255 emissions_clone.lock().unwrap().push(summaries.len());
256 });
257
258 tokio::time::sleep(Duration::from_millis(250)).await;
260
261 handle.abort();
262
263 let emission_count = emissions.lock().unwrap().len();
265 assert!(emission_count >= 2);
266 }
267
268 #[test]
269 fn test_emitter_config_zero_interval() {
270 let result = EmitterConfig::new(Duration::from_secs(0));
271 assert!(matches!(
272 result,
273 Err(EmitterConfigError::ZeroSummaryInterval)
274 ));
275 }
276
277 #[test]
278 fn test_emitter_config_valid_interval() {
279 let config = EmitterConfig::new(Duration::from_secs(30)).unwrap();
280 assert_eq!(config.interval, Duration::from_secs(30));
281 assert_eq!(config.min_count, 1);
282 }
283}