tracing_throttle/application/
registry.rs1use crate::application::ports::{Clock, Storage};
7use crate::domain::{policy::Policy, signature::EventSignature, summary::SuppressionCounter};
8use std::sync::Arc;
9use std::time::Instant;
10
11#[derive(Debug)]
13pub struct EventState {
14 pub policy: Policy,
16 pub counter: SuppressionCounter,
18}
19
20impl EventState {
21 pub fn new(policy: Policy, initial_timestamp: Instant) -> Self {
23 Self {
24 policy,
25 counter: SuppressionCounter::new(initial_timestamp),
26 }
27 }
28
29 #[cfg(feature = "redis-storage")]
33 pub fn from_snapshot(
34 policy: Policy,
35 suppressed_count: usize,
36 first_suppressed: Instant,
37 last_suppressed: Instant,
38 ) -> Self {
39 Self {
40 policy,
41 counter: SuppressionCounter::from_snapshot(
42 suppressed_count,
43 first_suppressed,
44 last_suppressed,
45 ),
46 }
47 }
48}
49
50#[derive(Clone)]
57pub struct SuppressionRegistry<S>
58where
59 S: Storage<EventSignature, EventState> + Clone,
60{
61 storage: S,
62 clock: Arc<dyn Clock>,
63 default_policy: Policy,
64}
65
66impl<S> SuppressionRegistry<S>
67where
68 S: Storage<EventSignature, EventState> + Clone,
69{
70 pub fn new(storage: S, clock: Arc<dyn Clock>, default_policy: Policy) -> Self {
74 Self {
75 storage,
76 clock,
77 default_policy,
78 }
79 }
80
81 pub fn with_event_state<F, R>(&self, signature: EventSignature, f: F) -> R
87 where
88 F: FnOnce(&mut EventState, Instant) -> R,
89 {
90 let now = self.clock.now();
91 let default_policy = self.default_policy.clone();
92 self.storage.with_entry_mut(
93 signature,
94 || EventState::new(default_policy, now),
95 |state| f(state, now),
96 )
97 }
98
99 pub fn default_policy(&self) -> &Policy {
101 &self.default_policy
102 }
103
104 pub fn clone_default_policy(&self) -> Policy {
106 self.default_policy.clone()
107 }
108
109 pub fn len(&self) -> usize {
111 self.storage.len()
112 }
113
114 pub fn is_empty(&self) -> bool {
116 self.storage.is_empty()
117 }
118
119 pub fn clear(&self) {
121 self.storage.clear();
122 }
123
124 pub fn for_each<F>(&self, f: F)
126 where
127 F: FnMut(&EventSignature, &EventState),
128 {
129 self.storage.for_each(f);
130 }
131
132 pub fn cleanup<F>(&self, f: F)
134 where
135 F: FnMut(&EventSignature, &mut EventState) -> bool,
136 {
137 self.storage.retain(f);
138 }
139}
140
141#[cfg(test)]
142mod tests {
143 use super::*;
144 use crate::domain::policy::Policy;
145 use crate::infrastructure::clock::SystemClock;
146 use crate::infrastructure::storage::ShardedStorage;
147 use std::sync::Arc;
148
149 #[test]
150 fn test_registry_creation() {
151 let storage = Arc::new(ShardedStorage::new());
152 let clock = Arc::new(SystemClock::new());
153 let policy = Policy::count_based(100).unwrap();
154 let registry = SuppressionRegistry::new(storage, clock, policy);
155
156 assert_eq!(registry.len(), 0);
157 assert!(registry.is_empty());
158 }
159
160 #[test]
161 fn test_with_event_state() {
162 let storage = Arc::new(ShardedStorage::new());
163 let clock = Arc::new(SystemClock::new());
164 let policy = Policy::count_based(100).unwrap();
165 let registry = SuppressionRegistry::new(storage, clock, policy);
166 let sig = EventSignature::simple("INFO", "Test message");
167
168 registry.with_event_state(sig, |state, _now| {
170 assert_eq!(state.counter.count(), 1);
171 });
172
173 assert_eq!(registry.len(), 1);
174 assert!(!registry.is_empty());
175
176 registry.with_event_state(sig, |state, now| {
178 state.counter.record_suppression(now);
179 });
180
181 assert_eq!(registry.len(), 1);
182 }
183
184 #[test]
185 fn test_clear() {
186 let storage = Arc::new(ShardedStorage::new());
187 let clock = Arc::new(SystemClock::new());
188 let policy = Policy::count_based(100).unwrap();
189 let registry = SuppressionRegistry::new(storage, clock, policy);
190
191 for i in 0..10 {
192 let sig = EventSignature::simple("INFO", &format!("Message {}", i));
193 registry.with_event_state(sig, |_state, _now| {
194 });
196 }
197
198 assert_eq!(registry.len(), 10);
199
200 registry.clear();
201 assert_eq!(registry.len(), 0);
202 assert!(registry.is_empty());
203 }
204
205 #[test]
206 fn test_concurrent_access() {
207 use std::sync::Arc;
208 use std::thread;
209
210 let storage = Arc::new(ShardedStorage::new());
211 let clock = Arc::new(SystemClock::new());
212 let policy = Policy::count_based(100).unwrap();
213 let registry = Arc::new(SuppressionRegistry::new(storage, clock, policy));
214 let mut handles = vec![];
215
216 for i in 0..10 {
217 let registry_clone = Arc::clone(®istry);
218 let handle = thread::spawn(move || {
219 for j in 0..100 {
220 let sig = EventSignature::simple("INFO", &format!("Msg_{}_{}", i, j));
221 registry_clone.with_event_state(sig, |_state, _now| {
222 });
224 }
225 });
226 handles.push(handle);
227 }
228
229 for handle in handles {
230 handle.join().unwrap();
231 }
232
233 assert_eq!(registry.len(), 1000);
234 }
235}