nv_runtime/output.rs
1//! Output types: [`OutputEnvelope`], [`OutputSink`] trait, [`SinkFactory`], and lag detection.
2
3use std::sync::Arc;
4use std::time::Instant;
5
6use nv_core::TypedMetadata;
7use nv_core::health::HealthEvent;
8use nv_core::id::FeedId;
9use nv_core::timestamp::{MonotonicTs, WallTs};
10use nv_frame::FrameEnvelope;
11use nv_perception::{DerivedSignal, DetectionSet, SceneFeature, Track};
12use nv_view::ViewState;
13use tokio::sync::broadcast;
14
15use crate::provenance::Provenance;
16
17/// Factory for constructing fresh [`OutputSink`] instances.
18///
19/// When a sink thread times out or panics during shutdown, the feed
20/// worker loses the original sink. If a `SinkFactory` was provided,
21/// the next restart can construct a fresh sink rather than falling
22/// back to a silent `NullSink`.
23pub type SinkFactory = Box<dyn Fn() -> Box<dyn OutputSink> + Send + Sync>;
24
25/// Controls whether the source [`FrameEnvelope`] is included in the
26/// [`OutputEnvelope`].
27///
28/// Default is [`Never`](FrameInclusion::Never) — output contains only
29/// perception artifacts. Use [`Always`](FrameInclusion::Always) when
30/// downstream consumers need access to the pixel data (e.g., annotation
31/// overlays, frame archival, or visual debugging).
32///
33/// [`Sampled`](FrameInclusion::Sampled) provides a middle ground: frames
34/// are included every `interval` outputs, keeping metadata (detections,
35/// tracks, signals) at full rate while reducing the cost of host
36/// materialization and downstream pixel processing in the sink thread.
37/// For example, `Sampled { interval: 6 }` on a 30 fps source yields
38/// ~5 fps of frame delivery while perception runs at full rate.
39///
40/// Because `FrameEnvelope` is `Arc`-backed, inclusion is zero-copy.
41#[derive(Debug, Clone, Copy, PartialEq, Default)]
42pub enum FrameInclusion {
43 /// Never include frames in output (default).
44 #[default]
45 Never,
46 /// Always include the source frame in output.
47 Always,
48 /// Include the source frame every `interval` outputs.
49 ///
50 /// Perception artifacts (detections, tracks, signals, provenance)
51 /// flow at full rate regardless. Only the pixel payload is gated.
52 ///
53 /// An `interval` of `1` behaves like [`Always`](Self::Always).
54 /// An `interval` of `0` behaves like [`Never`](Self::Never).
55 Sampled {
56 /// Include a frame every N-th output envelope.
57 interval: u32,
58 },
59 /// Include frames at a target preview FPS, resolved dynamically from
60 /// the observed source rate.
61 ///
62 /// During a warmup window (first ~30 frames), `fallback_interval` is
63 /// used. Once the source FPS is estimated, the interval is computed
64 /// as `round(source_fps / target_fps)` and the variant is resolved
65 /// in-place to [`Sampled`](Self::Sampled).
66 ///
67 /// This avoids hardcoding an assumed source FPS at config time.
68 TargetFps {
69 /// Desired preview frames per second.
70 target: f32,
71 /// Interval to use before the source rate is known.
72 fallback_interval: u32,
73 },
74}
75
76impl FrameInclusion {
77 /// Create a sampled frame inclusion policy with edge-case normalization.
78 ///
79 /// - `interval == 0` → [`Never`](Self::Never)
80 /// - `interval == 1` → [`Always`](Self::Always)
81 /// - `interval > 1` → [`Sampled`](Self::Sampled)
82 ///
83 /// Prefer this over constructing [`Sampled`](Self::Sampled) directly
84 /// to avoid footgun values that silently alias other variants.
85 #[must_use]
86 pub fn sampled(interval: u32) -> Self {
87 match interval {
88 0 => Self::Never,
89 1 => Self::Always,
90 n => Self::Sampled { interval: n },
91 }
92 }
93
94 /// Create a target-FPS frame inclusion that resolves dynamically
95 /// from the observed source rate.
96 ///
97 /// Until the source rate is known (warmup window), falls back to
98 /// `fallback_interval`. Once observed, resolves to `Sampled`.
99 ///
100 /// `fallback_interval` is normalized: 0 → Never, 1 → Always.
101 #[must_use]
102 pub fn target_fps(target: f32, fallback_interval: u32) -> Self {
103 if target <= 0.0 {
104 return Self::Never;
105 }
106 Self::TargetFps {
107 target,
108 fallback_interval,
109 }
110 }
111
112 /// Compute a sampled frame inclusion from a target preview FPS and
113 /// an assumed source FPS.
114 ///
115 /// The interval is `round(source / target)`, clamped to valid range.
116 ///
117 /// For runtime-adaptive behavior, prefer [`target_fps`](Self::target_fps)
118 /// which resolves from observed source rate instead of a static assumption.
119 ///
120 /// # Examples
121 ///
122 /// ```
123 /// # use nv_runtime::FrameInclusion;
124 /// assert_eq!(
125 /// FrameInclusion::from_target_fps(5.0, 30.0),
126 /// FrameInclusion::Sampled { interval: 6 },
127 /// );
128 /// assert_eq!(
129 /// FrameInclusion::from_target_fps(60.0, 30.0),
130 /// FrameInclusion::Always,
131 /// );
132 /// ```
133 #[must_use]
134 pub fn from_target_fps(target_fps: f32, assumed_source_fps: f32) -> Self {
135 if target_fps <= 0.0 {
136 return Self::Never;
137 }
138 if assumed_source_fps <= 0.0 || target_fps >= assumed_source_fps {
139 return Self::Always;
140 }
141 let interval = (assumed_source_fps / target_fps).round() as u32;
142 Self::sampled(interval)
143 }
144
145 /// The effective sample interval.
146 ///
147 /// - [`Never`](Self::Never) → `0`
148 /// - [`Always`](Self::Always) → `1`
149 /// - [`Sampled`](Self::Sampled) → the configured interval
150 /// - [`TargetFps`](Self::TargetFps) → the fallback interval
151 /// (actual interval is determined at runtime)
152 #[must_use]
153 pub fn effective_interval(&self) -> u32 {
154 match self {
155 Self::Never => 0,
156 Self::Always => 1,
157 Self::Sampled { interval } => *interval,
158 Self::TargetFps {
159 fallback_interval, ..
160 } => *fallback_interval,
161 }
162 }
163
164 /// Resolve a [`TargetFps`](Self::TargetFps) variant to a concrete
165 /// [`Sampled`](Self::Sampled) interval given the observed source FPS.
166 ///
167 /// Returns the resolved variant. Non-`TargetFps` variants are
168 /// returned unchanged.
169 #[must_use]
170 pub fn resolve_with_source_fps(self, source_fps: f32) -> Self {
171 match self {
172 Self::TargetFps {
173 target,
174 fallback_interval,
175 } => {
176 if source_fps <= 0.0 {
177 Self::sampled(fallback_interval)
178 } else {
179 Self::from_target_fps(target, source_fps)
180 }
181 }
182 other => other,
183 }
184 }
185}
186
187/// Summary of temporal-store admission for this frame.
188///
189/// Populated during the temporal commit phase. Tells downstream consumers
190/// how many tracks were admitted vs. rejected due to the concurrent-track
191/// cap, without requiring them to subscribe to health events.
192#[derive(Debug, Clone, Copy, Default, PartialEq, Eq)]
193pub struct AdmissionSummary {
194 /// Number of tracks successfully committed to the temporal store.
195 pub admitted: u32,
196 /// Number of new tracks rejected because the store was at capacity
197 /// and no eviction victim was available.
198 pub rejected: u32,
199}
200
201/// Structured output for one processed frame.
202///
203/// Contains the complete perception result, view state, and full provenance.
204/// Delivered to the user via [`OutputSink::emit`].
205///
206/// Broadcast subscribers receive `Arc<OutputEnvelope>` to avoid cloning the
207/// full payload on every send. The per-feed [`OutputSink`] receives owned
208/// values.
209#[derive(Debug, Clone)]
210pub struct OutputEnvelope {
211 /// Which feed produced this output.
212 pub feed_id: FeedId,
213 /// Monotonic frame sequence number.
214 pub frame_seq: u64,
215 /// Monotonic timestamp of the source frame.
216 pub ts: MonotonicTs,
217 /// Wall-clock timestamp of the source frame.
218 pub wall_ts: WallTs,
219 /// Final detection set after all stages.
220 pub detections: DetectionSet,
221 /// Final track set after all stages.
222 pub tracks: Vec<Track>,
223 /// All derived signals from all stages.
224 pub signals: Vec<DerivedSignal>,
225 /// Scene-level features from all stages.
226 pub scene_features: Vec<SceneFeature>,
227 /// View state at the time of this frame.
228 pub view: ViewState,
229 /// Full provenance: stage timings, view decisions, pipeline latency.
230 pub provenance: Provenance,
231 /// Extensible output metadata.
232 pub metadata: TypedMetadata,
233 /// The source frame, present when [`FrameInclusion::Always`] is
234 /// configured, or on sampled frames when [`FrameInclusion::Sampled`]
235 /// is configured.
236 ///
237 /// This is a zero-copy `Arc` clone of the frame the pipeline processed.
238 pub frame: Option<FrameEnvelope>,
239 /// Temporal-store admission outcome for this frame's tracks.
240 pub admission: AdmissionSummary,
241}
242
243/// User-implementable trait: receives structured outputs from the pipeline.
244///
245/// `emit()` is called on a dedicated per-feed sink thread, decoupled from
246/// the feed's processing loop by a bounded queue. This isolation ensures
247/// that a slow sink does not block perception.
248///
249/// The output arrives as `Arc<OutputEnvelope>` for zero-copy handoff.
250/// Sinks that need an owned copy can call `Arc::unwrap_or_clone()` or
251/// clone specific fields as needed.
252///
253/// `emit()` is wrapped in `catch_unwind` — a panicking sink emits a
254/// [`HealthEvent::SinkPanic`] and the output is dropped, but the feed
255/// continues. If a sink blocks during shutdown, a
256/// [`HealthEvent::SinkTimeout`] is emitted and the sink thread is detached.
257///
258/// `emit()` is deliberately **not** async and **not** fallible:
259///
260/// - If the sink needs async I/O, it should buffer and channel internally.
261/// - If the sink fails, it should log and drop — the perception pipeline
262/// must never block on downstream consumption.
263pub trait OutputSink: Send + 'static {
264 /// Receive a processed output envelope.
265 fn emit(&self, output: Arc<OutputEnvelope>);
266}
267
268/// Arc-wrapped output envelope for zero-copy broadcast fan-out.
269///
270/// Returned by [`Runtime::output_subscribe`](crate::Runtime::output_subscribe).
271/// Subscribers share the same allocation; no full clone is needed per receiver.
272pub type SharedOutput = Arc<OutputEnvelope>;
273
274// ---------------------------------------------------------------------------
275// LagDetector — deterministic output-channel overflow detection
276// ---------------------------------------------------------------------------
277
278/// Minimum interval between consecutive `OutputLagged` health events.
279const LAG_THROTTLE_INTERVAL: std::time::Duration = std::time::Duration::from_secs(1);
280
281/// Detects output broadcast channel saturation using an internal sentinel
282/// receiver.
283///
284/// A single instance is shared (via `Arc`) across all feed workers.
285/// After every broadcast `send()`, each worker calls
286/// [`check_after_send`](LagDetector::check_after_send) which:
287///
288/// 1. Increments a send counter (atomic, lock-free).
289/// 2. When enough sends have accumulated for the ring buffer to potentially
290/// wrap (≥ `capacity`), acquires the sentinel mutex via `try_lock`.
291/// 3. Drains the sentinel receiver. A `TryRecvError::Lagged(n)` indicates
292/// the ring buffer wrapped past the sentinel's read position by `n`
293/// messages.
294/// 4. Accumulates sentinel-observed wrap counts and emits a throttled
295/// [`HealthEvent::OutputLagged`] event:
296/// - **Transition into saturation:** always emitted immediately.
297/// - **Sustained saturation:** at most once per second, carrying the
298/// accumulated delta for the interval.
299/// - **Recovery:** a final event flushes any remaining accumulated
300/// delta.
301///
302/// The sentinel receiver intentionally does **not** consume messages between
303/// checks, so it naturally falls behind when the channel overflows. This
304/// makes saturation detection deterministic — it observes
305/// `TryRecvError::Lagged(n)` rather than predicting overflow from queue
306/// length.
307///
308/// **Semantics:** the sentinel reports ring-buffer wrap, **not** guaranteed
309/// per-subscriber loss. The sentinel is the *slowest possible* consumer;
310/// any external subscriber keeping up with production will experience less
311/// (or no) loss. The emitted event is a channel-saturation /
312/// backpressure-risk signal — not a per-subscriber loss report.
313///
314/// # Thread safety
315///
316/// The send counter is `AtomicU64`. All remaining mutable state is behind
317/// a single `std::sync::Mutex` guarded by `try_lock`, so contention never
318/// blocks the per-frame hot path.
319///
320/// # Hot-path cost
321///
322/// - **Most frames:** one `fetch_add` + one comparison (sends < capacity).
323/// - **Every ~capacity frames:** one `try_lock` + sentinel drain + optional
324/// throttled event emission.
325pub(crate) struct LagDetector {
326 /// Total sends since the last sentinel drain.
327 sends_since_check: std::sync::atomic::AtomicU64,
328 inner: std::sync::Mutex<LagDetectorInner>,
329 /// Broadcast channel capacity — determines drain interval.
330 capacity: usize,
331 /// Minimum interval between consecutive throttled events.
332 throttle_interval: std::time::Duration,
333}
334
335struct LagDetectorInner {
336 sentinel: broadcast::Receiver<SharedOutput>,
337 in_lag: bool,
338 /// Messages evicted since the last emitted `OutputLagged` event.
339 accumulated_lost: u64,
340 /// When the last `OutputLagged` event was emitted.
341 last_event_time: Instant,
342}
343
344impl LagDetector {
345 /// Create a new lag detector from a sentinel receiver.
346 ///
347 /// `capacity` must match the broadcast channel's capacity so the
348 /// detector knows when the ring buffer can wrap.
349 ///
350 /// The sentinel is a dedicated `broadcast::Receiver` created internally
351 /// by the runtime — it is never exposed to users. It counts as one
352 /// receiver in `Sender::receiver_count()`.
353 pub fn new(sentinel: broadcast::Receiver<SharedOutput>, capacity: usize) -> Self {
354 Self::with_config(sentinel, capacity, LAG_THROTTLE_INTERVAL)
355 }
356
357 /// Internal constructor with configurable throttle interval.
358 ///
359 /// `check_after_send` uses `throttle_interval` to cap emission rate
360 /// during sustained saturation.
361 fn with_config(
362 sentinel: broadcast::Receiver<SharedOutput>,
363 capacity: usize,
364 throttle_interval: std::time::Duration,
365 ) -> Self {
366 Self {
367 sends_since_check: std::sync::atomic::AtomicU64::new(0),
368 inner: std::sync::Mutex::new(LagDetectorInner {
369 sentinel,
370 in_lag: false,
371 accumulated_lost: 0,
372 last_event_time: Instant::now(),
373 }),
374 capacity,
375 throttle_interval,
376 }
377 }
378
379 /// Record a send and, when enough sends have accumulated, drain the
380 /// sentinel to detect channel saturation.
381 ///
382 /// Call this after every successful `broadcast::Sender::send()`.
383 pub fn check_after_send(&self, health_tx: &broadcast::Sender<HealthEvent>) {
384 use std::sync::atomic::Ordering;
385
386 let sends = self.sends_since_check.fetch_add(1, Ordering::Relaxed) + 1;
387
388 // The sentinel hasn't consumed any messages since the last drain.
389 // It can only observe Lagged(n) when the ring buffer wraps past
390 // its position, which requires more than `capacity` sends
391 // (since the ring buffer holds exactly `capacity` messages).
392 if (sends as usize) <= self.capacity {
393 return;
394 }
395
396 let Ok(mut inner) = self.inner.try_lock() else {
397 return;
398 };
399
400 // Reset send counter now that we're draining.
401 self.sends_since_check.store(0, Ordering::Relaxed);
402
403 // Drain the sentinel: it has been idle since the last drain, so
404 // any ring-buffer wrap shows up as Lagged(n).
405 let mut lost: u64 = 0;
406 loop {
407 match inner.sentinel.try_recv() {
408 Ok(_) => {} // consume to catch up
409 Err(broadcast::error::TryRecvError::Lagged(n)) => {
410 lost += n;
411 // Receiver position was advanced past the gap.
412 // Continue draining to fully catch up.
413 }
414 Err(broadcast::error::TryRecvError::Empty) => break,
415 Err(broadcast::error::TryRecvError::Closed) => break,
416 }
417 }
418
419 if lost == 0 {
420 // Sentinel caught up with no missed messages — buffer did not wrap.
421 if inner.in_lag && inner.accumulated_lost > 0 {
422 // Recovery: flush remaining accumulated delta.
423 let delta = inner.accumulated_lost;
424 inner.accumulated_lost = 0;
425 inner.in_lag = false;
426 inner.last_event_time = Instant::now();
427 drop(inner);
428 let _ = health_tx.send(HealthEvent::OutputLagged {
429 messages_lost: delta,
430 });
431 } else {
432 inner.in_lag = false;
433 }
434 return;
435 }
436
437 // Saturation detected: sentinel missed `lost` messages due to
438 // ring-buffer wrap.
439 inner.accumulated_lost += lost;
440
441 let should_emit = if !inner.in_lag {
442 // Transition into saturation — always emit immediately.
443 inner.in_lag = true;
444 true
445 } else {
446 // Already saturated — emit at most once per throttle interval.
447 inner.last_event_time.elapsed() >= self.throttle_interval
448 };
449
450 if should_emit {
451 let delta = inner.accumulated_lost;
452 inner.accumulated_lost = 0;
453 inner.last_event_time = Instant::now();
454 drop(inner);
455 let _ = health_tx.send(HealthEvent::OutputLagged {
456 messages_lost: delta,
457 });
458 }
459 }
460
461 /// Snapshot the current lag status without emitting any health events.
462 ///
463 /// Uses a blocking lock with poison recovery — safe for the
464 /// diagnostics polling path (off hot path, 1–5 s interval).
465 pub fn status(&self) -> crate::diagnostics::OutputLagStatus {
466 let inner = self.inner.lock().unwrap_or_else(|e| e.into_inner());
467 crate::diagnostics::OutputLagStatus {
468 in_lag: inner.in_lag,
469 pending_lost: inner.accumulated_lost,
470 }
471 }
472
473 /// Reset the detector after transitioning to a no-external-subscriber
474 /// state.
475 ///
476 /// If accumulated sentinel-observed wrap from a prior saturation period
477 /// is pending, it is flushed as a single final `OutputLagged` event
478 /// before the reset. The sentinel is then drained without reporting
479 /// (no subscriber cares about sentinel-only wrap), and all state is
480 /// cleared.
481 ///
482 /// Uses a blocking `lock()` — this is called off the hot path, only
483 /// when `receiver_count` drops below the external threshold.
484 pub fn realign(&self, health_tx: &broadcast::Sender<HealthEvent>) {
485 let mut inner = self.inner.lock().unwrap_or_else(|e| e.into_inner());
486
487 // Flush any pending sentinel-observed delta first.
488 if inner.accumulated_lost > 0 {
489 let delta = inner.accumulated_lost;
490 inner.accumulated_lost = 0;
491 let _ = health_tx.send(HealthEvent::OutputLagged {
492 messages_lost: delta,
493 });
494 }
495
496 // Drain the sentinel completely — discard everything, no external
497 // subscriber is present to care.
498 loop {
499 match inner.sentinel.try_recv() {
500 Ok(_) => {}
501 Err(broadcast::error::TryRecvError::Lagged(_)) => {}
502 Err(broadcast::error::TryRecvError::Empty) => break,
503 Err(broadcast::error::TryRecvError::Closed) => break,
504 }
505 }
506
507 // Reset saturation state.
508 inner.in_lag = false;
509 inner.last_event_time = Instant::now();
510 self.sends_since_check
511 .store(0, std::sync::atomic::Ordering::Relaxed);
512 }
513
514 /// Flush any pending accumulated sentinel-observed delta as a single
515 /// final event.
516 ///
517 /// Called on shutdown or any other point where no further sends will
518 /// occur. Does **not** drain the sentinel — only emits the delta that
519 /// was already accumulated by prior `check_after_send` calls.
520 ///
521 /// Uses a blocking `lock()`.
522 pub fn flush(&self, health_tx: &broadcast::Sender<HealthEvent>) {
523 let mut inner = self.inner.lock().unwrap_or_else(|e| e.into_inner());
524 if inner.accumulated_lost > 0 {
525 let delta = inner.accumulated_lost;
526 inner.accumulated_lost = 0;
527 inner.in_lag = false;
528 let _ = health_tx.send(HealthEvent::OutputLagged {
529 messages_lost: delta,
530 });
531 }
532 }
533}
534
535// ===========================================================================
536// Unit tests for LagDetector
537// ===========================================================================
538
539#[cfg(test)]
540mod tests {
541 use super::*;
542 use std::sync::Arc;
543 use std::time::Duration;
544
545 use nv_core::TypedMetadata;
546 use nv_core::health::HealthEvent;
547 use nv_core::id::FeedId;
548 use nv_core::timestamp::{MonotonicTs, WallTs};
549 use nv_perception::DetectionSet;
550 use nv_view::ViewState;
551 use tokio::sync::broadcast;
552
553 use crate::provenance::{Provenance, ViewProvenance};
554
555 /// Create a minimal dummy `SharedOutput` for broadcast sending.
556 fn make_dummy_output() -> SharedOutput {
557 Arc::new(OutputEnvelope {
558 feed_id: FeedId::new(0),
559 frame_seq: 0,
560 ts: MonotonicTs::from_nanos(0),
561 wall_ts: WallTs::from_micros(0),
562 detections: DetectionSet::empty(),
563 tracks: Vec::new(),
564 signals: Vec::new(),
565 scene_features: Vec::new(),
566 view: ViewState::fixed_initial(),
567 provenance: Provenance {
568 stages: Vec::new(),
569 view_provenance: ViewProvenance {
570 motion_source: nv_view::MotionSource::None,
571 epoch_decision: None,
572 transition: nv_view::TransitionPhase::Settled,
573 stability_score: 1.0,
574 epoch: nv_view::view_state::ViewEpoch::INITIAL,
575 version: nv_view::view_state::ViewVersion::INITIAL,
576 },
577 frame_receive_ts: MonotonicTs::from_nanos(0),
578 pipeline_complete_ts: MonotonicTs::from_nanos(0),
579 total_latency: nv_core::Duration::from_nanos(0),
580 frame_age: None,
581 queue_hold_time: std::time::Duration::ZERO,
582 frame_included: false,
583 },
584 metadata: TypedMetadata::new(),
585 frame: None,
586 admission: AdmissionSummary::default(),
587 })
588 }
589
590 fn make_detector(
591 capacity: usize,
592 throttle: Duration,
593 ) -> (broadcast::Sender<SharedOutput>, LagDetector) {
594 let (tx, sentinel_rx) = broadcast::channel(capacity);
595 let detector = LagDetector::with_config(sentinel_rx, capacity, throttle);
596 (tx, detector)
597 }
598
599 fn make_health() -> (
600 broadcast::Sender<HealthEvent>,
601 broadcast::Receiver<HealthEvent>,
602 ) {
603 broadcast::channel(128)
604 }
605
606 /// Collect all `OutputLagged` deltas from the health channel.
607 fn collect_lag_deltas(rx: &mut broadcast::Receiver<HealthEvent>) -> Vec<u64> {
608 let mut deltas = Vec::new();
609 while let Ok(evt) = rx.try_recv() {
610 if let HealthEvent::OutputLagged { messages_lost } = evt {
611 deltas.push(messages_lost);
612 }
613 }
614 deltas
615 }
616
617 // D.1: delta_not_cumulative_exact
618 //
619 // Two equal loss intervals must produce equal deltas, not
620 // cumulative totals.
621 #[test]
622 fn delta_not_cumulative_exact() {
623 let capacity = 4;
624 // Zero throttle — emit every check so we get crisp per-interval deltas.
625 let (tx, detector) = make_detector(capacity, Duration::ZERO);
626 let (health_tx, mut health_rx) = make_health();
627
628 // First interval: capacity+1 sends ⇒ sentinel lags by 1.
629 for _ in 0..capacity + 1 {
630 let _ = tx.send(make_dummy_output());
631 detector.check_after_send(&health_tx);
632 }
633 let d1 = collect_lag_deltas(&mut health_rx);
634
635 // Second interval: same pattern.
636 for _ in 0..capacity + 1 {
637 let _ = tx.send(make_dummy_output());
638 detector.check_after_send(&health_tx);
639 }
640 let d2 = collect_lag_deltas(&mut health_rx);
641
642 // Both intervals should report exactly the same delta, not cumulative.
643 assert!(!d1.is_empty(), "first interval should emit a lag event");
644 assert!(!d2.is_empty(), "second interval should emit a lag event");
645
646 let sum1: u64 = d1.iter().sum();
647 let sum2: u64 = d2.iter().sum();
648 assert_eq!(sum1, sum2, "equal loss intervals must produce equal deltas");
649 }
650
651 // D.2: throttle_blocks_event_storm
652 //
653 // Many overflow checks within one throttle interval must produce at most
654 // one event (the transition event).
655 #[test]
656 fn throttle_blocks_event_storm() {
657 let capacity = 4;
658 // 1-second throttle — the loop below runs in well under 1 second.
659 let (tx, detector) = make_detector(capacity, Duration::from_secs(1));
660 let (health_tx, mut health_rx) = make_health();
661
662 // 10 full drain cycles, each producing Lagged(1).
663 for _ in 0..10 {
664 for _ in 0..capacity + 1 {
665 let _ = tx.send(make_dummy_output());
666 detector.check_after_send(&health_tx);
667 }
668 }
669
670 let deltas = collect_lag_deltas(&mut health_rx);
671
672 // Only the first transition event should have been emitted;
673 // all subsequent overflow within the same second is accumulated.
674 assert_eq!(
675 deltas.len(),
676 1,
677 "throttle should block storm: got {:?}",
678 deltas
679 );
680 // The single event carries the first interval's delta.
681 assert!(deltas[0] > 0, "emitted delta must be positive");
682 }
683
684 // D.3: throttle_allows_periodic_emission
685 //
686 // Sustained lag across >1 throttle interval should produce bounded
687 // periodic emissions.
688 #[test]
689 fn throttle_allows_periodic_emission() {
690 let capacity = 4;
691 // Very short throttle so the test doesn't sleep long.
692 let throttle = Duration::from_millis(10);
693 let (tx, detector) = make_detector(capacity, throttle);
694 let (health_tx, mut health_rx) = make_health();
695
696 // First interval — triggers transition event.
697 for _ in 0..capacity + 1 {
698 let _ = tx.send(make_dummy_output());
699 detector.check_after_send(&health_tx);
700 }
701 let d1 = collect_lag_deltas(&mut health_rx);
702 assert_eq!(d1.len(), 1, "transition event");
703
704 // Wait past throttle interval.
705 std::thread::sleep(throttle + Duration::from_millis(5));
706
707 // Another drain cycle — should now be eligible for periodic emission.
708 for _ in 0..capacity + 1 {
709 let _ = tx.send(make_dummy_output());
710 detector.check_after_send(&health_tx);
711 }
712 let d2 = collect_lag_deltas(&mut health_rx);
713 assert_eq!(d2.len(), 1, "periodic emission after interval elapsed");
714 assert!(d2[0] > 0, "periodic delta must be positive");
715 }
716
717 // D.4: no_subscriber_reset_prevents_false_positive
718 //
719 // Induce a sentinel-only window via realign, then verify no lag is
720 // reported from that window. A subsequent subscriber window must
721 // report only its own losses.
722 #[test]
723 fn no_subscriber_reset_prevents_false_positive() {
724 let capacity = 4;
725 let (tx, detector) = make_detector(capacity, Duration::ZERO);
726 let (health_tx, mut health_rx) = make_health();
727
728 // Phase 1: normal sends — accumulate sentinel backlog.
729 for _ in 0..capacity {
730 let _ = tx.send(make_dummy_output());
731 detector.check_after_send(&health_tx);
732 }
733
734 // No lag events yet (sentinel is exactly capacity behind, not lagged).
735 let d = collect_lag_deltas(&mut health_rx);
736 assert!(d.is_empty(), "no lag before buffer wraps");
737
738 // Phase 2: realign — simulates no-subscriber transition.
739 // Any stale sentinel state is discarded.
740 detector.realign(&health_tx);
741 let flushed = collect_lag_deltas(&mut health_rx);
742 // No accumulated loss to flush (we never exceeded capacity).
743 assert!(flushed.is_empty(), "no pending loss to flush on realign");
744
745 // Phase 3: more sends in a "new subscriber" window.
746 // After realign, sends_since_check is 0 and sentinel is caught up.
747 // Send capacity+1 to trigger one drain cycle with Lagged(1).
748 for _ in 0..capacity + 1 {
749 let _ = tx.send(make_dummy_output());
750 detector.check_after_send(&health_tx);
751 }
752 let d3 = collect_lag_deltas(&mut health_rx);
753
754 // Should report only the loss from this interval, not from the
755 // pre-realign window.
756 assert!(
757 !d3.is_empty(),
758 "new window should produce its own lag events"
759 );
760 let total: u64 = d3.iter().sum();
761 assert!(
762 total > 0 && total <= 2,
763 "delta should reflect only new-window loss"
764 );
765 }
766
767 // D.5: flush_pending_emits_final_delta
768 //
769 // Create pending accumulated loss via check_after_send (throttled,
770 // so not yet emitted), then flush and verify the final event.
771 #[test]
772 fn flush_pending_emits_final_delta() {
773 let capacity = 4;
774 // Long throttle — the second drain interval's loss remains pending.
775 let (tx, detector) = make_detector(capacity, Duration::from_secs(60));
776 let (health_tx, mut health_rx) = make_health();
777
778 // First drain cycle — transition event emitted.
779 for _ in 0..capacity + 1 {
780 let _ = tx.send(make_dummy_output());
781 detector.check_after_send(&health_tx);
782 }
783 let d1 = collect_lag_deltas(&mut health_rx);
784 assert_eq!(d1.len(), 1, "transition event emitted");
785
786 // Second drain cycle — throttled, loss is accumulated but not emitted.
787 for _ in 0..capacity + 1 {
788 let _ = tx.send(make_dummy_output());
789 detector.check_after_send(&health_tx);
790 }
791 let d2 = collect_lag_deltas(&mut health_rx);
792 assert!(d2.is_empty(), "throttled — no event emitted yet");
793
794 // Flush — should emit the stranded accumulated loss.
795 detector.flush(&health_tx);
796 let d3 = collect_lag_deltas(&mut health_rx);
797 assert_eq!(d3.len(), 1, "flush must emit exactly one event");
798 assert!(d3[0] > 0, "flushed delta must be positive");
799
800 // Flush again — nothing pending, no event.
801 detector.flush(&health_tx);
802 let d4 = collect_lag_deltas(&mut health_rx);
803 assert!(d4.is_empty(), "double flush must not emit");
804 }
805
806 // Verify that realign flushes real pending loss before resetting.
807 #[test]
808 fn realign_flushes_pending_before_reset() {
809 let capacity = 4;
810 // Long throttle so second-interval loss stays pending.
811 let (tx, detector) = make_detector(capacity, Duration::from_secs(60));
812 let (health_tx, mut health_rx) = make_health();
813
814 // Transition event.
815 for _ in 0..capacity + 1 {
816 let _ = tx.send(make_dummy_output());
817 detector.check_after_send(&health_tx);
818 }
819 let _ = collect_lag_deltas(&mut health_rx);
820
821 // Accumulate more loss (throttled, stays pending).
822 for _ in 0..capacity + 1 {
823 let _ = tx.send(make_dummy_output());
824 detector.check_after_send(&health_tx);
825 }
826 let pending = collect_lag_deltas(&mut health_rx);
827 assert!(pending.is_empty(), "loss stays pending under throttle");
828
829 // Realign should flush the pending loss, then reset.
830 detector.realign(&health_tx);
831 let flushed = collect_lag_deltas(&mut health_rx);
832 assert_eq!(flushed.len(), 1, "realign must flush pending loss");
833 assert!(flushed[0] > 0, "flushed delta must be positive");
834
835 // After realign, no more events until new loss accumulates.
836 detector.flush(&health_tx);
837 let after = collect_lag_deltas(&mut health_rx);
838 assert!(after.is_empty(), "no pending loss after realign + flush");
839 }
840
841 // ---------------------------------------------------------------
842 // FrameInclusion normalization + constructor tests
843 // ---------------------------------------------------------------
844
845 #[test]
846 fn sampled_zero_normalizes_to_never() {
847 assert_eq!(FrameInclusion::sampled(0), FrameInclusion::Never);
848 }
849
850 #[test]
851 fn sampled_one_normalizes_to_always() {
852 assert_eq!(FrameInclusion::sampled(1), FrameInclusion::Always);
853 }
854
855 #[test]
856 fn sampled_above_one_creates_sampled() {
857 assert_eq!(
858 FrameInclusion::sampled(6),
859 FrameInclusion::Sampled { interval: 6 },
860 );
861 }
862
863 #[test]
864 fn from_target_fps_5_at_30_yields_interval_6() {
865 assert_eq!(
866 FrameInclusion::from_target_fps(5.0, 30.0),
867 FrameInclusion::Sampled { interval: 6 },
868 );
869 }
870
871 #[test]
872 fn from_target_fps_10_at_30_yields_interval_3() {
873 assert_eq!(
874 FrameInclusion::from_target_fps(10.0, 30.0),
875 FrameInclusion::Sampled { interval: 3 },
876 );
877 }
878
879 #[test]
880 fn from_target_fps_zero_is_never() {
881 assert_eq!(
882 FrameInclusion::from_target_fps(0.0, 30.0),
883 FrameInclusion::Never,
884 );
885 }
886
887 #[test]
888 fn from_target_fps_negative_is_never() {
889 assert_eq!(
890 FrameInclusion::from_target_fps(-5.0, 30.0),
891 FrameInclusion::Never,
892 );
893 }
894
895 #[test]
896 fn from_target_fps_above_source_is_always() {
897 assert_eq!(
898 FrameInclusion::from_target_fps(60.0, 30.0),
899 FrameInclusion::Always,
900 );
901 }
902
903 #[test]
904 fn from_target_fps_equal_to_source_is_always() {
905 assert_eq!(
906 FrameInclusion::from_target_fps(30.0, 30.0),
907 FrameInclusion::Always,
908 );
909 }
910
911 #[test]
912 fn from_target_fps_with_zero_source_is_always() {
913 assert_eq!(
914 FrameInclusion::from_target_fps(5.0, 0.0),
915 FrameInclusion::Always,
916 );
917 }
918
919 #[test]
920 fn effective_interval_values() {
921 assert_eq!(FrameInclusion::Never.effective_interval(), 0);
922 assert_eq!(FrameInclusion::Always.effective_interval(), 1);
923 assert_eq!(
924 FrameInclusion::Sampled { interval: 6 }.effective_interval(),
925 6,
926 );
927 assert_eq!(
928 FrameInclusion::TargetFps {
929 target: 5.0,
930 fallback_interval: 6
931 }
932 .effective_interval(),
933 6,
934 );
935 }
936
937 // ---------------------------------------------------------------
938 // TargetFps constructor + resolution tests
939 // ---------------------------------------------------------------
940
941 #[test]
942 fn target_fps_zero_is_never() {
943 assert_eq!(FrameInclusion::target_fps(0.0, 6), FrameInclusion::Never);
944 }
945
946 #[test]
947 fn target_fps_negative_is_never() {
948 assert_eq!(FrameInclusion::target_fps(-5.0, 6), FrameInclusion::Never);
949 }
950
951 #[test]
952 fn target_fps_positive_creates_variant() {
953 assert_eq!(
954 FrameInclusion::target_fps(5.0, 6),
955 FrameInclusion::TargetFps {
956 target: 5.0,
957 fallback_interval: 6
958 },
959 );
960 }
961
962 #[test]
963 fn resolve_target_fps_with_30_source() {
964 let fi = FrameInclusion::target_fps(5.0, 6);
965 let resolved = fi.resolve_with_source_fps(30.0);
966 assert_eq!(resolved, FrameInclusion::Sampled { interval: 6 });
967 }
968
969 #[test]
970 fn resolve_target_fps_with_25_source() {
971 let fi = FrameInclusion::target_fps(5.0, 6);
972 let resolved = fi.resolve_with_source_fps(25.0);
973 assert_eq!(resolved, FrameInclusion::Sampled { interval: 5 });
974 }
975
976 #[test]
977 fn resolve_target_fps_with_15_source() {
978 let fi = FrameInclusion::target_fps(5.0, 6);
979 let resolved = fi.resolve_with_source_fps(15.0);
980 assert_eq!(resolved, FrameInclusion::Sampled { interval: 3 });
981 }
982
983 #[test]
984 fn resolve_target_fps_above_source_is_always() {
985 let fi = FrameInclusion::target_fps(60.0, 6);
986 let resolved = fi.resolve_with_source_fps(30.0);
987 assert_eq!(resolved, FrameInclusion::Always);
988 }
989
990 #[test]
991 fn resolve_target_fps_zero_source_uses_fallback() {
992 let fi = FrameInclusion::target_fps(5.0, 6);
993 let resolved = fi.resolve_with_source_fps(0.0);
994 assert_eq!(resolved, FrameInclusion::Sampled { interval: 6 });
995 }
996
997 #[test]
998 fn resolve_noop_for_sampled() {
999 let fi = FrameInclusion::Sampled { interval: 3 };
1000 let resolved = fi.resolve_with_source_fps(30.0);
1001 assert_eq!(resolved, FrameInclusion::Sampled { interval: 3 });
1002 }
1003
1004 #[test]
1005 fn resolve_noop_for_never() {
1006 let fi = FrameInclusion::Never;
1007 let resolved = fi.resolve_with_source_fps(30.0);
1008 assert_eq!(resolved, FrameInclusion::Never);
1009 }
1010}