Skip to main content

nv_runtime/
output.rs

1//! Output types: [`OutputEnvelope`], [`OutputSink`] trait, [`SinkFactory`], and lag detection.
2
3use std::sync::Arc;
4use std::time::Instant;
5
6use nv_core::TypedMetadata;
7use nv_core::health::HealthEvent;
8use nv_core::id::FeedId;
9use nv_core::timestamp::{MonotonicTs, WallTs};
10use nv_frame::FrameEnvelope;
11use nv_perception::{DerivedSignal, DetectionSet, SceneFeature, Track};
12use nv_view::ViewState;
13use tokio::sync::broadcast;
14
15use crate::provenance::Provenance;
16
17/// Factory for constructing fresh [`OutputSink`] instances.
18///
19/// When a sink thread times out or panics during shutdown, the feed
20/// worker loses the original sink. If a `SinkFactory` was provided,
21/// the next restart can construct a fresh sink rather than falling
22/// back to a silent `NullSink`.
23pub type SinkFactory = Box<dyn Fn() -> Box<dyn OutputSink> + Send + Sync>;
24
25/// Controls whether the source [`FrameEnvelope`] is included in the
26/// [`OutputEnvelope`].
27///
28/// Default is [`Never`](FrameInclusion::Never) — output contains only
29/// perception artifacts. Use [`Always`](FrameInclusion::Always) when
30/// downstream consumers need access to the pixel data (e.g., annotation
31/// overlays, frame archival, or visual debugging).
32///
33/// [`Sampled`](FrameInclusion::Sampled) provides a middle ground: frames
34/// are included every `interval` outputs, keeping metadata (detections,
35/// tracks, signals) at full rate while reducing the cost of host
36/// materialization and downstream pixel processing in the sink thread.
37/// For example, `Sampled { interval: 6 }` on a 30 fps source yields
38/// ~5 fps of frame delivery while perception runs at full rate.
39///
40/// Because `FrameEnvelope` is `Arc`-backed, inclusion is zero-copy.
41#[derive(Debug, Clone, Copy, PartialEq, Default)]
42pub enum FrameInclusion {
43    /// Never include frames in output (default).
44    #[default]
45    Never,
46    /// Always include the source frame in output.
47    Always,
48    /// Include the source frame every `interval` outputs.
49    ///
50    /// Perception artifacts (detections, tracks, signals, provenance)
51    /// flow at full rate regardless. Only the pixel payload is gated.
52    ///
53    /// An `interval` of `1` behaves like [`Always`](Self::Always).
54    /// An `interval` of `0` behaves like [`Never`](Self::Never).
55    Sampled {
56        /// Include a frame every N-th output envelope.
57        interval: u32,
58    },
59    /// Include frames at a target preview FPS, resolved dynamically from
60    /// the observed source rate.
61    ///
62    /// During a warmup window (first ~30 frames), `fallback_interval` is
63    /// used. Once the source FPS is estimated, the interval is computed
64    /// as `round(source_fps / target_fps)` and the variant is resolved
65    /// in-place to [`Sampled`](Self::Sampled).
66    ///
67    /// This avoids hardcoding an assumed source FPS at config time.
68    TargetFps {
69        /// Desired preview frames per second.
70        target: f32,
71        /// Interval to use before the source rate is known.
72        fallback_interval: u32,
73    },
74}
75
76impl FrameInclusion {
77    /// Create a sampled frame inclusion policy with edge-case normalization.
78    ///
79    /// - `interval == 0` → [`Never`](Self::Never)
80    /// - `interval == 1` → [`Always`](Self::Always)
81    /// - `interval > 1` → [`Sampled`](Self::Sampled)
82    ///
83    /// Prefer this over constructing [`Sampled`](Self::Sampled) directly
84    /// to avoid footgun values that silently alias other variants.
85    #[must_use]
86    pub fn sampled(interval: u32) -> Self {
87        match interval {
88            0 => Self::Never,
89            1 => Self::Always,
90            n => Self::Sampled { interval: n },
91        }
92    }
93
94    /// Create a target-FPS frame inclusion that resolves dynamically
95    /// from the observed source rate.
96    ///
97    /// Until the source rate is known (warmup window), falls back to
98    /// `fallback_interval`. Once observed, resolves to `Sampled`.
99    ///
100    /// `fallback_interval` is normalized: 0 → Never, 1 → Always.
101    #[must_use]
102    pub fn target_fps(target: f32, fallback_interval: u32) -> Self {
103        if target <= 0.0 {
104            return Self::Never;
105        }
106        Self::TargetFps {
107            target,
108            fallback_interval,
109        }
110    }
111
112    /// Compute a sampled frame inclusion from a target preview FPS and
113    /// an assumed source FPS.
114    ///
115    /// The interval is `round(source / target)`, clamped to valid range.
116    ///
117    /// For runtime-adaptive behavior, prefer [`target_fps`](Self::target_fps)
118    /// which resolves from observed source rate instead of a static assumption.
119    ///
120    /// # Examples
121    ///
122    /// ```
123    /// # use nv_runtime::FrameInclusion;
124    /// assert_eq!(
125    ///     FrameInclusion::from_target_fps(5.0, 30.0),
126    ///     FrameInclusion::Sampled { interval: 6 },
127    /// );
128    /// assert_eq!(
129    ///     FrameInclusion::from_target_fps(60.0, 30.0),
130    ///     FrameInclusion::Always,
131    /// );
132    /// ```
133    #[must_use]
134    pub fn from_target_fps(target_fps: f32, assumed_source_fps: f32) -> Self {
135        if target_fps <= 0.0 {
136            return Self::Never;
137        }
138        if assumed_source_fps <= 0.0 || target_fps >= assumed_source_fps {
139            return Self::Always;
140        }
141        let interval = (assumed_source_fps / target_fps).round() as u32;
142        Self::sampled(interval)
143    }
144
145    /// The effective sample interval.
146    ///
147    /// - [`Never`](Self::Never) → `0`
148    /// - [`Always`](Self::Always) → `1`
149    /// - [`Sampled`](Self::Sampled) → the configured interval
150    /// - [`TargetFps`](Self::TargetFps) → the fallback interval
151    ///   (actual interval is determined at runtime)
152    #[must_use]
153    pub fn effective_interval(&self) -> u32 {
154        match self {
155            Self::Never => 0,
156            Self::Always => 1,
157            Self::Sampled { interval } => *interval,
158            Self::TargetFps {
159                fallback_interval, ..
160            } => *fallback_interval,
161        }
162    }
163
164    /// Resolve a [`TargetFps`](Self::TargetFps) variant to a concrete
165    /// [`Sampled`](Self::Sampled) interval given the observed source FPS.
166    ///
167    /// Returns the resolved variant. Non-`TargetFps` variants are
168    /// returned unchanged.
169    #[must_use]
170    pub fn resolve_with_source_fps(self, source_fps: f32) -> Self {
171        match self {
172            Self::TargetFps {
173                target,
174                fallback_interval,
175            } => {
176                if source_fps <= 0.0 {
177                    Self::sampled(fallback_interval)
178                } else {
179                    Self::from_target_fps(target, source_fps)
180                }
181            }
182            other => other,
183        }
184    }
185}
186
187/// Summary of temporal-store admission for this frame.
188///
189/// Populated during the temporal commit phase. Tells downstream consumers
190/// how many tracks were admitted vs. rejected due to the concurrent-track
191/// cap, without requiring them to subscribe to health events.
192#[derive(Debug, Clone, Copy, Default, PartialEq, Eq)]
193pub struct AdmissionSummary {
194    /// Number of tracks successfully committed to the temporal store.
195    pub admitted: u32,
196    /// Number of new tracks rejected because the store was at capacity
197    /// and no eviction victim was available.
198    pub rejected: u32,
199}
200
201/// Structured output for one processed frame.
202///
203/// Contains the complete perception result, view state, and full provenance.
204/// Delivered to the user via [`OutputSink::emit`].
205///
206/// Broadcast subscribers receive `Arc<OutputEnvelope>` to avoid cloning the
207/// full payload on every send. The per-feed [`OutputSink`] receives owned
208/// values.
209#[derive(Debug, Clone)]
210pub struct OutputEnvelope {
211    /// Which feed produced this output.
212    pub feed_id: FeedId,
213    /// Monotonic frame sequence number.
214    pub frame_seq: u64,
215    /// Monotonic timestamp of the source frame.
216    pub ts: MonotonicTs,
217    /// Wall-clock timestamp of the source frame.
218    pub wall_ts: WallTs,
219    /// Final detection set after all stages.
220    pub detections: DetectionSet,
221    /// Final track set after all stages.
222    pub tracks: Vec<Track>,
223    /// All derived signals from all stages.
224    pub signals: Vec<DerivedSignal>,
225    /// Scene-level features from all stages.
226    pub scene_features: Vec<SceneFeature>,
227    /// View state at the time of this frame.
228    pub view: ViewState,
229    /// Full provenance: stage timings, view decisions, pipeline latency.
230    pub provenance: Provenance,
231    /// Extensible output metadata.
232    pub metadata: TypedMetadata,
233    /// The source frame, present when [`FrameInclusion::Always`] is
234    /// configured, or on sampled frames when [`FrameInclusion::Sampled`]
235    /// is configured.
236    ///
237    /// This is a zero-copy `Arc` clone of the frame the pipeline processed.
238    pub frame: Option<FrameEnvelope>,
239    /// Temporal-store admission outcome for this frame's tracks.
240    pub admission: AdmissionSummary,
241}
242
243/// User-implementable trait: receives structured outputs from the pipeline.
244///
245/// `emit()` is called on a dedicated per-feed sink thread, decoupled from
246/// the feed's processing loop by a bounded queue. This isolation ensures
247/// that a slow sink does not block perception.
248///
249/// The output arrives as `Arc<OutputEnvelope>` for zero-copy handoff.
250/// Sinks that need an owned copy can call `Arc::unwrap_or_clone()` or
251/// clone specific fields as needed.
252///
253/// `emit()` is wrapped in `catch_unwind` — a panicking sink emits a
254/// [`HealthEvent::SinkPanic`] and the output is dropped, but the feed
255/// continues. If a sink blocks during shutdown, a
256/// [`HealthEvent::SinkTimeout`] is emitted and the sink thread is detached.
257///
258/// `emit()` is deliberately **not** async and **not** fallible:
259///
260/// - If the sink needs async I/O, it should buffer and channel internally.
261/// - If the sink fails, it should log and drop — the perception pipeline
262///   must never block on downstream consumption.
263pub trait OutputSink: Send + 'static {
264    /// Receive a processed output envelope.
265    fn emit(&self, output: Arc<OutputEnvelope>);
266}
267
268/// Arc-wrapped output envelope for zero-copy broadcast fan-out.
269///
270/// Returned by [`Runtime::output_subscribe`](crate::Runtime::output_subscribe).
271/// Subscribers share the same allocation; no full clone is needed per receiver.
272pub type SharedOutput = Arc<OutputEnvelope>;
273
274// ---------------------------------------------------------------------------
275// LagDetector — deterministic output-channel overflow detection
276// ---------------------------------------------------------------------------
277
278/// Minimum interval between consecutive `OutputLagged` health events.
279const LAG_THROTTLE_INTERVAL: std::time::Duration = std::time::Duration::from_secs(1);
280
281/// Detects output broadcast channel saturation using an internal sentinel
282/// receiver.
283///
284/// A single instance is shared (via `Arc`) across all feed workers.
285/// After every broadcast `send()`, each worker calls
286/// [`check_after_send`](LagDetector::check_after_send) which:
287///
288/// 1. Increments a send counter (atomic, lock-free).
289/// 2. When enough sends have accumulated for the ring buffer to potentially
290///    wrap (≥ `capacity`), acquires the sentinel mutex via `try_lock`.
291/// 3. Drains the sentinel receiver. A `TryRecvError::Lagged(n)` indicates
292///    the ring buffer wrapped past the sentinel's read position by `n`
293///    messages.
294/// 4. Accumulates sentinel-observed wrap counts and emits a throttled
295///    [`HealthEvent::OutputLagged`] event:
296///    - **Transition into saturation:** always emitted immediately.
297///    - **Sustained saturation:** at most once per second, carrying the
298///      accumulated delta for the interval.
299///    - **Recovery:** a final event flushes any remaining accumulated
300///      delta.
301///
302/// The sentinel receiver intentionally does **not** consume messages between
303/// checks, so it naturally falls behind when the channel overflows. This
304/// makes saturation detection deterministic — it observes
305/// `TryRecvError::Lagged(n)` rather than predicting overflow from queue
306/// length.
307///
308/// **Semantics:** the sentinel reports ring-buffer wrap, **not** guaranteed
309/// per-subscriber loss. The sentinel is the *slowest possible* consumer;
310/// any external subscriber keeping up with production will experience less
311/// (or no) loss. The emitted event is a channel-saturation /
312/// backpressure-risk signal — not a per-subscriber loss report.
313///
314/// # Thread safety
315///
316/// The send counter is `AtomicU64`. All remaining mutable state is behind
317/// a single `std::sync::Mutex` guarded by `try_lock`, so contention never
318/// blocks the per-frame hot path.
319///
320/// # Hot-path cost
321///
322/// - **Most frames:** one `fetch_add` + one comparison (sends < capacity).
323/// - **Every ~capacity frames:** one `try_lock` + sentinel drain + optional
324///   throttled event emission.
325pub(crate) struct LagDetector {
326    /// Total sends since the last sentinel drain.
327    sends_since_check: std::sync::atomic::AtomicU64,
328    inner: std::sync::Mutex<LagDetectorInner>,
329    /// Broadcast channel capacity — determines drain interval.
330    capacity: usize,
331    /// Minimum interval between consecutive throttled events.
332    throttle_interval: std::time::Duration,
333}
334
335struct LagDetectorInner {
336    sentinel: broadcast::Receiver<SharedOutput>,
337    in_lag: bool,
338    /// Messages evicted since the last emitted `OutputLagged` event.
339    accumulated_lost: u64,
340    /// When the last `OutputLagged` event was emitted.
341    last_event_time: Instant,
342}
343
344impl LagDetector {
345    /// Create a new lag detector from a sentinel receiver.
346    ///
347    /// `capacity` must match the broadcast channel's capacity so the
348    /// detector knows when the ring buffer can wrap.
349    ///
350    /// The sentinel is a dedicated `broadcast::Receiver` created internally
351    /// by the runtime — it is never exposed to users. It counts as one
352    /// receiver in `Sender::receiver_count()`.
353    pub fn new(sentinel: broadcast::Receiver<SharedOutput>, capacity: usize) -> Self {
354        Self::with_config(sentinel, capacity, LAG_THROTTLE_INTERVAL)
355    }
356
357    /// Internal constructor with configurable throttle interval.
358    ///
359    /// `check_after_send` uses `throttle_interval` to cap emission rate
360    /// during sustained saturation.
361    fn with_config(
362        sentinel: broadcast::Receiver<SharedOutput>,
363        capacity: usize,
364        throttle_interval: std::time::Duration,
365    ) -> Self {
366        Self {
367            sends_since_check: std::sync::atomic::AtomicU64::new(0),
368            inner: std::sync::Mutex::new(LagDetectorInner {
369                sentinel,
370                in_lag: false,
371                accumulated_lost: 0,
372                last_event_time: Instant::now(),
373            }),
374            capacity,
375            throttle_interval,
376        }
377    }
378
379    /// Record a send and, when enough sends have accumulated, drain the
380    /// sentinel to detect channel saturation.
381    ///
382    /// Call this after every successful `broadcast::Sender::send()`.
383    pub fn check_after_send(&self, health_tx: &broadcast::Sender<HealthEvent>) {
384        use std::sync::atomic::Ordering;
385
386        let sends = self.sends_since_check.fetch_add(1, Ordering::Relaxed) + 1;
387
388        // The sentinel hasn't consumed any messages since the last drain.
389        // It can only observe Lagged(n) when the ring buffer wraps past
390        // its position, which requires more than `capacity` sends
391        // (since the ring buffer holds exactly `capacity` messages).
392        if (sends as usize) <= self.capacity {
393            return;
394        }
395
396        let Ok(mut inner) = self.inner.try_lock() else {
397            return;
398        };
399
400        // Reset send counter now that we're draining.
401        self.sends_since_check.store(0, Ordering::Relaxed);
402
403        // Drain the sentinel: it has been idle since the last drain, so
404        // any ring-buffer wrap shows up as Lagged(n).
405        let mut lost: u64 = 0;
406        loop {
407            match inner.sentinel.try_recv() {
408                Ok(_) => {} // consume to catch up
409                Err(broadcast::error::TryRecvError::Lagged(n)) => {
410                    lost += n;
411                    // Receiver position was advanced past the gap.
412                    // Continue draining to fully catch up.
413                }
414                Err(broadcast::error::TryRecvError::Empty) => break,
415                Err(broadcast::error::TryRecvError::Closed) => break,
416            }
417        }
418
419        if lost == 0 {
420            // Sentinel caught up with no missed messages — buffer did not wrap.
421            if inner.in_lag && inner.accumulated_lost > 0 {
422                // Recovery: flush remaining accumulated delta.
423                let delta = inner.accumulated_lost;
424                inner.accumulated_lost = 0;
425                inner.in_lag = false;
426                inner.last_event_time = Instant::now();
427                drop(inner);
428                let _ = health_tx.send(HealthEvent::OutputLagged {
429                    messages_lost: delta,
430                });
431            } else {
432                inner.in_lag = false;
433            }
434            return;
435        }
436
437        // Saturation detected: sentinel missed `lost` messages due to
438        // ring-buffer wrap.
439        inner.accumulated_lost += lost;
440
441        let should_emit = if !inner.in_lag {
442            // Transition into saturation — always emit immediately.
443            inner.in_lag = true;
444            true
445        } else {
446            // Already saturated — emit at most once per throttle interval.
447            inner.last_event_time.elapsed() >= self.throttle_interval
448        };
449
450        if should_emit {
451            let delta = inner.accumulated_lost;
452            inner.accumulated_lost = 0;
453            inner.last_event_time = Instant::now();
454            drop(inner);
455            let _ = health_tx.send(HealthEvent::OutputLagged {
456                messages_lost: delta,
457            });
458        }
459    }
460
461    /// Snapshot the current lag status without emitting any health events.
462    ///
463    /// Uses a blocking lock with poison recovery — safe for the
464    /// diagnostics polling path (off hot path, 1–5 s interval).
465    pub fn status(&self) -> crate::diagnostics::OutputLagStatus {
466        let inner = self.inner.lock().unwrap_or_else(|e| e.into_inner());
467        crate::diagnostics::OutputLagStatus {
468            in_lag: inner.in_lag,
469            pending_lost: inner.accumulated_lost,
470        }
471    }
472
473    /// Reset the detector after transitioning to a no-external-subscriber
474    /// state.
475    ///
476    /// If accumulated sentinel-observed wrap from a prior saturation period
477    /// is pending, it is flushed as a single final `OutputLagged` event
478    /// before the reset. The sentinel is then drained without reporting
479    /// (no subscriber cares about sentinel-only wrap), and all state is
480    /// cleared.
481    ///
482    /// Uses a blocking `lock()` — this is called off the hot path, only
483    /// when `receiver_count` drops below the external threshold.
484    pub fn realign(&self, health_tx: &broadcast::Sender<HealthEvent>) {
485        let mut inner = self.inner.lock().unwrap_or_else(|e| e.into_inner());
486
487        // Flush any pending sentinel-observed delta first.
488        if inner.accumulated_lost > 0 {
489            let delta = inner.accumulated_lost;
490            inner.accumulated_lost = 0;
491            let _ = health_tx.send(HealthEvent::OutputLagged {
492                messages_lost: delta,
493            });
494        }
495
496        // Drain the sentinel completely — discard everything, no external
497        // subscriber is present to care.
498        loop {
499            match inner.sentinel.try_recv() {
500                Ok(_) => {}
501                Err(broadcast::error::TryRecvError::Lagged(_)) => {}
502                Err(broadcast::error::TryRecvError::Empty) => break,
503                Err(broadcast::error::TryRecvError::Closed) => break,
504            }
505        }
506
507        // Reset saturation state.
508        inner.in_lag = false;
509        inner.last_event_time = Instant::now();
510        self.sends_since_check
511            .store(0, std::sync::atomic::Ordering::Relaxed);
512    }
513
514    /// Flush any pending accumulated sentinel-observed delta as a single
515    /// final event.
516    ///
517    /// Called on shutdown or any other point where no further sends will
518    /// occur. Does **not** drain the sentinel — only emits the delta that
519    /// was already accumulated by prior `check_after_send` calls.
520    ///
521    /// Uses a blocking `lock()`.
522    pub fn flush(&self, health_tx: &broadcast::Sender<HealthEvent>) {
523        let mut inner = self.inner.lock().unwrap_or_else(|e| e.into_inner());
524        if inner.accumulated_lost > 0 {
525            let delta = inner.accumulated_lost;
526            inner.accumulated_lost = 0;
527            inner.in_lag = false;
528            let _ = health_tx.send(HealthEvent::OutputLagged {
529                messages_lost: delta,
530            });
531        }
532    }
533}
534
535// ===========================================================================
536// Unit tests for LagDetector
537// ===========================================================================
538
539#[cfg(test)]
540mod tests {
541    use super::*;
542    use std::sync::Arc;
543    use std::time::Duration;
544
545    use nv_core::TypedMetadata;
546    use nv_core::health::HealthEvent;
547    use nv_core::id::FeedId;
548    use nv_core::timestamp::{MonotonicTs, WallTs};
549    use nv_perception::DetectionSet;
550    use nv_view::ViewState;
551    use tokio::sync::broadcast;
552
553    use crate::provenance::{Provenance, ViewProvenance};
554
555    /// Create a minimal dummy `SharedOutput` for broadcast sending.
556    fn make_dummy_output() -> SharedOutput {
557        Arc::new(OutputEnvelope {
558            feed_id: FeedId::new(0),
559            frame_seq: 0,
560            ts: MonotonicTs::from_nanos(0),
561            wall_ts: WallTs::from_micros(0),
562            detections: DetectionSet::empty(),
563            tracks: Vec::new(),
564            signals: Vec::new(),
565            scene_features: Vec::new(),
566            view: ViewState::fixed_initial(),
567            provenance: Provenance {
568                stages: Vec::new(),
569                view_provenance: ViewProvenance {
570                    motion_source: nv_view::MotionSource::None,
571                    epoch_decision: None,
572                    transition: nv_view::TransitionPhase::Settled,
573                    stability_score: 1.0,
574                    epoch: nv_view::view_state::ViewEpoch::INITIAL,
575                    version: nv_view::view_state::ViewVersion::INITIAL,
576                },
577                frame_receive_ts: MonotonicTs::from_nanos(0),
578                pipeline_complete_ts: MonotonicTs::from_nanos(0),
579                total_latency: nv_core::Duration::from_nanos(0),
580                frame_age: None,
581                queue_hold_time: std::time::Duration::ZERO,
582                frame_included: false,
583            },
584            metadata: TypedMetadata::new(),
585            frame: None,
586            admission: AdmissionSummary::default(),
587        })
588    }
589
590    fn make_detector(
591        capacity: usize,
592        throttle: Duration,
593    ) -> (broadcast::Sender<SharedOutput>, LagDetector) {
594        let (tx, sentinel_rx) = broadcast::channel(capacity);
595        let detector = LagDetector::with_config(sentinel_rx, capacity, throttle);
596        (tx, detector)
597    }
598
599    fn make_health() -> (
600        broadcast::Sender<HealthEvent>,
601        broadcast::Receiver<HealthEvent>,
602    ) {
603        broadcast::channel(128)
604    }
605
606    /// Collect all `OutputLagged` deltas from the health channel.
607    fn collect_lag_deltas(rx: &mut broadcast::Receiver<HealthEvent>) -> Vec<u64> {
608        let mut deltas = Vec::new();
609        while let Ok(evt) = rx.try_recv() {
610            if let HealthEvent::OutputLagged { messages_lost } = evt {
611                deltas.push(messages_lost);
612            }
613        }
614        deltas
615    }
616
617    // D.1: delta_not_cumulative_exact
618    //
619    // Two equal loss intervals must produce equal deltas, not
620    // cumulative totals.
621    #[test]
622    fn delta_not_cumulative_exact() {
623        let capacity = 4;
624        // Zero throttle — emit every check so we get crisp per-interval deltas.
625        let (tx, detector) = make_detector(capacity, Duration::ZERO);
626        let (health_tx, mut health_rx) = make_health();
627
628        // First interval: capacity+1 sends ⇒ sentinel lags by 1.
629        for _ in 0..capacity + 1 {
630            let _ = tx.send(make_dummy_output());
631            detector.check_after_send(&health_tx);
632        }
633        let d1 = collect_lag_deltas(&mut health_rx);
634
635        // Second interval: same pattern.
636        for _ in 0..capacity + 1 {
637            let _ = tx.send(make_dummy_output());
638            detector.check_after_send(&health_tx);
639        }
640        let d2 = collect_lag_deltas(&mut health_rx);
641
642        // Both intervals should report exactly the same delta, not cumulative.
643        assert!(!d1.is_empty(), "first interval should emit a lag event");
644        assert!(!d2.is_empty(), "second interval should emit a lag event");
645
646        let sum1: u64 = d1.iter().sum();
647        let sum2: u64 = d2.iter().sum();
648        assert_eq!(sum1, sum2, "equal loss intervals must produce equal deltas");
649    }
650
651    // D.2: throttle_blocks_event_storm
652    //
653    // Many overflow checks within one throttle interval must produce at most
654    // one event (the transition event).
655    #[test]
656    fn throttle_blocks_event_storm() {
657        let capacity = 4;
658        // 1-second throttle — the loop below runs in well under 1 second.
659        let (tx, detector) = make_detector(capacity, Duration::from_secs(1));
660        let (health_tx, mut health_rx) = make_health();
661
662        // 10 full drain cycles, each producing Lagged(1).
663        for _ in 0..10 {
664            for _ in 0..capacity + 1 {
665                let _ = tx.send(make_dummy_output());
666                detector.check_after_send(&health_tx);
667            }
668        }
669
670        let deltas = collect_lag_deltas(&mut health_rx);
671
672        // Only the first transition event should have been emitted;
673        // all subsequent overflow within the same second is accumulated.
674        assert_eq!(
675            deltas.len(),
676            1,
677            "throttle should block storm: got {:?}",
678            deltas
679        );
680        // The single event carries the first interval's delta.
681        assert!(deltas[0] > 0, "emitted delta must be positive");
682    }
683
684    // D.3: throttle_allows_periodic_emission
685    //
686    // Sustained lag across >1 throttle interval should produce bounded
687    // periodic emissions.
688    #[test]
689    fn throttle_allows_periodic_emission() {
690        let capacity = 4;
691        // Very short throttle so the test doesn't sleep long.
692        let throttle = Duration::from_millis(10);
693        let (tx, detector) = make_detector(capacity, throttle);
694        let (health_tx, mut health_rx) = make_health();
695
696        // First interval — triggers transition event.
697        for _ in 0..capacity + 1 {
698            let _ = tx.send(make_dummy_output());
699            detector.check_after_send(&health_tx);
700        }
701        let d1 = collect_lag_deltas(&mut health_rx);
702        assert_eq!(d1.len(), 1, "transition event");
703
704        // Wait past throttle interval.
705        std::thread::sleep(throttle + Duration::from_millis(5));
706
707        // Another drain cycle — should now be eligible for periodic emission.
708        for _ in 0..capacity + 1 {
709            let _ = tx.send(make_dummy_output());
710            detector.check_after_send(&health_tx);
711        }
712        let d2 = collect_lag_deltas(&mut health_rx);
713        assert_eq!(d2.len(), 1, "periodic emission after interval elapsed");
714        assert!(d2[0] > 0, "periodic delta must be positive");
715    }
716
717    // D.4: no_subscriber_reset_prevents_false_positive
718    //
719    // Induce a sentinel-only window via realign, then verify no lag is
720    // reported from that window. A subsequent subscriber window must
721    // report only its own losses.
722    #[test]
723    fn no_subscriber_reset_prevents_false_positive() {
724        let capacity = 4;
725        let (tx, detector) = make_detector(capacity, Duration::ZERO);
726        let (health_tx, mut health_rx) = make_health();
727
728        // Phase 1: normal sends — accumulate sentinel backlog.
729        for _ in 0..capacity {
730            let _ = tx.send(make_dummy_output());
731            detector.check_after_send(&health_tx);
732        }
733
734        // No lag events yet (sentinel is exactly capacity behind, not lagged).
735        let d = collect_lag_deltas(&mut health_rx);
736        assert!(d.is_empty(), "no lag before buffer wraps");
737
738        // Phase 2: realign — simulates no-subscriber transition.
739        // Any stale sentinel state is discarded.
740        detector.realign(&health_tx);
741        let flushed = collect_lag_deltas(&mut health_rx);
742        // No accumulated loss to flush (we never exceeded capacity).
743        assert!(flushed.is_empty(), "no pending loss to flush on realign");
744
745        // Phase 3: more sends in a "new subscriber" window.
746        // After realign, sends_since_check is 0 and sentinel is caught up.
747        // Send capacity+1 to trigger one drain cycle with Lagged(1).
748        for _ in 0..capacity + 1 {
749            let _ = tx.send(make_dummy_output());
750            detector.check_after_send(&health_tx);
751        }
752        let d3 = collect_lag_deltas(&mut health_rx);
753
754        // Should report only the loss from this interval, not from the
755        // pre-realign window.
756        assert!(
757            !d3.is_empty(),
758            "new window should produce its own lag events"
759        );
760        let total: u64 = d3.iter().sum();
761        assert!(
762            total > 0 && total <= 2,
763            "delta should reflect only new-window loss"
764        );
765    }
766
767    // D.5: flush_pending_emits_final_delta
768    //
769    // Create pending accumulated loss via check_after_send (throttled,
770    // so not yet emitted), then flush and verify the final event.
771    #[test]
772    fn flush_pending_emits_final_delta() {
773        let capacity = 4;
774        // Long throttle — the second drain interval's loss remains pending.
775        let (tx, detector) = make_detector(capacity, Duration::from_secs(60));
776        let (health_tx, mut health_rx) = make_health();
777
778        // First drain cycle — transition event emitted.
779        for _ in 0..capacity + 1 {
780            let _ = tx.send(make_dummy_output());
781            detector.check_after_send(&health_tx);
782        }
783        let d1 = collect_lag_deltas(&mut health_rx);
784        assert_eq!(d1.len(), 1, "transition event emitted");
785
786        // Second drain cycle — throttled, loss is accumulated but not emitted.
787        for _ in 0..capacity + 1 {
788            let _ = tx.send(make_dummy_output());
789            detector.check_after_send(&health_tx);
790        }
791        let d2 = collect_lag_deltas(&mut health_rx);
792        assert!(d2.is_empty(), "throttled — no event emitted yet");
793
794        // Flush — should emit the stranded accumulated loss.
795        detector.flush(&health_tx);
796        let d3 = collect_lag_deltas(&mut health_rx);
797        assert_eq!(d3.len(), 1, "flush must emit exactly one event");
798        assert!(d3[0] > 0, "flushed delta must be positive");
799
800        // Flush again — nothing pending, no event.
801        detector.flush(&health_tx);
802        let d4 = collect_lag_deltas(&mut health_rx);
803        assert!(d4.is_empty(), "double flush must not emit");
804    }
805
806    // Verify that realign flushes real pending loss before resetting.
807    #[test]
808    fn realign_flushes_pending_before_reset() {
809        let capacity = 4;
810        // Long throttle so second-interval loss stays pending.
811        let (tx, detector) = make_detector(capacity, Duration::from_secs(60));
812        let (health_tx, mut health_rx) = make_health();
813
814        // Transition event.
815        for _ in 0..capacity + 1 {
816            let _ = tx.send(make_dummy_output());
817            detector.check_after_send(&health_tx);
818        }
819        let _ = collect_lag_deltas(&mut health_rx);
820
821        // Accumulate more loss (throttled, stays pending).
822        for _ in 0..capacity + 1 {
823            let _ = tx.send(make_dummy_output());
824            detector.check_after_send(&health_tx);
825        }
826        let pending = collect_lag_deltas(&mut health_rx);
827        assert!(pending.is_empty(), "loss stays pending under throttle");
828
829        // Realign should flush the pending loss, then reset.
830        detector.realign(&health_tx);
831        let flushed = collect_lag_deltas(&mut health_rx);
832        assert_eq!(flushed.len(), 1, "realign must flush pending loss");
833        assert!(flushed[0] > 0, "flushed delta must be positive");
834
835        // After realign, no more events until new loss accumulates.
836        detector.flush(&health_tx);
837        let after = collect_lag_deltas(&mut health_rx);
838        assert!(after.is_empty(), "no pending loss after realign + flush");
839    }
840
841    // ---------------------------------------------------------------
842    // FrameInclusion normalization + constructor tests
843    // ---------------------------------------------------------------
844
845    #[test]
846    fn sampled_zero_normalizes_to_never() {
847        assert_eq!(FrameInclusion::sampled(0), FrameInclusion::Never);
848    }
849
850    #[test]
851    fn sampled_one_normalizes_to_always() {
852        assert_eq!(FrameInclusion::sampled(1), FrameInclusion::Always);
853    }
854
855    #[test]
856    fn sampled_above_one_creates_sampled() {
857        assert_eq!(
858            FrameInclusion::sampled(6),
859            FrameInclusion::Sampled { interval: 6 },
860        );
861    }
862
863    #[test]
864    fn from_target_fps_5_at_30_yields_interval_6() {
865        assert_eq!(
866            FrameInclusion::from_target_fps(5.0, 30.0),
867            FrameInclusion::Sampled { interval: 6 },
868        );
869    }
870
871    #[test]
872    fn from_target_fps_10_at_30_yields_interval_3() {
873        assert_eq!(
874            FrameInclusion::from_target_fps(10.0, 30.0),
875            FrameInclusion::Sampled { interval: 3 },
876        );
877    }
878
879    #[test]
880    fn from_target_fps_zero_is_never() {
881        assert_eq!(
882            FrameInclusion::from_target_fps(0.0, 30.0),
883            FrameInclusion::Never,
884        );
885    }
886
887    #[test]
888    fn from_target_fps_negative_is_never() {
889        assert_eq!(
890            FrameInclusion::from_target_fps(-5.0, 30.0),
891            FrameInclusion::Never,
892        );
893    }
894
895    #[test]
896    fn from_target_fps_above_source_is_always() {
897        assert_eq!(
898            FrameInclusion::from_target_fps(60.0, 30.0),
899            FrameInclusion::Always,
900        );
901    }
902
903    #[test]
904    fn from_target_fps_equal_to_source_is_always() {
905        assert_eq!(
906            FrameInclusion::from_target_fps(30.0, 30.0),
907            FrameInclusion::Always,
908        );
909    }
910
911    #[test]
912    fn from_target_fps_with_zero_source_is_always() {
913        assert_eq!(
914            FrameInclusion::from_target_fps(5.0, 0.0),
915            FrameInclusion::Always,
916        );
917    }
918
919    #[test]
920    fn effective_interval_values() {
921        assert_eq!(FrameInclusion::Never.effective_interval(), 0);
922        assert_eq!(FrameInclusion::Always.effective_interval(), 1);
923        assert_eq!(
924            FrameInclusion::Sampled { interval: 6 }.effective_interval(),
925            6,
926        );
927        assert_eq!(
928            FrameInclusion::TargetFps {
929                target: 5.0,
930                fallback_interval: 6
931            }
932            .effective_interval(),
933            6,
934        );
935    }
936
937    // ---------------------------------------------------------------
938    // TargetFps constructor + resolution tests
939    // ---------------------------------------------------------------
940
941    #[test]
942    fn target_fps_zero_is_never() {
943        assert_eq!(FrameInclusion::target_fps(0.0, 6), FrameInclusion::Never);
944    }
945
946    #[test]
947    fn target_fps_negative_is_never() {
948        assert_eq!(FrameInclusion::target_fps(-5.0, 6), FrameInclusion::Never);
949    }
950
951    #[test]
952    fn target_fps_positive_creates_variant() {
953        assert_eq!(
954            FrameInclusion::target_fps(5.0, 6),
955            FrameInclusion::TargetFps {
956                target: 5.0,
957                fallback_interval: 6
958            },
959        );
960    }
961
962    #[test]
963    fn resolve_target_fps_with_30_source() {
964        let fi = FrameInclusion::target_fps(5.0, 6);
965        let resolved = fi.resolve_with_source_fps(30.0);
966        assert_eq!(resolved, FrameInclusion::Sampled { interval: 6 });
967    }
968
969    #[test]
970    fn resolve_target_fps_with_25_source() {
971        let fi = FrameInclusion::target_fps(5.0, 6);
972        let resolved = fi.resolve_with_source_fps(25.0);
973        assert_eq!(resolved, FrameInclusion::Sampled { interval: 5 });
974    }
975
976    #[test]
977    fn resolve_target_fps_with_15_source() {
978        let fi = FrameInclusion::target_fps(5.0, 6);
979        let resolved = fi.resolve_with_source_fps(15.0);
980        assert_eq!(resolved, FrameInclusion::Sampled { interval: 3 });
981    }
982
983    #[test]
984    fn resolve_target_fps_above_source_is_always() {
985        let fi = FrameInclusion::target_fps(60.0, 6);
986        let resolved = fi.resolve_with_source_fps(30.0);
987        assert_eq!(resolved, FrameInclusion::Always);
988    }
989
990    #[test]
991    fn resolve_target_fps_zero_source_uses_fallback() {
992        let fi = FrameInclusion::target_fps(5.0, 6);
993        let resolved = fi.resolve_with_source_fps(0.0);
994        assert_eq!(resolved, FrameInclusion::Sampled { interval: 6 });
995    }
996
997    #[test]
998    fn resolve_noop_for_sampled() {
999        let fi = FrameInclusion::Sampled { interval: 3 };
1000        let resolved = fi.resolve_with_source_fps(30.0);
1001        assert_eq!(resolved, FrameInclusion::Sampled { interval: 3 });
1002    }
1003
1004    #[test]
1005    fn resolve_noop_for_never() {
1006        let fi = FrameInclusion::Never;
1007        let resolved = fi.resolve_with_source_fps(30.0);
1008        assert_eq!(resolved, FrameInclusion::Never);
1009    }
1010}