1use crate::application::{
7 ports::Storage,
8 registry::{EventState, SuppressionRegistry},
9};
10use crate::domain::{signature::EventSignature, summary::SuppressionSummary};
11use std::time::Duration;
12
13#[cfg(feature = "async")]
14use tokio::{sync::watch, time::interval};
15
16#[derive(Debug, Clone, PartialEq, Eq)]
18pub enum EmitterConfigError {
19 ZeroSummaryInterval,
21}
22
23impl std::fmt::Display for EmitterConfigError {
24 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
25 match self {
26 EmitterConfigError::ZeroSummaryInterval => {
27 write!(f, "summary interval must be greater than 0")
28 }
29 }
30 }
31}
32
33impl std::error::Error for EmitterConfigError {}
34
35#[cfg(feature = "async")]
37#[derive(Debug, Clone, PartialEq, Eq)]
38pub enum ShutdownError {
39 TaskPanicked,
41 TaskCancelled,
43 Timeout,
45 SignalFailed,
47}
48
49#[cfg(feature = "async")]
50impl std::fmt::Display for ShutdownError {
51 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
52 match self {
53 ShutdownError::TaskPanicked => write!(f, "emitter task panicked during shutdown"),
54 ShutdownError::TaskCancelled => write!(f, "emitter task was cancelled"),
55 ShutdownError::Timeout => write!(f, "shutdown exceeded timeout"),
56 ShutdownError::SignalFailed => write!(f, "failed to send shutdown signal"),
57 }
58 }
59}
60
61#[cfg(feature = "async")]
62impl std::error::Error for ShutdownError {}
63
64#[derive(Debug, Clone)]
66pub struct EmitterConfig {
67 pub interval: Duration,
69 pub min_count: usize,
71}
72
73impl Default for EmitterConfig {
74 fn default() -> Self {
75 Self {
76 interval: Duration::from_secs(30),
77 min_count: 1,
78 }
79 }
80}
81
82impl EmitterConfig {
83 pub fn new(interval: Duration) -> Result<Self, EmitterConfigError> {
88 if interval.is_zero() {
89 return Err(EmitterConfigError::ZeroSummaryInterval);
90 }
91 Ok(Self {
92 interval,
93 min_count: 1,
94 })
95 }
96
97 pub fn with_min_count(mut self, min_count: usize) -> Self {
99 self.min_count = min_count;
100 self
101 }
102}
103
104#[cfg(feature = "async")]
140pub struct EmitterHandle {
141 shutdown_tx: watch::Sender<bool>,
142 join_handle: Option<tokio::task::JoinHandle<()>>,
143}
144
145#[cfg(feature = "async")]
146impl EmitterHandle {
147 pub async fn shutdown(self) -> Result<(), ShutdownError> {
186 self.shutdown_with_timeout(Duration::from_secs(10)).await
187 }
188
189 pub async fn shutdown_with_timeout(
231 mut self,
232 timeout_duration: Duration,
233 ) -> Result<(), ShutdownError> {
234 use tokio::time::timeout;
235
236 if self.shutdown_tx.send(true).is_err() {
238 return Err(ShutdownError::SignalFailed);
239 }
240
241 if let Some(handle) = self.join_handle.take() {
243 match timeout(timeout_duration, handle).await {
244 Ok(Ok(())) => Ok(()),
245 Ok(Err(e)) if e.is_panic() => Err(ShutdownError::TaskPanicked),
246 Ok(Err(e)) if e.is_cancelled() => Err(ShutdownError::TaskCancelled),
247 Ok(Err(_)) => Err(ShutdownError::TaskPanicked), Err(_) => Err(ShutdownError::Timeout),
249 }
250 } else {
251 Ok(())
252 }
253 }
254
255 pub fn is_running(&self) -> bool {
257 self.join_handle.as_ref().is_some_and(|h| !h.is_finished())
258 }
259}
260
261pub struct SummaryEmitter<S>
263where
264 S: Storage<EventSignature, EventState> + Clone,
265{
266 registry: SuppressionRegistry<S>,
267 config: EmitterConfig,
268}
269
270impl<S> SummaryEmitter<S>
271where
272 S: Storage<EventSignature, EventState> + Clone,
273{
274 pub fn new(registry: SuppressionRegistry<S>, config: EmitterConfig) -> Self {
276 Self { registry, config }
277 }
278
279 pub fn collect_summaries(&self) -> Vec<SuppressionSummary> {
284 let mut summaries = Vec::new();
285 let min_count = self.config.min_count;
286
287 self.registry.for_each(|signature, state| {
288 let count = state.counter.count();
289
290 if count >= min_count {
291 #[cfg(feature = "human-readable")]
292 let summary = SuppressionSummary::from_counter_with_metadata(
293 *signature,
294 &state.counter,
295 state.metadata.clone(),
296 );
297
298 #[cfg(not(feature = "human-readable"))]
299 let summary = SuppressionSummary::from_counter(*signature, &state.counter);
300
301 summaries.push(summary);
302 }
303 });
304
305 summaries
306 }
307
308 #[cfg(feature = "async")]
363 pub fn start<F>(self, mut emit_fn: F, emit_final: bool) -> EmitterHandle
364 where
365 F: FnMut(Vec<SuppressionSummary>) + Send + 'static,
366 S: Send + 'static,
367 {
368 let (shutdown_tx, mut shutdown_rx) = tokio::sync::watch::channel(false);
369
370 let handle = tokio::spawn(async move {
371 let mut ticker = interval(self.config.interval);
372 ticker.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip);
374
375 loop {
376 tokio::select! {
377 biased;
379
380 _ = shutdown_rx.changed() => {
381 if *shutdown_rx.borrow_and_update() {
382 if emit_final {
384 let summaries = self.collect_summaries();
385 if !summaries.is_empty() {
386 let result = std::panic::catch_unwind(std::panic::AssertUnwindSafe(|| {
389 emit_fn(summaries);
390 }));
391
392 if result.is_err() {
393 #[cfg(debug_assertions)]
394 eprintln!("Warning: emit_fn panicked during final emission");
395 }
396 }
397 }
398 break;
399 }
400 }
401 _ = ticker.tick() => {
402 let summaries = self.collect_summaries();
403 if !summaries.is_empty() {
404 let result = std::panic::catch_unwind(std::panic::AssertUnwindSafe(|| {
407 emit_fn(summaries);
408 }));
409
410 if result.is_err() {
411 #[cfg(debug_assertions)]
413 eprintln!("Warning: emit_fn panicked during emission");
414 }
415 }
416 }
417 }
418 }
419 });
420
421 EmitterHandle {
422 shutdown_tx,
423 join_handle: Some(handle),
424 }
425 }
426
427 pub fn config(&self) -> &EmitterConfig {
429 &self.config
430 }
431
432 pub fn registry(&self) -> &SuppressionRegistry<S> {
434 &self.registry
435 }
436}
437
438#[cfg(test)]
439mod tests {
440 use super::*;
441 use crate::domain::{policy::Policy, signature::EventSignature};
442 use crate::infrastructure::clock::SystemClock;
443 use crate::infrastructure::storage::ShardedStorage;
444 use std::sync::Arc;
445
446 #[test]
447 fn test_collect_summaries_empty() {
448 let storage = Arc::new(ShardedStorage::new());
449 let clock = Arc::new(SystemClock::new());
450 let policy = Policy::count_based(100).unwrap();
451 let registry = SuppressionRegistry::new(storage, clock, policy);
452 let config = EmitterConfig::default();
453 let emitter = SummaryEmitter::new(registry, config);
454
455 let summaries = emitter.collect_summaries();
456 assert!(summaries.is_empty());
457 }
458
459 #[test]
460 fn test_collect_summaries_with_suppressions() {
461 let storage = Arc::new(ShardedStorage::new());
462 let clock = Arc::new(SystemClock::new());
463 let policy = Policy::count_based(100).unwrap();
464 let registry = SuppressionRegistry::new(storage, clock, policy);
465 let config = EmitterConfig::default();
466
467 for i in 0..3 {
469 let sig = EventSignature::simple("INFO", &format!("Message {}", i));
470 registry.with_event_state(sig, |state, now| {
471 for _ in 0..(i + 1) * 5 {
473 state.counter.record_suppression(now);
474 }
475 });
476 }
477
478 let emitter = SummaryEmitter::new(registry, config);
479 let summaries = emitter.collect_summaries();
480
481 assert_eq!(summaries.len(), 3);
482
483 let counts: Vec<usize> = summaries.iter().map(|s| s.count).collect();
485 assert!(counts.contains(&6)); assert!(counts.contains(&11)); assert!(counts.contains(&16)); }
489
490 #[test]
491 fn test_min_count_filtering() {
492 let storage = Arc::new(ShardedStorage::new());
493 let clock = Arc::new(SystemClock::new());
494 let policy = Policy::count_based(100).unwrap();
495 let registry = SuppressionRegistry::new(storage, clock, policy);
496 let config = EmitterConfig::default().with_min_count(10);
497
498 let sig1 = EventSignature::simple("INFO", "Low count");
500 registry.with_event_state(sig1, |state, now| {
501 for _ in 0..4 {
502 state.counter.record_suppression(now);
503 }
504 });
505
506 let sig2 = EventSignature::simple("INFO", "High count");
508 registry.with_event_state(sig2, |state, now| {
509 for _ in 0..14 {
510 state.counter.record_suppression(now);
511 }
512 });
513
514 let emitter = SummaryEmitter::new(registry, config);
515 let summaries = emitter.collect_summaries();
516
517 assert_eq!(summaries.len(), 1);
519 assert_eq!(summaries[0].count, 15); }
521
522 #[cfg(feature = "async")]
523 #[tokio::test]
524 async fn test_async_emission() {
525 use std::sync::Mutex;
526
527 let storage = Arc::new(ShardedStorage::new());
528 let clock = Arc::new(SystemClock::new());
529 let policy = Policy::count_based(100).unwrap();
530 let registry = SuppressionRegistry::new(storage, clock, policy);
531 let config = EmitterConfig::new(Duration::from_millis(100)).unwrap();
532
533 let sig = EventSignature::simple("INFO", "Test");
535 registry.with_event_state(sig, |state, now| {
536 state.counter.record_suppression(now);
537 });
538
539 let emitter = SummaryEmitter::new(registry, config);
540
541 let emissions = Arc::new(Mutex::new(Vec::new()));
543 let emissions_clone = Arc::clone(&emissions);
544
545 let handle = emitter.start(
546 move |summaries| {
547 emissions_clone.lock().unwrap().push(summaries.len());
548 },
549 false,
550 );
551
552 tokio::time::sleep(Duration::from_millis(250)).await;
554
555 handle.shutdown().await.expect("shutdown failed");
556
557 let emission_count = emissions.lock().unwrap().len();
559 assert!(emission_count >= 2);
560 }
561
562 #[test]
563 fn test_emitter_config_zero_interval() {
564 let result = EmitterConfig::new(Duration::from_secs(0));
565 assert!(matches!(
566 result,
567 Err(EmitterConfigError::ZeroSummaryInterval)
568 ));
569 }
570
571 #[test]
572 fn test_emitter_config_valid_interval() {
573 let config = EmitterConfig::new(Duration::from_secs(30)).unwrap();
574 assert_eq!(config.interval, Duration::from_secs(30));
575 assert_eq!(config.min_count, 1);
576 }
577
578 #[cfg(feature = "async")]
579 #[tokio::test]
580 async fn test_graceful_shutdown() {
581 use std::sync::Mutex;
582
583 let storage = Arc::new(ShardedStorage::new());
584 let clock = Arc::new(SystemClock::new());
585 let policy = Policy::count_based(100).unwrap();
586 let registry = SuppressionRegistry::new(storage, clock, policy);
587 let config = EmitterConfig::new(Duration::from_millis(100)).unwrap();
588
589 let sig = EventSignature::simple("INFO", "Test");
591 registry.with_event_state(sig, |state, now| {
592 state.counter.record_suppression(now);
593 });
594
595 let emitter = SummaryEmitter::new(registry, config);
596
597 let emissions = Arc::new(Mutex::new(0));
598 let emissions_clone = Arc::clone(&emissions);
599
600 let handle = emitter.start(
601 move |_| {
602 *emissions_clone.lock().unwrap() += 1;
603 },
604 false,
605 );
606
607 tokio::time::sleep(Duration::from_millis(250)).await;
609
610 handle.shutdown().await.expect("shutdown failed");
612
613 let final_count = *emissions.lock().unwrap();
615 assert!(final_count >= 1);
616
617 tokio::time::sleep(Duration::from_millis(150)).await;
619 let count_after_shutdown = *emissions.lock().unwrap();
620 assert_eq!(count_after_shutdown, final_count);
621 }
622
623 #[cfg(feature = "async")]
624 #[tokio::test]
625 async fn test_shutdown_with_final_emission() {
626 use std::sync::Mutex;
627
628 let storage = Arc::new(ShardedStorage::new());
629 let clock = Arc::new(SystemClock::new());
630 let policy = Policy::count_based(100).unwrap();
631 let registry = SuppressionRegistry::new(storage, clock, policy);
632 let config = EmitterConfig::new(Duration::from_secs(60)).unwrap(); let emitter = SummaryEmitter::new(registry.clone(), config);
635
636 let emissions = Arc::new(Mutex::new(Vec::new()));
637 let emissions_clone = Arc::clone(&emissions);
638
639 let handle = emitter.start(
640 move |summaries| {
641 emissions_clone.lock().unwrap().push(summaries.len());
642 },
643 true, );
645
646 tokio::time::sleep(Duration::from_millis(50)).await;
648
649 let sig = EventSignature::simple("INFO", "Test event");
651 registry.with_event_state(sig, |state, now| {
652 for _ in 0..10 {
653 state.counter.record_suppression(now);
654 }
655 });
656
657 tokio::time::sleep(Duration::from_millis(50)).await;
659 handle.shutdown().await.expect("shutdown failed");
660
661 let emission_list = emissions.lock().unwrap();
663 assert_eq!(emission_list.len(), 1);
664 assert_eq!(emission_list[0], 1); }
666
667 #[cfg(feature = "async")]
668 #[tokio::test]
669 async fn test_shutdown_without_final_emission() {
670 use std::sync::Mutex;
671
672 let storage = Arc::new(ShardedStorage::new());
673 let clock = Arc::new(SystemClock::new());
674 let policy = Policy::count_based(100).unwrap();
675 let registry = SuppressionRegistry::new(storage, clock, policy);
676 let config = EmitterConfig::new(Duration::from_secs(60)).unwrap();
677
678 let emitter = SummaryEmitter::new(registry.clone(), config);
679
680 let emissions = Arc::new(Mutex::new(0));
681 let emissions_clone = Arc::clone(&emissions);
682
683 let handle = emitter.start(
684 move |_| {
685 *emissions_clone.lock().unwrap() += 1;
686 },
687 false, );
689
690 tokio::time::sleep(Duration::from_millis(50)).await;
692
693 let sig = EventSignature::simple("INFO", "Test event");
695 registry.with_event_state(sig, |state, now| {
696 state.counter.record_suppression(now);
697 });
698
699 tokio::time::sleep(Duration::from_millis(50)).await;
701 handle.shutdown().await.expect("shutdown failed");
702
703 assert_eq!(*emissions.lock().unwrap(), 0);
705 }
706
707 #[cfg(feature = "async")]
708 #[tokio::test]
709 async fn test_is_running() {
710 let storage = Arc::new(ShardedStorage::new());
711 let clock = Arc::new(SystemClock::new());
712 let policy = Policy::count_based(100).unwrap();
713 let registry = SuppressionRegistry::new(storage, clock, policy);
714 let config = EmitterConfig::new(Duration::from_millis(100)).unwrap();
715
716 let emitter = SummaryEmitter::new(registry, config);
717 let handle = emitter.start(|_| {}, false);
718
719 assert!(handle.is_running());
721
722 handle.shutdown().await.expect("shutdown failed");
724
725 tokio::time::sleep(Duration::from_millis(50)).await;
727 }
729
730 #[cfg(feature = "async")]
731 #[tokio::test]
732 async fn test_shutdown_during_emission() {
733 use std::sync::{Arc, Mutex};
734
735 let storage = Arc::new(ShardedStorage::new());
736 let clock = Arc::new(SystemClock::new());
737 let policy = Policy::count_based(100).unwrap();
738 let registry = SuppressionRegistry::new(storage, clock, policy);
739 let config = EmitterConfig::new(Duration::from_millis(50)).unwrap();
740
741 let sig = EventSignature::simple("INFO", "Test");
743 registry.with_event_state(sig, |state, now| {
744 state.counter.record_suppression(now);
745 });
746
747 let emitter = SummaryEmitter::new(registry, config);
748
749 let emissions = Arc::new(Mutex::new(0));
750 let emissions_clone = Arc::clone(&emissions);
751
752 let handle = emitter.start(
753 move |_| {
754 std::thread::sleep(Duration::from_millis(30));
756 *emissions_clone.lock().unwrap() += 1;
757 },
758 false,
759 );
760
761 tokio::time::sleep(Duration::from_millis(60)).await;
763
764 handle.shutdown().await.expect("shutdown failed");
766
767 assert!(*emissions.lock().unwrap() >= 1);
769 }
770
771 #[cfg(feature = "async")]
772 #[tokio::test]
773 async fn test_shutdown_with_custom_timeout() {
774 let storage = Arc::new(ShardedStorage::new());
775 let clock = Arc::new(SystemClock::new());
776 let policy = Policy::count_based(100).unwrap();
777 let registry = SuppressionRegistry::new(storage, clock, policy);
778 let config = EmitterConfig::new(Duration::from_millis(100)).unwrap();
779
780 let emitter = SummaryEmitter::new(registry, config);
781 let handle = emitter.start(|_| {}, false);
782
783 tokio::time::sleep(Duration::from_millis(150)).await;
785
786 let result = handle.shutdown_with_timeout(Duration::from_secs(5)).await;
788 assert!(result.is_ok());
789 }
790
791 #[cfg(feature = "async")]
792 #[tokio::test]
793 async fn test_panic_in_emit_fn() {
794 use std::sync::atomic::{AtomicUsize, Ordering};
795
796 let storage = Arc::new(ShardedStorage::new());
797 let clock = Arc::new(SystemClock::new());
798 let policy = Policy::count_based(100).unwrap();
799 let registry = SuppressionRegistry::new(storage, clock, policy);
800 let config = EmitterConfig::new(Duration::from_millis(50)).unwrap();
801
802 let sig = EventSignature::simple("INFO", "Test");
804 registry.with_event_state(sig, |state, now| {
805 for _ in 0..5 {
806 state.counter.record_suppression(now);
807 }
808 });
809
810 let emitter = SummaryEmitter::new(registry, config);
811
812 let call_count = Arc::new(AtomicUsize::new(0));
813 let call_count_clone = Arc::clone(&call_count);
814
815 let handle = emitter.start(
816 move |_summaries| {
817 let count = call_count_clone.fetch_add(1, Ordering::SeqCst);
818
819 if count == 0 {
821 panic!("intentional panic for testing");
822 }
823 },
825 false,
826 );
827
828 tokio::time::sleep(Duration::from_millis(200)).await;
830
831 handle.shutdown().await.expect("shutdown failed");
832
833 let final_count = call_count.load(Ordering::SeqCst);
835 assert!(
836 final_count > 1,
837 "Task should continue after panic in emit_fn"
838 );
839 }
840
841 #[cfg(feature = "async")]
842 #[tokio::test]
843 async fn test_repeated_panic_in_emit_fn() {
844 use std::sync::atomic::{AtomicUsize, Ordering};
845
846 let storage = Arc::new(ShardedStorage::new());
847 let clock = Arc::new(SystemClock::new());
848 let policy = Policy::count_based(100).unwrap();
849 let registry = SuppressionRegistry::new(storage, clock, policy);
850 let config = EmitterConfig::new(Duration::from_millis(30)).unwrap();
851
852 let sig = EventSignature::simple("INFO", "Test");
853 registry.with_event_state(sig, |state, now| {
854 state.counter.record_suppression(now);
855 });
856
857 let emitter = SummaryEmitter::new(registry, config);
858
859 let call_count = Arc::new(AtomicUsize::new(0));
860 let call_count_clone = Arc::clone(&call_count);
861
862 let handle = emitter.start(
863 move |_summaries| {
864 call_count_clone.fetch_add(1, Ordering::SeqCst);
865 panic!("always panic");
866 },
867 false,
868 );
869
870 tokio::time::sleep(Duration::from_millis(150)).await;
872
873 handle.shutdown().await.expect("shutdown failed");
874
875 let final_count = call_count.load(Ordering::SeqCst);
877 assert!(
878 final_count >= 3,
879 "Task should continue despite repeated panics, got {} calls",
880 final_count
881 );
882 }
883
884 #[cfg(feature = "async")]
885 #[tokio::test]
886 async fn test_panic_during_final_emission() {
887 use std::sync::atomic::{AtomicBool, Ordering};
888
889 let storage = Arc::new(ShardedStorage::new());
890 let clock = Arc::new(SystemClock::new());
891 let policy = Policy::count_based(100).unwrap();
892 let registry = SuppressionRegistry::new(storage, clock, policy);
893 let config = EmitterConfig::new(Duration::from_secs(3600)).unwrap(); let sig = EventSignature::simple("INFO", "Test");
896 registry.with_event_state(sig, |state, now| {
897 state.counter.record_suppression(now);
898 });
899
900 let emitter = SummaryEmitter::new(registry, config);
901
902 let panicked = Arc::new(AtomicBool::new(false));
903 let panicked_clone = Arc::clone(&panicked);
904
905 let handle = emitter.start(
906 move |_summaries| {
907 panicked_clone.store(true, Ordering::SeqCst);
908 panic!("panic during final emission");
909 },
910 true, );
912
913 handle
915 .shutdown()
916 .await
917 .expect("shutdown should succeed even if final emission panics");
918
919 assert!(
921 panicked.load(Ordering::SeqCst),
922 "Final emission should have been attempted"
923 );
924 }
925
926 #[cfg(feature = "async")]
927 #[tokio::test]
928 async fn test_shutdown_timeout_with_slow_emit_fn() {
929 let storage = Arc::new(ShardedStorage::new());
930 let clock = Arc::new(SystemClock::new());
931 let policy = Policy::count_based(100).unwrap();
932 let registry = SuppressionRegistry::new(storage, clock, policy);
933 let config = EmitterConfig::new(Duration::from_secs(3600)).unwrap();
934
935 let emitter = SummaryEmitter::new(registry, config);
936
937 let handle = emitter.start(
938 move |_summaries| {
939 std::thread::sleep(Duration::from_millis(500));
941 },
942 true,
943 );
944
945 let result = handle
947 .shutdown_with_timeout(Duration::from_millis(10))
948 .await;
949
950 let _ = result; }
954
955 #[cfg(feature = "async")]
956 #[tokio::test]
957 async fn test_handle_dropped_without_shutdown() {
958 let storage = Arc::new(ShardedStorage::new());
959 let clock = Arc::new(SystemClock::new());
960 let policy = Policy::count_based(100).unwrap();
961 let registry = SuppressionRegistry::new(storage, clock, policy);
962 let config = EmitterConfig::new(Duration::from_millis(50)).unwrap();
963
964 let emitter = SummaryEmitter::new(registry, config);
965
966 let handle = emitter.start(|_summaries| {}, false);
967
968 drop(handle);
970
971 tokio::time::sleep(Duration::from_millis(100)).await;
974
975 }
977
978 #[cfg(feature = "async")]
979 #[tokio::test]
980 async fn test_concurrent_shutdown_calls() {
981 let storage = Arc::new(ShardedStorage::new());
982 let clock = Arc::new(SystemClock::new());
983 let policy = Policy::count_based(100).unwrap();
984 let registry = SuppressionRegistry::new(storage, clock, policy);
985 let config = EmitterConfig::new(Duration::from_secs(3600)).unwrap();
986
987 let emitter = SummaryEmitter::new(registry, config);
988 let handle = emitter.start(|_summaries| {}, false);
989
990 let sender = handle.shutdown_tx.clone();
992 let mut handles_vec = vec![];
993
994 for _ in 0..5 {
996 let sender_clone = sender.clone();
997 handles_vec.push(tokio::spawn(async move {
998 let _ = sender_clone.send(true);
999 }));
1000 }
1001
1002 for h in handles_vec {
1004 let _ = h.await;
1005 }
1006
1007 let _ = handle.shutdown().await;
1009 }
1010
1011 #[cfg(feature = "async")]
1012 #[tokio::test]
1013 async fn test_shutdown_signal_failure() {
1014 let storage = Arc::new(ShardedStorage::new());
1016 let clock = Arc::new(SystemClock::new());
1017 let policy = Policy::count_based(100).unwrap();
1018 let registry = SuppressionRegistry::new(storage, clock, policy);
1019 let config = EmitterConfig::new(Duration::from_millis(30)).unwrap();
1020
1021 let emitter = SummaryEmitter::new(registry, config);
1022 let handle = emitter.start(|_summaries| {}, false);
1023
1024 handle.shutdown().await.expect("shutdown should succeed");
1026
1027 tokio::time::sleep(Duration::from_millis(100)).await;
1029 }
1030}