tracing_throttle/application/
registry.rs1use crate::application::ports::{Clock, Storage};
7use crate::domain::{policy::Policy, signature::EventSignature, summary::SuppressionCounter};
8use std::sync::Arc;
9use std::time::Instant;
10
11#[derive(Debug)]
13pub struct EventState {
14 pub policy: Policy,
16 pub counter: SuppressionCounter,
18}
19
20impl EventState {
21 pub fn new(policy: Policy, initial_timestamp: Instant) -> Self {
23 Self {
24 policy,
25 counter: SuppressionCounter::new(initial_timestamp),
26 }
27 }
28}
29
30#[derive(Clone)]
37pub struct SuppressionRegistry<S>
38where
39 S: Storage<EventSignature, EventState> + Clone,
40{
41 storage: S,
42 clock: Arc<dyn Clock>,
43 default_policy: Policy,
44}
45
46impl<S> SuppressionRegistry<S>
47where
48 S: Storage<EventSignature, EventState> + Clone,
49{
50 pub fn new(storage: S, clock: Arc<dyn Clock>, default_policy: Policy) -> Self {
54 Self {
55 storage,
56 clock,
57 default_policy,
58 }
59 }
60
61 pub fn with_event_state<F, R>(&self, signature: EventSignature, f: F) -> R
67 where
68 F: FnOnce(&mut EventState, Instant) -> R,
69 {
70 let now = self.clock.now();
71 let default_policy = self.default_policy.clone();
72 self.storage.with_entry_mut(
73 signature,
74 || EventState::new(default_policy, now),
75 |state| f(state, now),
76 )
77 }
78
79 pub fn default_policy(&self) -> &Policy {
81 &self.default_policy
82 }
83
84 pub fn clone_default_policy(&self) -> Policy {
86 self.default_policy.clone()
87 }
88
89 pub fn len(&self) -> usize {
91 self.storage.len()
92 }
93
94 pub fn is_empty(&self) -> bool {
96 self.storage.is_empty()
97 }
98
99 pub fn clear(&self) {
101 self.storage.clear();
102 }
103
104 pub fn for_each<F>(&self, f: F)
106 where
107 F: FnMut(&EventSignature, &EventState),
108 {
109 self.storage.for_each(f);
110 }
111
112 pub fn cleanup<F>(&self, f: F)
114 where
115 F: FnMut(&EventSignature, &mut EventState) -> bool,
116 {
117 self.storage.retain(f);
118 }
119}
120
121#[cfg(test)]
122mod tests {
123 use super::*;
124 use crate::domain::policy::Policy;
125 use crate::infrastructure::clock::SystemClock;
126 use crate::infrastructure::storage::ShardedStorage;
127 use std::sync::Arc;
128
129 #[test]
130 fn test_registry_creation() {
131 let storage = Arc::new(ShardedStorage::new());
132 let clock = Arc::new(SystemClock::new());
133 let policy = Policy::count_based(100).unwrap();
134 let registry = SuppressionRegistry::new(storage, clock, policy);
135
136 assert_eq!(registry.len(), 0);
137 assert!(registry.is_empty());
138 }
139
140 #[test]
141 fn test_with_event_state() {
142 let storage = Arc::new(ShardedStorage::new());
143 let clock = Arc::new(SystemClock::new());
144 let policy = Policy::count_based(100).unwrap();
145 let registry = SuppressionRegistry::new(storage, clock, policy);
146 let sig = EventSignature::simple("INFO", "Test message");
147
148 registry.with_event_state(sig, |state, _now| {
150 assert_eq!(state.counter.count(), 1);
151 });
152
153 assert_eq!(registry.len(), 1);
154 assert!(!registry.is_empty());
155
156 registry.with_event_state(sig, |state, now| {
158 state.counter.record_suppression(now);
159 });
160
161 assert_eq!(registry.len(), 1);
162 }
163
164 #[test]
165 fn test_clear() {
166 let storage = Arc::new(ShardedStorage::new());
167 let clock = Arc::new(SystemClock::new());
168 let policy = Policy::count_based(100).unwrap();
169 let registry = SuppressionRegistry::new(storage, clock, policy);
170
171 for i in 0..10 {
172 let sig = EventSignature::simple("INFO", &format!("Message {}", i));
173 registry.with_event_state(sig, |_state, _now| {
174 });
176 }
177
178 assert_eq!(registry.len(), 10);
179
180 registry.clear();
181 assert_eq!(registry.len(), 0);
182 assert!(registry.is_empty());
183 }
184
185 #[test]
186 fn test_concurrent_access() {
187 use std::sync::Arc;
188 use std::thread;
189
190 let storage = Arc::new(ShardedStorage::new());
191 let clock = Arc::new(SystemClock::new());
192 let policy = Policy::count_based(100).unwrap();
193 let registry = Arc::new(SuppressionRegistry::new(storage, clock, policy));
194 let mut handles = vec![];
195
196 for i in 0..10 {
197 let registry_clone = Arc::clone(®istry);
198 let handle = thread::spawn(move || {
199 for j in 0..100 {
200 let sig = EventSignature::simple("INFO", &format!("Msg_{}_{}", i, j));
201 registry_clone.with_event_state(sig, |_state, _now| {
202 });
204 }
205 });
206 handles.push(handle);
207 }
208
209 for handle in handles {
210 handle.join().unwrap();
211 }
212
213 assert_eq!(registry.len(), 1000);
214 }
215}