tracing_throttle/application/
registry.rs1use crate::application::ports::{Clock, Storage};
7use crate::domain::{policy::Policy, signature::EventSignature, summary::SuppressionCounter};
8
9#[cfg(feature = "human-readable")]
10use crate::domain::metadata::EventMetadata;
11use std::sync::Arc;
12use std::time::Instant;
13
14#[derive(Debug)]
16pub struct EventState {
17 pub policy: Policy,
19 pub counter: SuppressionCounter,
21 #[cfg(feature = "human-readable")]
23 pub metadata: Option<EventMetadata>,
24}
25
26impl EventState {
27 pub fn new(policy: Policy, initial_timestamp: Instant) -> Self {
29 Self {
30 policy,
31 counter: SuppressionCounter::new(initial_timestamp),
32 #[cfg(feature = "human-readable")]
33 metadata: None,
34 }
35 }
36
37 #[cfg(feature = "human-readable")]
39 pub fn new_with_metadata(
40 policy: Policy,
41 initial_timestamp: Instant,
42 metadata: EventMetadata,
43 ) -> Self {
44 Self {
45 policy,
46 counter: SuppressionCounter::new(initial_timestamp),
47 metadata: Some(metadata),
48 }
49 }
50
51 #[cfg(feature = "human-readable")]
55 pub fn set_metadata(&mut self, metadata: EventMetadata) {
56 if self.metadata.is_none() {
57 self.metadata = Some(metadata);
58 }
59 }
60
61 #[cfg(feature = "redis-storage")]
65 pub fn from_snapshot(
66 policy: Policy,
67 suppressed_count: usize,
68 first_suppressed: Instant,
69 last_suppressed: Instant,
70 ) -> Self {
71 Self {
72 policy,
73 counter: SuppressionCounter::from_snapshot(
74 suppressed_count,
75 first_suppressed,
76 last_suppressed,
77 ),
78 metadata: None,
79 }
80 }
81}
82
83#[derive(Clone)]
90pub struct SuppressionRegistry<S>
91where
92 S: Storage<EventSignature, EventState> + Clone,
93{
94 storage: S,
95 clock: Arc<dyn Clock>,
96 default_policy: Policy,
97}
98
99impl<S> SuppressionRegistry<S>
100where
101 S: Storage<EventSignature, EventState> + Clone,
102{
103 pub fn new(storage: S, clock: Arc<dyn Clock>, default_policy: Policy) -> Self {
107 Self {
108 storage,
109 clock,
110 default_policy,
111 }
112 }
113
114 pub fn with_event_state<F, R>(&self, signature: EventSignature, f: F) -> R
120 where
121 F: FnOnce(&mut EventState, Instant) -> R,
122 {
123 let now = self.clock.now();
124 let default_policy = self.default_policy.clone();
125 self.storage.with_entry_mut(
126 signature,
127 || EventState::new(default_policy, now),
128 |state| f(state, now),
129 )
130 }
131
132 pub fn default_policy(&self) -> &Policy {
134 &self.default_policy
135 }
136
137 pub fn clone_default_policy(&self) -> Policy {
139 self.default_policy.clone()
140 }
141
142 pub fn len(&self) -> usize {
144 self.storage.len()
145 }
146
147 pub fn is_empty(&self) -> bool {
149 self.storage.is_empty()
150 }
151
152 pub fn clear(&self) {
154 self.storage.clear();
155 }
156
157 pub fn for_each<F>(&self, f: F)
159 where
160 F: FnMut(&EventSignature, &EventState),
161 {
162 self.storage.for_each(f);
163 }
164
165 pub fn cleanup<F>(&self, f: F)
167 where
168 F: FnMut(&EventSignature, &mut EventState) -> bool,
169 {
170 self.storage.retain(f);
171 }
172}
173
174#[cfg(test)]
175mod tests {
176 use super::*;
177 use crate::domain::policy::Policy;
178 use crate::infrastructure::clock::SystemClock;
179 use crate::infrastructure::storage::ShardedStorage;
180 use std::sync::Arc;
181
182 #[test]
183 fn test_registry_creation() {
184 let storage = Arc::new(ShardedStorage::new());
185 let clock = Arc::new(SystemClock::new());
186 let policy = Policy::count_based(100).unwrap();
187 let registry = SuppressionRegistry::new(storage, clock, policy);
188
189 assert_eq!(registry.len(), 0);
190 assert!(registry.is_empty());
191 }
192
193 #[test]
194 fn test_with_event_state() {
195 let storage = Arc::new(ShardedStorage::new());
196 let clock = Arc::new(SystemClock::new());
197 let policy = Policy::count_based(100).unwrap();
198 let registry = SuppressionRegistry::new(storage, clock, policy);
199 let sig = EventSignature::simple("INFO", "Test message");
200
201 registry.with_event_state(sig, |state, _now| {
203 assert_eq!(state.counter.count(), 1);
204 });
205
206 assert_eq!(registry.len(), 1);
207 assert!(!registry.is_empty());
208
209 registry.with_event_state(sig, |state, now| {
211 state.counter.record_suppression(now);
212 });
213
214 assert_eq!(registry.len(), 1);
215 }
216
217 #[test]
218 fn test_clear() {
219 let storage = Arc::new(ShardedStorage::new());
220 let clock = Arc::new(SystemClock::new());
221 let policy = Policy::count_based(100).unwrap();
222 let registry = SuppressionRegistry::new(storage, clock, policy);
223
224 for i in 0..10 {
225 let sig = EventSignature::simple("INFO", &format!("Message {}", i));
226 registry.with_event_state(sig, |_state, _now| {
227 });
229 }
230
231 assert_eq!(registry.len(), 10);
232
233 registry.clear();
234 assert_eq!(registry.len(), 0);
235 assert!(registry.is_empty());
236 }
237
238 #[test]
239 fn test_concurrent_access() {
240 use std::sync::Arc;
241 use std::thread;
242
243 let storage = Arc::new(ShardedStorage::new());
244 let clock = Arc::new(SystemClock::new());
245 let policy = Policy::count_based(100).unwrap();
246 let registry = Arc::new(SuppressionRegistry::new(storage, clock, policy));
247 let mut handles = vec![];
248
249 for i in 0..10 {
250 let registry_clone = Arc::clone(®istry);
251 let handle = thread::spawn(move || {
252 for j in 0..100 {
253 let sig = EventSignature::simple("INFO", &format!("Msg_{}_{}", i, j));
254 registry_clone.with_event_state(sig, |_state, _now| {
255 });
257 }
258 });
259 handles.push(handle);
260 }
261
262 for handle in handles {
263 handle.join().unwrap();
264 }
265
266 assert_eq!(registry.len(), 1000);
267 }
268}