use super::super::*;
use std::sync::Arc;
use nv_test_util::mock_stage::NoOpStage;
use tokio::sync::broadcast;
use crate::shutdown::{RestartPolicy, RestartTrigger};
use super::harness::*;
#[test]
fn output_subscription_receives_outputs() {
let runtime = Runtime::builder()
.ingress_factory(Box::new(MockFactory::new(5)))
.output_capacity(32)
.build()
.unwrap();
let mut rx = runtime.output_subscribe();
let (sink, _) = CountingSink::new();
let handle = runtime
.add_feed(build_config(
vec![Box::new(NoOpStage::new("noop"))],
Box::new(sink),
))
.unwrap();
let feed_id = handle.id();
let mut outputs = Vec::new();
let deadline = std::time::Instant::now() + std::time::Duration::from_secs(5);
loop {
match rx.try_recv() {
Ok(output) => outputs.push(output),
Err(broadcast::error::TryRecvError::Empty) => {
if !handle.is_alive() {
while let Ok(o) = rx.try_recv() {
outputs.push(o);
}
break;
}
std::thread::sleep(std::time::Duration::from_millis(10));
}
Err(broadcast::error::TryRecvError::Lagged(_)) => continue,
Err(broadcast::error::TryRecvError::Closed) => break,
}
if std::time::Instant::now() > deadline {
break;
}
}
assert!(
!outputs.is_empty(),
"should receive outputs via subscription"
);
for o in &outputs {
assert_eq!(o.feed_id, feed_id);
}
runtime.shutdown().unwrap();
}
#[test]
fn output_subscription_bounded_capacity() {
let runtime = Runtime::builder()
.ingress_factory(Box::new(MockFactory {
frame_count: 10,
fail_on_start: false,
frame_delay: std::time::Duration::ZERO,
}))
.output_capacity(2)
.build()
.unwrap();
let mut rx = runtime.output_subscribe();
let (sink, _) = CountingSink::new();
let handle = runtime
.add_feed(build_config(
vec![Box::new(NoOpStage::new("noop"))],
Box::new(sink),
))
.unwrap();
wait_for_stop(&handle, std::time::Duration::from_secs(5));
let mut received = 0u64;
let mut lagged = false;
loop {
match rx.try_recv() {
Ok(_) => received += 1,
Err(broadcast::error::TryRecvError::Lagged(n)) => {
lagged = true;
received += n;
}
Err(_) => break,
}
}
assert!(received > 0 || lagged, "should receive or detect lag");
runtime.shutdown().unwrap();
}
#[test]
fn shared_output_broadcast_is_arc() {
let runtime = Runtime::builder()
.ingress_factory(Box::new(MockFactory::new(3)))
.output_capacity(32)
.build()
.unwrap();
let mut rx = runtime.output_subscribe();
let (sink, _) = CountingSink::new();
let handle = runtime
.add_feed(build_config(
vec![Box::new(NoOpStage::new("noop"))],
Box::new(sink),
))
.unwrap();
wait_for_stop(&handle, std::time::Duration::from_secs(5));
let mut received = Vec::new();
while let Ok(output) = rx.try_recv() {
received.push(output);
}
assert!(!received.is_empty(), "should receive at least one output");
for item in &received {
assert!(
Arc::strong_count(item) >= 1,
"SharedOutput should be Arc-wrapped"
);
}
runtime.shutdown().unwrap();
}
#[test]
fn provenance_has_valid_timestamps() {
let runtime = Runtime::builder()
.ingress_factory(Box::new(MockFactory::new(1)))
.output_capacity(32)
.build()
.unwrap();
let mut rx = runtime.output_subscribe();
let (sink, _) = CountingSink::new();
let handle = runtime
.add_feed(build_config(
vec![Box::new(NoOpStage::new("noop"))],
Box::new(sink),
))
.unwrap();
wait_for_stop(&handle, std::time::Duration::from_secs(5));
let mut output = None;
while let Ok(o) = rx.try_recv() {
output = Some(o);
}
let output = output.expect("should receive at least one output");
let prov = &output.provenance;
assert!(
prov.pipeline_complete_ts >= prov.frame_receive_ts,
"pipeline_complete_ts should be >= frame_receive_ts"
);
assert_eq!(prov.stages.len(), 1, "one stage provenance entry");
let sp = &prov.stages[0];
assert!(sp.end_ts >= sp.start_ts, "stage end >= start");
assert_eq!(sp.result, crate::provenance::StageResult::Ok);
runtime.shutdown().unwrap();
}
#[test]
fn output_lag_emits_health_event() {
let runtime = Runtime::builder()
.ingress_factory(Box::new(MockFactory {
frame_count: 50,
fail_on_start: false,
frame_delay: std::time::Duration::ZERO,
}))
.output_capacity(2)
.build()
.unwrap();
let mut health_rx = runtime.health_subscribe();
let _output_rx = runtime.output_subscribe();
let (sink, _) = CountingSink::new();
let handle = runtime
.add_feed(build_config(
vec![Box::new(NoOpStage::new("noop"))],
Box::new(sink),
))
.unwrap();
wait_for_stop(&handle, std::time::Duration::from_secs(5));
let mut saw_lag_event = false;
let mut total_lost: u64 = 0;
loop {
match health_rx.try_recv() {
Ok(event) => {
if let HealthEvent::OutputLagged { messages_lost } = event {
total_lost += messages_lost;
saw_lag_event = true;
}
}
Err(broadcast::error::TryRecvError::Lagged(_)) => continue,
Err(_) => break,
}
}
assert!(
saw_lag_event,
"should emit OutputLagged when output channel is saturated"
);
assert!(total_lost > 0, "messages_lost should be nonzero");
runtime.shutdown().unwrap();
}
#[test]
fn no_lag_event_without_subscribers() {
let runtime = Runtime::builder()
.ingress_factory(Box::new(MockFactory {
frame_count: 20,
fail_on_start: false,
frame_delay: std::time::Duration::ZERO,
}))
.output_capacity(2)
.build()
.unwrap();
let mut health_rx = runtime.health_subscribe();
let (sink, _) = CountingSink::new();
let handle = runtime
.add_feed(build_config(
vec![Box::new(NoOpStage::new("noop"))],
Box::new(sink),
))
.unwrap();
wait_for_stop(&handle, std::time::Duration::from_secs(5));
let mut saw_lag = false;
loop {
match health_rx.try_recv() {
Ok(event) => {
if matches!(event, HealthEvent::OutputLagged { .. }) {
saw_lag = true;
}
}
Err(broadcast::error::TryRecvError::Lagged(_)) => continue,
Err(_) => break,
}
}
assert!(
!saw_lag,
"should not emit OutputLagged when no external subscribers"
);
runtime.shutdown().unwrap();
}
#[test]
fn lag_messages_lost_is_per_event_delta() {
let runtime = Runtime::builder()
.ingress_factory(Box::new(MockFactory {
frame_count: 30,
fail_on_start: false,
frame_delay: std::time::Duration::ZERO,
}))
.output_capacity(2)
.build()
.unwrap();
let mut health_rx = runtime.health_subscribe();
let _output_rx = runtime.output_subscribe();
let (sink, _) = CountingSink::new();
let handle = runtime
.add_feed(build_config(
vec![Box::new(NoOpStage::new("noop"))],
Box::new(sink),
))
.unwrap();
wait_for_stop(&handle, std::time::Duration::from_secs(5));
let mut deltas: Vec<u64> = Vec::new();
loop {
match health_rx.try_recv() {
Ok(event) => {
if let HealthEvent::OutputLagged { messages_lost } = event {
assert!(
messages_lost > 0,
"each lag event must have messages_lost > 0"
);
deltas.push(messages_lost);
}
}
Err(broadcast::error::TryRecvError::Lagged(_)) => continue,
Err(_) => break,
}
}
assert!(
!deltas.is_empty(),
"should have at least one OutputLagged event"
);
let total_lost: u64 = deltas.iter().sum();
assert!(total_lost > 0, "total messages lost should be > 0, got 0");
runtime.shutdown().unwrap();
}
#[test]
fn no_spurious_lag_on_subscriber_disconnect() {
let runtime = Runtime::builder()
.ingress_factory(Box::new(MockFactory {
frame_count: 1_000,
fail_on_start: false,
frame_delay: std::time::Duration::from_millis(1),
}))
.output_capacity(64)
.build()
.unwrap();
let mut health_rx = runtime.health_subscribe();
let output_rx = runtime.output_subscribe();
drop(output_rx);
let (sink, _) = CountingSink::new();
let handle = runtime
.add_feed(build_config(
vec![Box::new(NoOpStage::new("noop"))],
Box::new(sink),
))
.unwrap();
std::thread::sleep(std::time::Duration::from_millis(100));
let feed_id = handle.id();
runtime.remove_feed(feed_id).unwrap();
let mut saw_lag = false;
loop {
match health_rx.try_recv() {
Ok(event) => {
if matches!(event, HealthEvent::OutputLagged { .. }) {
saw_lag = true;
}
}
Err(broadcast::error::TryRecvError::Lagged(_)) => continue,
Err(_) => break,
}
}
assert!(
!saw_lag,
"should not emit OutputLagged when external subscriber disconnects"
);
runtime.shutdown().unwrap();
}
#[test]
fn multi_feed_lag_attribution_is_global() {
let runtime = Runtime::builder()
.ingress_factory(Box::new(MockFactory {
frame_count: 30,
fail_on_start: false,
frame_delay: std::time::Duration::ZERO,
}))
.output_capacity(2)
.build()
.unwrap();
let mut health_rx = runtime.health_subscribe();
let _output_rx = runtime.output_subscribe();
let (s1, _) = CountingSink::new();
let (s2, _) = CountingSink::new();
let h1 = runtime
.add_feed(build_config(
vec![Box::new(NoOpStage::new("noop"))],
Box::new(s1),
))
.unwrap();
let h2 = runtime
.add_feed(build_config(
vec![Box::new(NoOpStage::new("noop"))],
Box::new(s2),
))
.unwrap();
wait_for_stop(&h1, std::time::Duration::from_secs(5));
wait_for_stop(&h2, std::time::Duration::from_secs(5));
let mut lag_count = 0u64;
loop {
match health_rx.try_recv() {
Ok(event) => {
if let HealthEvent::OutputLagged { messages_lost } = event {
assert!(messages_lost > 0);
lag_count += 1;
}
}
Err(broadcast::error::TryRecvError::Lagged(_)) => continue,
Err(_) => break,
}
}
assert!(
lag_count > 0,
"multi-feed should trigger OutputLagged with tiny capacity"
);
runtime.shutdown().unwrap();
}
#[test]
fn lag_throttling_bounds_event_count() {
let frame_count = 200u64;
let runtime = Runtime::builder()
.ingress_factory(Box::new(MockFactory {
frame_count,
fail_on_start: false,
frame_delay: std::time::Duration::ZERO,
}))
.output_capacity(2)
.health_capacity(4096)
.build()
.unwrap();
let mut health_rx = runtime.health_subscribe();
let _output_rx = runtime.output_subscribe();
let (sink, _) = CountingSink::new();
let handle = runtime
.add_feed(build_config_with_restart(
vec![Box::new(NoOpStage::new("noop"))],
Box::new(sink),
RestartPolicy {
max_restarts: 0,
restart_on: RestartTrigger::Never,
..RestartPolicy::default()
},
))
.unwrap();
wait_for_stop(&handle, std::time::Duration::from_secs(5));
let mut lag_event_count = 0u64;
loop {
match health_rx.try_recv() {
Ok(event) => {
if matches!(event, HealthEvent::OutputLagged { .. }) {
lag_event_count += 1;
}
}
Err(broadcast::error::TryRecvError::Lagged(_)) => continue,
Err(_) => break,
}
}
assert!(lag_event_count > 0, "should see at least one lag event");
assert!(
lag_event_count < frame_count / 2,
"throttling should bound lag events: got {lag_event_count} for {frame_count} frames"
);
runtime.shutdown().unwrap();
}
use crate::output::{FrameInclusion, OutputSink};
use nv_core::config::{CameraMode, SourceSpec};
use nv_perception::Stage;
fn build_config_with_inclusion(
stages: Vec<Box<dyn Stage>>,
sink: Box<dyn OutputSink>,
inclusion: FrameInclusion,
) -> FeedConfig {
FeedConfig::builder()
.source(SourceSpec::rtsp("rtsp://mock/stream"))
.camera_mode(CameraMode::Fixed)
.stages(stages)
.output_sink(sink)
.frame_inclusion(inclusion)
.restart(RestartPolicy {
max_restarts: 0,
restart_on: RestartTrigger::Never,
..RestartPolicy::default()
})
.build()
.expect("valid config")
}
fn collect_outputs(
rx: &mut broadcast::Receiver<SharedOutput>,
handle: &FeedHandle,
timeout: std::time::Duration,
) -> Vec<SharedOutput> {
let deadline = std::time::Instant::now() + timeout;
let mut outputs = Vec::new();
loop {
match rx.try_recv() {
Ok(o) => outputs.push(o),
Err(broadcast::error::TryRecvError::Empty) => {
if !handle.is_alive() {
while let Ok(o) = rx.try_recv() {
outputs.push(o);
}
break;
}
if std::time::Instant::now() > deadline {
break;
}
std::thread::sleep(std::time::Duration::from_millis(5));
}
Err(broadcast::error::TryRecvError::Lagged(_)) => continue,
Err(_) => break,
}
}
outputs
}
#[test]
fn sampled_inclusion_delivers_frame_periodically() {
let frame_count = 30u64;
let interval = 6u32;
let runtime = Runtime::builder()
.ingress_factory(Box::new(MockFactory::new(frame_count)))
.output_capacity(64)
.build()
.unwrap();
let mut rx = runtime.output_subscribe();
let (sink, _) = CountingSink::new();
let handle = runtime
.add_feed(build_config_with_inclusion(
vec![Box::new(NoOpStage::new("noop"))],
Box::new(sink),
FrameInclusion::Sampled { interval },
))
.unwrap();
let outputs = collect_outputs(&mut rx, &handle, std::time::Duration::from_secs(5));
assert_eq!(
outputs.len(),
frame_count as usize,
"should receive all {frame_count} outputs"
);
let with_frame: Vec<_> = outputs.iter().filter(|o| o.frame.is_some()).collect();
let without_frame: Vec<_> = outputs.iter().filter(|o| o.frame.is_none()).collect();
let expected_with_frame = frame_count / interval as u64;
assert_eq!(
with_frame.len() as u64,
expected_with_frame,
"expected {expected_with_frame} outputs with frame, got {}",
with_frame.len()
);
assert_eq!(
without_frame.len() as u64,
frame_count - expected_with_frame,
"remaining outputs should lack frame"
);
for output in &outputs {
assert_eq!(
output.provenance.frame_included,
output.frame.is_some(),
"provenance.frame_included should match frame presence for seq {}",
output.frame_seq,
);
}
runtime.shutdown().unwrap();
}
#[test]
fn sampled_interval_zero_behaves_like_never() {
let frame_count = 10u64;
let runtime = Runtime::builder()
.ingress_factory(Box::new(MockFactory::new(frame_count)))
.output_capacity(32)
.build()
.unwrap();
let mut rx = runtime.output_subscribe();
let (sink, _) = CountingSink::new();
let handle = runtime
.add_feed(build_config_with_inclusion(
vec![Box::new(NoOpStage::new("noop"))],
Box::new(sink),
FrameInclusion::Sampled { interval: 0 },
))
.unwrap();
let outputs = collect_outputs(&mut rx, &handle, std::time::Duration::from_secs(5));
assert!(!outputs.is_empty(), "should receive outputs");
for output in &outputs {
assert!(
output.frame.is_none(),
"interval=0 should never include frames"
);
assert!(
!output.provenance.frame_included,
"provenance should report no frame"
);
}
runtime.shutdown().unwrap();
}
#[test]
fn sampled_interval_one_behaves_like_always() {
let frame_count = 10u64;
let runtime = Runtime::builder()
.ingress_factory(Box::new(MockFactory::new(frame_count)))
.output_capacity(32)
.build()
.unwrap();
let mut rx = runtime.output_subscribe();
let (sink, _) = CountingSink::new();
let handle = runtime
.add_feed(build_config_with_inclusion(
vec![Box::new(NoOpStage::new("noop"))],
Box::new(sink),
FrameInclusion::Sampled { interval: 1 },
))
.unwrap();
let outputs = collect_outputs(&mut rx, &handle, std::time::Duration::from_secs(5));
assert_eq!(outputs.len(), frame_count as usize);
for output in &outputs {
assert!(
output.frame.is_some(),
"interval=1 should always include frames"
);
assert!(
output.provenance.frame_included,
"provenance should report frame included"
);
}
runtime.shutdown().unwrap();
}
#[test]
fn frame_inclusion_always_includes_every_frame() {
let frame_count = 10u64;
let runtime = Runtime::builder()
.ingress_factory(Box::new(MockFactory::new(frame_count)))
.output_capacity(32)
.build()
.unwrap();
let mut rx = runtime.output_subscribe();
let (sink, _) = CountingSink::new();
let handle = runtime
.add_feed(build_config_with_inclusion(
vec![Box::new(NoOpStage::new("noop"))],
Box::new(sink),
FrameInclusion::Always,
))
.unwrap();
let outputs = collect_outputs(&mut rx, &handle, std::time::Duration::from_secs(5));
assert_eq!(outputs.len(), frame_count as usize);
for output in &outputs {
assert!(output.frame.is_some(), "Always should include every frame");
assert!(output.provenance.frame_included);
}
runtime.shutdown().unwrap();
}