tracing_throttle/application/
emitter.rs

1//! Summary emission for suppressed events.
2//!
3//! Periodically collects and emits summaries of suppressed events to provide
4//! visibility into what has been rate limited.
5
6use crate::application::{
7    ports::Storage,
8    registry::{EventState, SuppressionRegistry},
9};
10use crate::domain::{signature::EventSignature, summary::SuppressionSummary};
11use std::time::Duration;
12
13#[cfg(feature = "async")]
14use tokio::{sync::watch, time::interval};
15
16/// Error returned when emitter configuration validation fails.
17#[derive(Debug, Clone, PartialEq, Eq)]
18pub enum EmitterConfigError {
19    /// Summary interval duration must be greater than zero
20    ZeroSummaryInterval,
21}
22
23impl std::fmt::Display for EmitterConfigError {
24    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
25        match self {
26            EmitterConfigError::ZeroSummaryInterval => {
27                write!(f, "summary interval must be greater than 0")
28            }
29        }
30    }
31}
32
33impl std::error::Error for EmitterConfigError {}
34
35/// Error returned when emitter shutdown fails.
36#[cfg(feature = "async")]
37#[derive(Debug, Clone, PartialEq, Eq)]
38pub enum ShutdownError {
39    /// Task panicked during shutdown
40    TaskPanicked,
41    /// Task was cancelled before completing
42    TaskCancelled,
43    /// Shutdown exceeded the specified timeout
44    Timeout,
45    /// Failed to send shutdown signal (task may have already exited)
46    SignalFailed,
47}
48
49#[cfg(feature = "async")]
50impl std::fmt::Display for ShutdownError {
51    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
52        match self {
53            ShutdownError::TaskPanicked => write!(f, "emitter task panicked during shutdown"),
54            ShutdownError::TaskCancelled => write!(f, "emitter task was cancelled"),
55            ShutdownError::Timeout => write!(f, "shutdown exceeded timeout"),
56            ShutdownError::SignalFailed => write!(f, "failed to send shutdown signal"),
57        }
58    }
59}
60
61#[cfg(feature = "async")]
62impl std::error::Error for ShutdownError {}
63
64/// Configuration for summary emission.
65#[derive(Debug, Clone)]
66pub struct EmitterConfig {
67    /// How often to emit summaries
68    pub interval: Duration,
69    /// Minimum suppression count to include in summary
70    pub min_count: usize,
71}
72
73impl Default for EmitterConfig {
74    fn default() -> Self {
75        Self {
76            interval: Duration::from_secs(30),
77            min_count: 1,
78        }
79    }
80}
81
82impl EmitterConfig {
83    /// Create a new emitter config with the specified interval.
84    ///
85    /// # Errors
86    /// Returns `EmitterConfigError::ZeroSummaryInterval` if `interval` is zero.
87    pub fn new(interval: Duration) -> Result<Self, EmitterConfigError> {
88        if interval.is_zero() {
89            return Err(EmitterConfigError::ZeroSummaryInterval);
90        }
91        Ok(Self {
92            interval,
93            min_count: 1,
94        })
95    }
96
97    /// Set the minimum suppression count threshold.
98    pub fn with_min_count(mut self, min_count: usize) -> Self {
99        self.min_count = min_count;
100        self
101    }
102}
103
104/// Handle for controlling a running emitter task.
105///
106/// # Shutdown Behavior
107///
108/// You **must** call `shutdown().await` to stop the emitter task. The handle does
109/// not implement `Drop` to avoid race conditions and resource leaks.
110///
111/// If you drop the handle without calling `shutdown()`, the background task will
112/// continue running indefinitely, potentially causing:
113/// - Resource leaks (the task holds references to the registry)
114/// - Unexpected behavior if the task outlives expected lifetime
115/// - Inability to observe task failures or panics
116///
117/// # Examples
118///
119/// ```rust,no_run
120/// # use tracing_throttle::application::emitter::{SummaryEmitter, EmitterConfig};
121/// # use tracing_throttle::application::registry::SuppressionRegistry;
122/// # use tracing_throttle::domain::policy::Policy;
123/// # use tracing_throttle::infrastructure::storage::ShardedStorage;
124/// # use tracing_throttle::infrastructure::clock::SystemClock;
125/// # use std::sync::Arc;
126/// # async fn example() {
127/// # let storage = Arc::new(ShardedStorage::new());
128/// # let clock = Arc::new(SystemClock::new());
129/// # let policy = Policy::count_based(100).unwrap();
130/// # let registry = SuppressionRegistry::new(storage, clock, policy);
131/// # let config = EmitterConfig::default();
132/// # let emitter = SummaryEmitter::new(registry, config);
133/// let handle = emitter.start(|_| {}, false);
134///
135/// // Always call shutdown explicitly
136/// handle.shutdown().await.expect("shutdown failed");
137/// # }
138/// ```
139#[cfg(feature = "async")]
140pub struct EmitterHandle {
141    shutdown_tx: watch::Sender<bool>,
142    join_handle: Option<tokio::task::JoinHandle<()>>,
143}
144
145#[cfg(feature = "async")]
146impl EmitterHandle {
147    /// Trigger graceful shutdown and wait for the task to complete.
148    ///
149    /// This method uses a default timeout of 10 seconds. For custom timeout durations,
150    /// use [`shutdown_with_timeout`](Self::shutdown_with_timeout).
151    ///
152    /// # Errors
153    ///
154    /// Returns an error if:
155    /// - The task panics during shutdown
156    /// - The task is cancelled
157    /// - Shutdown exceeds the timeout (10 seconds)
158    /// - The shutdown signal fails to send
159    ///
160    /// # Examples
161    ///
162    /// ```no_run
163    /// # use tracing_throttle::application::emitter::{SummaryEmitter, EmitterConfig};
164    /// # use tracing_throttle::application::registry::SuppressionRegistry;
165    /// # use tracing_throttle::domain::policy::Policy;
166    /// # use tracing_throttle::infrastructure::storage::ShardedStorage;
167    /// # use tracing_throttle::infrastructure::clock::SystemClock;
168    /// # use std::sync::Arc;
169    /// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
170    /// # let storage = Arc::new(ShardedStorage::new());
171    /// # let clock = Arc::new(SystemClock::new());
172    /// # let policy = Policy::count_based(100).unwrap();
173    /// # let registry = SuppressionRegistry::new(storage, clock, policy);
174    /// # let config = EmitterConfig::default();
175    /// # let emitter = SummaryEmitter::new(registry, config);
176    /// let handle = emitter.start(|_| {}, false);
177    ///
178    /// // Do some work...
179    ///
180    /// // Clean shutdown with default 10 second timeout
181    /// handle.shutdown().await?;
182    /// # Ok(())
183    /// # }
184    /// ```
185    pub async fn shutdown(self) -> Result<(), ShutdownError> {
186        self.shutdown_with_timeout(Duration::from_secs(10)).await
187    }
188
189    /// Trigger graceful shutdown with a custom timeout.
190    ///
191    /// This method:
192    /// 1. Sends the shutdown signal
193    /// 2. Waits for the background task to finish (up to the timeout)
194    /// 3. If the timeout is exceeded, the task is aborted
195    ///
196    /// # Errors
197    ///
198    /// Returns an error if:
199    /// - The task panics during shutdown
200    /// - The task is cancelled
201    /// - Shutdown exceeds the specified timeout
202    /// - The shutdown signal fails to send
203    ///
204    /// # Examples
205    ///
206    /// ```no_run
207    /// # use tracing_throttle::application::emitter::{SummaryEmitter, EmitterConfig};
208    /// # use tracing_throttle::application::registry::SuppressionRegistry;
209    /// # use tracing_throttle::domain::policy::Policy;
210    /// # use tracing_throttle::infrastructure::storage::ShardedStorage;
211    /// # use tracing_throttle::infrastructure::clock::SystemClock;
212    /// # use std::sync::Arc;
213    /// # use std::time::Duration;
214    /// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
215    /// # let storage = Arc::new(ShardedStorage::new());
216    /// # let clock = Arc::new(SystemClock::new());
217    /// # let policy = Policy::count_based(100).unwrap();
218    /// # let registry = SuppressionRegistry::new(storage, clock, policy);
219    /// # let config = EmitterConfig::default();
220    /// # let emitter = SummaryEmitter::new(registry, config);
221    /// let handle = emitter.start(|_| {}, false);
222    ///
223    /// // Do some work...
224    ///
225    /// // Clean shutdown with 5 second timeout
226    /// handle.shutdown_with_timeout(Duration::from_secs(5)).await?;
227    /// # Ok(())
228    /// # }
229    /// ```
230    pub async fn shutdown_with_timeout(
231        mut self,
232        timeout_duration: Duration,
233    ) -> Result<(), ShutdownError> {
234        use tokio::time::timeout;
235
236        // Send shutdown signal
237        if self.shutdown_tx.send(true).is_err() {
238            return Err(ShutdownError::SignalFailed);
239        }
240
241        // Wait for task to complete with timeout
242        if let Some(handle) = self.join_handle.take() {
243            match timeout(timeout_duration, handle).await {
244                Ok(Ok(())) => Ok(()),
245                Ok(Err(e)) if e.is_panic() => Err(ShutdownError::TaskPanicked),
246                Ok(Err(e)) if e.is_cancelled() => Err(ShutdownError::TaskCancelled),
247                Ok(Err(_)) => Err(ShutdownError::TaskPanicked), // Treat unknown errors as panics
248                Err(_) => Err(ShutdownError::Timeout),
249            }
250        } else {
251            Ok(())
252        }
253    }
254
255    /// Check if the emitter task is still running.
256    pub fn is_running(&self) -> bool {
257        self.join_handle.as_ref().is_some_and(|h| !h.is_finished())
258    }
259}
260
261/// Emits periodic summaries of suppressed events.
262pub struct SummaryEmitter<S>
263where
264    S: Storage<EventSignature, EventState> + Clone,
265{
266    registry: SuppressionRegistry<S>,
267    config: EmitterConfig,
268}
269
270impl<S> SummaryEmitter<S>
271where
272    S: Storage<EventSignature, EventState> + Clone,
273{
274    /// Create a new summary emitter.
275    pub fn new(registry: SuppressionRegistry<S>, config: EmitterConfig) -> Self {
276        Self { registry, config }
277    }
278
279    /// Collect current suppression summaries.
280    ///
281    /// Returns summaries for all events that have been suppressed at least
282    /// `min_count` times.
283    pub fn collect_summaries(&self) -> Vec<SuppressionSummary> {
284        let mut summaries = Vec::new();
285        let min_count = self.config.min_count;
286
287        self.registry.for_each(|signature, state| {
288            let count = state.counter.count();
289
290            if count >= min_count {
291                let summary = SuppressionSummary::from_counter(*signature, &state.counter);
292                summaries.push(summary);
293            }
294        });
295
296        summaries
297    }
298
299    /// Start emitting summaries periodically (async version).
300    ///
301    /// This spawns a background task that emits summaries at the configured interval.
302    /// The task will run until `shutdown()` is called on the returned `EmitterHandle`.
303    ///
304    /// # Graceful Shutdown
305    ///
306    /// When `shutdown()` is called on the `EmitterHandle`:
307    /// 1. The shutdown signal is prioritized over tick events
308    /// 2. The current emission completes if in progress
309    /// 3. If `emit_final` is true, one final emission occurs with current summaries
310    /// 4. The background task completes gracefully
311    ///
312    /// # Cancellation Safety
313    ///
314    /// The spawned task is cancellation-safe:
315    /// - `collect_summaries()` reads atomically from storage without mutations
316    /// - If cancelled during emission, the next startup will see correct state
317    /// - Panics in `emit_fn` are caught and don't abort the task
318    /// - The `emit_fn` closure should be cancellation-safe (avoid holding locks across `.await`)
319    ///
320    /// # Type Parameters
321    ///
322    /// * `F` - The emission function. Must be `Send + 'static` because it runs
323    ///   in a spawned task that may execute on any thread in the tokio runtime.
324    ///   The function receives ownership of the summaries vector.
325    ///
326    /// # Examples
327    ///
328    /// ```no_run
329    /// # use tracing_throttle::application::emitter::{SummaryEmitter, EmitterConfig};
330    /// # use tracing_throttle::application::registry::SuppressionRegistry;
331    /// # use tracing_throttle::domain::policy::Policy;
332    /// # use tracing_throttle::infrastructure::storage::ShardedStorage;
333    /// # use tracing_throttle::infrastructure::clock::SystemClock;
334    /// # use std::sync::Arc;
335    /// # use std::time::Duration;
336    /// # async fn example() {
337    /// # let storage = Arc::new(ShardedStorage::new());
338    /// # let clock = Arc::new(SystemClock::new());
339    /// # let policy = Policy::count_based(100).unwrap();
340    /// # let registry = SuppressionRegistry::new(storage, clock, policy);
341    /// # let config = EmitterConfig::default();
342    /// let emitter = SummaryEmitter::new(registry, config);
343    /// let handle = emitter.start(|summaries| {
344    ///     for summary in summaries {
345    ///         tracing::warn!("{}", summary.format_message());
346    ///     }
347    /// }, true);
348    ///
349    /// // Later, trigger graceful shutdown
350    /// handle.shutdown().await.expect("shutdown failed");
351    /// # }
352    /// ```
353    #[cfg(feature = "async")]
354    pub fn start<F>(self, mut emit_fn: F, emit_final: bool) -> EmitterHandle
355    where
356        F: FnMut(Vec<SuppressionSummary>) + Send + 'static,
357        S: Send + 'static,
358    {
359        let (shutdown_tx, mut shutdown_rx) = tokio::sync::watch::channel(false);
360
361        let handle = tokio::spawn(async move {
362            let mut ticker = interval(self.config.interval);
363            // Skip missed ticks to prevent backpressure if emissions are slow
364            ticker.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip);
365
366            loop {
367                tokio::select! {
368                    // Prioritize shutdown signal to ensure fast shutdown
369                    biased;
370
371                    _ = shutdown_rx.changed() => {
372                        if *shutdown_rx.borrow_and_update() {
373                            // Emit final summaries if requested
374                            if emit_final {
375                                let summaries = self.collect_summaries();
376                                if !summaries.is_empty() {
377                                    // Panic safety for final emission too
378                                    // Note: summaries will be properly dropped even if emit_fn panics
379                                    let result = std::panic::catch_unwind(std::panic::AssertUnwindSafe(|| {
380                                        emit_fn(summaries);
381                                    }));
382
383                                    if result.is_err() {
384                                        #[cfg(debug_assertions)]
385                                        eprintln!("Warning: emit_fn panicked during final emission");
386                                    }
387                                }
388                            }
389                            break;
390                        }
391                    }
392                    _ = ticker.tick() => {
393                        let summaries = self.collect_summaries();
394                        if !summaries.is_empty() {
395                            // Panic safety: catch panics in emit_fn to prevent task abort
396                            // Note: summaries will be properly dropped even if emit_fn panics
397                            let result = std::panic::catch_unwind(std::panic::AssertUnwindSafe(|| {
398                                emit_fn(summaries);
399                            }));
400
401                            if result.is_err() {
402                                // emit_fn panicked - summaries were dropped, continue running
403                                #[cfg(debug_assertions)]
404                                eprintln!("Warning: emit_fn panicked during emission");
405                            }
406                        }
407                    }
408                }
409            }
410        });
411
412        EmitterHandle {
413            shutdown_tx,
414            join_handle: Some(handle),
415        }
416    }
417
418    /// Get the emitter configuration.
419    pub fn config(&self) -> &EmitterConfig {
420        &self.config
421    }
422
423    /// Get a reference to the registry.
424    pub fn registry(&self) -> &SuppressionRegistry<S> {
425        &self.registry
426    }
427}
428
429#[cfg(test)]
430mod tests {
431    use super::*;
432    use crate::domain::{policy::Policy, signature::EventSignature};
433    use crate::infrastructure::clock::SystemClock;
434    use crate::infrastructure::storage::ShardedStorage;
435    use std::sync::Arc;
436
437    #[test]
438    fn test_collect_summaries_empty() {
439        let storage = Arc::new(ShardedStorage::new());
440        let clock = Arc::new(SystemClock::new());
441        let policy = Policy::count_based(100).unwrap();
442        let registry = SuppressionRegistry::new(storage, clock, policy);
443        let config = EmitterConfig::default();
444        let emitter = SummaryEmitter::new(registry, config);
445
446        let summaries = emitter.collect_summaries();
447        assert!(summaries.is_empty());
448    }
449
450    #[test]
451    fn test_collect_summaries_with_suppressions() {
452        let storage = Arc::new(ShardedStorage::new());
453        let clock = Arc::new(SystemClock::new());
454        let policy = Policy::count_based(100).unwrap();
455        let registry = SuppressionRegistry::new(storage, clock, policy);
456        let config = EmitterConfig::default();
457
458        // Add some suppressed events
459        for i in 0..3 {
460            let sig = EventSignature::simple("INFO", &format!("Message {}", i));
461            registry.with_event_state(sig, |state, now| {
462                // Simulate some suppressions
463                for _ in 0..(i + 1) * 5 {
464                    state.counter.record_suppression(now);
465                }
466            });
467        }
468
469        let emitter = SummaryEmitter::new(registry, config);
470        let summaries = emitter.collect_summaries();
471
472        assert_eq!(summaries.len(), 3);
473
474        // Verify counts
475        let counts: Vec<usize> = summaries.iter().map(|s| s.count).collect();
476        assert!(counts.contains(&6)); // 5 + 1 (initial)
477        assert!(counts.contains(&11)); // 10 + 1
478        assert!(counts.contains(&16)); // 15 + 1
479    }
480
481    #[test]
482    fn test_min_count_filtering() {
483        let storage = Arc::new(ShardedStorage::new());
484        let clock = Arc::new(SystemClock::new());
485        let policy = Policy::count_based(100).unwrap();
486        let registry = SuppressionRegistry::new(storage, clock, policy);
487        let config = EmitterConfig::default().with_min_count(10);
488
489        // Add event with low count (below threshold)
490        let sig1 = EventSignature::simple("INFO", "Low count");
491        registry.with_event_state(sig1, |state, now| {
492            for _ in 0..4 {
493                state.counter.record_suppression(now);
494            }
495        });
496
497        // Add event with high count (above threshold)
498        let sig2 = EventSignature::simple("INFO", "High count");
499        registry.with_event_state(sig2, |state, now| {
500            for _ in 0..14 {
501                state.counter.record_suppression(now);
502            }
503        });
504
505        let emitter = SummaryEmitter::new(registry, config);
506        let summaries = emitter.collect_summaries();
507
508        // Only the high-count event should be included
509        assert_eq!(summaries.len(), 1);
510        assert_eq!(summaries[0].count, 15); // 14 + 1 initial
511    }
512
513    #[cfg(feature = "async")]
514    #[tokio::test]
515    async fn test_async_emission() {
516        use std::sync::Mutex;
517
518        let storage = Arc::new(ShardedStorage::new());
519        let clock = Arc::new(SystemClock::new());
520        let policy = Policy::count_based(100).unwrap();
521        let registry = SuppressionRegistry::new(storage, clock, policy);
522        let config = EmitterConfig::new(Duration::from_millis(100)).unwrap();
523
524        // Add a suppressed event
525        let sig = EventSignature::simple("INFO", "Test");
526        registry.with_event_state(sig, |state, now| {
527            state.counter.record_suppression(now);
528        });
529
530        let emitter = SummaryEmitter::new(registry, config);
531
532        // Track emissions
533        let emissions = Arc::new(Mutex::new(Vec::new()));
534        let emissions_clone = Arc::clone(&emissions);
535
536        let handle = emitter.start(
537            move |summaries| {
538                emissions_clone.lock().unwrap().push(summaries.len());
539            },
540            false,
541        );
542
543        // Wait for a couple of intervals
544        tokio::time::sleep(Duration::from_millis(250)).await;
545
546        handle.shutdown().await.expect("shutdown failed");
547
548        // Should have emitted at least once
549        let emission_count = emissions.lock().unwrap().len();
550        assert!(emission_count >= 2);
551    }
552
553    #[test]
554    fn test_emitter_config_zero_interval() {
555        let result = EmitterConfig::new(Duration::from_secs(0));
556        assert!(matches!(
557            result,
558            Err(EmitterConfigError::ZeroSummaryInterval)
559        ));
560    }
561
562    #[test]
563    fn test_emitter_config_valid_interval() {
564        let config = EmitterConfig::new(Duration::from_secs(30)).unwrap();
565        assert_eq!(config.interval, Duration::from_secs(30));
566        assert_eq!(config.min_count, 1);
567    }
568
569    #[cfg(feature = "async")]
570    #[tokio::test]
571    async fn test_graceful_shutdown() {
572        use std::sync::Mutex;
573
574        let storage = Arc::new(ShardedStorage::new());
575        let clock = Arc::new(SystemClock::new());
576        let policy = Policy::count_based(100).unwrap();
577        let registry = SuppressionRegistry::new(storage, clock, policy);
578        let config = EmitterConfig::new(Duration::from_millis(100)).unwrap();
579
580        // Add a suppressed event so there's something to emit
581        let sig = EventSignature::simple("INFO", "Test");
582        registry.with_event_state(sig, |state, now| {
583            state.counter.record_suppression(now);
584        });
585
586        let emitter = SummaryEmitter::new(registry, config);
587
588        let emissions = Arc::new(Mutex::new(0));
589        let emissions_clone = Arc::clone(&emissions);
590
591        let handle = emitter.start(
592            move |_| {
593                *emissions_clone.lock().unwrap() += 1;
594            },
595            false,
596        );
597
598        // Let it run for a bit
599        tokio::time::sleep(Duration::from_millis(250)).await;
600
601        // Trigger graceful shutdown
602        handle.shutdown().await.expect("shutdown failed");
603
604        // Verify task is no longer running
605        let final_count = *emissions.lock().unwrap();
606        assert!(final_count >= 1);
607
608        // Wait a bit more to ensure task really stopped
609        tokio::time::sleep(Duration::from_millis(150)).await;
610        let count_after_shutdown = *emissions.lock().unwrap();
611        assert_eq!(count_after_shutdown, final_count);
612    }
613
614    #[cfg(feature = "async")]
615    #[tokio::test]
616    async fn test_shutdown_with_final_emission() {
617        use std::sync::Mutex;
618
619        let storage = Arc::new(ShardedStorage::new());
620        let clock = Arc::new(SystemClock::new());
621        let policy = Policy::count_based(100).unwrap();
622        let registry = SuppressionRegistry::new(storage, clock, policy);
623        let config = EmitterConfig::new(Duration::from_secs(60)).unwrap(); // Long interval
624
625        let emitter = SummaryEmitter::new(registry.clone(), config);
626
627        let emissions = Arc::new(Mutex::new(Vec::new()));
628        let emissions_clone = Arc::clone(&emissions);
629
630        let handle = emitter.start(
631            move |summaries| {
632                emissions_clone.lock().unwrap().push(summaries.len());
633            },
634            true, // Emit final summaries
635        );
636
637        // Wait for first tick to complete (interval's first tick is immediate)
638        tokio::time::sleep(Duration::from_millis(50)).await;
639
640        // Now add some suppressions after the first tick
641        let sig = EventSignature::simple("INFO", "Test event");
642        registry.with_event_state(sig, |state, now| {
643            for _ in 0..10 {
644                state.counter.record_suppression(now);
645            }
646        });
647
648        // Shutdown before next interval (which is 60 seconds away)
649        tokio::time::sleep(Duration::from_millis(50)).await;
650        handle.shutdown().await.expect("shutdown failed");
651
652        // Should have emitted final summaries
653        let emission_list = emissions.lock().unwrap();
654        assert_eq!(emission_list.len(), 1);
655        assert_eq!(emission_list[0], 1); // 1 summary
656    }
657
658    #[cfg(feature = "async")]
659    #[tokio::test]
660    async fn test_shutdown_without_final_emission() {
661        use std::sync::Mutex;
662
663        let storage = Arc::new(ShardedStorage::new());
664        let clock = Arc::new(SystemClock::new());
665        let policy = Policy::count_based(100).unwrap();
666        let registry = SuppressionRegistry::new(storage, clock, policy);
667        let config = EmitterConfig::new(Duration::from_secs(60)).unwrap();
668
669        let emitter = SummaryEmitter::new(registry.clone(), config);
670
671        let emissions = Arc::new(Mutex::new(0));
672        let emissions_clone = Arc::clone(&emissions);
673
674        let handle = emitter.start(
675            move |_| {
676                *emissions_clone.lock().unwrap() += 1;
677            },
678            false, // No final emission
679        );
680
681        // Wait for first tick (immediate, but no emissions since no suppressions yet)
682        tokio::time::sleep(Duration::from_millis(50)).await;
683
684        // Add some suppressions after first tick
685        let sig = EventSignature::simple("INFO", "Test event");
686        registry.with_event_state(sig, |state, now| {
687            state.counter.record_suppression(now);
688        });
689
690        // Shutdown immediately (before next 60-second interval)
691        tokio::time::sleep(Duration::from_millis(50)).await;
692        handle.shutdown().await.expect("shutdown failed");
693
694        // Should not have emitted anything (no final emission)
695        assert_eq!(*emissions.lock().unwrap(), 0);
696    }
697
698    #[cfg(feature = "async")]
699    #[tokio::test]
700    async fn test_is_running() {
701        let storage = Arc::new(ShardedStorage::new());
702        let clock = Arc::new(SystemClock::new());
703        let policy = Policy::count_based(100).unwrap();
704        let registry = SuppressionRegistry::new(storage, clock, policy);
705        let config = EmitterConfig::new(Duration::from_millis(100)).unwrap();
706
707        let emitter = SummaryEmitter::new(registry, config);
708        let handle = emitter.start(|_| {}, false);
709
710        // Should be running
711        assert!(handle.is_running());
712
713        // Shutdown
714        handle.shutdown().await.expect("shutdown failed");
715
716        // Should no longer be running
717        tokio::time::sleep(Duration::from_millis(50)).await;
718        // Note: is_running() consumes self, so we can't check after shutdown
719    }
720
721    #[cfg(feature = "async")]
722    #[tokio::test]
723    async fn test_shutdown_during_emission() {
724        use std::sync::{Arc, Mutex};
725
726        let storage = Arc::new(ShardedStorage::new());
727        let clock = Arc::new(SystemClock::new());
728        let policy = Policy::count_based(100).unwrap();
729        let registry = SuppressionRegistry::new(storage, clock, policy);
730        let config = EmitterConfig::new(Duration::from_millis(50)).unwrap();
731
732        // Add suppressions
733        let sig = EventSignature::simple("INFO", "Test");
734        registry.with_event_state(sig, |state, now| {
735            state.counter.record_suppression(now);
736        });
737
738        let emitter = SummaryEmitter::new(registry, config);
739
740        let emissions = Arc::new(Mutex::new(0));
741        let emissions_clone = Arc::clone(&emissions);
742
743        let handle = emitter.start(
744            move |_| {
745                // Simulate slow emission
746                std::thread::sleep(Duration::from_millis(30));
747                *emissions_clone.lock().unwrap() += 1;
748            },
749            false,
750        );
751
752        // Let first emission start
753        tokio::time::sleep(Duration::from_millis(60)).await;
754
755        // Shutdown should wait for current emission to complete
756        handle.shutdown().await.expect("shutdown failed");
757
758        // Current emission should have completed
759        assert!(*emissions.lock().unwrap() >= 1);
760    }
761
762    #[cfg(feature = "async")]
763    #[tokio::test]
764    async fn test_shutdown_with_custom_timeout() {
765        let storage = Arc::new(ShardedStorage::new());
766        let clock = Arc::new(SystemClock::new());
767        let policy = Policy::count_based(100).unwrap();
768        let registry = SuppressionRegistry::new(storage, clock, policy);
769        let config = EmitterConfig::new(Duration::from_millis(100)).unwrap();
770
771        let emitter = SummaryEmitter::new(registry, config);
772        let handle = emitter.start(|_| {}, false);
773
774        // Let it run briefly
775        tokio::time::sleep(Duration::from_millis(150)).await;
776
777        // Shutdown with custom timeout should succeed quickly
778        let result = handle.shutdown_with_timeout(Duration::from_secs(5)).await;
779        assert!(result.is_ok());
780    }
781
782    #[cfg(feature = "async")]
783    #[tokio::test]
784    async fn test_panic_in_emit_fn() {
785        use std::sync::atomic::{AtomicUsize, Ordering};
786
787        let storage = Arc::new(ShardedStorage::new());
788        let clock = Arc::new(SystemClock::new());
789        let policy = Policy::count_based(100).unwrap();
790        let registry = SuppressionRegistry::new(storage, clock, policy);
791        let config = EmitterConfig::new(Duration::from_millis(50)).unwrap();
792
793        // Add suppressions
794        let sig = EventSignature::simple("INFO", "Test");
795        registry.with_event_state(sig, |state, now| {
796            for _ in 0..5 {
797                state.counter.record_suppression(now);
798            }
799        });
800
801        let emitter = SummaryEmitter::new(registry, config);
802
803        let call_count = Arc::new(AtomicUsize::new(0));
804        let call_count_clone = Arc::clone(&call_count);
805
806        let handle = emitter.start(
807            move |_summaries| {
808                let count = call_count_clone.fetch_add(1, Ordering::SeqCst);
809
810                // Panic on first call, succeed on subsequent calls
811                if count == 0 {
812                    panic!("intentional panic for testing");
813                }
814                // If we get here, panic was handled and task continued
815            },
816            false,
817        );
818
819        // Let it emit multiple times - first should panic, rest should succeed
820        tokio::time::sleep(Duration::from_millis(200)).await;
821
822        handle.shutdown().await.expect("shutdown failed");
823
824        // Should have attempted multiple emissions (first panicked, others succeeded)
825        let final_count = call_count.load(Ordering::SeqCst);
826        assert!(
827            final_count > 1,
828            "Task should continue after panic in emit_fn"
829        );
830    }
831}