use std::sync::Arc;
use std::time::Instant;
use nv_core::TypedMetadata;
use nv_core::health::HealthEvent;
use nv_core::id::FeedId;
use nv_core::timestamp::{MonotonicTs, WallTs};
use nv_frame::FrameEnvelope;
use nv_perception::{DerivedSignal, DetectionSet, SceneFeature, Track};
use nv_view::ViewState;
use tokio::sync::broadcast;
use crate::provenance::Provenance;
pub type SinkFactory = Box<dyn Fn() -> Box<dyn OutputSink> + Send + Sync>;
#[derive(Debug, Clone, Copy, PartialEq, Default)]
pub enum FrameInclusion {
#[default]
Never,
Always,
Sampled {
interval: u32,
},
TargetFps {
target: f32,
fallback_interval: u32,
},
}
impl FrameInclusion {
#[must_use]
pub fn sampled(interval: u32) -> Self {
match interval {
0 => Self::Never,
1 => Self::Always,
n => Self::Sampled { interval: n },
}
}
#[must_use]
pub fn target_fps(target: f32, fallback_interval: u32) -> Self {
if target <= 0.0 {
return Self::Never;
}
Self::TargetFps {
target,
fallback_interval,
}
}
#[must_use]
pub fn from_target_fps(target_fps: f32, assumed_source_fps: f32) -> Self {
if target_fps <= 0.0 {
return Self::Never;
}
if assumed_source_fps <= 0.0 || target_fps >= assumed_source_fps {
return Self::Always;
}
let interval = (assumed_source_fps / target_fps).round() as u32;
Self::sampled(interval)
}
#[must_use]
pub fn effective_interval(&self) -> u32 {
match self {
Self::Never => 0,
Self::Always => 1,
Self::Sampled { interval } => *interval,
Self::TargetFps {
fallback_interval, ..
} => *fallback_interval,
}
}
#[must_use]
pub fn resolve_with_source_fps(self, source_fps: f32) -> Self {
match self {
Self::TargetFps {
target,
fallback_interval,
} => {
if source_fps <= 0.0 {
Self::sampled(fallback_interval)
} else {
Self::from_target_fps(target, source_fps)
}
}
other => other,
}
}
}
#[derive(Debug, Clone, Copy, Default, PartialEq, Eq)]
pub struct AdmissionSummary {
pub admitted: u32,
pub rejected: u32,
}
#[derive(Debug, Clone)]
pub struct OutputEnvelope {
pub feed_id: FeedId,
pub frame_seq: u64,
pub ts: MonotonicTs,
pub wall_ts: WallTs,
pub detections: DetectionSet,
pub tracks: Vec<Track>,
pub signals: Vec<DerivedSignal>,
pub scene_features: Vec<SceneFeature>,
pub view: ViewState,
pub provenance: Provenance,
pub metadata: TypedMetadata,
pub frame: Option<FrameEnvelope>,
pub admission: AdmissionSummary,
}
pub trait OutputSink: Send + 'static {
fn emit(&self, output: Arc<OutputEnvelope>);
}
pub type SharedOutput = Arc<OutputEnvelope>;
const LAG_THROTTLE_INTERVAL: std::time::Duration = std::time::Duration::from_secs(1);
pub(crate) struct LagDetector {
sends_since_check: std::sync::atomic::AtomicU64,
inner: std::sync::Mutex<LagDetectorInner>,
capacity: usize,
throttle_interval: std::time::Duration,
}
struct LagDetectorInner {
sentinel: broadcast::Receiver<SharedOutput>,
in_lag: bool,
accumulated_lost: u64,
last_event_time: Instant,
}
impl LagDetector {
pub fn new(sentinel: broadcast::Receiver<SharedOutput>, capacity: usize) -> Self {
Self::with_config(sentinel, capacity, LAG_THROTTLE_INTERVAL)
}
fn with_config(
sentinel: broadcast::Receiver<SharedOutput>,
capacity: usize,
throttle_interval: std::time::Duration,
) -> Self {
Self {
sends_since_check: std::sync::atomic::AtomicU64::new(0),
inner: std::sync::Mutex::new(LagDetectorInner {
sentinel,
in_lag: false,
accumulated_lost: 0,
last_event_time: Instant::now(),
}),
capacity,
throttle_interval,
}
}
pub fn check_after_send(&self, health_tx: &broadcast::Sender<HealthEvent>) {
use std::sync::atomic::Ordering;
let sends = self.sends_since_check.fetch_add(1, Ordering::Relaxed) + 1;
if (sends as usize) <= self.capacity {
return;
}
let Ok(mut inner) = self.inner.try_lock() else {
return;
};
self.sends_since_check.store(0, Ordering::Relaxed);
let mut lost: u64 = 0;
loop {
match inner.sentinel.try_recv() {
Ok(_) => {} Err(broadcast::error::TryRecvError::Lagged(n)) => {
lost += n;
}
Err(broadcast::error::TryRecvError::Empty) => break,
Err(broadcast::error::TryRecvError::Closed) => break,
}
}
if lost == 0 {
if inner.in_lag && inner.accumulated_lost > 0 {
let delta = inner.accumulated_lost;
inner.accumulated_lost = 0;
inner.in_lag = false;
inner.last_event_time = Instant::now();
drop(inner);
let _ = health_tx.send(HealthEvent::OutputLagged {
messages_lost: delta,
});
} else {
inner.in_lag = false;
}
return;
}
inner.accumulated_lost += lost;
let should_emit = if !inner.in_lag {
inner.in_lag = true;
true
} else {
inner.last_event_time.elapsed() >= self.throttle_interval
};
if should_emit {
let delta = inner.accumulated_lost;
inner.accumulated_lost = 0;
inner.last_event_time = Instant::now();
drop(inner);
let _ = health_tx.send(HealthEvent::OutputLagged {
messages_lost: delta,
});
}
}
pub fn status(&self) -> crate::diagnostics::OutputLagStatus {
let inner = self.inner.lock().unwrap_or_else(|e| e.into_inner());
crate::diagnostics::OutputLagStatus {
in_lag: inner.in_lag,
pending_lost: inner.accumulated_lost,
}
}
pub fn realign(&self, health_tx: &broadcast::Sender<HealthEvent>) {
let mut inner = self.inner.lock().unwrap_or_else(|e| e.into_inner());
if inner.accumulated_lost > 0 {
let delta = inner.accumulated_lost;
inner.accumulated_lost = 0;
let _ = health_tx.send(HealthEvent::OutputLagged {
messages_lost: delta,
});
}
loop {
match inner.sentinel.try_recv() {
Ok(_) => {}
Err(broadcast::error::TryRecvError::Lagged(_)) => {}
Err(broadcast::error::TryRecvError::Empty) => break,
Err(broadcast::error::TryRecvError::Closed) => break,
}
}
inner.in_lag = false;
inner.last_event_time = Instant::now();
self.sends_since_check
.store(0, std::sync::atomic::Ordering::Relaxed);
}
pub fn flush(&self, health_tx: &broadcast::Sender<HealthEvent>) {
let mut inner = self.inner.lock().unwrap_or_else(|e| e.into_inner());
if inner.accumulated_lost > 0 {
let delta = inner.accumulated_lost;
inner.accumulated_lost = 0;
inner.in_lag = false;
let _ = health_tx.send(HealthEvent::OutputLagged {
messages_lost: delta,
});
}
}
}
#[cfg(test)]
mod tests {
use super::*;
use std::sync::Arc;
use std::time::Duration;
use nv_core::TypedMetadata;
use nv_core::health::HealthEvent;
use nv_core::id::FeedId;
use nv_core::timestamp::{MonotonicTs, WallTs};
use nv_perception::DetectionSet;
use nv_view::ViewState;
use tokio::sync::broadcast;
use crate::provenance::{Provenance, ViewProvenance};
fn make_dummy_output() -> SharedOutput {
Arc::new(OutputEnvelope {
feed_id: FeedId::new(0),
frame_seq: 0,
ts: MonotonicTs::from_nanos(0),
wall_ts: WallTs::from_micros(0),
detections: DetectionSet::empty(),
tracks: Vec::new(),
signals: Vec::new(),
scene_features: Vec::new(),
view: ViewState::fixed_initial(),
provenance: Provenance {
stages: Vec::new(),
view_provenance: ViewProvenance {
motion_source: nv_view::MotionSource::None,
epoch_decision: None,
transition: nv_view::TransitionPhase::Settled,
stability_score: 1.0,
epoch: nv_view::view_state::ViewEpoch::INITIAL,
version: nv_view::view_state::ViewVersion::INITIAL,
},
frame_receive_ts: MonotonicTs::from_nanos(0),
pipeline_complete_ts: MonotonicTs::from_nanos(0),
total_latency: nv_core::Duration::from_nanos(0),
frame_age: None,
queue_hold_time: std::time::Duration::ZERO,
frame_included: false,
},
metadata: TypedMetadata::new(),
frame: None,
admission: AdmissionSummary::default(),
})
}
fn make_detector(
capacity: usize,
throttle: Duration,
) -> (broadcast::Sender<SharedOutput>, LagDetector) {
let (tx, sentinel_rx) = broadcast::channel(capacity);
let detector = LagDetector::with_config(sentinel_rx, capacity, throttle);
(tx, detector)
}
fn make_health() -> (
broadcast::Sender<HealthEvent>,
broadcast::Receiver<HealthEvent>,
) {
broadcast::channel(128)
}
fn collect_lag_deltas(rx: &mut broadcast::Receiver<HealthEvent>) -> Vec<u64> {
let mut deltas = Vec::new();
while let Ok(evt) = rx.try_recv() {
if let HealthEvent::OutputLagged { messages_lost } = evt {
deltas.push(messages_lost);
}
}
deltas
}
#[test]
fn delta_not_cumulative_exact() {
let capacity = 4;
let (tx, detector) = make_detector(capacity, Duration::ZERO);
let (health_tx, mut health_rx) = make_health();
for _ in 0..capacity + 1 {
let _ = tx.send(make_dummy_output());
detector.check_after_send(&health_tx);
}
let d1 = collect_lag_deltas(&mut health_rx);
for _ in 0..capacity + 1 {
let _ = tx.send(make_dummy_output());
detector.check_after_send(&health_tx);
}
let d2 = collect_lag_deltas(&mut health_rx);
assert!(!d1.is_empty(), "first interval should emit a lag event");
assert!(!d2.is_empty(), "second interval should emit a lag event");
let sum1: u64 = d1.iter().sum();
let sum2: u64 = d2.iter().sum();
assert_eq!(sum1, sum2, "equal loss intervals must produce equal deltas");
}
#[test]
fn throttle_blocks_event_storm() {
let capacity = 4;
let (tx, detector) = make_detector(capacity, Duration::from_secs(1));
let (health_tx, mut health_rx) = make_health();
for _ in 0..10 {
for _ in 0..capacity + 1 {
let _ = tx.send(make_dummy_output());
detector.check_after_send(&health_tx);
}
}
let deltas = collect_lag_deltas(&mut health_rx);
assert_eq!(
deltas.len(),
1,
"throttle should block storm: got {:?}",
deltas
);
assert!(deltas[0] > 0, "emitted delta must be positive");
}
#[test]
fn throttle_allows_periodic_emission() {
let capacity = 4;
let throttle = Duration::from_millis(10);
let (tx, detector) = make_detector(capacity, throttle);
let (health_tx, mut health_rx) = make_health();
for _ in 0..capacity + 1 {
let _ = tx.send(make_dummy_output());
detector.check_after_send(&health_tx);
}
let d1 = collect_lag_deltas(&mut health_rx);
assert_eq!(d1.len(), 1, "transition event");
std::thread::sleep(throttle + Duration::from_millis(5));
for _ in 0..capacity + 1 {
let _ = tx.send(make_dummy_output());
detector.check_after_send(&health_tx);
}
let d2 = collect_lag_deltas(&mut health_rx);
assert_eq!(d2.len(), 1, "periodic emission after interval elapsed");
assert!(d2[0] > 0, "periodic delta must be positive");
}
#[test]
fn no_subscriber_reset_prevents_false_positive() {
let capacity = 4;
let (tx, detector) = make_detector(capacity, Duration::ZERO);
let (health_tx, mut health_rx) = make_health();
for _ in 0..capacity {
let _ = tx.send(make_dummy_output());
detector.check_after_send(&health_tx);
}
let d = collect_lag_deltas(&mut health_rx);
assert!(d.is_empty(), "no lag before buffer wraps");
detector.realign(&health_tx);
let flushed = collect_lag_deltas(&mut health_rx);
assert!(flushed.is_empty(), "no pending loss to flush on realign");
for _ in 0..capacity + 1 {
let _ = tx.send(make_dummy_output());
detector.check_after_send(&health_tx);
}
let d3 = collect_lag_deltas(&mut health_rx);
assert!(
!d3.is_empty(),
"new window should produce its own lag events"
);
let total: u64 = d3.iter().sum();
assert!(
total > 0 && total <= 2,
"delta should reflect only new-window loss"
);
}
#[test]
fn flush_pending_emits_final_delta() {
let capacity = 4;
let (tx, detector) = make_detector(capacity, Duration::from_secs(60));
let (health_tx, mut health_rx) = make_health();
for _ in 0..capacity + 1 {
let _ = tx.send(make_dummy_output());
detector.check_after_send(&health_tx);
}
let d1 = collect_lag_deltas(&mut health_rx);
assert_eq!(d1.len(), 1, "transition event emitted");
for _ in 0..capacity + 1 {
let _ = tx.send(make_dummy_output());
detector.check_after_send(&health_tx);
}
let d2 = collect_lag_deltas(&mut health_rx);
assert!(d2.is_empty(), "throttled — no event emitted yet");
detector.flush(&health_tx);
let d3 = collect_lag_deltas(&mut health_rx);
assert_eq!(d3.len(), 1, "flush must emit exactly one event");
assert!(d3[0] > 0, "flushed delta must be positive");
detector.flush(&health_tx);
let d4 = collect_lag_deltas(&mut health_rx);
assert!(d4.is_empty(), "double flush must not emit");
}
#[test]
fn realign_flushes_pending_before_reset() {
let capacity = 4;
let (tx, detector) = make_detector(capacity, Duration::from_secs(60));
let (health_tx, mut health_rx) = make_health();
for _ in 0..capacity + 1 {
let _ = tx.send(make_dummy_output());
detector.check_after_send(&health_tx);
}
let _ = collect_lag_deltas(&mut health_rx);
for _ in 0..capacity + 1 {
let _ = tx.send(make_dummy_output());
detector.check_after_send(&health_tx);
}
let pending = collect_lag_deltas(&mut health_rx);
assert!(pending.is_empty(), "loss stays pending under throttle");
detector.realign(&health_tx);
let flushed = collect_lag_deltas(&mut health_rx);
assert_eq!(flushed.len(), 1, "realign must flush pending loss");
assert!(flushed[0] > 0, "flushed delta must be positive");
detector.flush(&health_tx);
let after = collect_lag_deltas(&mut health_rx);
assert!(after.is_empty(), "no pending loss after realign + flush");
}
#[test]
fn sampled_zero_normalizes_to_never() {
assert_eq!(FrameInclusion::sampled(0), FrameInclusion::Never);
}
#[test]
fn sampled_one_normalizes_to_always() {
assert_eq!(FrameInclusion::sampled(1), FrameInclusion::Always);
}
#[test]
fn sampled_above_one_creates_sampled() {
assert_eq!(
FrameInclusion::sampled(6),
FrameInclusion::Sampled { interval: 6 },
);
}
#[test]
fn from_target_fps_5_at_30_yields_interval_6() {
assert_eq!(
FrameInclusion::from_target_fps(5.0, 30.0),
FrameInclusion::Sampled { interval: 6 },
);
}
#[test]
fn from_target_fps_10_at_30_yields_interval_3() {
assert_eq!(
FrameInclusion::from_target_fps(10.0, 30.0),
FrameInclusion::Sampled { interval: 3 },
);
}
#[test]
fn from_target_fps_zero_is_never() {
assert_eq!(
FrameInclusion::from_target_fps(0.0, 30.0),
FrameInclusion::Never,
);
}
#[test]
fn from_target_fps_negative_is_never() {
assert_eq!(
FrameInclusion::from_target_fps(-5.0, 30.0),
FrameInclusion::Never,
);
}
#[test]
fn from_target_fps_above_source_is_always() {
assert_eq!(
FrameInclusion::from_target_fps(60.0, 30.0),
FrameInclusion::Always,
);
}
#[test]
fn from_target_fps_equal_to_source_is_always() {
assert_eq!(
FrameInclusion::from_target_fps(30.0, 30.0),
FrameInclusion::Always,
);
}
#[test]
fn from_target_fps_with_zero_source_is_always() {
assert_eq!(
FrameInclusion::from_target_fps(5.0, 0.0),
FrameInclusion::Always,
);
}
#[test]
fn effective_interval_values() {
assert_eq!(FrameInclusion::Never.effective_interval(), 0);
assert_eq!(FrameInclusion::Always.effective_interval(), 1);
assert_eq!(
FrameInclusion::Sampled { interval: 6 }.effective_interval(),
6,
);
assert_eq!(
FrameInclusion::TargetFps {
target: 5.0,
fallback_interval: 6
}
.effective_interval(),
6,
);
}
#[test]
fn target_fps_zero_is_never() {
assert_eq!(FrameInclusion::target_fps(0.0, 6), FrameInclusion::Never);
}
#[test]
fn target_fps_negative_is_never() {
assert_eq!(FrameInclusion::target_fps(-5.0, 6), FrameInclusion::Never);
}
#[test]
fn target_fps_positive_creates_variant() {
assert_eq!(
FrameInclusion::target_fps(5.0, 6),
FrameInclusion::TargetFps {
target: 5.0,
fallback_interval: 6
},
);
}
#[test]
fn resolve_target_fps_with_30_source() {
let fi = FrameInclusion::target_fps(5.0, 6);
let resolved = fi.resolve_with_source_fps(30.0);
assert_eq!(resolved, FrameInclusion::Sampled { interval: 6 });
}
#[test]
fn resolve_target_fps_with_25_source() {
let fi = FrameInclusion::target_fps(5.0, 6);
let resolved = fi.resolve_with_source_fps(25.0);
assert_eq!(resolved, FrameInclusion::Sampled { interval: 5 });
}
#[test]
fn resolve_target_fps_with_15_source() {
let fi = FrameInclusion::target_fps(5.0, 6);
let resolved = fi.resolve_with_source_fps(15.0);
assert_eq!(resolved, FrameInclusion::Sampled { interval: 3 });
}
#[test]
fn resolve_target_fps_above_source_is_always() {
let fi = FrameInclusion::target_fps(60.0, 6);
let resolved = fi.resolve_with_source_fps(30.0);
assert_eq!(resolved, FrameInclusion::Always);
}
#[test]
fn resolve_target_fps_zero_source_uses_fallback() {
let fi = FrameInclusion::target_fps(5.0, 6);
let resolved = fi.resolve_with_source_fps(0.0);
assert_eq!(resolved, FrameInclusion::Sampled { interval: 6 });
}
#[test]
fn resolve_noop_for_sampled() {
let fi = FrameInclusion::Sampled { interval: 3 };
let resolved = fi.resolve_with_source_fps(30.0);
assert_eq!(resolved, FrameInclusion::Sampled { interval: 3 });
}
#[test]
fn resolve_noop_for_never() {
let fi = FrameInclusion::Never;
let resolved = fi.resolve_with_source_fps(30.0);
assert_eq!(resolved, FrameInclusion::Never);
}
}