tracing_throttle/application/
registry.rs

1//! Central registry for tracking event suppression state.
2//!
3//! The registry maintains state for each unique event signature, including
4//! its rate limiting policy and suppression counters.
5
6use crate::application::ports::{Clock, Storage};
7use crate::domain::{policy::Policy, signature::EventSignature, summary::SuppressionCounter};
8
9#[cfg(feature = "human-readable")]
10use crate::domain::metadata::EventMetadata;
11use std::sync::Arc;
12use std::time::Instant;
13
14/// State tracked for each event signature.
15#[derive(Debug)]
16pub struct EventState {
17    /// Rate limiting policy for this event
18    pub policy: Policy,
19    /// Counter tracking suppressions
20    pub counter: SuppressionCounter,
21    /// Metadata about the event (for human-readable summaries)
22    #[cfg(feature = "human-readable")]
23    pub metadata: Option<EventMetadata>,
24}
25
26impl EventState {
27    /// Create new event state with a policy.
28    pub fn new(policy: Policy, initial_timestamp: Instant) -> Self {
29        Self {
30            policy,
31            counter: SuppressionCounter::new(initial_timestamp),
32            #[cfg(feature = "human-readable")]
33            metadata: None,
34        }
35    }
36
37    /// Create new event state with metadata.
38    #[cfg(feature = "human-readable")]
39    pub fn new_with_metadata(
40        policy: Policy,
41        initial_timestamp: Instant,
42        metadata: EventMetadata,
43    ) -> Self {
44        Self {
45            policy,
46            counter: SuppressionCounter::new(initial_timestamp),
47            metadata: Some(metadata),
48        }
49    }
50
51    /// Update or set the event metadata.
52    ///
53    /// This is called on first occurrence to capture event details.
54    #[cfg(feature = "human-readable")]
55    pub fn set_metadata(&mut self, metadata: EventMetadata) {
56        if self.metadata.is_none() {
57            self.metadata = Some(metadata);
58        }
59    }
60
61    /// Create event state from a snapshot (for deserialization).
62    ///
63    /// This is used by storage backends like Redis to reconstruct state.
64    #[cfg(feature = "redis-storage")]
65    pub fn from_snapshot(
66        policy: Policy,
67        suppressed_count: usize,
68        first_suppressed: Instant,
69        last_suppressed: Instant,
70    ) -> Self {
71        Self {
72            policy,
73            counter: SuppressionCounter::from_snapshot(
74                suppressed_count,
75                first_suppressed,
76                last_suppressed,
77            ),
78            metadata: None,
79        }
80    }
81}
82
83/// Registry managing all event suppression state.
84///
85/// Uses the Storage port for high-performance concurrent access.
86///
87/// This type is generic over the storage implementation, allowing different
88/// storage backends to be used. In production, use `Arc<ShardedStorage>`.
89#[derive(Clone)]
90pub struct SuppressionRegistry<S>
91where
92    S: Storage<EventSignature, EventState> + Clone,
93{
94    storage: S,
95    clock: Arc<dyn Clock>,
96    default_policy: Policy,
97}
98
99impl<S> SuppressionRegistry<S>
100where
101    S: Storage<EventSignature, EventState> + Clone,
102{
103    /// Create a new registry with storage, clock, and a default policy.
104    ///
105    /// All events will use the default policy unless overridden.
106    pub fn new(storage: S, clock: Arc<dyn Clock>, default_policy: Policy) -> Self {
107        Self {
108            storage,
109            clock,
110            default_policy,
111        }
112    }
113
114    /// Access or create event state for a signature with a callback.
115    ///
116    /// If this is the first time seeing this signature, creates new state
117    /// with the default policy. The callback receives the event state and
118    /// the current timestamp.
119    pub fn with_event_state<F, R>(&self, signature: EventSignature, f: F) -> R
120    where
121        F: FnOnce(&mut EventState, Instant) -> R,
122    {
123        let now = self.clock.now();
124        let default_policy = self.default_policy.clone();
125        self.storage.with_entry_mut(
126            signature,
127            || EventState::new(default_policy, now),
128            |state| f(state, now),
129        )
130    }
131
132    /// Get the default policy.
133    pub fn default_policy(&self) -> &Policy {
134        &self.default_policy
135    }
136
137    /// Create a clone of the default policy.
138    pub fn clone_default_policy(&self) -> Policy {
139        self.default_policy.clone()
140    }
141
142    /// Get the number of tracked signatures.
143    pub fn len(&self) -> usize {
144        self.storage.len()
145    }
146
147    /// Check if the registry is empty.
148    pub fn is_empty(&self) -> bool {
149        self.storage.is_empty()
150    }
151
152    /// Clear all tracked state.
153    pub fn clear(&self) {
154        self.storage.clear();
155    }
156
157    /// Iterate over all event states with a callback.
158    pub fn for_each<F>(&self, f: F)
159    where
160        F: FnMut(&EventSignature, &EventState),
161    {
162        self.storage.for_each(f);
163    }
164
165    /// Remove expired or inactive signatures based on a predicate.
166    pub fn cleanup<F>(&self, f: F)
167    where
168        F: FnMut(&EventSignature, &mut EventState) -> bool,
169    {
170        self.storage.retain(f);
171    }
172}
173
174#[cfg(test)]
175mod tests {
176    use super::*;
177    use crate::domain::policy::Policy;
178    use crate::infrastructure::clock::SystemClock;
179    use crate::infrastructure::storage::ShardedStorage;
180    use std::sync::Arc;
181
182    #[test]
183    fn test_registry_creation() {
184        let storage = Arc::new(ShardedStorage::new());
185        let clock = Arc::new(SystemClock::new());
186        let policy = Policy::count_based(100).unwrap();
187        let registry = SuppressionRegistry::new(storage, clock, policy);
188
189        assert_eq!(registry.len(), 0);
190        assert!(registry.is_empty());
191    }
192
193    #[test]
194    fn test_with_event_state() {
195        let storage = Arc::new(ShardedStorage::new());
196        let clock = Arc::new(SystemClock::new());
197        let policy = Policy::count_based(100).unwrap();
198        let registry = SuppressionRegistry::new(storage, clock, policy);
199        let sig = EventSignature::simple("INFO", "Test message");
200
201        // First access creates state
202        registry.with_event_state(sig, |state, _now| {
203            assert_eq!(state.counter.count(), 1);
204        });
205
206        assert_eq!(registry.len(), 1);
207        assert!(!registry.is_empty());
208
209        // Second access retrieves existing state
210        registry.with_event_state(sig, |state, now| {
211            state.counter.record_suppression(now);
212        });
213
214        assert_eq!(registry.len(), 1);
215    }
216
217    #[test]
218    fn test_clear() {
219        let storage = Arc::new(ShardedStorage::new());
220        let clock = Arc::new(SystemClock::new());
221        let policy = Policy::count_based(100).unwrap();
222        let registry = SuppressionRegistry::new(storage, clock, policy);
223
224        for i in 0..10 {
225            let sig = EventSignature::simple("INFO", &format!("Message {}", i));
226            registry.with_event_state(sig, |_state, _now| {
227                // State is created
228            });
229        }
230
231        assert_eq!(registry.len(), 10);
232
233        registry.clear();
234        assert_eq!(registry.len(), 0);
235        assert!(registry.is_empty());
236    }
237
238    #[test]
239    fn test_concurrent_access() {
240        use std::sync::Arc;
241        use std::thread;
242
243        let storage = Arc::new(ShardedStorage::new());
244        let clock = Arc::new(SystemClock::new());
245        let policy = Policy::count_based(100).unwrap();
246        let registry = Arc::new(SuppressionRegistry::new(storage, clock, policy));
247        let mut handles = vec![];
248
249        for i in 0..10 {
250            let registry_clone = Arc::clone(&registry);
251            let handle = thread::spawn(move || {
252                for j in 0..100 {
253                    let sig = EventSignature::simple("INFO", &format!("Msg_{}_{}", i, j));
254                    registry_clone.with_event_state(sig, |_state, _now| {
255                        // State is created
256                    });
257                }
258            });
259            handles.push(handle);
260        }
261
262        for handle in handles {
263            handle.join().unwrap();
264        }
265
266        assert_eq!(registry.len(), 1000);
267    }
268}