use super::*;
use crate::backend::{BackendKind, DacBackend, FifoBackend, WriteOutcome};
use crate::buffer_estimate::{BufferEstimator, SoftwareDecayEstimator};
use std::sync::atomic::{AtomicU64, AtomicUsize, Ordering};
use std::sync::{Arc, Mutex};
use std::time::Instant;
#[derive(Clone)]
struct SharedEstimator(Arc<Mutex<SoftwareDecayEstimator>>);
impl SharedEstimator {
fn new() -> Self {
Self(Arc::new(Mutex::new(SoftwareDecayEstimator::new())))
}
fn seed(&self, n: u64, pps: u32) {
let mut g = self.0.lock().unwrap();
*g = SoftwareDecayEstimator::new();
if n > 0 {
g.record_send(Instant::now(), n, pps);
}
}
fn record_send(&self, now: Instant, n: u64, pps: u32) {
self.0.lock().unwrap().record_send(now, n, pps);
}
}
impl BufferEstimator for SharedEstimator {
fn estimated_fullness(&self, now: Instant, pps: u32) -> u64 {
self.0.lock().unwrap().estimated_fullness(now, pps)
}
}
struct TestBackend {
caps: DacCapabilities,
connected: bool,
write_count: Arc<AtomicUsize>,
would_block_count: Arc<AtomicUsize>,
queued: Arc<AtomicU64>,
shutter_open: Arc<AtomicBool>,
estimator: SharedEstimator,
}
impl TestBackend {
fn new() -> Self {
Self {
caps: DacCapabilities {
pps_min: 1000,
pps_max: 100000,
max_points_per_chunk: 1000,
output_model: crate::device::OutputModel::NetworkFifo,
},
connected: false,
write_count: Arc::new(AtomicUsize::new(0)),
would_block_count: Arc::new(AtomicUsize::new(0)),
queued: Arc::new(AtomicU64::new(0)),
shutter_open: Arc::new(AtomicBool::new(false)),
estimator: SharedEstimator::new(),
}
}
fn with_max_points_per_chunk(mut self, max_points_per_chunk: usize) -> Self {
self.caps.max_points_per_chunk = max_points_per_chunk;
self
}
fn with_would_block_count(mut self, count: usize) -> Self {
self.would_block_count = Arc::new(AtomicUsize::new(count));
self
}
fn with_output_model(mut self, model: OutputModel) -> Self {
self.caps.output_model = model;
self
}
fn with_initial_queue(self, queue: u64) -> Self {
self.queued.store(queue, Ordering::SeqCst);
self.estimator.seed(queue, 30_000);
self
}
}
impl DacBackend for TestBackend {
fn dac_type(&self) -> DacType {
DacType::Custom("Test".to_string())
}
fn caps(&self) -> &DacCapabilities {
&self.caps
}
fn connect(&mut self) -> Result<()> {
self.connected = true;
Ok(())
}
fn disconnect(&mut self) -> Result<()> {
self.connected = false;
Ok(())
}
fn is_connected(&self) -> bool {
self.connected
}
fn stop(&mut self) -> Result<()> {
Ok(())
}
fn set_shutter(&mut self, open: bool) -> Result<()> {
self.shutter_open.store(open, Ordering::SeqCst);
Ok(())
}
}
impl FifoBackend for TestBackend {
fn try_write_points(&mut self, pps: u32, points: &[LaserPoint]) -> Result<WriteOutcome> {
self.write_count.fetch_add(1, Ordering::SeqCst);
let remaining = self.would_block_count.load(Ordering::SeqCst);
if remaining > 0 {
self.would_block_count.fetch_sub(1, Ordering::SeqCst);
return Ok(WriteOutcome::WouldBlock);
}
self.queued.fetch_add(points.len() as u64, Ordering::SeqCst);
self.estimator
.record_send(Instant::now(), points.len() as u64, pps);
Ok(WriteOutcome::Written)
}
fn estimator(&self) -> &dyn BufferEstimator {
&self.estimator
}
}
struct FrameSwapTestBackend {
inner: TestBackend,
frame_capacity: usize,
ready: Arc<AtomicBool>,
}
impl FrameSwapTestBackend {
fn new() -> Self {
Self {
inner: TestBackend::new().with_output_model(OutputModel::UsbFrameSwap),
frame_capacity: 4095,
ready: Arc::new(AtomicBool::new(true)),
}
}
}
impl DacBackend for FrameSwapTestBackend {
fn dac_type(&self) -> DacType {
self.inner.dac_type()
}
fn caps(&self) -> &DacCapabilities {
self.inner.caps()
}
fn connect(&mut self) -> Result<()> {
self.inner.connect()
}
fn disconnect(&mut self) -> Result<()> {
self.inner.disconnect()
}
fn is_connected(&self) -> bool {
self.inner.is_connected()
}
fn stop(&mut self) -> Result<()> {
self.inner.stop()
}
fn set_shutter(&mut self, open: bool) -> Result<()> {
self.inner.set_shutter(open)
}
}
impl crate::backend::FrameSwapBackend for FrameSwapTestBackend {
fn frame_capacity(&self) -> usize {
self.frame_capacity
}
fn is_ready_for_frame(&mut self) -> bool {
self.ready.load(Ordering::SeqCst)
}
fn write_frame(&mut self, pps: u32, points: &[LaserPoint]) -> Result<WriteOutcome> {
if !self.ready.load(Ordering::SeqCst) {
return Ok(WriteOutcome::WouldBlock);
}
self.inner.try_write_points(pps, points)
}
}
#[test]
fn test_stream_control_arm_disarm() {
let (tx, _rx) = mpsc::channel();
let control = StreamControl::new(tx, Duration::ZERO, 30_000);
assert!(!control.is_armed());
control.arm().unwrap();
assert!(control.is_armed());
control.disarm().unwrap();
assert!(!control.is_armed());
}
#[test]
fn test_stream_control_stop() {
let (tx, _rx) = mpsc::channel();
let control = StreamControl::new(tx, Duration::ZERO, 30_000);
assert!(!control.is_stop_requested());
control.stop().unwrap();
assert!(control.is_stop_requested());
}
#[test]
fn test_stream_control_clone_shares_state() {
let (tx, _rx) = mpsc::channel();
let control1 = StreamControl::new(tx, Duration::ZERO, 30_000);
let control2 = control1.clone();
control1.arm().unwrap();
assert!(control2.is_armed());
control2.stop().unwrap();
assert!(control1.is_stop_requested());
}
#[test]
fn test_device_start_stream_connects_backend() {
let backend = TestBackend::new();
let device = Dac::new(
test_info(backend.caps()),
BackendKind::Fifo(Box::new(backend)),
);
assert!(!device.is_connected());
let (stream, _info) = device.start_stream(StreamConfig::new(30000)).unwrap();
assert!(stream.backend.as_ref().unwrap().is_connected());
}
#[test]
fn test_device_start_stream_promotes_untouched_defaults_for_network_backends() {
let mut backend = TestBackend::new();
backend.caps.output_model = OutputModel::NetworkFifo;
let device = Dac::new(
test_info(backend.caps()),
BackendKind::Fifo(Box::new(backend)),
);
let (stream, _info) = device.start_stream(StreamConfig::new(30_000)).unwrap();
assert_eq!(
stream.config.target_buffer,
StreamConfig::NETWORK_DEFAULT_TARGET_BUFFER
);
}
#[test]
fn test_device_start_stream_keeps_explicit_network_buffer_settings() {
let mut backend = TestBackend::new();
backend.caps.output_model = OutputModel::UdpTimed;
let device = Dac::new(
test_info(backend.caps()),
BackendKind::Fifo(Box::new(backend)),
);
let cfg = StreamConfig::new(30_000).with_target_buffer(Duration::from_millis(12));
let (stream, _info) = device.start_stream(cfg).unwrap();
assert_eq!(stream.config.target_buffer, Duration::from_millis(12));
}
#[test]
fn test_handle_underrun_advances_state() {
let backend = TestBackend::new();
let queued = backend.queued.clone();
let mut stream = make_test_stream(backend);
let initial_instant = stream.state.current_instant;
let initial_queued = queued.load(Ordering::SeqCst);
let initial_chunks = stream.state.stats.chunks_written;
let initial_points = stream.state.stats.points_written;
let req = ChunkRequest {
start: StreamInstant::new(0),
pps: 30000,
target_points: 100,
};
stream.handle_underrun(&req).unwrap();
assert!(stream.state.current_instant > initial_instant);
assert!(queued.load(Ordering::SeqCst) > initial_queued);
assert_eq!(stream.state.stats.chunks_written, initial_chunks + 1);
assert_eq!(stream.state.stats.points_written, initial_points + 100);
assert_eq!(stream.state.stats.underrun_count, 1);
}
#[test]
fn test_run_retries_on_would_block() {
let backend = TestBackend::new().with_would_block_count(3);
let write_count = backend.write_count.clone();
let stream = make_test_stream(backend);
let produced_count = Arc::new(AtomicUsize::new(0));
let produced_count_clone = produced_count.clone();
let result = stream.run(
move |req, buffer| {
let count = produced_count_clone.fetch_add(1, Ordering::SeqCst);
if count < 1 {
let n = req.target_points.min(buffer.len());
for pt in buffer.iter_mut().take(n) {
*pt = LaserPoint::blanked(0.0, 0.0);
}
ChunkResult::Filled(n)
} else {
ChunkResult::End
}
},
|_e| {},
);
assert_eq!(result.unwrap(), RunExit::ProducerEnded);
assert!(write_count.load(Ordering::SeqCst) >= 1);
}
#[test]
fn test_arm_opens_shutter_disarm_closes_shutter() {
let backend = TestBackend::new();
let shutter_open = backend.shutter_open.clone();
let mut stream = make_test_stream(backend);
assert!(!shutter_open.load(Ordering::SeqCst));
let control = stream.control();
control.arm().unwrap();
let stopped = stream.process_control_messages();
assert!(!stopped);
assert!(shutter_open.load(Ordering::SeqCst));
control.disarm().unwrap();
let stopped = stream.process_control_messages();
assert!(!stopped);
assert!(!shutter_open.load(Ordering::SeqCst));
}
#[test]
fn test_handle_underrun_blanks_when_disarmed() {
let cfg = StreamConfig::new(30000).with_idle_policy(IdlePolicy::RepeatLast);
let mut stream = make_test_stream_with_cfg(TestBackend::new(), cfg);
let colored_point = LaserPoint::new(0.5, 0.5, 65535, 65535, 65535, 65535);
for i in 0..100 {
stream.state.last_chunk[i] = colored_point;
}
stream.state.last_chunk_len = 100;
assert!(!stream.control.is_armed());
let req = ChunkRequest {
start: StreamInstant::new(0),
pps: 30000,
target_points: 100,
};
stream.handle_underrun(&req).unwrap();
assert_eq!(stream.state.last_chunk[0].r, 65535); }
#[test]
fn test_stop_closes_shutter() {
let backend = TestBackend::new();
let shutter_open = backend.shutter_open.clone();
let mut stream = make_test_stream(backend);
stream.control.arm().unwrap();
stream.process_control_messages();
assert!(shutter_open.load(Ordering::SeqCst));
stream.stop().unwrap();
assert!(!shutter_open.load(Ordering::SeqCst));
}
#[test]
fn test_arm_disarm_arm_cycle() {
let backend = TestBackend::new();
let shutter_open = backend.shutter_open.clone();
let mut stream = make_test_stream(backend);
let control = stream.control();
assert!(!control.is_armed());
assert!(!shutter_open.load(Ordering::SeqCst));
control.arm().unwrap();
stream.process_control_messages();
assert!(control.is_armed());
assert!(shutter_open.load(Ordering::SeqCst));
control.disarm().unwrap();
stream.process_control_messages();
assert!(!control.is_armed());
assert!(!shutter_open.load(Ordering::SeqCst));
control.arm().unwrap();
stream.process_control_messages();
assert!(control.is_armed());
assert!(shutter_open.load(Ordering::SeqCst));
}
#[test]
fn test_run_buffer_driven_behavior() {
let backend = TestBackend::new();
let write_count = backend.write_count.clone();
let cfg = StreamConfig::new(30000).with_target_buffer(Duration::from_millis(10));
let stream = make_test_stream_with_cfg(backend, cfg);
let call_count = Arc::new(AtomicUsize::new(0));
let call_count_clone = call_count.clone();
let result = stream.run(
move |req, buffer| {
let count = call_count_clone.fetch_add(1, Ordering::SeqCst);
if count >= 4 {
ChunkResult::End
} else {
let n = req.target_points.min(buffer.len()).min(100);
for pt in buffer.iter_mut().take(n) {
*pt = LaserPoint::blanked(0.0, 0.0);
}
ChunkResult::Filled(n)
}
},
|_e| {},
);
assert_eq!(result.unwrap(), RunExit::ProducerEnded);
assert!(
write_count.load(Ordering::SeqCst) >= 4,
"Should have written multiple chunks"
);
}
#[test]
fn test_run_uses_configured_target_buffer() {
let cfg = StreamConfig::new(1_000)
.with_target_buffer(Duration::from_millis(100))
.with_drain_timeout(Duration::ZERO);
let stream = make_test_stream_with_cfg(TestBackend::new(), cfg);
let observed = Arc::new(AtomicUsize::new(0));
let observed_clone = observed.clone();
let result = stream.run(
move |req, _buffer| {
observed_clone.store(req.target_points, Ordering::SeqCst);
ChunkResult::End
},
|_e| {},
);
assert_eq!(result.unwrap(), RunExit::ProducerEnded);
assert_eq!(observed.load(Ordering::SeqCst), 100);
}
#[test]
fn test_run_sleeps_when_buffer_healthy() {
use std::time::Instant;
let cfg = StreamConfig::new(30000)
.with_target_buffer(Duration::from_millis(5))
.with_drain_timeout(Duration::ZERO);
let stream = make_test_stream_with_cfg(TestBackend::new(), cfg);
let call_count = Arc::new(AtomicUsize::new(0));
let call_count_clone = call_count.clone();
let start_time = Instant::now();
let result = stream.run(
move |req, buffer| {
let count = call_count_clone.fetch_add(1, Ordering::SeqCst);
if count >= 2 {
ChunkResult::End
} else {
let n = req.target_points.min(buffer.len());
for pt in buffer.iter_mut().take(n) {
*pt = LaserPoint::blanked(0.0, 0.0);
}
ChunkResult::Filled(n)
}
},
|_e| {},
);
assert_eq!(result.unwrap(), RunExit::ProducerEnded);
let elapsed = start_time.elapsed();
assert!(
elapsed.as_millis() < 100,
"Elapsed time {:?} is too long for test",
elapsed
);
}
#[test]
fn test_run_stops_on_control_stop() {
use std::thread;
let stream = make_test_stream(TestBackend::new());
let control = stream.control();
let control_clone = control.clone();
thread::spawn(move || {
thread::sleep(Duration::from_millis(20));
control_clone.stop().unwrap();
});
let result = stream.run(
|req, buffer| {
let n = req.target_points.min(buffer.len()).min(10);
for pt in buffer.iter_mut().take(n) {
*pt = LaserPoint::blanked(0.0, 0.0);
}
ChunkResult::Filled(n)
},
|_e| {},
);
assert_eq!(result.unwrap(), RunExit::Stopped);
}
#[test]
fn test_run_producer_ended() {
let stream = make_test_stream(TestBackend::new());
let call_count = Arc::new(AtomicUsize::new(0));
let call_count_clone = call_count.clone();
let result = stream.run(
move |req, buffer| {
let count = call_count_clone.fetch_add(1, Ordering::SeqCst);
if count == 0 {
let n = req.target_points.min(buffer.len()).min(100);
for pt in buffer.iter_mut().take(n) {
*pt = LaserPoint::blanked(0.0, 0.0);
}
ChunkResult::Filled(n)
} else {
ChunkResult::End
}
},
|_e| {},
);
assert_eq!(result.unwrap(), RunExit::ProducerEnded);
assert_eq!(call_count.load(Ordering::SeqCst), 2);
}
#[test]
fn test_run_starved_applies_underrun_policy() {
let backend = TestBackend::new();
let queued = backend.queued.clone();
let cfg = StreamConfig::new(30000).with_idle_policy(IdlePolicy::Blank);
let stream = make_test_stream_with_cfg(backend, cfg);
let call_count = Arc::new(AtomicUsize::new(0));
let call_count_clone = call_count.clone();
let result = stream.run(
move |_req, _buffer| {
let count = call_count_clone.fetch_add(1, Ordering::SeqCst);
if count == 0 {
ChunkResult::Starved
} else {
ChunkResult::End
}
},
|_e| {},
);
assert_eq!(result.unwrap(), RunExit::ProducerEnded);
assert!(
queued.load(Ordering::SeqCst) > 0,
"Underrun policy should have written blank points"
);
}
#[test]
fn test_run_filled_zero_with_target_treated_as_starved() {
let backend = TestBackend::new();
let queued = backend.queued.clone();
let cfg = StreamConfig::new(30000).with_idle_policy(IdlePolicy::Blank);
let stream = make_test_stream_with_cfg(backend, cfg);
let call_count = Arc::new(AtomicUsize::new(0));
let call_count_clone = call_count.clone();
let result = stream.run(
move |_req, _buffer| {
let count = call_count_clone.fetch_add(1, Ordering::SeqCst);
if count == 0 {
ChunkResult::Filled(0)
} else {
ChunkResult::End
}
},
|_e| {},
);
assert_eq!(result.unwrap(), RunExit::ProducerEnded);
assert!(
queued.load(Ordering::SeqCst) > 0,
"Filled(0) with target > 0 should trigger underrun and write blank points"
);
}
#[test]
fn test_estimate_buffer_reads_backend_estimator() {
let backend = TestBackend::new().with_initial_queue(300);
let estimator = backend.estimator.clone();
let stream = make_test_stream(backend);
assert_eq!(stream.estimate_buffer_points(), 300);
estimator.seed(800, stream.config.pps);
assert_eq!(stream.estimate_buffer_points(), 800);
}
#[test]
fn test_build_fill_request_calculates_target_points() {
let cfg = StreamConfig::new(30000).with_target_buffer(Duration::from_millis(40));
let backend = TestBackend::new();
let estimator = backend.estimator.clone();
let stream = make_test_stream_with_cfg(backend, cfg);
estimator.seed(0, 30_000);
let req = stream.build_fill_request(1000, stream.estimate_buffer_points());
assert_eq!(req.target_points, 1000);
estimator.seed(500, 30_000);
let req = stream.build_fill_request(1000, stream.estimate_buffer_points());
assert_eq!(req.target_points, 700);
estimator.seed(1200, 30_000);
let req = stream.build_fill_request(1000, stream.estimate_buffer_points());
assert_eq!(req.target_points, 0);
}
#[test]
fn test_fill_result_filled_writes_points_and_updates_state() {
let backend = TestBackend::new();
let queued = backend.queued.clone();
let stream = make_test_stream(backend);
let points_written = Arc::new(AtomicUsize::new(0));
let points_written_clone = points_written.clone();
let call_count = Arc::new(AtomicUsize::new(0));
let call_count_clone = call_count.clone();
let result = stream.run(
move |req, buffer| {
let count = call_count_clone.fetch_add(1, Ordering::SeqCst);
if count < 3 {
let n = req.target_points.min(50);
for (i, pt) in buffer.iter_mut().enumerate().take(n) {
*pt = LaserPoint::new(0.1 * i as f32, 0.2 * i as f32, 1000, 2000, 3000, 4000);
}
points_written_clone.fetch_add(n, Ordering::SeqCst);
ChunkResult::Filled(n)
} else {
ChunkResult::End
}
},
|_e| {},
);
assert_eq!(result.unwrap(), RunExit::ProducerEnded);
let total_queued = queued.load(Ordering::SeqCst);
let total_written = points_written.load(Ordering::SeqCst);
assert!(
total_queued > 0,
"Points should have been queued to backend"
);
assert!(
total_queued as usize >= total_written,
"Queued points ({}) should be at least written points ({})",
total_queued,
total_written
);
}
#[test]
fn test_fill_result_filled_updates_last_chunk_when_armed() {
let cfg = StreamConfig::new(30000).with_idle_policy(IdlePolicy::RepeatLast);
let stream = make_test_stream_with_cfg(TestBackend::new(), cfg);
let control = stream.control();
control.arm().unwrap();
let call_count = Arc::new(AtomicUsize::new(0));
let call_count_clone = call_count.clone();
let result = stream.run(
move |req, buffer| {
let count = call_count_clone.fetch_add(1, Ordering::SeqCst);
if count == 0 {
let n = req.target_points.min(10);
for pt in buffer.iter_mut().take(n) {
*pt = LaserPoint::new(0.5, 0.5, 10000, 20000, 30000, 40000);
}
ChunkResult::Filled(n)
} else if count == 1 {
ChunkResult::Starved
} else {
ChunkResult::End
}
},
|_e| {},
);
assert_eq!(result.unwrap(), RunExit::ProducerEnded);
}
#[test]
fn test_fill_result_starved_repeat_last_with_stored_chunk() {
let backend = TestBackend::new();
let queued = backend.queued.clone();
let cfg = StreamConfig::new(30000).with_idle_policy(IdlePolicy::RepeatLast);
let stream = make_test_stream_with_cfg(backend, cfg);
let control = stream.control();
control.arm().unwrap();
let call_count = Arc::new(AtomicUsize::new(0));
let call_count_clone = call_count.clone();
let result = stream.run(
move |req, buffer| {
let count = call_count_clone.fetch_add(1, Ordering::SeqCst);
if count == 0 {
let n = req.target_points.min(50);
for pt in buffer.iter_mut().take(n) {
*pt = LaserPoint::new(0.3, 0.3, 5000, 5000, 5000, 5000);
}
ChunkResult::Filled(n)
} else if count == 1 {
ChunkResult::Starved
} else {
ChunkResult::End
}
},
|_e| {},
);
assert_eq!(result.unwrap(), RunExit::ProducerEnded);
let total_queued = queued.load(Ordering::SeqCst);
assert!(
total_queued >= 50,
"Should have written initial chunk plus repeated chunk"
);
}
#[test]
fn test_fill_result_starved_repeat_last_without_stored_chunk_falls_back_to_blank() {
let backend = TestBackend::new();
let queued = backend.queued.clone();
let cfg = StreamConfig::new(30000).with_idle_policy(IdlePolicy::RepeatLast);
let stream = make_test_stream_with_cfg(backend, cfg);
let control = stream.control();
control.arm().unwrap();
let call_count = Arc::new(AtomicUsize::new(0));
let call_count_clone = call_count.clone();
let result = stream.run(
move |_req, _buffer| {
let count = call_count_clone.fetch_add(1, Ordering::SeqCst);
if count == 0 {
ChunkResult::Starved
} else {
ChunkResult::End
}
},
|_e| {},
);
assert_eq!(result.unwrap(), RunExit::ProducerEnded);
let total_queued = queued.load(Ordering::SeqCst);
assert!(
total_queued > 0,
"Should have written blank points as fallback"
);
}
#[test]
fn test_fill_result_starved_with_park_policy() {
let backend = TestBackend::new();
let queued = backend.queued.clone();
let cfg = StreamConfig::new(30000).with_idle_policy(IdlePolicy::Park { x: 0.5, y: -0.5 });
let stream = make_test_stream_with_cfg(backend, cfg);
let control = stream.control();
control.arm().unwrap();
let call_count = Arc::new(AtomicUsize::new(0));
let call_count_clone = call_count.clone();
let result = stream.run(
move |_req, _buffer| {
let count = call_count_clone.fetch_add(1, Ordering::SeqCst);
if count == 0 {
ChunkResult::Starved
} else {
ChunkResult::End
}
},
|_e| {},
);
assert_eq!(result.unwrap(), RunExit::ProducerEnded);
let total_queued = queued.load(Ordering::SeqCst);
assert!(total_queued > 0, "Should have written parked points");
}
#[test]
fn test_fill_result_starved_with_stop_policy() {
let cfg = StreamConfig::new(30000).with_idle_policy(IdlePolicy::Stop);
let stream = make_test_stream_with_cfg(TestBackend::new(), cfg);
let control = stream.control();
control.arm().unwrap();
let result = stream.run(
|_req, _buffer| {
ChunkResult::Starved
},
|_e| {},
);
assert!(result.is_err(), "Stop policy should return an error");
assert!(
result.unwrap_err().is_stopped(),
"Error should be Stopped variant"
);
}
#[test]
fn test_fill_result_end_returns_producer_ended() {
let stream = make_test_stream(TestBackend::new());
let result = stream.run(
|_req, _buffer| {
ChunkResult::End
},
|_e| {},
);
assert_eq!(result.unwrap(), RunExit::ProducerEnded);
}
#[test]
fn test_fill_result_filled_exceeds_buffer_clamped() {
let backend = TestBackend::new();
let queued = backend.queued.clone();
let stream = make_test_stream(backend);
let call_count = Arc::new(AtomicUsize::new(0));
let call_count_clone = call_count.clone();
let result = stream.run(
move |_req, buffer| {
let count = call_count_clone.fetch_add(1, Ordering::SeqCst);
if count == 0 {
for pt in buffer.iter_mut() {
*pt = LaserPoint::blanked(0.0, 0.0);
}
ChunkResult::Filled(buffer.len() + 1000)
} else {
ChunkResult::End
}
},
|_e| {},
);
assert_eq!(result.unwrap(), RunExit::ProducerEnded);
let total_queued = queued.load(Ordering::SeqCst);
assert!(total_queued > 0, "Should have written some points");
assert!(
total_queued <= 1016,
"Points should be clamped to max_points_per_chunk (+ drain)"
);
}
#[test]
fn test_full_stream_lifecycle_create_arm_stream_stop() {
let backend = TestBackend::new();
let queued = backend.queued.clone();
let shutter_open = backend.shutter_open.clone();
let device = Dac::new(
test_info(backend.caps()),
BackendKind::Fifo(Box::new(backend)),
);
assert!(!device.is_connected());
let (stream, returned_info) = device.start_stream(StreamConfig::new(30000)).unwrap();
assert_eq!(returned_info.id, "test");
let control = stream.control();
assert!(!control.is_armed());
assert!(!shutter_open.load(Ordering::SeqCst));
control.arm().unwrap();
assert!(control.is_armed());
let call_count = Arc::new(AtomicUsize::new(0));
let call_count_clone = call_count.clone();
let result = stream.run(
move |req, buffer| {
let count = call_count_clone.fetch_add(1, Ordering::SeqCst);
if count < 5 {
let n = req.target_points.min(buffer.len()).min(100);
for (i, pt) in buffer.iter_mut().enumerate().take(n) {
let t = i as f32 / 100.0;
*pt = LaserPoint::new(t, t, 10000, 20000, 30000, 40000);
}
ChunkResult::Filled(n)
} else {
ChunkResult::End
}
},
|_e| {},
);
assert_eq!(result.unwrap(), RunExit::ProducerEnded);
assert!(
queued.load(Ordering::SeqCst) > 0,
"Should have written points"
);
assert!(
call_count.load(Ordering::SeqCst) >= 5,
"Should have called producer multiple times"
);
}
#[test]
fn test_full_stream_lifecycle_with_idle_policy_recovery() {
let backend = TestBackend::new();
let queued = backend.queued.clone();
let cfg = StreamConfig::new(30000).with_idle_policy(IdlePolicy::RepeatLast);
let stream = make_test_stream_with_cfg(backend, cfg);
let control = stream.control();
control.arm().unwrap();
let call_count = Arc::new(AtomicUsize::new(0));
let call_count_clone = call_count.clone();
let result = stream.run(
move |req, buffer| {
let count = call_count_clone.fetch_add(1, Ordering::SeqCst);
match count {
0 => {
let n = req.target_points.min(buffer.len()).min(50);
for pt in buffer.iter_mut().take(n) {
*pt = LaserPoint::new(0.5, 0.5, 30000, 30000, 30000, 30000);
}
ChunkResult::Filled(n)
}
1 => {
ChunkResult::Starved
}
2 => {
let n = req.target_points.min(buffer.len()).min(50);
for pt in buffer.iter_mut().take(n) {
*pt = LaserPoint::new(-0.5, -0.5, 20000, 20000, 20000, 20000);
}
ChunkResult::Filled(n)
}
_ => ChunkResult::End,
}
},
|_e| {},
);
assert_eq!(result.unwrap(), RunExit::ProducerEnded);
let total = queued.load(Ordering::SeqCst);
assert!(
total >= 100,
"Should have written multiple chunks including underrun recovery"
);
}
#[test]
fn test_full_stream_lifecycle_external_stop() {
use std::thread;
let stream = make_test_stream(TestBackend::new());
let control = stream.control();
let control_clone = control.clone();
thread::spawn(move || {
thread::sleep(Duration::from_millis(30));
control_clone.stop().unwrap();
});
let result = stream.run(
|req, buffer| {
let n = req.target_points.min(buffer.len()).min(10);
for pt in buffer.iter_mut().take(n) {
*pt = LaserPoint::blanked(0.0, 0.0);
}
ChunkResult::Filled(n)
},
|_e| {},
);
assert_eq!(result.unwrap(), RunExit::Stopped);
}
#[test]
fn test_full_stream_lifecycle_into_dac_recovery() {
let backend = TestBackend::new();
let device = Dac::new(
test_info(backend.caps()),
BackendKind::Fifo(Box::new(backend)),
);
let (stream, _) = device.start_stream(StreamConfig::new(30000)).unwrap();
let call_count = Arc::new(AtomicUsize::new(0));
let call_count_clone = call_count.clone();
let result = stream.run(
move |req, buffer| {
let count = call_count_clone.fetch_add(1, Ordering::SeqCst);
if count < 2 {
let n = req.target_points.min(buffer.len()).min(50);
for pt in buffer.iter_mut().take(n) {
*pt = LaserPoint::blanked(0.0, 0.0);
}
ChunkResult::Filled(n)
} else {
ChunkResult::End
}
},
|_e| {},
);
assert_eq!(result.unwrap(), RunExit::ProducerEnded);
}
#[test]
fn test_stream_stats_tracking() {
let stream = make_test_stream(TestBackend::new());
let control = stream.control();
control.arm().unwrap();
let call_count = Arc::new(AtomicUsize::new(0));
let call_count_clone = call_count.clone();
let points_per_call = 50;
let result = stream.run(
move |req, buffer| {
let count = call_count_clone.fetch_add(1, Ordering::SeqCst);
if count < 3 {
let n = req.target_points.min(buffer.len()).min(points_per_call);
for pt in buffer.iter_mut().take(n) {
*pt = LaserPoint::blanked(0.0, 0.0);
}
ChunkResult::Filled(n)
} else {
ChunkResult::End
}
},
|_e| {},
);
assert_eq!(result.unwrap(), RunExit::ProducerEnded);
}
#[test]
fn test_stream_disarm_during_streaming() {
use std::thread;
let backend = TestBackend::new();
let shutter_open = backend.shutter_open.clone();
let stream = make_test_stream(backend);
let control = stream.control();
let control_clone = control.clone();
control.arm().unwrap();
assert!(control.is_armed());
thread::spawn(move || {
thread::sleep(Duration::from_millis(15));
control_clone.disarm().unwrap();
thread::sleep(Duration::from_millis(15));
control_clone.stop().unwrap();
});
let result = stream.run(
|req, buffer| {
let n = req.target_points.min(buffer.len()).min(10);
for pt in buffer.iter_mut().take(n) {
*pt = LaserPoint::new(0.1, 0.1, 50000, 50000, 50000, 50000);
}
ChunkResult::Filled(n)
},
|_e| {},
);
assert_eq!(result.unwrap(), RunExit::Stopped);
assert!(!shutter_open.load(Ordering::SeqCst));
}
#[test]
fn test_disarm_parks_scanners_at_origin_default_policy() {
let mut stream = make_test_stream(TestBackend::new());
let n = 10;
for i in 0..n {
let angle = i as f32 * 0.628; stream.state.chunk_buffer[i] =
LaserPoint::new(angle.cos(), angle.sin(), 65535, 0, 0, 65535);
}
assert!(!stream.control.is_armed());
let mut on_error = |_: Error| {};
stream.write_fill_points(n, &mut on_error).unwrap();
for i in 0..n {
let p = &stream.state.chunk_buffer[i];
assert_eq!(p.x, 0.0, "point {i}: x must be 0.0 (parked), got {}", p.x);
assert_eq!(p.y, 0.0, "point {i}: y must be 0.0 (parked), got {}", p.y);
assert_eq!(p.r, 0, "point {i}: must be blanked");
assert_eq!(p.g, 0, "point {i}: must be blanked");
assert_eq!(p.b, 0, "point {i}: must be blanked");
assert_eq!(p.intensity, 0, "point {i}: must be blanked");
}
}
#[test]
fn test_disarm_parks_scanners_at_configured_park_position() {
let cfg = StreamConfig::new(30000).with_idle_policy(IdlePolicy::Park { x: 0.5, y: -0.3 });
let mut stream = make_test_stream_with_cfg(TestBackend::new(), cfg);
let n = 10;
for i in 0..n {
stream.state.chunk_buffer[i] =
LaserPoint::new(i as f32 * 0.1, i as f32 * -0.1, 65535, 65535, 65535, 65535);
}
assert!(!stream.control.is_armed());
let mut on_error = |_: Error| {};
stream.write_fill_points(n, &mut on_error).unwrap();
for i in 0..n {
let p = &stream.state.chunk_buffer[i];
assert_eq!(p.x, 0.5, "point {i}: x must be park position 0.5");
assert_eq!(p.y, -0.3, "point {i}: y must be park position -0.3");
assert_eq!(p.r, 0, "point {i}: must be blanked");
}
}
#[test]
fn test_disarm_repeat_last_falls_back_to_blank() {
let cfg = StreamConfig::new(30000).with_idle_policy(IdlePolicy::RepeatLast);
let mut stream = make_test_stream_with_cfg(TestBackend::new(), cfg);
let lit = LaserPoint::new(0.8, 0.8, 65535, 65535, 65535, 65535);
for i in 0..100 {
stream.state.last_chunk[i] = lit;
}
stream.state.last_chunk_len = 100;
let n = 10;
for i in 0..n {
stream.state.chunk_buffer[i] = lit;
}
assert!(!stream.control.is_armed());
let mut on_error = |_: Error| {};
stream.write_fill_points(n, &mut on_error).unwrap();
for i in 0..n {
let p = &stream.state.chunk_buffer[i];
assert_eq!(
p.x, 0.0,
"point {i}: must be parked at origin, not shape position"
);
assert_eq!(
p.y, 0.0,
"point {i}: must be parked at origin, not shape position"
);
assert_eq!(
p.r, 0,
"point {i}: must be blanked, not repeating lit content"
);
}
}
#[test]
fn test_disarm_underrun_respects_park_policy() {
let cfg = StreamConfig::new(30000).with_idle_policy(IdlePolicy::Park { x: -0.5, y: 0.5 });
let mut stream = make_test_stream_with_cfg(TestBackend::new(), cfg);
assert!(!stream.control.is_armed());
let req = ChunkRequest {
start: StreamInstant::new(0),
pps: 30000,
target_points: 50,
};
stream.handle_underrun(&req).unwrap();
for i in 0..50 {
let p = &stream.state.chunk_buffer[i];
assert_eq!(p.x, -0.5, "point {i}: must park at configured x");
assert_eq!(p.y, 0.5, "point {i}: must park at configured y");
assert_eq!(p.r, 0, "point {i}: must be blanked");
}
}
#[test]
fn test_stream_with_mock_backend_disconnect() {
use std::sync::atomic::AtomicBool;
struct DisconnectingBackend {
inner: TestBackend,
disconnect_after: Arc<AtomicUsize>,
call_count: Arc<AtomicUsize>,
}
impl DacBackend for DisconnectingBackend {
fn dac_type(&self) -> DacType {
self.inner.dac_type()
}
fn caps(&self) -> &DacCapabilities {
self.inner.caps()
}
fn connect(&mut self) -> Result<()> {
self.inner.connect()
}
fn disconnect(&mut self) -> Result<()> {
self.inner.disconnect()
}
fn is_connected(&self) -> bool {
let count = self.call_count.load(Ordering::SeqCst);
let disconnect_after = self.disconnect_after.load(Ordering::SeqCst);
if count >= disconnect_after {
return false;
}
self.inner.is_connected()
}
fn stop(&mut self) -> Result<()> {
self.inner.stop()
}
fn set_shutter(&mut self, open: bool) -> Result<()> {
self.inner.set_shutter(open)
}
}
impl FifoBackend for DisconnectingBackend {
fn try_write_points(&mut self, pps: u32, points: &[LaserPoint]) -> Result<WriteOutcome> {
self.call_count.fetch_add(1, Ordering::SeqCst);
self.inner.try_write_points(pps, points)
}
fn estimator(&self) -> &dyn BufferEstimator {
self.inner.estimator()
}
}
let backend = DisconnectingBackend {
inner: TestBackend::new(),
disconnect_after: Arc::new(AtomicUsize::new(3)),
call_count: Arc::new(AtomicUsize::new(0)),
};
let stream = make_test_stream(backend);
let error_occurred = Arc::new(AtomicBool::new(false));
let error_occurred_clone = error_occurred.clone();
let result = stream.run(
|req, buffer| {
let n = req.target_points.min(buffer.len()).min(10);
for pt in buffer.iter_mut().take(n) {
*pt = LaserPoint::blanked(0.0, 0.0);
}
ChunkResult::Filled(n)
},
move |_e| {
error_occurred_clone.store(true, Ordering::SeqCst);
},
);
assert_eq!(result.unwrap(), RunExit::Disconnected);
}
struct FailingWriteBackend {
inner: TestBackend,
fail_after: usize,
write_count: Arc<AtomicUsize>,
disconnect_called: Arc<AtomicBool>,
error_kind: std::io::ErrorKind,
error_message: &'static str,
}
impl FailingWriteBackend {
fn new(fail_after: usize) -> Self {
Self {
inner: TestBackend::new(),
fail_after,
write_count: Arc::new(AtomicUsize::new(0)),
disconnect_called: Arc::new(AtomicBool::new(false)),
error_kind: std::io::ErrorKind::BrokenPipe,
error_message: "simulated write failure",
}
}
fn with_error(mut self, kind: std::io::ErrorKind, message: &'static str) -> Self {
self.error_kind = kind;
self.error_message = message;
self
}
}
impl DacBackend for FailingWriteBackend {
fn dac_type(&self) -> DacType {
DacType::Custom("FailingTest".to_string())
}
fn caps(&self) -> &DacCapabilities {
self.inner.caps()
}
fn connect(&mut self) -> Result<()> {
self.inner.connect()
}
fn disconnect(&mut self) -> Result<()> {
self.disconnect_called.store(true, Ordering::SeqCst);
self.inner.disconnect()
}
fn is_connected(&self) -> bool {
self.inner.is_connected()
}
fn stop(&mut self) -> Result<()> {
self.inner.stop()
}
fn set_shutter(&mut self, open: bool) -> Result<()> {
self.inner.set_shutter(open)
}
}
impl FifoBackend for FailingWriteBackend {
fn try_write_points(&mut self, pps: u32, points: &[LaserPoint]) -> Result<WriteOutcome> {
let count = self.write_count.fetch_add(1, Ordering::SeqCst);
if count >= self.fail_after {
Err(Error::backend(std::io::Error::new(
self.error_kind,
self.error_message,
)))
} else {
self.inner.try_write_points(pps, points)
}
}
fn estimator(&self) -> &dyn BufferEstimator {
self.inner.estimator()
}
}
fn blank_producer(req: &ChunkRequest, buffer: &mut [LaserPoint]) -> ChunkResult {
let n = req.target_points.min(buffer.len()).min(10);
for pt in buffer.iter_mut().take(n) {
*pt = LaserPoint::blanked(0.0, 0.0);
}
ChunkResult::Filled(n)
}
#[test]
fn test_start_stream_with_reconnect_rejects_invalid_pps() {
let backend = TestBackend::new(); let mut device = Dac::new(
test_info(backend.caps()),
BackendKind::Fifo(Box::new(backend)),
);
device.reconnect_target = Some(crate::reconnect::ReconnectTarget {
device_id: "test".to_string(),
discovery_factory: None,
});
let cfg = StreamConfig::new(500).with_reconnect(crate::config::ReconnectConfig::new());
let result = device.start_stream(cfg);
assert!(result.is_err());
}
#[test]
fn test_start_stream_reconnect_without_target_errors() {
let backend = TestBackend::new();
let device = Dac::new(
test_info(backend.caps()),
BackendKind::Fifo(Box::new(backend)),
);
let cfg = StreamConfig::new(30_000).with_reconnect(crate::config::ReconnectConfig::new());
let result = device.start_stream(cfg);
match result {
Err(e) => {
let msg = format!("{}", e);
assert!(
msg.contains("open_device()") && msg.contains("with_discovery_factory"),
"error should mention open_device and alternatives: {}",
msg
);
}
Ok(_) => panic!("expected error for reconnect without target"),
}
}
#[test]
fn test_start_stream_reconnect_with_target_succeeds() {
let backend = TestBackend::new();
let mut device = Dac::new(
test_info(backend.caps()),
BackendKind::Fifo(Box::new(backend)),
);
device.reconnect_target = Some(crate::reconnect::ReconnectTarget {
device_id: "test".to_string(),
discovery_factory: None,
});
let cfg = StreamConfig::new(30_000).with_reconnect(crate::config::ReconnectConfig::new());
let result = device.start_stream(cfg);
assert!(result.is_ok());
let (stream, _) = result.unwrap();
assert!(stream.reconnect_policy.is_some());
}
#[test]
fn test_reset_state_for_reconnect_resizes_buffers() {
let backend = TestBackend::new().with_max_points_per_chunk(1000);
let mut stream = make_test_stream(backend);
assert_eq!(stream.state.chunk_buffer.len(), 1000);
assert_eq!(stream.state.last_chunk.len(), 1000);
stream.info.caps.max_points_per_chunk = 500;
stream.reset_state_for_reconnect();
assert_eq!(stream.state.chunk_buffer.len(), 500);
assert_eq!(stream.state.last_chunk.len(), 500);
assert_eq!(stream.state.last_chunk_len, 0);
assert_eq!(stream.state.stats.reconnect_count, 1);
}
#[test]
fn test_reset_state_for_reconnect_clears_timing() {
let backend = TestBackend::new();
let mut stream = make_test_stream(backend);
stream.state.shutter_open = true;
stream.state.last_armed = true;
stream.state.startup_blank_remaining = 10;
stream.reset_state_for_reconnect();
assert!(!stream.state.shutter_open);
assert!(!stream.state.last_armed);
assert_eq!(stream.state.startup_blank_remaining, 0);
assert!(stream.state.color_delay_line.is_empty());
}
#[test]
fn test_sleep_with_stop_exits_on_stop() {
use crate::reconnect::ReconnectPolicy;
use std::sync::atomic::AtomicBool;
let stopped = Arc::new(AtomicBool::new(false));
let stopped_clone = stopped.clone();
std::thread::spawn(move || {
std::thread::sleep(Duration::from_millis(50));
stopped_clone.store(true, Ordering::SeqCst);
});
let start = std::time::Instant::now();
let mut on_progress = || {};
let was_stopped = ReconnectPolicy::sleep_with_stop(
Duration::from_secs(5),
|| stopped.load(Ordering::SeqCst),
&mut on_progress,
);
assert!(was_stopped);
assert!(start.elapsed() < Duration::from_secs(1));
}
#[test]
fn test_into_dac_preserves_reconnect_target() {
let backend = TestBackend::new();
let mut device = Dac::new(
test_info(backend.caps()),
BackendKind::Fifo(Box::new(backend)),
);
device.reconnect_target = Some(crate::reconnect::ReconnectTarget {
device_id: "test-id".to_string(),
discovery_factory: None,
});
let cfg = StreamConfig::new(30_000).with_reconnect(crate::config::ReconnectConfig::new());
let (stream, _) = device.start_stream(cfg).unwrap();
let (dac, stats) = stream.into_dac();
assert!(dac.reconnect_target.is_some());
assert_eq!(dac.reconnect_target.as_ref().unwrap().device_id, "test-id");
assert_eq!(stats.reconnect_count, 0);
}
#[test]
fn test_into_dac_preserves_target_without_reconnect() {
let backend = TestBackend::new();
let mut device = Dac::new(
test_info(backend.caps()),
BackendKind::Fifo(Box::new(backend)),
);
device.reconnect_target = Some(crate::reconnect::ReconnectTarget {
device_id: "test-id".to_string(),
discovery_factory: None,
});
let cfg = StreamConfig::new(30_000);
let (stream, _) = device.start_stream(cfg).unwrap();
assert!(stream.reconnect_target.is_some());
assert!(stream.reconnect_policy.is_none());
let (dac, _) = stream.into_dac();
assert!(dac.reconnect_target.is_some());
assert_eq!(dac.reconnect_target.as_ref().unwrap().device_id, "test-id");
}
#[test]
fn test_dac_new_has_no_reconnect_target() {
let backend = TestBackend::new();
let device = Dac::new(
test_info(backend.caps()),
BackendKind::Fifo(Box::new(backend)),
);
assert!(device.reconnect_target.is_none());
}
fn test_info(caps: &DacCapabilities) -> DacInfo {
DacInfo {
id: "test".to_string(),
name: "Test Device".to_string(),
kind: DacType::Custom("Test".to_string()),
caps: caps.clone(),
}
}
fn make_test_stream(mut backend: impl FifoBackend + 'static) -> Stream {
backend.connect().unwrap();
let info = test_info(backend.caps());
Stream::with_backend(
info,
BackendKind::Fifo(Box::new(backend)),
StreamConfig::new(30000),
)
}
fn make_test_stream_with_cfg(mut backend: impl FifoBackend + 'static, cfg: StreamConfig) -> Stream {
backend.connect().unwrap();
let info = test_info(backend.caps());
Stream::with_backend(info, BackendKind::Fifo(Box::new(backend)), cfg)
}
#[test]
fn test_backend_write_error_exits_with_disconnected() {
use std::thread;
let backend = FailingWriteBackend::new(2);
let disconnect_called = backend.disconnect_called.clone();
let stream = make_test_stream(backend);
let handle = thread::spawn(move || stream.run(blank_producer, |_err| {}));
let result = handle.join().expect("stream thread panicked");
assert_eq!(
result.unwrap(),
RunExit::Disconnected,
"Write error should cause stream to exit with Disconnected"
);
assert!(
disconnect_called.load(Ordering::SeqCst),
"backend.disconnect() should have been called after write error"
);
}
#[test]
fn test_backend_write_error_fires_on_error() {
let backend = FailingWriteBackend::new(1);
let stream = make_test_stream(backend);
let got_backend_error = Arc::new(AtomicBool::new(false));
let got_backend_error_clone = got_backend_error.clone();
let result = stream.run(blank_producer, move |err| {
if matches!(err, Error::Backend(_)) {
got_backend_error_clone.store(true, Ordering::SeqCst);
}
});
assert_eq!(result.unwrap(), RunExit::Disconnected);
assert!(
got_backend_error.load(Ordering::SeqCst),
"on_error should have received the Backend error"
);
}
#[test]
fn test_backend_write_error_immediate_fail() {
let stream = make_test_stream(FailingWriteBackend::new(0));
let result = stream.run(blank_producer, |_err| {});
assert_eq!(
result.unwrap(),
RunExit::Disconnected,
"Immediate write failure should exit with Disconnected"
);
}
fn helios_like_backend(fail_after: usize) -> FailingWriteBackend {
FailingWriteBackend::new(fail_after).with_error(
std::io::ErrorKind::TimedOut,
"usb connection error: Operation timed out",
)
}
#[test]
fn test_helios_status_timeout_exits_with_disconnected() {
use std::thread;
let backend = helios_like_backend(3);
let disconnect_called = backend.disconnect_called.clone();
let stream = make_test_stream(backend);
let handle = thread::spawn(move || stream.run(blank_producer, |_err| {}));
let result = handle.join().expect("stream thread panicked");
assert_eq!(
result.unwrap(),
RunExit::Disconnected,
"Helios status timeout should cause stream to exit with Disconnected"
);
assert!(
disconnect_called.load(Ordering::SeqCst),
"backend.disconnect() should have been called on status timeout"
);
}
#[test]
fn test_helios_status_timeout_fires_on_error_with_backend_variant() {
let stream = make_test_stream(helios_like_backend(1));
let got_backend_error = Arc::new(AtomicBool::new(false));
let error_received: Arc<Mutex<Option<String>>> = Arc::new(Mutex::new(None));
let got_backend_error_clone = got_backend_error.clone();
let error_received_clone = error_received.clone();
let result = stream.run(blank_producer, move |err| {
if matches!(err, Error::Backend(_)) {
got_backend_error_clone.store(true, Ordering::SeqCst);
*error_received_clone.lock().unwrap() = Some(err.to_string());
}
});
assert_eq!(result.unwrap(), RunExit::Disconnected);
assert!(
got_backend_error.load(Ordering::SeqCst),
"on_error should receive Error::Backend for Helios timeout"
);
let msg = error_received.lock().unwrap();
assert!(
msg.as_ref().unwrap().contains("Operation timed out"),
"Error message should mention timeout, got: {:?}",
msg
);
}
#[test]
fn test_helios_immediate_status_timeout() {
let backend = helios_like_backend(0);
let disconnect_called = backend.disconnect_called.clone();
let stream = make_test_stream(backend);
let result = stream.run(blank_producer, |_err| {});
assert_eq!(
result.unwrap(),
RunExit::Disconnected,
"Immediate status timeout should exit with Disconnected"
);
assert!(
disconnect_called.load(Ordering::SeqCst),
"backend.disconnect() should be called even on first-write failure"
);
}
#[test]
fn test_fill_result_end_drains_with_queue_depth() {
use std::time::Instant;
let backend = TestBackend::new().with_initial_queue(1000);
let queued = backend.queued.clone();
let cfg = StreamConfig::new(30000).with_drain_timeout(Duration::from_millis(100));
let stream = make_test_stream_with_cfg(backend, cfg);
queued.store(0, Ordering::SeqCst);
let start = Instant::now();
let result = stream.run(|_req, _buffer| ChunkResult::End, |_e| {});
let elapsed = start.elapsed();
assert_eq!(result.unwrap(), RunExit::ProducerEnded);
assert!(
elapsed.as_millis() < 50,
"Should return quickly when queue is empty, took {:?}",
elapsed
);
}
#[test]
fn test_fill_result_end_respects_drain_timeout() {
use std::time::Instant;
let cfg = StreamConfig::new(30000).with_drain_timeout(Duration::from_millis(50));
let backend = TestBackend::new();
let estimator = backend.estimator.clone();
let stream = make_test_stream_with_cfg(backend, cfg);
let start = Instant::now();
let result = stream.run(
move |_req, _buffer| {
estimator.seed(100_000, 30_000);
ChunkResult::End
},
|_e| {},
);
let elapsed = start.elapsed();
assert_eq!(result.unwrap(), RunExit::ProducerEnded);
assert!(
elapsed.as_millis() >= 40 && elapsed.as_millis() < 150,
"Should respect drain timeout (~50ms), took {:?}",
elapsed
);
}
#[test]
fn test_fill_result_end_skips_drain_with_zero_timeout() {
use std::time::Instant;
let cfg = StreamConfig::new(30000).with_drain_timeout(Duration::ZERO);
let backend = TestBackend::new();
let estimator = backend.estimator.clone();
let stream = make_test_stream_with_cfg(backend, cfg);
let start = Instant::now();
let result = stream.run(
move |_req, _buffer| {
estimator.seed(100_000, 30_000);
ChunkResult::End
},
|_e| {},
);
let elapsed = start.elapsed();
assert_eq!(result.unwrap(), RunExit::ProducerEnded);
assert!(
elapsed.as_millis() < 20,
"Should skip drain with zero timeout, took {:?}",
elapsed
);
}
#[test]
fn test_fill_result_end_drains_with_empty_estimator() {
use std::time::Instant;
let cfg = StreamConfig::new(30000).with_drain_timeout(Duration::from_millis(100));
let stream = make_test_stream_with_cfg(TestBackend::new(), cfg);
let start = Instant::now();
let result = stream.run(|_req, _buffer| ChunkResult::End, |_e| {});
let elapsed = start.elapsed();
assert_eq!(result.unwrap(), RunExit::ProducerEnded);
assert!(
elapsed.as_millis() < 50,
"Should return quickly with empty buffer estimate, took {:?}",
elapsed
);
}
#[test]
fn test_fill_result_end_closes_shutter() {
let backend = TestBackend::new();
let shutter_open = backend.shutter_open.clone();
let cfg = StreamConfig::new(30000).with_drain_timeout(Duration::from_millis(10));
let stream = make_test_stream_with_cfg(backend, cfg);
let control = stream.control();
control.arm().unwrap();
let result = stream.run(
|req, buffer| {
let n = req.target_points.min(buffer.len()).min(10);
for pt in buffer.iter_mut().take(n) {
*pt = LaserPoint::blanked(0.0, 0.0);
}
ChunkResult::End
},
|_e| {},
);
assert_eq!(result.unwrap(), RunExit::ProducerEnded);
assert!(
!shutter_open.load(Ordering::SeqCst),
"Shutter should be closed after drain"
);
}
#[test]
fn test_color_delay_zero_is_passthrough() {
let mut stream = make_test_stream(TestBackend::new());
stream.control.arm().unwrap();
stream.process_control_messages();
stream.state.last_armed = true;
let n = 5;
for i in 0..n {
stream.state.chunk_buffer[i] =
LaserPoint::new(0.0, 0.0, (i as u16 + 1) * 1000, 0, 0, 65535);
}
let mut on_error = |_: Error| {};
stream.write_fill_points(n, &mut on_error).unwrap();
assert!(stream.state.color_delay_line.is_empty());
}
#[test]
fn test_color_delay_shifts_colors() {
let cfg = StreamConfig::new(10000).with_color_delay(Duration::from_micros(300));
let mut stream = make_test_stream_with_cfg(TestBackend::new(), cfg);
stream.control.arm().unwrap();
stream.process_control_messages();
stream.state.last_armed = true;
stream.state.color_delay_line.clear();
for _ in 0..3 {
stream.state.color_delay_line.push_back((0, 0, 0, 0));
}
let n = 5;
for i in 0..n {
stream.state.chunk_buffer[i] = LaserPoint::new(
i as f32 * 0.1,
0.0,
(i as u16 + 1) * 10000,
(i as u16 + 1) * 5000,
(i as u16 + 1) * 2000,
65535,
);
}
let mut on_error = |_: Error| {};
stream.write_fill_points(n, &mut on_error).unwrap();
assert_eq!(stream.state.color_delay_line.len(), 3);
let expected: Vec<(u16, u16, u16, u16)> = (3..=5)
.map(|i| (i * 10000u16, i * 5000, i * 2000, 65535))
.collect();
let actual: Vec<(u16, u16, u16, u16)> = stream.state.color_delay_line.iter().copied().collect();
assert_eq!(actual, expected);
}
#[test]
fn test_color_delay_resets_on_disarm_arm() {
let cfg = StreamConfig::new(10000).with_color_delay(Duration::from_micros(200));
let mut stream = make_test_stream_with_cfg(TestBackend::new(), cfg);
stream.handle_shutter_transition(true);
assert_eq!(stream.state.color_delay_line.len(), 2);
assert_eq!(stream.state.color_delay_line.front(), Some(&(0, 0, 0, 0)));
stream.handle_shutter_transition(false);
assert!(stream.state.color_delay_line.is_empty());
stream.handle_shutter_transition(true);
assert_eq!(stream.state.color_delay_line.len(), 2);
}
#[test]
fn test_color_delay_dynamic_change() {
let cfg = StreamConfig::new(10000).with_color_delay(Duration::from_micros(200));
let mut stream = make_test_stream_with_cfg(TestBackend::new(), cfg);
stream.control.arm().unwrap();
stream.process_control_messages();
stream.state.last_armed = true;
stream.state.color_delay_line.clear();
for _ in 0..2 {
stream.state.color_delay_line.push_back((0, 0, 0, 0));
}
let n = 3;
for i in 0..n {
stream.state.chunk_buffer[i] =
LaserPoint::new(0.0, 0.0, (i as u16 + 1) * 10000, 0, 0, 65535);
}
let mut on_error = |_: Error| {};
stream.write_fill_points(n, &mut on_error).unwrap();
stream.control.set_color_delay(Duration::from_micros(500));
for i in 0..n {
stream.state.chunk_buffer[i] =
LaserPoint::new(0.0, 0.0, (i as u16 + 4) * 10000, 0, 0, 65535);
}
stream.write_fill_points(n, &mut on_error).unwrap();
assert_eq!(stream.state.color_delay_line.len(), 5);
stream.control.set_color_delay(Duration::ZERO);
for i in 0..n {
stream.state.chunk_buffer[i] = LaserPoint::new(0.0, 0.0, 50000, 0, 0, 65535);
}
stream.write_fill_points(n, &mut on_error).unwrap();
assert!(stream.state.color_delay_line.is_empty());
}
#[test]
fn test_startup_blank_blanks_first_n_points() {
let cfg = StreamConfig::new(10000)
.with_startup_blank(Duration::from_micros(500))
.with_color_delay(Duration::ZERO);
let mut stream = make_test_stream_with_cfg(TestBackend::new(), cfg);
assert_eq!(stream.state.startup_blank_points, 5);
stream.control.arm().unwrap();
stream.process_control_messages();
stream.state.last_armed = false;
let n = 10;
for i in 0..n {
stream.state.chunk_buffer[i] =
LaserPoint::new(i as f32 * 0.1, 0.0, 65535, 32000, 16000, 65535);
}
let mut on_error = |_: Error| {};
stream.write_fill_points(n, &mut on_error).unwrap();
assert_eq!(stream.state.startup_blank_remaining, 0);
stream.state.last_armed = true; for i in 0..n {
stream.state.chunk_buffer[i] = LaserPoint::new(0.0, 0.0, 65535, 32000, 16000, 65535);
}
stream.write_fill_points(n, &mut on_error).unwrap();
assert_eq!(stream.state.chunk_buffer[0].r, 65535);
assert_eq!(stream.state.chunk_buffer[0].g, 32000);
}
#[test]
fn test_startup_blank_resets_on_rearm() {
let cfg = StreamConfig::new(10000)
.with_startup_blank(Duration::from_micros(500))
.with_color_delay(Duration::ZERO);
let mut stream = make_test_stream_with_cfg(TestBackend::new(), cfg);
stream.state.last_armed = false;
stream.control.arm().unwrap();
stream.process_control_messages();
let n = 10;
for i in 0..n {
stream.state.chunk_buffer[i] = LaserPoint::new(0.0, 0.0, 65535, 65535, 65535, 65535);
}
let mut on_error = |_: Error| {};
stream.state.last_armed = false;
stream.write_fill_points(n, &mut on_error).unwrap();
assert_eq!(stream.state.startup_blank_remaining, 0);
stream.control.disarm().unwrap();
stream.process_control_messages();
stream.control.arm().unwrap();
stream.process_control_messages();
stream.state.last_armed = false;
for i in 0..n {
stream.state.chunk_buffer[i] = LaserPoint::new(0.0, 0.0, 65535, 65535, 65535, 65535);
}
stream.write_fill_points(n, &mut on_error).unwrap();
assert_eq!(stream.state.startup_blank_remaining, 0);
}
#[test]
fn test_startup_blank_zero_is_noop() {
let cfg = StreamConfig::new(10000)
.with_startup_blank(Duration::ZERO)
.with_color_delay(Duration::ZERO);
let mut stream = make_test_stream_with_cfg(TestBackend::new(), cfg);
assert_eq!(stream.state.startup_blank_points, 0);
stream.control.arm().unwrap();
stream.process_control_messages();
stream.state.last_armed = false;
let n = 5;
for i in 0..n {
stream.state.chunk_buffer[i] = LaserPoint::new(0.0, 0.0, 65535, 32000, 16000, 65535);
}
let mut on_error = |_: Error| {};
stream.write_fill_points(n, &mut on_error).unwrap();
assert_eq!(stream.state.chunk_buffer[0].r, 65535);
assert_eq!(stream.state.chunk_buffer[0].g, 32000);
assert_eq!(stream.state.chunk_buffer[0].b, 16000);
assert_eq!(stream.state.chunk_buffer[0].intensity, 65535);
assert_eq!(stream.state.startup_blank_remaining, 0);
}
#[test]
fn test_device_start_stream_rejects_frame_swap_backend() {
let backend = FrameSwapTestBackend::new();
let device = Dac::new(
test_info(&backend.inner.caps),
BackendKind::FrameSwap(Box::new(backend)),
);
let result = device.start_stream(StreamConfig::new(30_000));
match result {
Err(e) => {
let err_msg = e.to_string();
assert!(
err_msg.contains("frame-swap"),
"error should mention frame-swap: {err_msg}"
);
}
Ok(_) => panic!("expected start_stream to reject frame-swap backend"),
}
}
#[test]
fn test_network_fifo_accumulates_via_estimator() {
let cfg = StreamConfig::new(30000).with_color_delay(Duration::ZERO);
let backend = TestBackend::new();
let queued = backend.queued.clone();
let mut stream = make_test_stream_with_cfg(backend, cfg);
stream.control.arm().unwrap();
stream.process_control_messages();
let n = 50;
for _ in 0..2 {
for i in 0..n {
stream.state.chunk_buffer[i] = LaserPoint::new(0.0, 0.0, 0, 0, 0, 0);
}
let mut on_error = |_: Error| {};
stream.write_fill_points(n, &mut on_error).unwrap();
}
assert_eq!(queued.load(Ordering::SeqCst), 2 * n as u64);
assert_eq!(stream.state.stats.chunks_written, 2);
assert_eq!(stream.state.stats.points_written, 2 * n as u64);
}
#[test]
fn test_udp_timed_prefills_to_max_points_per_chunk() {
let backend = TestBackend::new()
.with_output_model(OutputModel::UdpTimed)
.with_max_points_per_chunk(179);
let estimator = backend.estimator.clone();
let cfg = StreamConfig::new(1000).with_target_buffer(Duration::from_millis(500));
let stream = make_test_stream_with_cfg(backend, cfg);
let mut writes = 0;
while stream.estimate_buffer_points() <= stream.scheduler_target_buffer_points() {
let buffered = stream.estimate_buffer_points();
let req = stream.build_fill_request(179, buffered);
assert_eq!(req.target_points, 179);
estimator.record_send(Instant::now(), req.target_points as u64, stream.config.pps);
writes += 1;
}
assert_eq!(
writes, 2,
"UdpTimed target is max_points_per_chunk (179) — two writes to exceed"
);
}
#[test]
fn test_udp_timed_uses_max_points_per_chunk_for_lead() {
let backend = TestBackend::new()
.with_output_model(OutputModel::UdpTimed)
.with_max_points_per_chunk(179);
let stream = make_test_stream(backend);
assert_eq!(stream.scheduler_target_buffer_points(), 179);
}
#[test]
fn test_udp_timed_build_fill_request_uses_full_packet() {
let backend = TestBackend::new()
.with_output_model(OutputModel::UdpTimed)
.with_max_points_per_chunk(179);
let estimator = backend.estimator.clone();
let stream = make_test_stream(backend);
estimator.seed(120, stream.config.pps);
let req = stream.build_fill_request(179, 120);
assert_eq!(req.target_points, 179);
}
#[test]
fn test_udp_timed_sleep_slice_caps_coarse_sleep() {
assert_eq!(
Stream::udp_timed_sleep_slice(Duration::from_millis(5)),
Some(Duration::from_millis(1))
);
}
#[test]
fn test_udp_timed_sleep_slice_switches_to_busy_wait_near_deadline() {
assert_eq!(
Stream::udp_timed_sleep_slice(Duration::from_micros(400)),
None
);
}
#[test]
fn test_network_fifo_lasercube_default_target_requests_topup() {
let backend = TestBackend::new()
.with_output_model(OutputModel::NetworkFifo)
.with_max_points_per_chunk(5700);
let cfg = StreamConfig::new(30_000).with_target_buffer(Duration::from_millis(50));
let stream = make_test_stream_with_cfg(backend, cfg);
let buffered = stream.estimate_buffer_points();
let req = stream.build_fill_request(5700, buffered);
assert_eq!(
req.target_points, 1500,
"should request ~1500 top-up, not full 5700"
);
}
#[test]
fn test_network_fifo_lasercube_large_target_uses_more_capacity() {
let backend = TestBackend::new()
.with_output_model(OutputModel::NetworkFifo)
.with_max_points_per_chunk(5700);
let cfg = StreamConfig::new(30_000).with_target_buffer(Duration::from_millis(200));
let stream = make_test_stream_with_cfg(backend, cfg);
let buffered = stream.estimate_buffer_points();
let req = stream.build_fill_request(5700, buffered);
assert_eq!(req.target_points, 5700);
}
#[test]
fn test_validate_config_rejects_pps_below_min() {
let caps = DacCapabilities {
pps_min: 7,
pps_max: 65535,
max_points_per_chunk: 1000,
output_model: OutputModel::NetworkFifo,
};
let result = Dac::validate_pps(&caps, 5);
assert!(result.is_err());
let msg = result.unwrap_err().to_string();
assert!(msg.contains("PPS 5"), "expected PPS 5 in error: {msg}");
}
#[test]
fn test_validate_config_rejects_pps_above_max() {
let caps = DacCapabilities {
pps_min: 1,
pps_max: 35_000,
max_points_per_chunk: 1000,
output_model: OutputModel::NetworkFifo,
};
let result = Dac::validate_pps(&caps, 50_000);
assert!(result.is_err());
let msg = result.unwrap_err().to_string();
assert!(
msg.contains("PPS 50000"),
"expected PPS 50000 in error: {msg}"
);
}
#[test]
fn test_validate_config_avb_accepts_standard_pps() {
let caps = DacCapabilities {
pps_min: 1,
pps_max: 100_000,
max_points_per_chunk: 4096,
output_model: OutputModel::NetworkFifo,
};
assert!(Dac::validate_pps(&caps, 30_000).is_ok());
}
#[test]
fn with_discovery_factory_creates_target_when_none() {
let backend = TestBackend::new();
let mut info = test_info(backend.caps());
info.id = "test-factory".to_string();
let device = Dac::new(info, BackendKind::Fifo(Box::new(backend)));
assert!(device.reconnect_target.is_none());
let device = device.with_discovery_factory(|| {
crate::discovery::DacDiscovery::new(crate::device::EnabledDacTypes::all())
});
let target = device.reconnect_target.as_ref().unwrap();
assert_eq!(target.device_id, "test-factory");
assert!(target.discovery_factory.is_some());
}
#[test]
fn with_discovery_factory_replaces_factory_on_existing_target() {
let backend = TestBackend::new();
let mut info = test_info(backend.caps());
info.id = "test-replace".to_string();
let mut device = Dac::new(info, BackendKind::Fifo(Box::new(backend)));
device.reconnect_target = Some(crate::reconnect::ReconnectTarget {
device_id: "original-id".to_string(),
discovery_factory: None,
});
let device = device.with_discovery_factory(|| {
crate::discovery::DacDiscovery::new(crate::device::EnabledDacTypes::all())
});
let target = device.reconnect_target.as_ref().unwrap();
assert_eq!(target.device_id, "original-id");
assert!(target.discovery_factory.is_some());
}
#[test]
fn with_discovery_factory_enables_reconnect_for_frame_session() {
let backend = TestBackend::new();
let mut info = test_info(backend.caps());
info.id = "test-session".to_string();
let device =
Dac::new(info, BackendKind::Fifo(Box::new(backend))).with_discovery_factory(|| {
crate::discovery::DacDiscovery::new(crate::device::EnabledDacTypes::all())
});
let config = crate::presentation::FrameSessionConfig::new(30_000)
.with_reconnect(crate::config::ReconnectConfig::new());
let result = device.start_frame_session(config);
assert!(
result.is_ok(),
"start_frame_session should succeed with discovery factory: {:?}",
result.err()
);
let (session, _info) = result.unwrap();
drop(session);
}
struct ReconnectFifoBackend {
connected: bool,
estimator: SoftwareDecayEstimator,
}
impl ReconnectFifoBackend {
fn new() -> Self {
Self {
connected: false,
estimator: SoftwareDecayEstimator::new(),
}
}
}
impl DacBackend for ReconnectFifoBackend {
fn dac_type(&self) -> DacType {
DacType::Custom("TrackingTest".into())
}
fn caps(&self) -> &DacCapabilities {
static CAPS: DacCapabilities = DacCapabilities {
pps_min: 1000,
pps_max: 100000,
max_points_per_chunk: 1000,
output_model: crate::device::OutputModel::NetworkFifo,
};
&CAPS
}
fn connect(&mut self) -> Result<()> {
self.connected = true;
Ok(())
}
fn disconnect(&mut self) -> Result<()> {
self.connected = false;
Ok(())
}
fn is_connected(&self) -> bool {
self.connected
}
fn stop(&mut self) -> Result<()> {
Ok(())
}
fn set_shutter(&mut self, _open: bool) -> Result<()> {
Ok(())
}
}
impl FifoBackend for ReconnectFifoBackend {
fn try_write_points(&mut self, _pps: u32, _points: &[LaserPoint]) -> Result<WriteOutcome> {
Ok(WriteOutcome::Written)
}
fn estimator(&self) -> &dyn BufferEstimator {
&self.estimator
}
}
struct DisconnectAfterNBackend {
connected: bool,
fail_after: usize,
write_count: AtomicUsize,
estimator: SoftwareDecayEstimator,
}
impl DisconnectAfterNBackend {
fn new(fail_after: usize) -> Self {
Self {
connected: false,
fail_after,
write_count: AtomicUsize::new(0),
estimator: SoftwareDecayEstimator::new(),
}
}
}
impl DacBackend for DisconnectAfterNBackend {
fn dac_type(&self) -> DacType {
DacType::Custom("TrackingTest".into())
}
fn caps(&self) -> &DacCapabilities {
static CAPS: DacCapabilities = DacCapabilities {
pps_min: 1000,
pps_max: 100000,
max_points_per_chunk: 1000,
output_model: crate::device::OutputModel::NetworkFifo,
};
&CAPS
}
fn connect(&mut self) -> Result<()> {
self.connected = true;
Ok(())
}
fn disconnect(&mut self) -> Result<()> {
self.connected = false;
Ok(())
}
fn is_connected(&self) -> bool {
self.connected
}
fn stop(&mut self) -> Result<()> {
Ok(())
}
fn set_shutter(&mut self, _open: bool) -> Result<()> {
Ok(())
}
}
impl FifoBackend for DisconnectAfterNBackend {
fn try_write_points(&mut self, _pps: u32, _points: &[LaserPoint]) -> Result<WriteOutcome> {
let count = self.write_count.fetch_add(1, Ordering::SeqCst);
if count >= self.fail_after {
self.connected = false;
Err(Error::disconnected("simulated disconnect"))
} else {
Ok(WriteOutcome::Written)
}
}
fn estimator(&self) -> &dyn BufferEstimator {
&self.estimator
}
}
struct TrackingDiscoverer {
scan_count: Arc<AtomicUsize>,
connect_count: Arc<AtomicUsize>,
}
impl crate::discovery::Discoverer for TrackingDiscoverer {
fn dac_type(&self) -> DacType {
DacType::Custom("TrackingTest".into())
}
fn prefix(&self) -> &str {
"trackingtest"
}
fn scan(&mut self) -> Vec<crate::discovery::DiscoveredDevice> {
self.scan_count.fetch_add(1, Ordering::SeqCst);
let info = crate::discovery::DiscoveredDeviceInfo::new(
DacType::Custom("TrackingTest".into()),
"trackingtest:10.0.0.99",
"Tracking Test Device",
)
.with_ip("10.0.0.99".parse().unwrap())
.with_hardware_name("Tracking Test Device");
vec![crate::discovery::DiscoveredDevice::new(info, Box::new(()))]
}
fn connect(&mut self, _opaque: Box<dyn std::any::Any + Send>) -> Result<BackendKind> {
self.connect_count.fetch_add(1, Ordering::SeqCst);
Ok(BackendKind::Fifo(Box::new(ReconnectFifoBackend::new())))
}
}
#[test]
fn reconnect_rediscovers_custom_backend_via_factory() {
let scan_count = Arc::new(AtomicUsize::new(0));
let connect_count = Arc::new(AtomicUsize::new(0));
let reconnected = Arc::new(AtomicBool::new(false));
let scan_count_factory = scan_count.clone();
let connect_count_factory = connect_count.clone();
let reconnected_cb = reconnected.clone();
let initial_backend = DisconnectAfterNBackend::new(2);
let caps = initial_backend.caps().clone();
let device_id = "trackingtest:10.0.0.99";
let info = DacInfo {
id: device_id.to_string(),
name: "Tracking Test Device".to_string(),
kind: DacType::Custom("TrackingTest".to_string()),
caps,
};
let device = Dac::new(info, BackendKind::Fifo(Box::new(initial_backend)))
.with_discovery_factory(move || {
let mut d = crate::discovery::DacDiscovery::new(crate::device::EnabledDacTypes::none());
d.register(Box::new(TrackingDiscoverer {
scan_count: scan_count_factory.clone(),
connect_count: connect_count_factory.clone(),
}));
d
});
let config = crate::presentation::FrameSessionConfig::new(30_000).with_reconnect(
crate::config::ReconnectConfig::new()
.max_retries(3)
.backoff(Duration::from_millis(50))
.on_reconnect(move |_info| {
reconnected_cb.store(true, Ordering::SeqCst);
}),
);
let (session, _info) = device.start_frame_session(config).unwrap();
session.control().arm().unwrap();
session.send_frame(crate::presentation::Frame::new(vec![LaserPoint::blanked(
0.0, 0.0,
)]));
std::thread::sleep(Duration::from_millis(500));
assert!(
scan_count.load(Ordering::SeqCst) > 0,
"discoverer scan() should have been called during reconnect"
);
assert!(
connect_count.load(Ordering::SeqCst) > 0,
"discoverer connect() should have been called to reopen the device"
);
assert!(
reconnected.load(Ordering::SeqCst),
"on_reconnect callback should have fired, proving successful reconnect"
);
drop(session);
}