use crate::application::{
ports::Storage,
registry::{EventState, SuppressionRegistry},
};
use crate::domain::{signature::EventSignature, summary::SuppressionSummary};
use std::time::Duration;
#[cfg(feature = "async")]
use tokio::{sync::watch, time::interval};
#[derive(Debug, Clone, PartialEq, Eq)]
pub enum EmitterConfigError {
ZeroSummaryInterval,
}
impl std::fmt::Display for EmitterConfigError {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
EmitterConfigError::ZeroSummaryInterval => {
write!(f, "summary interval must be greater than 0")
}
}
}
}
impl std::error::Error for EmitterConfigError {}
#[cfg(feature = "async")]
#[derive(Debug, Clone, PartialEq, Eq)]
pub enum ShutdownError {
TaskPanicked,
TaskCancelled,
Timeout,
SignalFailed,
}
#[cfg(feature = "async")]
impl std::fmt::Display for ShutdownError {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
ShutdownError::TaskPanicked => write!(f, "emitter task panicked during shutdown"),
ShutdownError::TaskCancelled => write!(f, "emitter task was cancelled"),
ShutdownError::Timeout => write!(f, "shutdown exceeded timeout"),
ShutdownError::SignalFailed => write!(f, "failed to send shutdown signal"),
}
}
}
#[cfg(feature = "async")]
impl std::error::Error for ShutdownError {}
#[derive(Debug, Clone)]
pub struct EmitterConfig {
pub interval: Duration,
pub min_count: usize,
}
impl Default for EmitterConfig {
fn default() -> Self {
Self {
interval: Duration::from_secs(30),
min_count: 1,
}
}
}
impl EmitterConfig {
pub fn new(interval: Duration) -> Result<Self, EmitterConfigError> {
if interval.is_zero() {
return Err(EmitterConfigError::ZeroSummaryInterval);
}
Ok(Self {
interval,
min_count: 1,
})
}
pub fn with_min_count(mut self, min_count: usize) -> Self {
self.min_count = min_count;
self
}
}
#[cfg(feature = "async")]
pub struct EmitterHandle {
shutdown_tx: watch::Sender<bool>,
join_handle: Option<tokio::task::JoinHandle<()>>,
}
#[cfg(feature = "async")]
impl EmitterHandle {
pub async fn shutdown(self) -> Result<(), ShutdownError> {
self.shutdown_with_timeout(Duration::from_secs(10)).await
}
pub async fn shutdown_with_timeout(
mut self,
timeout_duration: Duration,
) -> Result<(), ShutdownError> {
use tokio::time::timeout;
if self.shutdown_tx.send(true).is_err() {
return Err(ShutdownError::SignalFailed);
}
if let Some(handle) = self.join_handle.take() {
match timeout(timeout_duration, handle).await {
Ok(Ok(())) => Ok(()),
Ok(Err(e)) if e.is_panic() => Err(ShutdownError::TaskPanicked),
Ok(Err(e)) if e.is_cancelled() => Err(ShutdownError::TaskCancelled),
Ok(Err(_)) => Err(ShutdownError::TaskPanicked), Err(_) => Err(ShutdownError::Timeout),
}
} else {
Ok(())
}
}
pub fn is_running(&self) -> bool {
self.join_handle.as_ref().is_some_and(|h| !h.is_finished())
}
}
pub struct SummaryEmitter<S>
where
S: Storage<EventSignature, EventState> + Clone,
{
registry: SuppressionRegistry<S>,
config: EmitterConfig,
}
impl<S> SummaryEmitter<S>
where
S: Storage<EventSignature, EventState> + Clone,
{
pub fn new(registry: SuppressionRegistry<S>, config: EmitterConfig) -> Self {
Self { registry, config }
}
pub fn collect_summaries(&self) -> Vec<SuppressionSummary> {
let mut summaries = Vec::new();
let min_count = self.config.min_count;
self.registry.for_each(|signature, state| {
let count = state.counter.count();
if count >= min_count {
#[cfg(feature = "human-readable")]
let summary = SuppressionSummary::from_counter_with_metadata(
*signature,
&state.counter,
state.metadata.clone(),
);
#[cfg(not(feature = "human-readable"))]
let summary = SuppressionSummary::from_counter(*signature, &state.counter);
summaries.push(summary);
}
});
summaries
}
#[cfg(feature = "async")]
pub fn start<F>(self, mut emit_fn: F, emit_final: bool) -> EmitterHandle
where
F: FnMut(Vec<SuppressionSummary>) + Send + 'static,
S: Send + 'static,
{
let (shutdown_tx, mut shutdown_rx) = tokio::sync::watch::channel(false);
let handle = tokio::spawn(async move {
let mut ticker = interval(self.config.interval);
ticker.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip);
loop {
tokio::select! {
biased;
_ = shutdown_rx.changed() => {
if *shutdown_rx.borrow_and_update() {
if emit_final {
let summaries = self.collect_summaries();
if !summaries.is_empty() {
let result = std::panic::catch_unwind(std::panic::AssertUnwindSafe(|| {
emit_fn(summaries);
}));
if result.is_err() {
#[cfg(debug_assertions)]
eprintln!("Warning: emit_fn panicked during final emission");
}
}
}
break;
}
}
_ = ticker.tick() => {
let summaries = self.collect_summaries();
if !summaries.is_empty() {
let result = std::panic::catch_unwind(std::panic::AssertUnwindSafe(|| {
emit_fn(summaries);
}));
if result.is_err() {
#[cfg(debug_assertions)]
eprintln!("Warning: emit_fn panicked during emission");
}
}
}
}
}
});
EmitterHandle {
shutdown_tx,
join_handle: Some(handle),
}
}
pub fn config(&self) -> &EmitterConfig {
&self.config
}
pub fn registry(&self) -> &SuppressionRegistry<S> {
&self.registry
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::domain::{policy::Policy, signature::EventSignature};
use crate::infrastructure::clock::SystemClock;
use crate::infrastructure::storage::ShardedStorage;
use std::sync::Arc;
#[test]
fn test_collect_summaries_empty() {
let storage = Arc::new(ShardedStorage::new());
let clock = Arc::new(SystemClock::new());
let policy = Policy::count_based(100).unwrap();
let registry = SuppressionRegistry::new(storage, clock, policy);
let config = EmitterConfig::default();
let emitter = SummaryEmitter::new(registry, config);
let summaries = emitter.collect_summaries();
assert!(summaries.is_empty());
}
#[test]
fn test_collect_summaries_with_suppressions() {
let storage = Arc::new(ShardedStorage::new());
let clock = Arc::new(SystemClock::new());
let policy = Policy::count_based(100).unwrap();
let registry = SuppressionRegistry::new(storage, clock, policy);
let config = EmitterConfig::default();
for i in 0..3 {
let sig = EventSignature::simple("INFO", &format!("Message {}", i));
registry.with_event_state(sig, |state, now| {
for _ in 0..(i + 1) * 5 {
state.counter.record_suppression(now);
}
});
}
let emitter = SummaryEmitter::new(registry, config);
let summaries = emitter.collect_summaries();
assert_eq!(summaries.len(), 3);
let counts: Vec<usize> = summaries.iter().map(|s| s.count).collect();
assert!(counts.contains(&5));
assert!(counts.contains(&10));
assert!(counts.contains(&15));
}
#[test]
fn test_min_count_filtering() {
let storage = Arc::new(ShardedStorage::new());
let clock = Arc::new(SystemClock::new());
let policy = Policy::count_based(100).unwrap();
let registry = SuppressionRegistry::new(storage, clock, policy);
let config = EmitterConfig::default().with_min_count(10);
let sig1 = EventSignature::simple("INFO", "Low count");
registry.with_event_state(sig1, |state, now| {
for _ in 0..4 {
state.counter.record_suppression(now);
}
});
let sig2 = EventSignature::simple("INFO", "High count");
registry.with_event_state(sig2, |state, now| {
for _ in 0..14 {
state.counter.record_suppression(now);
}
});
let emitter = SummaryEmitter::new(registry, config);
let summaries = emitter.collect_summaries();
assert_eq!(summaries.len(), 1);
assert_eq!(summaries[0].count, 14);
}
#[cfg(feature = "async")]
#[tokio::test]
async fn test_async_emission() {
use std::sync::Mutex;
let storage = Arc::new(ShardedStorage::new());
let clock = Arc::new(SystemClock::new());
let policy = Policy::count_based(100).unwrap();
let registry = SuppressionRegistry::new(storage, clock, policy);
let config = EmitterConfig::new(Duration::from_millis(100)).unwrap();
let sig = EventSignature::simple("INFO", "Test");
registry.with_event_state(sig, |state, now| {
state.counter.record_suppression(now);
});
let emitter = SummaryEmitter::new(registry, config);
let emissions = Arc::new(Mutex::new(Vec::new()));
let emissions_clone = Arc::clone(&emissions);
let handle = emitter.start(
move |summaries| {
emissions_clone.lock().unwrap().push(summaries.len());
},
false,
);
tokio::time::sleep(Duration::from_millis(250)).await;
handle.shutdown().await.expect("shutdown failed");
let emission_count = emissions.lock().unwrap().len();
assert!(emission_count >= 2);
}
#[test]
fn test_emitter_config_zero_interval() {
let result = EmitterConfig::new(Duration::from_secs(0));
assert!(matches!(
result,
Err(EmitterConfigError::ZeroSummaryInterval)
));
}
#[test]
fn test_emitter_config_valid_interval() {
let config = EmitterConfig::new(Duration::from_secs(30)).unwrap();
assert_eq!(config.interval, Duration::from_secs(30));
assert_eq!(config.min_count, 1);
}
#[cfg(feature = "async")]
#[tokio::test]
async fn test_graceful_shutdown() {
use std::sync::Mutex;
let storage = Arc::new(ShardedStorage::new());
let clock = Arc::new(SystemClock::new());
let policy = Policy::count_based(100).unwrap();
let registry = SuppressionRegistry::new(storage, clock, policy);
let config = EmitterConfig::new(Duration::from_millis(100)).unwrap();
let sig = EventSignature::simple("INFO", "Test");
registry.with_event_state(sig, |state, now| {
state.counter.record_suppression(now);
});
let emitter = SummaryEmitter::new(registry, config);
let emissions = Arc::new(Mutex::new(0));
let emissions_clone = Arc::clone(&emissions);
let handle = emitter.start(
move |_| {
*emissions_clone.lock().unwrap() += 1;
},
false,
);
tokio::time::sleep(Duration::from_millis(250)).await;
handle.shutdown().await.expect("shutdown failed");
let final_count = *emissions.lock().unwrap();
assert!(final_count >= 1);
tokio::time::sleep(Duration::from_millis(150)).await;
let count_after_shutdown = *emissions.lock().unwrap();
assert_eq!(count_after_shutdown, final_count);
}
#[cfg(feature = "async")]
#[tokio::test]
async fn test_shutdown_with_final_emission() {
use std::sync::Mutex;
let storage = Arc::new(ShardedStorage::new());
let clock = Arc::new(SystemClock::new());
let policy = Policy::count_based(100).unwrap();
let registry = SuppressionRegistry::new(storage, clock, policy);
let config = EmitterConfig::new(Duration::from_secs(60)).unwrap();
let emitter = SummaryEmitter::new(registry.clone(), config);
let emissions = Arc::new(Mutex::new(Vec::new()));
let emissions_clone = Arc::clone(&emissions);
let handle = emitter.start(
move |summaries| {
emissions_clone.lock().unwrap().push(summaries.len());
},
true, );
tokio::time::sleep(Duration::from_millis(50)).await;
let sig = EventSignature::simple("INFO", "Test event");
registry.with_event_state(sig, |state, now| {
for _ in 0..10 {
state.counter.record_suppression(now);
}
});
tokio::time::sleep(Duration::from_millis(50)).await;
handle.shutdown().await.expect("shutdown failed");
let emission_list = emissions.lock().unwrap();
assert_eq!(emission_list.len(), 1);
assert_eq!(emission_list[0], 1); }
#[cfg(feature = "async")]
#[tokio::test]
async fn test_shutdown_without_final_emission() {
use std::sync::Mutex;
let storage = Arc::new(ShardedStorage::new());
let clock = Arc::new(SystemClock::new());
let policy = Policy::count_based(100).unwrap();
let registry = SuppressionRegistry::new(storage, clock, policy);
let config = EmitterConfig::new(Duration::from_secs(60)).unwrap();
let emitter = SummaryEmitter::new(registry.clone(), config);
let emissions = Arc::new(Mutex::new(0));
let emissions_clone = Arc::clone(&emissions);
let handle = emitter.start(
move |_| {
*emissions_clone.lock().unwrap() += 1;
},
false, );
tokio::time::sleep(Duration::from_millis(50)).await;
let sig = EventSignature::simple("INFO", "Test event");
registry.with_event_state(sig, |state, now| {
state.counter.record_suppression(now);
});
tokio::time::sleep(Duration::from_millis(50)).await;
handle.shutdown().await.expect("shutdown failed");
assert_eq!(*emissions.lock().unwrap(), 0);
}
#[cfg(feature = "async")]
#[tokio::test]
async fn test_is_running() {
let storage = Arc::new(ShardedStorage::new());
let clock = Arc::new(SystemClock::new());
let policy = Policy::count_based(100).unwrap();
let registry = SuppressionRegistry::new(storage, clock, policy);
let config = EmitterConfig::new(Duration::from_millis(100)).unwrap();
let emitter = SummaryEmitter::new(registry, config);
let handle = emitter.start(|_| {}, false);
assert!(handle.is_running());
handle.shutdown().await.expect("shutdown failed");
tokio::time::sleep(Duration::from_millis(50)).await;
}
#[cfg(feature = "async")]
#[tokio::test]
async fn test_shutdown_during_emission() {
use std::sync::{Arc, Mutex};
let storage = Arc::new(ShardedStorage::new());
let clock = Arc::new(SystemClock::new());
let policy = Policy::count_based(100).unwrap();
let registry = SuppressionRegistry::new(storage, clock, policy);
let config = EmitterConfig::new(Duration::from_millis(50)).unwrap();
let sig = EventSignature::simple("INFO", "Test");
registry.with_event_state(sig, |state, now| {
state.counter.record_suppression(now);
});
let emitter = SummaryEmitter::new(registry, config);
let emissions = Arc::new(Mutex::new(0));
let emissions_clone = Arc::clone(&emissions);
let handle = emitter.start(
move |_| {
std::thread::sleep(Duration::from_millis(30));
*emissions_clone.lock().unwrap() += 1;
},
false,
);
tokio::time::sleep(Duration::from_millis(60)).await;
handle.shutdown().await.expect("shutdown failed");
assert!(*emissions.lock().unwrap() >= 1);
}
#[cfg(feature = "async")]
#[tokio::test]
async fn test_shutdown_with_custom_timeout() {
let storage = Arc::new(ShardedStorage::new());
let clock = Arc::new(SystemClock::new());
let policy = Policy::count_based(100).unwrap();
let registry = SuppressionRegistry::new(storage, clock, policy);
let config = EmitterConfig::new(Duration::from_millis(100)).unwrap();
let emitter = SummaryEmitter::new(registry, config);
let handle = emitter.start(|_| {}, false);
tokio::time::sleep(Duration::from_millis(150)).await;
let result = handle.shutdown_with_timeout(Duration::from_secs(5)).await;
assert!(result.is_ok());
}
#[cfg(feature = "async")]
#[tokio::test]
async fn test_panic_in_emit_fn() {
use std::sync::atomic::{AtomicUsize, Ordering};
let storage = Arc::new(ShardedStorage::new());
let clock = Arc::new(SystemClock::new());
let policy = Policy::count_based(100).unwrap();
let registry = SuppressionRegistry::new(storage, clock, policy);
let config = EmitterConfig::new(Duration::from_millis(50)).unwrap();
let sig = EventSignature::simple("INFO", "Test");
registry.with_event_state(sig, |state, now| {
for _ in 0..5 {
state.counter.record_suppression(now);
}
});
let emitter = SummaryEmitter::new(registry, config);
let call_count = Arc::new(AtomicUsize::new(0));
let call_count_clone = Arc::clone(&call_count);
let handle = emitter.start(
move |_summaries| {
let count = call_count_clone.fetch_add(1, Ordering::SeqCst);
if count == 0 {
panic!("intentional panic for testing");
}
},
false,
);
tokio::time::sleep(Duration::from_millis(200)).await;
handle.shutdown().await.expect("shutdown failed");
let final_count = call_count.load(Ordering::SeqCst);
assert!(
final_count > 1,
"Task should continue after panic in emit_fn"
);
}
#[cfg(feature = "async")]
#[tokio::test]
async fn test_repeated_panic_in_emit_fn() {
use std::sync::atomic::{AtomicUsize, Ordering};
let storage = Arc::new(ShardedStorage::new());
let clock = Arc::new(SystemClock::new());
let policy = Policy::count_based(100).unwrap();
let registry = SuppressionRegistry::new(storage, clock, policy);
let config = EmitterConfig::new(Duration::from_millis(30)).unwrap();
let sig = EventSignature::simple("INFO", "Test");
registry.with_event_state(sig, |state, now| {
state.counter.record_suppression(now);
});
let emitter = SummaryEmitter::new(registry, config);
let call_count = Arc::new(AtomicUsize::new(0));
let call_count_clone = Arc::clone(&call_count);
let handle = emitter.start(
move |_summaries| {
call_count_clone.fetch_add(1, Ordering::SeqCst);
panic!("always panic");
},
false,
);
tokio::time::sleep(Duration::from_millis(150)).await;
handle.shutdown().await.expect("shutdown failed");
let final_count = call_count.load(Ordering::SeqCst);
assert!(
final_count >= 3,
"Task should continue despite repeated panics, got {} calls",
final_count
);
}
#[cfg(feature = "async")]
#[tokio::test]
async fn test_panic_during_final_emission() {
use std::sync::atomic::{AtomicBool, Ordering};
let storage = Arc::new(ShardedStorage::new());
let clock = Arc::new(SystemClock::new());
let policy = Policy::count_based(100).unwrap();
let registry = SuppressionRegistry::new(storage, clock, policy);
let config = EmitterConfig::new(Duration::from_secs(3600)).unwrap();
let sig = EventSignature::simple("INFO", "Test");
registry.with_event_state(sig, |state, now| {
state.counter.record_suppression(now);
});
let emitter = SummaryEmitter::new(registry, config);
let panicked = Arc::new(AtomicBool::new(false));
let panicked_clone = Arc::clone(&panicked);
let handle = emitter.start(
move |_summaries| {
panicked_clone.store(true, Ordering::SeqCst);
panic!("panic during final emission");
},
true, );
handle
.shutdown()
.await
.expect("shutdown should succeed even if final emission panics");
assert!(
panicked.load(Ordering::SeqCst),
"Final emission should have been attempted"
);
}
#[cfg(feature = "async")]
#[tokio::test]
async fn test_shutdown_timeout_with_slow_emit_fn() {
let storage = Arc::new(ShardedStorage::new());
let clock = Arc::new(SystemClock::new());
let policy = Policy::count_based(100).unwrap();
let registry = SuppressionRegistry::new(storage, clock, policy);
let config = EmitterConfig::new(Duration::from_secs(3600)).unwrap();
let emitter = SummaryEmitter::new(registry, config);
let handle = emitter.start(
move |_summaries| {
std::thread::sleep(Duration::from_millis(500));
},
true,
);
let result = handle
.shutdown_with_timeout(Duration::from_millis(10))
.await;
let _ = result; }
#[cfg(feature = "async")]
#[tokio::test]
async fn test_handle_dropped_without_shutdown() {
let storage = Arc::new(ShardedStorage::new());
let clock = Arc::new(SystemClock::new());
let policy = Policy::count_based(100).unwrap();
let registry = SuppressionRegistry::new(storage, clock, policy);
let config = EmitterConfig::new(Duration::from_millis(50)).unwrap();
let emitter = SummaryEmitter::new(registry, config);
let handle = emitter.start(|_summaries| {}, false);
drop(handle);
tokio::time::sleep(Duration::from_millis(100)).await;
}
#[cfg(feature = "async")]
#[tokio::test]
async fn test_concurrent_shutdown_calls() {
let storage = Arc::new(ShardedStorage::new());
let clock = Arc::new(SystemClock::new());
let policy = Policy::count_based(100).unwrap();
let registry = SuppressionRegistry::new(storage, clock, policy);
let config = EmitterConfig::new(Duration::from_secs(3600)).unwrap();
let emitter = SummaryEmitter::new(registry, config);
let handle = emitter.start(|_summaries| {}, false);
let sender = handle.shutdown_tx.clone();
let mut handles_vec = vec![];
for _ in 0..5 {
let sender_clone = sender.clone();
handles_vec.push(tokio::spawn(async move {
let _ = sender_clone.send(true);
}));
}
for h in handles_vec {
let _ = h.await;
}
let _ = handle.shutdown().await;
}
#[cfg(feature = "async")]
#[tokio::test]
async fn test_shutdown_signal_failure() {
let storage = Arc::new(ShardedStorage::new());
let clock = Arc::new(SystemClock::new());
let policy = Policy::count_based(100).unwrap();
let registry = SuppressionRegistry::new(storage, clock, policy);
let config = EmitterConfig::new(Duration::from_millis(30)).unwrap();
let emitter = SummaryEmitter::new(registry, config);
let handle = emitter.start(|_summaries| {}, false);
handle.shutdown().await.expect("shutdown should succeed");
tokio::time::sleep(Duration::from_millis(100)).await;
}
}