tracing_throttle/application/
emitter.rs

1//! Summary emission for suppressed events.
2//!
3//! Periodically collects and emits summaries of suppressed events to provide
4//! visibility into what has been rate limited.
5
6use crate::application::{
7    ports::Storage,
8    registry::{EventState, SuppressionRegistry},
9};
10use crate::domain::{signature::EventSignature, summary::SuppressionSummary};
11use std::time::Duration;
12
13#[cfg(feature = "async")]
14use tokio::{sync::watch, time::interval};
15
16/// Error returned when emitter configuration validation fails.
17#[derive(Debug, Clone, PartialEq, Eq)]
18pub enum EmitterConfigError {
19    /// Summary interval duration must be greater than zero
20    ZeroSummaryInterval,
21}
22
23impl std::fmt::Display for EmitterConfigError {
24    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
25        match self {
26            EmitterConfigError::ZeroSummaryInterval => {
27                write!(f, "summary interval must be greater than 0")
28            }
29        }
30    }
31}
32
33impl std::error::Error for EmitterConfigError {}
34
35/// Error returned when emitter shutdown fails.
36#[cfg(feature = "async")]
37#[derive(Debug, Clone, PartialEq, Eq)]
38pub enum ShutdownError {
39    /// Task panicked during shutdown
40    TaskPanicked,
41    /// Task was cancelled before completing
42    TaskCancelled,
43    /// Shutdown exceeded the specified timeout
44    Timeout,
45    /// Failed to send shutdown signal (task may have already exited)
46    SignalFailed,
47}
48
49#[cfg(feature = "async")]
50impl std::fmt::Display for ShutdownError {
51    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
52        match self {
53            ShutdownError::TaskPanicked => write!(f, "emitter task panicked during shutdown"),
54            ShutdownError::TaskCancelled => write!(f, "emitter task was cancelled"),
55            ShutdownError::Timeout => write!(f, "shutdown exceeded timeout"),
56            ShutdownError::SignalFailed => write!(f, "failed to send shutdown signal"),
57        }
58    }
59}
60
61#[cfg(feature = "async")]
62impl std::error::Error for ShutdownError {}
63
64/// Configuration for summary emission.
65#[derive(Debug, Clone)]
66pub struct EmitterConfig {
67    /// How often to emit summaries
68    pub interval: Duration,
69    /// Minimum suppression count to include in summary
70    pub min_count: usize,
71}
72
73impl Default for EmitterConfig {
74    fn default() -> Self {
75        Self {
76            interval: Duration::from_secs(30),
77            min_count: 1,
78        }
79    }
80}
81
82impl EmitterConfig {
83    /// Create a new emitter config with the specified interval.
84    ///
85    /// # Errors
86    /// Returns `EmitterConfigError::ZeroSummaryInterval` if `interval` is zero.
87    pub fn new(interval: Duration) -> Result<Self, EmitterConfigError> {
88        if interval.is_zero() {
89            return Err(EmitterConfigError::ZeroSummaryInterval);
90        }
91        Ok(Self {
92            interval,
93            min_count: 1,
94        })
95    }
96
97    /// Set the minimum suppression count threshold.
98    pub fn with_min_count(mut self, min_count: usize) -> Self {
99        self.min_count = min_count;
100        self
101    }
102}
103
104/// Handle for controlling a running emitter task.
105///
106/// # Shutdown Behavior
107///
108/// You **must** call `shutdown().await` to stop the emitter task. The handle does
109/// not implement `Drop` to avoid race conditions and resource leaks.
110///
111/// If you drop the handle without calling `shutdown()`, the background task will
112/// continue running indefinitely, potentially causing:
113/// - Resource leaks (the task holds references to the registry)
114/// - Unexpected behavior if the task outlives expected lifetime
115/// - Inability to observe task failures or panics
116///
117/// # Examples
118///
119/// ```rust,no_run
120/// # use tracing_throttle::application::emitter::{SummaryEmitter, EmitterConfig};
121/// # use tracing_throttle::application::registry::SuppressionRegistry;
122/// # use tracing_throttle::domain::policy::Policy;
123/// # use tracing_throttle::infrastructure::storage::ShardedStorage;
124/// # use tracing_throttle::infrastructure::clock::SystemClock;
125/// # use std::sync::Arc;
126/// # async fn example() {
127/// # let storage = Arc::new(ShardedStorage::new());
128/// # let clock = Arc::new(SystemClock::new());
129/// # let policy = Policy::count_based(100).unwrap();
130/// # let registry = SuppressionRegistry::new(storage, clock, policy);
131/// # let config = EmitterConfig::default();
132/// # let emitter = SummaryEmitter::new(registry, config);
133/// let handle = emitter.start(|_| {}, false);
134///
135/// // Always call shutdown explicitly
136/// handle.shutdown().await.expect("shutdown failed");
137/// # }
138/// ```
139#[cfg(feature = "async")]
140pub struct EmitterHandle {
141    shutdown_tx: watch::Sender<bool>,
142    join_handle: Option<tokio::task::JoinHandle<()>>,
143}
144
145#[cfg(feature = "async")]
146impl EmitterHandle {
147    /// Trigger graceful shutdown and wait for the task to complete.
148    ///
149    /// This method uses a default timeout of 10 seconds. For custom timeout durations,
150    /// use [`shutdown_with_timeout`](Self::shutdown_with_timeout).
151    ///
152    /// # Errors
153    ///
154    /// Returns an error if:
155    /// - The task panics during shutdown
156    /// - The task is cancelled
157    /// - Shutdown exceeds the timeout (10 seconds)
158    /// - The shutdown signal fails to send
159    ///
160    /// # Examples
161    ///
162    /// ```no_run
163    /// # use tracing_throttle::application::emitter::{SummaryEmitter, EmitterConfig};
164    /// # use tracing_throttle::application::registry::SuppressionRegistry;
165    /// # use tracing_throttle::domain::policy::Policy;
166    /// # use tracing_throttle::infrastructure::storage::ShardedStorage;
167    /// # use tracing_throttle::infrastructure::clock::SystemClock;
168    /// # use std::sync::Arc;
169    /// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
170    /// # let storage = Arc::new(ShardedStorage::new());
171    /// # let clock = Arc::new(SystemClock::new());
172    /// # let policy = Policy::count_based(100).unwrap();
173    /// # let registry = SuppressionRegistry::new(storage, clock, policy);
174    /// # let config = EmitterConfig::default();
175    /// # let emitter = SummaryEmitter::new(registry, config);
176    /// let handle = emitter.start(|_| {}, false);
177    ///
178    /// // Do some work...
179    ///
180    /// // Clean shutdown with default 10 second timeout
181    /// handle.shutdown().await?;
182    /// # Ok(())
183    /// # }
184    /// ```
185    pub async fn shutdown(self) -> Result<(), ShutdownError> {
186        self.shutdown_with_timeout(Duration::from_secs(10)).await
187    }
188
189    /// Trigger graceful shutdown with a custom timeout.
190    ///
191    /// This method:
192    /// 1. Sends the shutdown signal
193    /// 2. Waits for the background task to finish (up to the timeout)
194    /// 3. If the timeout is exceeded, the task is aborted
195    ///
196    /// # Errors
197    ///
198    /// Returns an error if:
199    /// - The task panics during shutdown
200    /// - The task is cancelled
201    /// - Shutdown exceeds the specified timeout
202    /// - The shutdown signal fails to send
203    ///
204    /// # Examples
205    ///
206    /// ```no_run
207    /// # use tracing_throttle::application::emitter::{SummaryEmitter, EmitterConfig};
208    /// # use tracing_throttle::application::registry::SuppressionRegistry;
209    /// # use tracing_throttle::domain::policy::Policy;
210    /// # use tracing_throttle::infrastructure::storage::ShardedStorage;
211    /// # use tracing_throttle::infrastructure::clock::SystemClock;
212    /// # use std::sync::Arc;
213    /// # use std::time::Duration;
214    /// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
215    /// # let storage = Arc::new(ShardedStorage::new());
216    /// # let clock = Arc::new(SystemClock::new());
217    /// # let policy = Policy::count_based(100).unwrap();
218    /// # let registry = SuppressionRegistry::new(storage, clock, policy);
219    /// # let config = EmitterConfig::default();
220    /// # let emitter = SummaryEmitter::new(registry, config);
221    /// let handle = emitter.start(|_| {}, false);
222    ///
223    /// // Do some work...
224    ///
225    /// // Clean shutdown with 5 second timeout
226    /// handle.shutdown_with_timeout(Duration::from_secs(5)).await?;
227    /// # Ok(())
228    /// # }
229    /// ```
230    pub async fn shutdown_with_timeout(
231        mut self,
232        timeout_duration: Duration,
233    ) -> Result<(), ShutdownError> {
234        use tokio::time::timeout;
235
236        // Send shutdown signal
237        if self.shutdown_tx.send(true).is_err() {
238            return Err(ShutdownError::SignalFailed);
239        }
240
241        // Wait for task to complete with timeout
242        if let Some(handle) = self.join_handle.take() {
243            match timeout(timeout_duration, handle).await {
244                Ok(Ok(())) => Ok(()),
245                Ok(Err(e)) if e.is_panic() => Err(ShutdownError::TaskPanicked),
246                Ok(Err(e)) if e.is_cancelled() => Err(ShutdownError::TaskCancelled),
247                Ok(Err(_)) => Err(ShutdownError::TaskPanicked), // Treat unknown errors as panics
248                Err(_) => Err(ShutdownError::Timeout),
249            }
250        } else {
251            Ok(())
252        }
253    }
254
255    /// Check if the emitter task is still running.
256    pub fn is_running(&self) -> bool {
257        self.join_handle.as_ref().is_some_and(|h| !h.is_finished())
258    }
259}
260
261/// Emits periodic summaries of suppressed events.
262pub struct SummaryEmitter<S>
263where
264    S: Storage<EventSignature, EventState> + Clone,
265{
266    registry: SuppressionRegistry<S>,
267    config: EmitterConfig,
268}
269
270impl<S> SummaryEmitter<S>
271where
272    S: Storage<EventSignature, EventState> + Clone,
273{
274    /// Create a new summary emitter.
275    pub fn new(registry: SuppressionRegistry<S>, config: EmitterConfig) -> Self {
276        Self { registry, config }
277    }
278
279    /// Collect current suppression summaries.
280    ///
281    /// Returns summaries for all events that have been suppressed at least
282    /// `min_count` times.
283    pub fn collect_summaries(&self) -> Vec<SuppressionSummary> {
284        let mut summaries = Vec::new();
285        let min_count = self.config.min_count;
286
287        self.registry.for_each(|signature, state| {
288            let count = state.counter.count();
289
290            if count >= min_count {
291                #[cfg(feature = "human-readable")]
292                let summary = SuppressionSummary::from_counter_with_metadata(
293                    *signature,
294                    &state.counter,
295                    state.metadata.clone(),
296                );
297
298                #[cfg(not(feature = "human-readable"))]
299                let summary = SuppressionSummary::from_counter(*signature, &state.counter);
300
301                summaries.push(summary);
302            }
303        });
304
305        summaries
306    }
307
308    /// Start emitting summaries periodically (async version).
309    ///
310    /// This spawns a background task that emits summaries at the configured interval.
311    /// The task will run until `shutdown()` is called on the returned `EmitterHandle`.
312    ///
313    /// # Graceful Shutdown
314    ///
315    /// When `shutdown()` is called on the `EmitterHandle`:
316    /// 1. The shutdown signal is prioritized over tick events
317    /// 2. The current emission completes if in progress
318    /// 3. If `emit_final` is true, one final emission occurs with current summaries
319    /// 4. The background task completes gracefully
320    ///
321    /// # Cancellation Safety
322    ///
323    /// The spawned task is cancellation-safe:
324    /// - `collect_summaries()` reads atomically from storage without mutations
325    /// - If cancelled during emission, the next startup will see correct state
326    /// - Panics in `emit_fn` are caught and don't abort the task
327    /// - The `emit_fn` closure should be cancellation-safe (avoid holding locks across `.await`)
328    ///
329    /// # Type Parameters
330    ///
331    /// * `F` - The emission function. Must be `Send + 'static` because it runs
332    ///   in a spawned task that may execute on any thread in the tokio runtime.
333    ///   The function receives ownership of the summaries vector.
334    ///
335    /// # Examples
336    ///
337    /// ```no_run
338    /// # use tracing_throttle::application::emitter::{SummaryEmitter, EmitterConfig};
339    /// # use tracing_throttle::application::registry::SuppressionRegistry;
340    /// # use tracing_throttle::domain::policy::Policy;
341    /// # use tracing_throttle::infrastructure::storage::ShardedStorage;
342    /// # use tracing_throttle::infrastructure::clock::SystemClock;
343    /// # use std::sync::Arc;
344    /// # use std::time::Duration;
345    /// # async fn example() {
346    /// # let storage = Arc::new(ShardedStorage::new());
347    /// # let clock = Arc::new(SystemClock::new());
348    /// # let policy = Policy::count_based(100).unwrap();
349    /// # let registry = SuppressionRegistry::new(storage, clock, policy);
350    /// # let config = EmitterConfig::default();
351    /// let emitter = SummaryEmitter::new(registry, config);
352    /// let handle = emitter.start(|summaries| {
353    ///     for summary in summaries {
354    ///         tracing::warn!("{}", summary.format_message());
355    ///     }
356    /// }, true);
357    ///
358    /// // Later, trigger graceful shutdown
359    /// handle.shutdown().await.expect("shutdown failed");
360    /// # }
361    /// ```
362    #[cfg(feature = "async")]
363    pub fn start<F>(self, mut emit_fn: F, emit_final: bool) -> EmitterHandle
364    where
365        F: FnMut(Vec<SuppressionSummary>) + Send + 'static,
366        S: Send + 'static,
367    {
368        let (shutdown_tx, mut shutdown_rx) = tokio::sync::watch::channel(false);
369
370        let handle = tokio::spawn(async move {
371            let mut ticker = interval(self.config.interval);
372            // Skip missed ticks to prevent backpressure if emissions are slow
373            ticker.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip);
374
375            loop {
376                tokio::select! {
377                    // Prioritize shutdown signal to ensure fast shutdown
378                    biased;
379
380                    _ = shutdown_rx.changed() => {
381                        if *shutdown_rx.borrow_and_update() {
382                            // Emit final summaries if requested
383                            if emit_final {
384                                let summaries = self.collect_summaries();
385                                if !summaries.is_empty() {
386                                    // Panic safety for final emission too
387                                    // Note: summaries will be properly dropped even if emit_fn panics
388                                    let result = std::panic::catch_unwind(std::panic::AssertUnwindSafe(|| {
389                                        emit_fn(summaries);
390                                    }));
391
392                                    if result.is_err() {
393                                        #[cfg(debug_assertions)]
394                                        eprintln!("Warning: emit_fn panicked during final emission");
395                                    }
396                                }
397                            }
398                            break;
399                        }
400                    }
401                    _ = ticker.tick() => {
402                        let summaries = self.collect_summaries();
403                        if !summaries.is_empty() {
404                            // Panic safety: catch panics in emit_fn to prevent task abort
405                            // Note: summaries will be properly dropped even if emit_fn panics
406                            let result = std::panic::catch_unwind(std::panic::AssertUnwindSafe(|| {
407                                emit_fn(summaries);
408                            }));
409
410                            if result.is_err() {
411                                // emit_fn panicked - summaries were dropped, continue running
412                                #[cfg(debug_assertions)]
413                                eprintln!("Warning: emit_fn panicked during emission");
414                            }
415                        }
416                    }
417                }
418            }
419        });
420
421        EmitterHandle {
422            shutdown_tx,
423            join_handle: Some(handle),
424        }
425    }
426
427    /// Get the emitter configuration.
428    pub fn config(&self) -> &EmitterConfig {
429        &self.config
430    }
431
432    /// Get a reference to the registry.
433    pub fn registry(&self) -> &SuppressionRegistry<S> {
434        &self.registry
435    }
436}
437
438#[cfg(test)]
439mod tests {
440    use super::*;
441    use crate::domain::{policy::Policy, signature::EventSignature};
442    use crate::infrastructure::clock::SystemClock;
443    use crate::infrastructure::storage::ShardedStorage;
444    use std::sync::Arc;
445
446    #[test]
447    fn test_collect_summaries_empty() {
448        let storage = Arc::new(ShardedStorage::new());
449        let clock = Arc::new(SystemClock::new());
450        let policy = Policy::count_based(100).unwrap();
451        let registry = SuppressionRegistry::new(storage, clock, policy);
452        let config = EmitterConfig::default();
453        let emitter = SummaryEmitter::new(registry, config);
454
455        let summaries = emitter.collect_summaries();
456        assert!(summaries.is_empty());
457    }
458
459    #[test]
460    fn test_collect_summaries_with_suppressions() {
461        let storage = Arc::new(ShardedStorage::new());
462        let clock = Arc::new(SystemClock::new());
463        let policy = Policy::count_based(100).unwrap();
464        let registry = SuppressionRegistry::new(storage, clock, policy);
465        let config = EmitterConfig::default();
466
467        // Add some suppressed events
468        for i in 0..3 {
469            let sig = EventSignature::simple("INFO", &format!("Message {}", i));
470            registry.with_event_state(sig, |state, now| {
471                // Simulate some suppressions
472                for _ in 0..(i + 1) * 5 {
473                    state.counter.record_suppression(now);
474                }
475            });
476        }
477
478        let emitter = SummaryEmitter::new(registry, config);
479        let summaries = emitter.collect_summaries();
480
481        assert_eq!(summaries.len(), 3);
482
483        // Verify counts
484        let counts: Vec<usize> = summaries.iter().map(|s| s.count).collect();
485        assert!(counts.contains(&6)); // 5 + 1 (initial)
486        assert!(counts.contains(&11)); // 10 + 1
487        assert!(counts.contains(&16)); // 15 + 1
488    }
489
490    #[test]
491    fn test_min_count_filtering() {
492        let storage = Arc::new(ShardedStorage::new());
493        let clock = Arc::new(SystemClock::new());
494        let policy = Policy::count_based(100).unwrap();
495        let registry = SuppressionRegistry::new(storage, clock, policy);
496        let config = EmitterConfig::default().with_min_count(10);
497
498        // Add event with low count (below threshold)
499        let sig1 = EventSignature::simple("INFO", "Low count");
500        registry.with_event_state(sig1, |state, now| {
501            for _ in 0..4 {
502                state.counter.record_suppression(now);
503            }
504        });
505
506        // Add event with high count (above threshold)
507        let sig2 = EventSignature::simple("INFO", "High count");
508        registry.with_event_state(sig2, |state, now| {
509            for _ in 0..14 {
510                state.counter.record_suppression(now);
511            }
512        });
513
514        let emitter = SummaryEmitter::new(registry, config);
515        let summaries = emitter.collect_summaries();
516
517        // Only the high-count event should be included
518        assert_eq!(summaries.len(), 1);
519        assert_eq!(summaries[0].count, 15); // 14 + 1 initial
520    }
521
522    #[cfg(feature = "async")]
523    #[tokio::test]
524    async fn test_async_emission() {
525        use std::sync::Mutex;
526
527        let storage = Arc::new(ShardedStorage::new());
528        let clock = Arc::new(SystemClock::new());
529        let policy = Policy::count_based(100).unwrap();
530        let registry = SuppressionRegistry::new(storage, clock, policy);
531        let config = EmitterConfig::new(Duration::from_millis(100)).unwrap();
532
533        // Add a suppressed event
534        let sig = EventSignature::simple("INFO", "Test");
535        registry.with_event_state(sig, |state, now| {
536            state.counter.record_suppression(now);
537        });
538
539        let emitter = SummaryEmitter::new(registry, config);
540
541        // Track emissions
542        let emissions = Arc::new(Mutex::new(Vec::new()));
543        let emissions_clone = Arc::clone(&emissions);
544
545        let handle = emitter.start(
546            move |summaries| {
547                emissions_clone.lock().unwrap().push(summaries.len());
548            },
549            false,
550        );
551
552        // Wait for a couple of intervals
553        tokio::time::sleep(Duration::from_millis(250)).await;
554
555        handle.shutdown().await.expect("shutdown failed");
556
557        // Should have emitted at least once
558        let emission_count = emissions.lock().unwrap().len();
559        assert!(emission_count >= 2);
560    }
561
562    #[test]
563    fn test_emitter_config_zero_interval() {
564        let result = EmitterConfig::new(Duration::from_secs(0));
565        assert!(matches!(
566            result,
567            Err(EmitterConfigError::ZeroSummaryInterval)
568        ));
569    }
570
571    #[test]
572    fn test_emitter_config_valid_interval() {
573        let config = EmitterConfig::new(Duration::from_secs(30)).unwrap();
574        assert_eq!(config.interval, Duration::from_secs(30));
575        assert_eq!(config.min_count, 1);
576    }
577
578    #[cfg(feature = "async")]
579    #[tokio::test]
580    async fn test_graceful_shutdown() {
581        use std::sync::Mutex;
582
583        let storage = Arc::new(ShardedStorage::new());
584        let clock = Arc::new(SystemClock::new());
585        let policy = Policy::count_based(100).unwrap();
586        let registry = SuppressionRegistry::new(storage, clock, policy);
587        let config = EmitterConfig::new(Duration::from_millis(100)).unwrap();
588
589        // Add a suppressed event so there's something to emit
590        let sig = EventSignature::simple("INFO", "Test");
591        registry.with_event_state(sig, |state, now| {
592            state.counter.record_suppression(now);
593        });
594
595        let emitter = SummaryEmitter::new(registry, config);
596
597        let emissions = Arc::new(Mutex::new(0));
598        let emissions_clone = Arc::clone(&emissions);
599
600        let handle = emitter.start(
601            move |_| {
602                *emissions_clone.lock().unwrap() += 1;
603            },
604            false,
605        );
606
607        // Let it run for a bit
608        tokio::time::sleep(Duration::from_millis(250)).await;
609
610        // Trigger graceful shutdown
611        handle.shutdown().await.expect("shutdown failed");
612
613        // Verify task is no longer running
614        let final_count = *emissions.lock().unwrap();
615        assert!(final_count >= 1);
616
617        // Wait a bit more to ensure task really stopped
618        tokio::time::sleep(Duration::from_millis(150)).await;
619        let count_after_shutdown = *emissions.lock().unwrap();
620        assert_eq!(count_after_shutdown, final_count);
621    }
622
623    #[cfg(feature = "async")]
624    #[tokio::test]
625    async fn test_shutdown_with_final_emission() {
626        use std::sync::Mutex;
627
628        let storage = Arc::new(ShardedStorage::new());
629        let clock = Arc::new(SystemClock::new());
630        let policy = Policy::count_based(100).unwrap();
631        let registry = SuppressionRegistry::new(storage, clock, policy);
632        let config = EmitterConfig::new(Duration::from_secs(60)).unwrap(); // Long interval
633
634        let emitter = SummaryEmitter::new(registry.clone(), config);
635
636        let emissions = Arc::new(Mutex::new(Vec::new()));
637        let emissions_clone = Arc::clone(&emissions);
638
639        let handle = emitter.start(
640            move |summaries| {
641                emissions_clone.lock().unwrap().push(summaries.len());
642            },
643            true, // Emit final summaries
644        );
645
646        // Wait for first tick to complete (interval's first tick is immediate)
647        tokio::time::sleep(Duration::from_millis(50)).await;
648
649        // Now add some suppressions after the first tick
650        let sig = EventSignature::simple("INFO", "Test event");
651        registry.with_event_state(sig, |state, now| {
652            for _ in 0..10 {
653                state.counter.record_suppression(now);
654            }
655        });
656
657        // Shutdown before next interval (which is 60 seconds away)
658        tokio::time::sleep(Duration::from_millis(50)).await;
659        handle.shutdown().await.expect("shutdown failed");
660
661        // Should have emitted final summaries
662        let emission_list = emissions.lock().unwrap();
663        assert_eq!(emission_list.len(), 1);
664        assert_eq!(emission_list[0], 1); // 1 summary
665    }
666
667    #[cfg(feature = "async")]
668    #[tokio::test]
669    async fn test_shutdown_without_final_emission() {
670        use std::sync::Mutex;
671
672        let storage = Arc::new(ShardedStorage::new());
673        let clock = Arc::new(SystemClock::new());
674        let policy = Policy::count_based(100).unwrap();
675        let registry = SuppressionRegistry::new(storage, clock, policy);
676        let config = EmitterConfig::new(Duration::from_secs(60)).unwrap();
677
678        let emitter = SummaryEmitter::new(registry.clone(), config);
679
680        let emissions = Arc::new(Mutex::new(0));
681        let emissions_clone = Arc::clone(&emissions);
682
683        let handle = emitter.start(
684            move |_| {
685                *emissions_clone.lock().unwrap() += 1;
686            },
687            false, // No final emission
688        );
689
690        // Wait for first tick (immediate, but no emissions since no suppressions yet)
691        tokio::time::sleep(Duration::from_millis(50)).await;
692
693        // Add some suppressions after first tick
694        let sig = EventSignature::simple("INFO", "Test event");
695        registry.with_event_state(sig, |state, now| {
696            state.counter.record_suppression(now);
697        });
698
699        // Shutdown immediately (before next 60-second interval)
700        tokio::time::sleep(Duration::from_millis(50)).await;
701        handle.shutdown().await.expect("shutdown failed");
702
703        // Should not have emitted anything (no final emission)
704        assert_eq!(*emissions.lock().unwrap(), 0);
705    }
706
707    #[cfg(feature = "async")]
708    #[tokio::test]
709    async fn test_is_running() {
710        let storage = Arc::new(ShardedStorage::new());
711        let clock = Arc::new(SystemClock::new());
712        let policy = Policy::count_based(100).unwrap();
713        let registry = SuppressionRegistry::new(storage, clock, policy);
714        let config = EmitterConfig::new(Duration::from_millis(100)).unwrap();
715
716        let emitter = SummaryEmitter::new(registry, config);
717        let handle = emitter.start(|_| {}, false);
718
719        // Should be running
720        assert!(handle.is_running());
721
722        // Shutdown
723        handle.shutdown().await.expect("shutdown failed");
724
725        // Should no longer be running
726        tokio::time::sleep(Duration::from_millis(50)).await;
727        // Note: is_running() consumes self, so we can't check after shutdown
728    }
729
730    #[cfg(feature = "async")]
731    #[tokio::test]
732    async fn test_shutdown_during_emission() {
733        use std::sync::{Arc, Mutex};
734
735        let storage = Arc::new(ShardedStorage::new());
736        let clock = Arc::new(SystemClock::new());
737        let policy = Policy::count_based(100).unwrap();
738        let registry = SuppressionRegistry::new(storage, clock, policy);
739        let config = EmitterConfig::new(Duration::from_millis(50)).unwrap();
740
741        // Add suppressions
742        let sig = EventSignature::simple("INFO", "Test");
743        registry.with_event_state(sig, |state, now| {
744            state.counter.record_suppression(now);
745        });
746
747        let emitter = SummaryEmitter::new(registry, config);
748
749        let emissions = Arc::new(Mutex::new(0));
750        let emissions_clone = Arc::clone(&emissions);
751
752        let handle = emitter.start(
753            move |_| {
754                // Simulate slow emission
755                std::thread::sleep(Duration::from_millis(30));
756                *emissions_clone.lock().unwrap() += 1;
757            },
758            false,
759        );
760
761        // Let first emission start
762        tokio::time::sleep(Duration::from_millis(60)).await;
763
764        // Shutdown should wait for current emission to complete
765        handle.shutdown().await.expect("shutdown failed");
766
767        // Current emission should have completed
768        assert!(*emissions.lock().unwrap() >= 1);
769    }
770
771    #[cfg(feature = "async")]
772    #[tokio::test]
773    async fn test_shutdown_with_custom_timeout() {
774        let storage = Arc::new(ShardedStorage::new());
775        let clock = Arc::new(SystemClock::new());
776        let policy = Policy::count_based(100).unwrap();
777        let registry = SuppressionRegistry::new(storage, clock, policy);
778        let config = EmitterConfig::new(Duration::from_millis(100)).unwrap();
779
780        let emitter = SummaryEmitter::new(registry, config);
781        let handle = emitter.start(|_| {}, false);
782
783        // Let it run briefly
784        tokio::time::sleep(Duration::from_millis(150)).await;
785
786        // Shutdown with custom timeout should succeed quickly
787        let result = handle.shutdown_with_timeout(Duration::from_secs(5)).await;
788        assert!(result.is_ok());
789    }
790
791    #[cfg(feature = "async")]
792    #[tokio::test]
793    async fn test_panic_in_emit_fn() {
794        use std::sync::atomic::{AtomicUsize, Ordering};
795
796        let storage = Arc::new(ShardedStorage::new());
797        let clock = Arc::new(SystemClock::new());
798        let policy = Policy::count_based(100).unwrap();
799        let registry = SuppressionRegistry::new(storage, clock, policy);
800        let config = EmitterConfig::new(Duration::from_millis(50)).unwrap();
801
802        // Add suppressions
803        let sig = EventSignature::simple("INFO", "Test");
804        registry.with_event_state(sig, |state, now| {
805            for _ in 0..5 {
806                state.counter.record_suppression(now);
807            }
808        });
809
810        let emitter = SummaryEmitter::new(registry, config);
811
812        let call_count = Arc::new(AtomicUsize::new(0));
813        let call_count_clone = Arc::clone(&call_count);
814
815        let handle = emitter.start(
816            move |_summaries| {
817                let count = call_count_clone.fetch_add(1, Ordering::SeqCst);
818
819                // Panic on first call, succeed on subsequent calls
820                if count == 0 {
821                    panic!("intentional panic for testing");
822                }
823                // If we get here, panic was handled and task continued
824            },
825            false,
826        );
827
828        // Let it emit multiple times - first should panic, rest should succeed
829        tokio::time::sleep(Duration::from_millis(200)).await;
830
831        handle.shutdown().await.expect("shutdown failed");
832
833        // Should have attempted multiple emissions (first panicked, others succeeded)
834        let final_count = call_count.load(Ordering::SeqCst);
835        assert!(
836            final_count > 1,
837            "Task should continue after panic in emit_fn"
838        );
839    }
840
841    #[cfg(feature = "async")]
842    #[tokio::test]
843    async fn test_repeated_panic_in_emit_fn() {
844        use std::sync::atomic::{AtomicUsize, Ordering};
845
846        let storage = Arc::new(ShardedStorage::new());
847        let clock = Arc::new(SystemClock::new());
848        let policy = Policy::count_based(100).unwrap();
849        let registry = SuppressionRegistry::new(storage, clock, policy);
850        let config = EmitterConfig::new(Duration::from_millis(30)).unwrap();
851
852        let sig = EventSignature::simple("INFO", "Test");
853        registry.with_event_state(sig, |state, now| {
854            state.counter.record_suppression(now);
855        });
856
857        let emitter = SummaryEmitter::new(registry, config);
858
859        let call_count = Arc::new(AtomicUsize::new(0));
860        let call_count_clone = Arc::clone(&call_count);
861
862        let handle = emitter.start(
863            move |_summaries| {
864                call_count_clone.fetch_add(1, Ordering::SeqCst);
865                panic!("always panic");
866            },
867            false,
868        );
869
870        // Let it run and panic multiple times
871        tokio::time::sleep(Duration::from_millis(150)).await;
872
873        handle.shutdown().await.expect("shutdown failed");
874
875        // Should have attempted multiple times despite continuous panics
876        let final_count = call_count.load(Ordering::SeqCst);
877        assert!(
878            final_count >= 3,
879            "Task should continue despite repeated panics, got {} calls",
880            final_count
881        );
882    }
883
884    #[cfg(feature = "async")]
885    #[tokio::test]
886    async fn test_panic_during_final_emission() {
887        use std::sync::atomic::{AtomicBool, Ordering};
888
889        let storage = Arc::new(ShardedStorage::new());
890        let clock = Arc::new(SystemClock::new());
891        let policy = Policy::count_based(100).unwrap();
892        let registry = SuppressionRegistry::new(storage, clock, policy);
893        let config = EmitterConfig::new(Duration::from_secs(3600)).unwrap(); // Long interval
894
895        let sig = EventSignature::simple("INFO", "Test");
896        registry.with_event_state(sig, |state, now| {
897            state.counter.record_suppression(now);
898        });
899
900        let emitter = SummaryEmitter::new(registry, config);
901
902        let panicked = Arc::new(AtomicBool::new(false));
903        let panicked_clone = Arc::clone(&panicked);
904
905        let handle = emitter.start(
906            move |_summaries| {
907                panicked_clone.store(true, Ordering::SeqCst);
908                panic!("panic during final emission");
909            },
910            true, // emit_on_shutdown
911        );
912
913        // Shutdown immediately - will trigger final emission which panics
914        handle
915            .shutdown()
916            .await
917            .expect("shutdown should succeed even if final emission panics");
918
919        // Verify final emission was attempted
920        assert!(
921            panicked.load(Ordering::SeqCst),
922            "Final emission should have been attempted"
923        );
924    }
925
926    #[cfg(feature = "async")]
927    #[tokio::test]
928    async fn test_shutdown_timeout_with_slow_emit_fn() {
929        let storage = Arc::new(ShardedStorage::new());
930        let clock = Arc::new(SystemClock::new());
931        let policy = Policy::count_based(100).unwrap();
932        let registry = SuppressionRegistry::new(storage, clock, policy);
933        let config = EmitterConfig::new(Duration::from_secs(3600)).unwrap();
934
935        let emitter = SummaryEmitter::new(registry, config);
936
937        let handle = emitter.start(
938            move |_summaries| {
939                // Simulate slow emission - use a shorter duration for testing
940                std::thread::sleep(Duration::from_millis(500));
941            },
942            true,
943        );
944
945        // Shutdown with very short timeout
946        let result = handle
947            .shutdown_with_timeout(Duration::from_millis(10))
948            .await;
949
950        // Should timeout (or at minimum not panic)
951        // Note: timing-sensitive test, may occasionally pass if emit completes quickly
952        let _ = result; // Don't assert - just verify no panic
953    }
954
955    #[cfg(feature = "async")]
956    #[tokio::test]
957    async fn test_handle_dropped_without_shutdown() {
958        let storage = Arc::new(ShardedStorage::new());
959        let clock = Arc::new(SystemClock::new());
960        let policy = Policy::count_based(100).unwrap();
961        let registry = SuppressionRegistry::new(storage, clock, policy);
962        let config = EmitterConfig::new(Duration::from_millis(50)).unwrap();
963
964        let emitter = SummaryEmitter::new(registry, config);
965
966        let handle = emitter.start(|_summaries| {}, false);
967
968        // Drop handle without calling shutdown
969        drop(handle);
970
971        // Task should continue running
972        // This is documented behavior - not a bug
973        tokio::time::sleep(Duration::from_millis(100)).await;
974
975        // No assertions - just verify no panic
976    }
977
978    #[cfg(feature = "async")]
979    #[tokio::test]
980    async fn test_concurrent_shutdown_calls() {
981        let storage = Arc::new(ShardedStorage::new());
982        let clock = Arc::new(SystemClock::new());
983        let policy = Policy::count_based(100).unwrap();
984        let registry = SuppressionRegistry::new(storage, clock, policy);
985        let config = EmitterConfig::new(Duration::from_secs(3600)).unwrap();
986
987        let emitter = SummaryEmitter::new(registry, config);
988        let handle = emitter.start(|_summaries| {}, false);
989
990        // Create multiple shutdown senders
991        let sender = handle.shutdown_tx.clone();
992        let mut handles_vec = vec![];
993
994        // Multiple tasks try to send shutdown signal concurrently
995        for _ in 0..5 {
996            let sender_clone = sender.clone();
997            handles_vec.push(tokio::spawn(async move {
998                let _ = sender_clone.send(true);
999            }));
1000        }
1001
1002        // All send operations should complete without panic
1003        for h in handles_vec {
1004            let _ = h.await;
1005        }
1006
1007        // Clean shutdown
1008        let _ = handle.shutdown().await;
1009    }
1010
1011    #[cfg(feature = "async")]
1012    #[tokio::test]
1013    async fn test_shutdown_signal_failure() {
1014        // Test that shutdown handles channel errors gracefully
1015        let storage = Arc::new(ShardedStorage::new());
1016        let clock = Arc::new(SystemClock::new());
1017        let policy = Policy::count_based(100).unwrap();
1018        let registry = SuppressionRegistry::new(storage, clock, policy);
1019        let config = EmitterConfig::new(Duration::from_millis(30)).unwrap();
1020
1021        let emitter = SummaryEmitter::new(registry, config);
1022        let handle = emitter.start(|_summaries| {}, false);
1023
1024        // Normal shutdown
1025        handle.shutdown().await.expect("shutdown should succeed");
1026
1027        // Verify task stopped
1028        tokio::time::sleep(Duration::from_millis(100)).await;
1029    }
1030}