tracing_throttle/application/
registry.rs

1//! Central registry for tracking event suppression state.
2//!
3//! The registry maintains state for each unique event signature, including
4//! its rate limiting policy and suppression counters.
5
6use crate::application::ports::{Clock, Storage};
7use crate::domain::{policy::Policy, signature::EventSignature, summary::SuppressionCounter};
8use std::sync::Arc;
9use std::time::Instant;
10
11/// State tracked for each event signature.
12#[derive(Debug)]
13pub struct EventState {
14    /// Rate limiting policy for this event
15    pub policy: Policy,
16    /// Counter tracking suppressions
17    pub counter: SuppressionCounter,
18}
19
20impl EventState {
21    /// Create new event state with a policy.
22    pub fn new(policy: Policy, initial_timestamp: Instant) -> Self {
23        Self {
24            policy,
25            counter: SuppressionCounter::new(initial_timestamp),
26        }
27    }
28
29    /// Create event state from a snapshot (for deserialization).
30    ///
31    /// This is used by storage backends like Redis to reconstruct state.
32    #[cfg(feature = "redis-storage")]
33    pub fn from_snapshot(
34        policy: Policy,
35        suppressed_count: usize,
36        first_suppressed: Instant,
37        last_suppressed: Instant,
38    ) -> Self {
39        Self {
40            policy,
41            counter: SuppressionCounter::from_snapshot(
42                suppressed_count,
43                first_suppressed,
44                last_suppressed,
45            ),
46        }
47    }
48}
49
50/// Registry managing all event suppression state.
51///
52/// Uses the Storage port for high-performance concurrent access.
53///
54/// This type is generic over the storage implementation, allowing different
55/// storage backends to be used. In production, use `Arc<ShardedStorage>`.
56#[derive(Clone)]
57pub struct SuppressionRegistry<S>
58where
59    S: Storage<EventSignature, EventState> + Clone,
60{
61    storage: S,
62    clock: Arc<dyn Clock>,
63    default_policy: Policy,
64}
65
66impl<S> SuppressionRegistry<S>
67where
68    S: Storage<EventSignature, EventState> + Clone,
69{
70    /// Create a new registry with storage, clock, and a default policy.
71    ///
72    /// All events will use the default policy unless overridden.
73    pub fn new(storage: S, clock: Arc<dyn Clock>, default_policy: Policy) -> Self {
74        Self {
75            storage,
76            clock,
77            default_policy,
78        }
79    }
80
81    /// Access or create event state for a signature with a callback.
82    ///
83    /// If this is the first time seeing this signature, creates new state
84    /// with the default policy. The callback receives the event state and
85    /// the current timestamp.
86    pub fn with_event_state<F, R>(&self, signature: EventSignature, f: F) -> R
87    where
88        F: FnOnce(&mut EventState, Instant) -> R,
89    {
90        let now = self.clock.now();
91        let default_policy = self.default_policy.clone();
92        self.storage.with_entry_mut(
93            signature,
94            || EventState::new(default_policy, now),
95            |state| f(state, now),
96        )
97    }
98
99    /// Get the default policy.
100    pub fn default_policy(&self) -> &Policy {
101        &self.default_policy
102    }
103
104    /// Create a clone of the default policy.
105    pub fn clone_default_policy(&self) -> Policy {
106        self.default_policy.clone()
107    }
108
109    /// Get the number of tracked signatures.
110    pub fn len(&self) -> usize {
111        self.storage.len()
112    }
113
114    /// Check if the registry is empty.
115    pub fn is_empty(&self) -> bool {
116        self.storage.is_empty()
117    }
118
119    /// Clear all tracked state.
120    pub fn clear(&self) {
121        self.storage.clear();
122    }
123
124    /// Iterate over all event states with a callback.
125    pub fn for_each<F>(&self, f: F)
126    where
127        F: FnMut(&EventSignature, &EventState),
128    {
129        self.storage.for_each(f);
130    }
131
132    /// Remove expired or inactive signatures based on a predicate.
133    pub fn cleanup<F>(&self, f: F)
134    where
135        F: FnMut(&EventSignature, &mut EventState) -> bool,
136    {
137        self.storage.retain(f);
138    }
139}
140
141#[cfg(test)]
142mod tests {
143    use super::*;
144    use crate::domain::policy::Policy;
145    use crate::infrastructure::clock::SystemClock;
146    use crate::infrastructure::storage::ShardedStorage;
147    use std::sync::Arc;
148
149    #[test]
150    fn test_registry_creation() {
151        let storage = Arc::new(ShardedStorage::new());
152        let clock = Arc::new(SystemClock::new());
153        let policy = Policy::count_based(100).unwrap();
154        let registry = SuppressionRegistry::new(storage, clock, policy);
155
156        assert_eq!(registry.len(), 0);
157        assert!(registry.is_empty());
158    }
159
160    #[test]
161    fn test_with_event_state() {
162        let storage = Arc::new(ShardedStorage::new());
163        let clock = Arc::new(SystemClock::new());
164        let policy = Policy::count_based(100).unwrap();
165        let registry = SuppressionRegistry::new(storage, clock, policy);
166        let sig = EventSignature::simple("INFO", "Test message");
167
168        // First access creates state
169        registry.with_event_state(sig, |state, _now| {
170            assert_eq!(state.counter.count(), 1);
171        });
172
173        assert_eq!(registry.len(), 1);
174        assert!(!registry.is_empty());
175
176        // Second access retrieves existing state
177        registry.with_event_state(sig, |state, now| {
178            state.counter.record_suppression(now);
179        });
180
181        assert_eq!(registry.len(), 1);
182    }
183
184    #[test]
185    fn test_clear() {
186        let storage = Arc::new(ShardedStorage::new());
187        let clock = Arc::new(SystemClock::new());
188        let policy = Policy::count_based(100).unwrap();
189        let registry = SuppressionRegistry::new(storage, clock, policy);
190
191        for i in 0..10 {
192            let sig = EventSignature::simple("INFO", &format!("Message {}", i));
193            registry.with_event_state(sig, |_state, _now| {
194                // State is created
195            });
196        }
197
198        assert_eq!(registry.len(), 10);
199
200        registry.clear();
201        assert_eq!(registry.len(), 0);
202        assert!(registry.is_empty());
203    }
204
205    #[test]
206    fn test_concurrent_access() {
207        use std::sync::Arc;
208        use std::thread;
209
210        let storage = Arc::new(ShardedStorage::new());
211        let clock = Arc::new(SystemClock::new());
212        let policy = Policy::count_based(100).unwrap();
213        let registry = Arc::new(SuppressionRegistry::new(storage, clock, policy));
214        let mut handles = vec![];
215
216        for i in 0..10 {
217            let registry_clone = Arc::clone(&registry);
218            let handle = thread::spawn(move || {
219                for j in 0..100 {
220                    let sig = EventSignature::simple("INFO", &format!("Msg_{}_{}", i, j));
221                    registry_clone.with_event_state(sig, |_state, _now| {
222                        // State is created
223                    });
224                }
225            });
226            handles.push(handle);
227        }
228
229        for handle in handles {
230            handle.join().unwrap();
231        }
232
233        assert_eq!(registry.len(), 1000);
234    }
235}