use crate::application::ports::{Clock, Storage};
use crate::domain::{policy::Policy, signature::EventSignature, summary::SuppressionCounter};
#[cfg(feature = "human-readable")]
use crate::domain::metadata::EventMetadata;
use std::sync::Arc;
use std::time::Instant;
#[derive(Debug, Clone)]
pub struct EventState {
pub policy: Policy,
pub counter: SuppressionCounter,
#[cfg(feature = "human-readable")]
pub metadata: Option<EventMetadata>,
}
impl EventState {
pub fn new(policy: Policy, initial_timestamp: Instant) -> Self {
Self {
policy,
counter: SuppressionCounter::new(initial_timestamp),
#[cfg(feature = "human-readable")]
metadata: None,
}
}
#[cfg(feature = "human-readable")]
pub fn new_with_metadata(
policy: Policy,
initial_timestamp: Instant,
metadata: EventMetadata,
) -> Self {
Self {
policy,
counter: SuppressionCounter::new(initial_timestamp),
metadata: Some(metadata),
}
}
#[cfg(feature = "human-readable")]
pub fn set_metadata(&mut self, metadata: EventMetadata) {
if self.metadata.is_none() {
self.metadata = Some(metadata);
}
}
#[cfg(feature = "redis-storage")]
pub fn from_snapshot(
policy: Policy,
suppressed_count: usize,
first_suppressed: Instant,
last_suppressed: Instant,
) -> Self {
Self {
policy,
counter: SuppressionCounter::from_snapshot(
suppressed_count,
first_suppressed,
last_suppressed,
),
metadata: None,
}
}
}
#[derive(Clone)]
pub struct SuppressionRegistry<S>
where
S: Storage<EventSignature, EventState> + Clone,
{
storage: S,
clock: Arc<dyn Clock>,
default_policy: Policy,
}
impl<S> SuppressionRegistry<S>
where
S: Storage<EventSignature, EventState> + Clone,
{
pub fn new(storage: S, clock: Arc<dyn Clock>, default_policy: Policy) -> Self {
Self {
storage,
clock,
default_policy,
}
}
pub fn with_event_state<F, R>(&self, signature: EventSignature, f: F) -> R
where
F: FnOnce(&mut EventState, Instant) -> R,
{
let now = self.clock.now();
let default_policy = self.default_policy.clone();
self.storage.with_entry_mut(
signature,
|| EventState::new(default_policy, now),
|state| f(state, now),
)
}
pub fn default_policy(&self) -> &Policy {
&self.default_policy
}
pub fn clone_default_policy(&self) -> Policy {
self.default_policy.clone()
}
pub fn len(&self) -> usize {
self.storage.len()
}
pub fn is_empty(&self) -> bool {
self.storage.is_empty()
}
pub fn clear(&self) {
self.storage.clear();
}
pub fn for_each<F>(&self, f: F)
where
F: FnMut(&EventSignature, &EventState),
{
self.storage.for_each(f);
}
pub fn cleanup<F>(&self, f: F)
where
F: FnMut(&EventSignature, &mut EventState) -> bool,
{
self.storage.retain(f);
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::domain::policy::Policy;
use crate::infrastructure::clock::SystemClock;
use crate::infrastructure::storage::ShardedStorage;
use std::sync::Arc;
#[test]
fn test_registry_creation() {
let storage = Arc::new(ShardedStorage::new());
let clock = Arc::new(SystemClock::new());
let policy = Policy::count_based(100).unwrap();
let registry = SuppressionRegistry::new(storage, clock, policy);
assert_eq!(registry.len(), 0);
assert!(registry.is_empty());
}
#[test]
fn test_with_event_state() {
let storage = Arc::new(ShardedStorage::new());
let clock = Arc::new(SystemClock::new());
let policy = Policy::count_based(100).unwrap();
let registry = SuppressionRegistry::new(storage, clock, policy);
let sig = EventSignature::simple("INFO", "Test message");
registry.with_event_state(sig, |state, _now| {
assert_eq!(state.counter.count(), 0);
});
assert_eq!(registry.len(), 1);
assert!(!registry.is_empty());
registry.with_event_state(sig, |state, now| {
state.counter.record_suppression(now);
});
assert_eq!(registry.len(), 1);
}
#[test]
fn test_clear() {
let storage = Arc::new(ShardedStorage::new());
let clock = Arc::new(SystemClock::new());
let policy = Policy::count_based(100).unwrap();
let registry = SuppressionRegistry::new(storage, clock, policy);
for i in 0..10 {
let sig = EventSignature::simple("INFO", &format!("Message {}", i));
registry.with_event_state(sig, |_state, _now| {
});
}
assert_eq!(registry.len(), 10);
registry.clear();
assert_eq!(registry.len(), 0);
assert!(registry.is_empty());
}
#[test]
fn test_concurrent_access() {
use std::sync::Arc;
use std::thread;
let storage = Arc::new(ShardedStorage::new());
let clock = Arc::new(SystemClock::new());
let policy = Policy::count_based(100).unwrap();
let registry = Arc::new(SuppressionRegistry::new(storage, clock, policy));
let mut handles = vec![];
for i in 0..10 {
let registry_clone = Arc::clone(®istry);
let handle = thread::spawn(move || {
for j in 0..100 {
let sig = EventSignature::simple("INFO", &format!("Msg_{}_{}", i, j));
registry_clone.with_event_state(sig, |_state, _now| {
});
}
});
handles.push(handle);
}
for handle in handles {
handle.join().unwrap();
}
assert_eq!(registry.len(), 1000);
}
}