use super::super::*;
use std::sync::Arc;
use std::sync::atomic::{AtomicU64, Ordering};
use nv_test_util::mock_stage::NoOpStage;
use tokio::sync::broadcast;
use crate::output::{OutputSink, SharedOutput};
use crate::shutdown::{RestartPolicy, RestartTrigger};
use super::harness::*;
struct SlowSink {
count: Arc<AtomicU64>,
delay: std::time::Duration,
}
impl OutputSink for SlowSink {
fn emit(&self, _output: SharedOutput) {
self.count.fetch_add(1, Ordering::Relaxed);
std::thread::sleep(self.delay);
}
}
#[test]
fn slow_sink_does_not_block_processing() {
let count = Arc::new(AtomicU64::new(0));
let sink = SlowSink {
count: Arc::clone(&count),
delay: std::time::Duration::from_millis(100),
};
let runtime = build_runtime(50);
let handle = runtime
.add_feed(build_config(
vec![Box::new(NoOpStage::new("noop"))],
Box::new(sink),
))
.unwrap();
wait_for_stop(&handle, std::time::Duration::from_secs(10));
let m = handle.metrics();
assert!(
m.frames_processed >= 50,
"processing should complete all frames regardless of sink speed (got {})",
m.frames_processed
);
let emitted = count.load(Ordering::Relaxed);
assert!(
emitted > 0 && emitted <= m.frames_processed,
"sink should have received some (not all) outputs: got {emitted}"
);
runtime.shutdown().unwrap();
}
struct BlockingSink {
count: Arc<AtomicU64>,
}
impl OutputSink for BlockingSink {
fn emit(&self, _output: SharedOutput) {
self.count.fetch_add(1, Ordering::Relaxed);
loop {
std::thread::sleep(std::time::Duration::from_secs(60));
}
}
}
#[test]
fn bounded_shutdown_when_sink_blocks() {
let count = Arc::new(AtomicU64::new(0));
let sink = BlockingSink {
count: Arc::clone(&count),
};
let runtime = build_runtime(20);
let _handle = runtime
.add_feed(build_config(
vec![Box::new(NoOpStage::new("noop"))],
Box::new(sink),
))
.unwrap();
std::thread::sleep(std::time::Duration::from_millis(200));
let start = std::time::Instant::now();
runtime.shutdown().unwrap();
let elapsed = start.elapsed();
assert!(
elapsed < std::time::Duration::from_secs(10),
"shutdown should be bounded even with blocking sink (took {:?})",
elapsed,
);
}
#[test]
fn blocking_sink_emits_sink_timeout_event() {
let count = Arc::new(AtomicU64::new(0));
let sink = BlockingSink {
count: Arc::clone(&count),
};
let runtime = Runtime::builder()
.ingress_factory(Box::new(MockFactory::new(20)))
.health_capacity(256)
.build()
.unwrap();
let mut health_rx = runtime.health_subscribe();
let _handle = runtime
.add_feed(build_config(
vec![Box::new(NoOpStage::new("noop"))],
Box::new(sink),
))
.unwrap();
std::thread::sleep(std::time::Duration::from_millis(200));
runtime.shutdown().unwrap();
let mut saw_timeout = false;
let mut saw_panic = false;
loop {
match health_rx.try_recv() {
Ok(HealthEvent::SinkTimeout { .. }) => saw_timeout = true,
Ok(HealthEvent::SinkPanic { .. }) => saw_panic = true,
Ok(_) => {}
Err(broadcast::error::TryRecvError::Lagged(n)) => {
tracing::warn!("health subscriber lagged by {n}");
}
Err(_) => break,
}
}
assert!(
saw_timeout,
"should emit SinkTimeout when the sink thread is blocked during shutdown"
);
assert!(
!saw_panic,
"a blocked (not panicked) sink should emit SinkTimeout, not SinkPanic"
);
}
struct VerySlowSink {
count: Arc<AtomicU64>,
}
impl OutputSink for VerySlowSink {
fn emit(&self, _output: SharedOutput) {
self.count.fetch_add(1, Ordering::Relaxed);
std::thread::sleep(std::time::Duration::from_millis(500));
}
}
#[test]
fn sink_backpressure_throttling() {
let frame_count = 100u64;
let sink_count = Arc::new(AtomicU64::new(0));
let sink = VerySlowSink {
count: Arc::clone(&sink_count),
};
let runtime = Runtime::builder()
.ingress_factory(Box::new(MockFactory {
frame_count,
fail_on_start: false,
frame_delay: std::time::Duration::from_millis(1),
}))
.health_capacity(4096)
.build()
.unwrap();
let mut health_rx = runtime.health_subscribe();
let handle = runtime
.add_feed(build_config_with_restart(
vec![Box::new(NoOpStage::new("noop"))],
Box::new(sink),
RestartPolicy {
max_restarts: 0,
restart_on: RestartTrigger::Never,
..Default::default()
},
))
.unwrap();
wait_for_stop(&handle, std::time::Duration::from_secs(10));
let mut bp_events = 0u64;
let mut total_dropped_reported = 0u64;
loop {
match health_rx.try_recv() {
Ok(HealthEvent::SinkBackpressure {
outputs_dropped, ..
}) => {
bp_events += 1;
total_dropped_reported += outputs_dropped;
}
Ok(_) => continue,
Err(broadcast::error::TryRecvError::Lagged(_)) => continue,
Err(_) => break,
}
}
assert!(
bp_events > 0,
"should see at least one SinkBackpressure event"
);
assert!(
bp_events < frame_count / 2,
"throttling should coalesce SinkBackpressure events: got {bp_events} for {frame_count} frames"
);
assert!(
total_dropped_reported > 0,
"coalesced events should carry accumulated drop counts"
);
runtime.shutdown().unwrap();
}