tracing_throttle/application/
emitter.rs1use crate::application::{
7 ports::Storage,
8 registry::{EventState, SuppressionRegistry},
9};
10use crate::domain::{signature::EventSignature, summary::SuppressionSummary};
11use std::time::Duration;
12
13#[cfg(feature = "async")]
14use tokio::{sync::watch, time::interval};
15
16#[derive(Debug, Clone, PartialEq, Eq)]
18pub enum EmitterConfigError {
19 ZeroSummaryInterval,
21}
22
23impl std::fmt::Display for EmitterConfigError {
24 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
25 match self {
26 EmitterConfigError::ZeroSummaryInterval => {
27 write!(f, "summary interval must be greater than 0")
28 }
29 }
30 }
31}
32
33impl std::error::Error for EmitterConfigError {}
34
35#[cfg(feature = "async")]
37#[derive(Debug, Clone, PartialEq, Eq)]
38pub enum ShutdownError {
39 TaskPanicked,
41 TaskCancelled,
43 Timeout,
45 SignalFailed,
47}
48
49#[cfg(feature = "async")]
50impl std::fmt::Display for ShutdownError {
51 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
52 match self {
53 ShutdownError::TaskPanicked => write!(f, "emitter task panicked during shutdown"),
54 ShutdownError::TaskCancelled => write!(f, "emitter task was cancelled"),
55 ShutdownError::Timeout => write!(f, "shutdown exceeded timeout"),
56 ShutdownError::SignalFailed => write!(f, "failed to send shutdown signal"),
57 }
58 }
59}
60
61#[cfg(feature = "async")]
62impl std::error::Error for ShutdownError {}
63
64#[derive(Debug, Clone)]
66pub struct EmitterConfig {
67 pub interval: Duration,
69 pub min_count: usize,
71}
72
73impl Default for EmitterConfig {
74 fn default() -> Self {
75 Self {
76 interval: Duration::from_secs(30),
77 min_count: 1,
78 }
79 }
80}
81
82impl EmitterConfig {
83 pub fn new(interval: Duration) -> Result<Self, EmitterConfigError> {
88 if interval.is_zero() {
89 return Err(EmitterConfigError::ZeroSummaryInterval);
90 }
91 Ok(Self {
92 interval,
93 min_count: 1,
94 })
95 }
96
97 pub fn with_min_count(mut self, min_count: usize) -> Self {
99 self.min_count = min_count;
100 self
101 }
102}
103
104#[cfg(feature = "async")]
140pub struct EmitterHandle {
141 shutdown_tx: watch::Sender<bool>,
142 join_handle: Option<tokio::task::JoinHandle<()>>,
143}
144
145#[cfg(feature = "async")]
146impl EmitterHandle {
147 pub async fn shutdown(self) -> Result<(), ShutdownError> {
186 self.shutdown_with_timeout(Duration::from_secs(10)).await
187 }
188
189 pub async fn shutdown_with_timeout(
231 mut self,
232 timeout_duration: Duration,
233 ) -> Result<(), ShutdownError> {
234 use tokio::time::timeout;
235
236 if self.shutdown_tx.send(true).is_err() {
238 return Err(ShutdownError::SignalFailed);
239 }
240
241 if let Some(handle) = self.join_handle.take() {
243 match timeout(timeout_duration, handle).await {
244 Ok(Ok(())) => Ok(()),
245 Ok(Err(e)) if e.is_panic() => Err(ShutdownError::TaskPanicked),
246 Ok(Err(e)) if e.is_cancelled() => Err(ShutdownError::TaskCancelled),
247 Ok(Err(_)) => Err(ShutdownError::TaskPanicked), Err(_) => Err(ShutdownError::Timeout),
249 }
250 } else {
251 Ok(())
252 }
253 }
254
255 pub fn is_running(&self) -> bool {
257 self.join_handle.as_ref().is_some_and(|h| !h.is_finished())
258 }
259}
260
261pub struct SummaryEmitter<S>
263where
264 S: Storage<EventSignature, EventState> + Clone,
265{
266 registry: SuppressionRegistry<S>,
267 config: EmitterConfig,
268}
269
270impl<S> SummaryEmitter<S>
271where
272 S: Storage<EventSignature, EventState> + Clone,
273{
274 pub fn new(registry: SuppressionRegistry<S>, config: EmitterConfig) -> Self {
276 Self { registry, config }
277 }
278
279 pub fn collect_summaries(&self) -> Vec<SuppressionSummary> {
284 let mut summaries = Vec::new();
285 let min_count = self.config.min_count;
286
287 self.registry.for_each(|signature, state| {
288 let count = state.counter.count();
289
290 if count >= min_count {
291 let summary = SuppressionSummary::from_counter(*signature, &state.counter);
292 summaries.push(summary);
293 }
294 });
295
296 summaries
297 }
298
299 #[cfg(feature = "async")]
354 pub fn start<F>(self, mut emit_fn: F, emit_final: bool) -> EmitterHandle
355 where
356 F: FnMut(Vec<SuppressionSummary>) + Send + 'static,
357 S: Send + 'static,
358 {
359 let (shutdown_tx, mut shutdown_rx) = tokio::sync::watch::channel(false);
360
361 let handle = tokio::spawn(async move {
362 let mut ticker = interval(self.config.interval);
363 ticker.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip);
365
366 loop {
367 tokio::select! {
368 biased;
370
371 _ = shutdown_rx.changed() => {
372 if *shutdown_rx.borrow_and_update() {
373 if emit_final {
375 let summaries = self.collect_summaries();
376 if !summaries.is_empty() {
377 let result = std::panic::catch_unwind(std::panic::AssertUnwindSafe(|| {
380 emit_fn(summaries);
381 }));
382
383 if result.is_err() {
384 #[cfg(debug_assertions)]
385 eprintln!("Warning: emit_fn panicked during final emission");
386 }
387 }
388 }
389 break;
390 }
391 }
392 _ = ticker.tick() => {
393 let summaries = self.collect_summaries();
394 if !summaries.is_empty() {
395 let result = std::panic::catch_unwind(std::panic::AssertUnwindSafe(|| {
398 emit_fn(summaries);
399 }));
400
401 if result.is_err() {
402 #[cfg(debug_assertions)]
404 eprintln!("Warning: emit_fn panicked during emission");
405 }
406 }
407 }
408 }
409 }
410 });
411
412 EmitterHandle {
413 shutdown_tx,
414 join_handle: Some(handle),
415 }
416 }
417
418 pub fn config(&self) -> &EmitterConfig {
420 &self.config
421 }
422
423 pub fn registry(&self) -> &SuppressionRegistry<S> {
425 &self.registry
426 }
427}
428
429#[cfg(test)]
430mod tests {
431 use super::*;
432 use crate::domain::{policy::Policy, signature::EventSignature};
433 use crate::infrastructure::clock::SystemClock;
434 use crate::infrastructure::storage::ShardedStorage;
435 use std::sync::Arc;
436
437 #[test]
438 fn test_collect_summaries_empty() {
439 let storage = Arc::new(ShardedStorage::new());
440 let clock = Arc::new(SystemClock::new());
441 let policy = Policy::count_based(100).unwrap();
442 let registry = SuppressionRegistry::new(storage, clock, policy);
443 let config = EmitterConfig::default();
444 let emitter = SummaryEmitter::new(registry, config);
445
446 let summaries = emitter.collect_summaries();
447 assert!(summaries.is_empty());
448 }
449
450 #[test]
451 fn test_collect_summaries_with_suppressions() {
452 let storage = Arc::new(ShardedStorage::new());
453 let clock = Arc::new(SystemClock::new());
454 let policy = Policy::count_based(100).unwrap();
455 let registry = SuppressionRegistry::new(storage, clock, policy);
456 let config = EmitterConfig::default();
457
458 for i in 0..3 {
460 let sig = EventSignature::simple("INFO", &format!("Message {}", i));
461 registry.with_event_state(sig, |state, now| {
462 for _ in 0..(i + 1) * 5 {
464 state.counter.record_suppression(now);
465 }
466 });
467 }
468
469 let emitter = SummaryEmitter::new(registry, config);
470 let summaries = emitter.collect_summaries();
471
472 assert_eq!(summaries.len(), 3);
473
474 let counts: Vec<usize> = summaries.iter().map(|s| s.count).collect();
476 assert!(counts.contains(&6)); assert!(counts.contains(&11)); assert!(counts.contains(&16)); }
480
481 #[test]
482 fn test_min_count_filtering() {
483 let storage = Arc::new(ShardedStorage::new());
484 let clock = Arc::new(SystemClock::new());
485 let policy = Policy::count_based(100).unwrap();
486 let registry = SuppressionRegistry::new(storage, clock, policy);
487 let config = EmitterConfig::default().with_min_count(10);
488
489 let sig1 = EventSignature::simple("INFO", "Low count");
491 registry.with_event_state(sig1, |state, now| {
492 for _ in 0..4 {
493 state.counter.record_suppression(now);
494 }
495 });
496
497 let sig2 = EventSignature::simple("INFO", "High count");
499 registry.with_event_state(sig2, |state, now| {
500 for _ in 0..14 {
501 state.counter.record_suppression(now);
502 }
503 });
504
505 let emitter = SummaryEmitter::new(registry, config);
506 let summaries = emitter.collect_summaries();
507
508 assert_eq!(summaries.len(), 1);
510 assert_eq!(summaries[0].count, 15); }
512
513 #[cfg(feature = "async")]
514 #[tokio::test]
515 async fn test_async_emission() {
516 use std::sync::Mutex;
517
518 let storage = Arc::new(ShardedStorage::new());
519 let clock = Arc::new(SystemClock::new());
520 let policy = Policy::count_based(100).unwrap();
521 let registry = SuppressionRegistry::new(storage, clock, policy);
522 let config = EmitterConfig::new(Duration::from_millis(100)).unwrap();
523
524 let sig = EventSignature::simple("INFO", "Test");
526 registry.with_event_state(sig, |state, now| {
527 state.counter.record_suppression(now);
528 });
529
530 let emitter = SummaryEmitter::new(registry, config);
531
532 let emissions = Arc::new(Mutex::new(Vec::new()));
534 let emissions_clone = Arc::clone(&emissions);
535
536 let handle = emitter.start(
537 move |summaries| {
538 emissions_clone.lock().unwrap().push(summaries.len());
539 },
540 false,
541 );
542
543 tokio::time::sleep(Duration::from_millis(250)).await;
545
546 handle.shutdown().await.expect("shutdown failed");
547
548 let emission_count = emissions.lock().unwrap().len();
550 assert!(emission_count >= 2);
551 }
552
553 #[test]
554 fn test_emitter_config_zero_interval() {
555 let result = EmitterConfig::new(Duration::from_secs(0));
556 assert!(matches!(
557 result,
558 Err(EmitterConfigError::ZeroSummaryInterval)
559 ));
560 }
561
562 #[test]
563 fn test_emitter_config_valid_interval() {
564 let config = EmitterConfig::new(Duration::from_secs(30)).unwrap();
565 assert_eq!(config.interval, Duration::from_secs(30));
566 assert_eq!(config.min_count, 1);
567 }
568
569 #[cfg(feature = "async")]
570 #[tokio::test]
571 async fn test_graceful_shutdown() {
572 use std::sync::Mutex;
573
574 let storage = Arc::new(ShardedStorage::new());
575 let clock = Arc::new(SystemClock::new());
576 let policy = Policy::count_based(100).unwrap();
577 let registry = SuppressionRegistry::new(storage, clock, policy);
578 let config = EmitterConfig::new(Duration::from_millis(100)).unwrap();
579
580 let sig = EventSignature::simple("INFO", "Test");
582 registry.with_event_state(sig, |state, now| {
583 state.counter.record_suppression(now);
584 });
585
586 let emitter = SummaryEmitter::new(registry, config);
587
588 let emissions = Arc::new(Mutex::new(0));
589 let emissions_clone = Arc::clone(&emissions);
590
591 let handle = emitter.start(
592 move |_| {
593 *emissions_clone.lock().unwrap() += 1;
594 },
595 false,
596 );
597
598 tokio::time::sleep(Duration::from_millis(250)).await;
600
601 handle.shutdown().await.expect("shutdown failed");
603
604 let final_count = *emissions.lock().unwrap();
606 assert!(final_count >= 1);
607
608 tokio::time::sleep(Duration::from_millis(150)).await;
610 let count_after_shutdown = *emissions.lock().unwrap();
611 assert_eq!(count_after_shutdown, final_count);
612 }
613
614 #[cfg(feature = "async")]
615 #[tokio::test]
616 async fn test_shutdown_with_final_emission() {
617 use std::sync::Mutex;
618
619 let storage = Arc::new(ShardedStorage::new());
620 let clock = Arc::new(SystemClock::new());
621 let policy = Policy::count_based(100).unwrap();
622 let registry = SuppressionRegistry::new(storage, clock, policy);
623 let config = EmitterConfig::new(Duration::from_secs(60)).unwrap(); let emitter = SummaryEmitter::new(registry.clone(), config);
626
627 let emissions = Arc::new(Mutex::new(Vec::new()));
628 let emissions_clone = Arc::clone(&emissions);
629
630 let handle = emitter.start(
631 move |summaries| {
632 emissions_clone.lock().unwrap().push(summaries.len());
633 },
634 true, );
636
637 tokio::time::sleep(Duration::from_millis(50)).await;
639
640 let sig = EventSignature::simple("INFO", "Test event");
642 registry.with_event_state(sig, |state, now| {
643 for _ in 0..10 {
644 state.counter.record_suppression(now);
645 }
646 });
647
648 tokio::time::sleep(Duration::from_millis(50)).await;
650 handle.shutdown().await.expect("shutdown failed");
651
652 let emission_list = emissions.lock().unwrap();
654 assert_eq!(emission_list.len(), 1);
655 assert_eq!(emission_list[0], 1); }
657
658 #[cfg(feature = "async")]
659 #[tokio::test]
660 async fn test_shutdown_without_final_emission() {
661 use std::sync::Mutex;
662
663 let storage = Arc::new(ShardedStorage::new());
664 let clock = Arc::new(SystemClock::new());
665 let policy = Policy::count_based(100).unwrap();
666 let registry = SuppressionRegistry::new(storage, clock, policy);
667 let config = EmitterConfig::new(Duration::from_secs(60)).unwrap();
668
669 let emitter = SummaryEmitter::new(registry.clone(), config);
670
671 let emissions = Arc::new(Mutex::new(0));
672 let emissions_clone = Arc::clone(&emissions);
673
674 let handle = emitter.start(
675 move |_| {
676 *emissions_clone.lock().unwrap() += 1;
677 },
678 false, );
680
681 tokio::time::sleep(Duration::from_millis(50)).await;
683
684 let sig = EventSignature::simple("INFO", "Test event");
686 registry.with_event_state(sig, |state, now| {
687 state.counter.record_suppression(now);
688 });
689
690 tokio::time::sleep(Duration::from_millis(50)).await;
692 handle.shutdown().await.expect("shutdown failed");
693
694 assert_eq!(*emissions.lock().unwrap(), 0);
696 }
697
698 #[cfg(feature = "async")]
699 #[tokio::test]
700 async fn test_is_running() {
701 let storage = Arc::new(ShardedStorage::new());
702 let clock = Arc::new(SystemClock::new());
703 let policy = Policy::count_based(100).unwrap();
704 let registry = SuppressionRegistry::new(storage, clock, policy);
705 let config = EmitterConfig::new(Duration::from_millis(100)).unwrap();
706
707 let emitter = SummaryEmitter::new(registry, config);
708 let handle = emitter.start(|_| {}, false);
709
710 assert!(handle.is_running());
712
713 handle.shutdown().await.expect("shutdown failed");
715
716 tokio::time::sleep(Duration::from_millis(50)).await;
718 }
720
721 #[cfg(feature = "async")]
722 #[tokio::test]
723 async fn test_shutdown_during_emission() {
724 use std::sync::{Arc, Mutex};
725
726 let storage = Arc::new(ShardedStorage::new());
727 let clock = Arc::new(SystemClock::new());
728 let policy = Policy::count_based(100).unwrap();
729 let registry = SuppressionRegistry::new(storage, clock, policy);
730 let config = EmitterConfig::new(Duration::from_millis(50)).unwrap();
731
732 let sig = EventSignature::simple("INFO", "Test");
734 registry.with_event_state(sig, |state, now| {
735 state.counter.record_suppression(now);
736 });
737
738 let emitter = SummaryEmitter::new(registry, config);
739
740 let emissions = Arc::new(Mutex::new(0));
741 let emissions_clone = Arc::clone(&emissions);
742
743 let handle = emitter.start(
744 move |_| {
745 std::thread::sleep(Duration::from_millis(30));
747 *emissions_clone.lock().unwrap() += 1;
748 },
749 false,
750 );
751
752 tokio::time::sleep(Duration::from_millis(60)).await;
754
755 handle.shutdown().await.expect("shutdown failed");
757
758 assert!(*emissions.lock().unwrap() >= 1);
760 }
761
762 #[cfg(feature = "async")]
763 #[tokio::test]
764 async fn test_shutdown_with_custom_timeout() {
765 let storage = Arc::new(ShardedStorage::new());
766 let clock = Arc::new(SystemClock::new());
767 let policy = Policy::count_based(100).unwrap();
768 let registry = SuppressionRegistry::new(storage, clock, policy);
769 let config = EmitterConfig::new(Duration::from_millis(100)).unwrap();
770
771 let emitter = SummaryEmitter::new(registry, config);
772 let handle = emitter.start(|_| {}, false);
773
774 tokio::time::sleep(Duration::from_millis(150)).await;
776
777 let result = handle.shutdown_with_timeout(Duration::from_secs(5)).await;
779 assert!(result.is_ok());
780 }
781
782 #[cfg(feature = "async")]
783 #[tokio::test]
784 async fn test_panic_in_emit_fn() {
785 use std::sync::atomic::{AtomicUsize, Ordering};
786
787 let storage = Arc::new(ShardedStorage::new());
788 let clock = Arc::new(SystemClock::new());
789 let policy = Policy::count_based(100).unwrap();
790 let registry = SuppressionRegistry::new(storage, clock, policy);
791 let config = EmitterConfig::new(Duration::from_millis(50)).unwrap();
792
793 let sig = EventSignature::simple("INFO", "Test");
795 registry.with_event_state(sig, |state, now| {
796 for _ in 0..5 {
797 state.counter.record_suppression(now);
798 }
799 });
800
801 let emitter = SummaryEmitter::new(registry, config);
802
803 let call_count = Arc::new(AtomicUsize::new(0));
804 let call_count_clone = Arc::clone(&call_count);
805
806 let handle = emitter.start(
807 move |_summaries| {
808 let count = call_count_clone.fetch_add(1, Ordering::SeqCst);
809
810 if count == 0 {
812 panic!("intentional panic for testing");
813 }
814 },
816 false,
817 );
818
819 tokio::time::sleep(Duration::from_millis(200)).await;
821
822 handle.shutdown().await.expect("shutdown failed");
823
824 let final_count = call_count.load(Ordering::SeqCst);
826 assert!(
827 final_count > 1,
828 "Task should continue after panic in emit_fn"
829 );
830 }
831}