tracing_throttle/application/
registry.rs

1//! Central registry for tracking event suppression state.
2//!
3//! The registry maintains state for each unique event signature, including
4//! its rate limiting policy and suppression counters.
5
6use crate::application::ports::{Clock, Storage};
7use crate::domain::{policy::Policy, signature::EventSignature, summary::SuppressionCounter};
8use std::sync::Arc;
9use std::time::Instant;
10
11/// State tracked for each event signature.
12#[derive(Debug)]
13pub struct EventState {
14    /// Rate limiting policy for this event
15    pub policy: Policy,
16    /// Counter tracking suppressions
17    pub counter: SuppressionCounter,
18}
19
20impl EventState {
21    /// Create new event state with a policy.
22    pub fn new(policy: Policy, initial_timestamp: Instant) -> Self {
23        Self {
24            policy,
25            counter: SuppressionCounter::new(initial_timestamp),
26        }
27    }
28}
29
30/// Registry managing all event suppression state.
31///
32/// Uses the Storage port for high-performance concurrent access.
33///
34/// This type is generic over the storage implementation, allowing different
35/// storage backends to be used. In production, use `Arc<ShardedStorage>`.
36#[derive(Clone)]
37pub struct SuppressionRegistry<S>
38where
39    S: Storage<EventSignature, EventState> + Clone,
40{
41    storage: S,
42    clock: Arc<dyn Clock>,
43    default_policy: Policy,
44}
45
46impl<S> SuppressionRegistry<S>
47where
48    S: Storage<EventSignature, EventState> + Clone,
49{
50    /// Create a new registry with storage, clock, and a default policy.
51    ///
52    /// All events will use the default policy unless overridden.
53    pub fn new(storage: S, clock: Arc<dyn Clock>, default_policy: Policy) -> Self {
54        Self {
55            storage,
56            clock,
57            default_policy,
58        }
59    }
60
61    /// Access or create event state for a signature with a callback.
62    ///
63    /// If this is the first time seeing this signature, creates new state
64    /// with the default policy. The callback receives the event state and
65    /// the current timestamp.
66    pub fn with_event_state<F, R>(&self, signature: EventSignature, f: F) -> R
67    where
68        F: FnOnce(&mut EventState, Instant) -> R,
69    {
70        let now = self.clock.now();
71        let default_policy = self.default_policy.clone();
72        self.storage.with_entry_mut(
73            signature,
74            || EventState::new(default_policy, now),
75            |state| f(state, now),
76        )
77    }
78
79    /// Get the default policy.
80    pub fn default_policy(&self) -> &Policy {
81        &self.default_policy
82    }
83
84    /// Create a clone of the default policy.
85    pub fn clone_default_policy(&self) -> Policy {
86        self.default_policy.clone()
87    }
88
89    /// Get the number of tracked signatures.
90    pub fn len(&self) -> usize {
91        self.storage.len()
92    }
93
94    /// Check if the registry is empty.
95    pub fn is_empty(&self) -> bool {
96        self.storage.is_empty()
97    }
98
99    /// Clear all tracked state.
100    pub fn clear(&self) {
101        self.storage.clear();
102    }
103
104    /// Iterate over all event states with a callback.
105    pub fn for_each<F>(&self, f: F)
106    where
107        F: FnMut(&EventSignature, &EventState),
108    {
109        self.storage.for_each(f);
110    }
111
112    /// Remove expired or inactive signatures based on a predicate.
113    pub fn cleanup<F>(&self, f: F)
114    where
115        F: FnMut(&EventSignature, &mut EventState) -> bool,
116    {
117        self.storage.retain(f);
118    }
119}
120
121#[cfg(test)]
122mod tests {
123    use super::*;
124    use crate::domain::policy::Policy;
125    use crate::infrastructure::clock::SystemClock;
126    use crate::infrastructure::storage::ShardedStorage;
127    use std::sync::Arc;
128
129    #[test]
130    fn test_registry_creation() {
131        let storage = Arc::new(ShardedStorage::new());
132        let clock = Arc::new(SystemClock::new());
133        let policy = Policy::count_based(100).unwrap();
134        let registry = SuppressionRegistry::new(storage, clock, policy);
135
136        assert_eq!(registry.len(), 0);
137        assert!(registry.is_empty());
138    }
139
140    #[test]
141    fn test_with_event_state() {
142        let storage = Arc::new(ShardedStorage::new());
143        let clock = Arc::new(SystemClock::new());
144        let policy = Policy::count_based(100).unwrap();
145        let registry = SuppressionRegistry::new(storage, clock, policy);
146        let sig = EventSignature::simple("INFO", "Test message");
147
148        // First access creates state
149        registry.with_event_state(sig, |state, _now| {
150            assert_eq!(state.counter.count(), 1);
151        });
152
153        assert_eq!(registry.len(), 1);
154        assert!(!registry.is_empty());
155
156        // Second access retrieves existing state
157        registry.with_event_state(sig, |state, now| {
158            state.counter.record_suppression(now);
159        });
160
161        assert_eq!(registry.len(), 1);
162    }
163
164    #[test]
165    fn test_clear() {
166        let storage = Arc::new(ShardedStorage::new());
167        let clock = Arc::new(SystemClock::new());
168        let policy = Policy::count_based(100).unwrap();
169        let registry = SuppressionRegistry::new(storage, clock, policy);
170
171        for i in 0..10 {
172            let sig = EventSignature::simple("INFO", &format!("Message {}", i));
173            registry.with_event_state(sig, |_state, _now| {
174                // State is created
175            });
176        }
177
178        assert_eq!(registry.len(), 10);
179
180        registry.clear();
181        assert_eq!(registry.len(), 0);
182        assert!(registry.is_empty());
183    }
184
185    #[test]
186    fn test_concurrent_access() {
187        use std::sync::Arc;
188        use std::thread;
189
190        let storage = Arc::new(ShardedStorage::new());
191        let clock = Arc::new(SystemClock::new());
192        let policy = Policy::count_based(100).unwrap();
193        let registry = Arc::new(SuppressionRegistry::new(storage, clock, policy));
194        let mut handles = vec![];
195
196        for i in 0..10 {
197            let registry_clone = Arc::clone(&registry);
198            let handle = thread::spawn(move || {
199                for j in 0..100 {
200                    let sig = EventSignature::simple("INFO", &format!("Msg_{}_{}", i, j));
201                    registry_clone.with_event_state(sig, |_state, _now| {
202                        // State is created
203                    });
204                }
205            });
206            handles.push(handle);
207        }
208
209        for handle in handles {
210            handle.join().unwrap();
211        }
212
213        assert_eq!(registry.len(), 1000);
214    }
215}