1use std::collections::VecDeque;
31use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
32use std::sync::mpsc::{self, Receiver, Sender, TryRecvError};
33use std::sync::{Arc, Mutex};
34use std::time::Duration;
35
36use crate::backend::{Error, Result, StreamBackend, WriteOutcome};
37use crate::types::{
38 ChunkRequest, ChunkResult, DacCapabilities, DacInfo, DacType, LaserPoint, OutputModel, RunExit,
39 StreamConfig, StreamInstant, StreamStats, StreamStatus, UnderrunPolicy,
40};
41
42#[derive(Debug, Clone, Copy)]
51enum ControlMsg {
52 Arm,
54 Disarm,
56 Stop,
58}
59
60#[derive(Clone)]
68pub struct StreamControl {
69 inner: Arc<StreamControlInner>,
70}
71
72struct StreamControlInner {
73 armed: AtomicBool,
75 stop_requested: AtomicBool,
77 control_tx: Mutex<Sender<ControlMsg>>,
80 color_delay_micros: AtomicU64,
82}
83
84impl StreamControl {
85 fn new(control_tx: Sender<ControlMsg>, color_delay: Duration) -> Self {
86 Self {
87 inner: Arc::new(StreamControlInner {
88 armed: AtomicBool::new(false),
89 stop_requested: AtomicBool::new(false),
90 control_tx: Mutex::new(control_tx),
91 color_delay_micros: AtomicU64::new(color_delay.as_micros() as u64),
92 }),
93 }
94 }
95
96 pub fn arm(&self) -> Result<()> {
101 self.inner.armed.store(true, Ordering::SeqCst);
102 if let Ok(tx) = self.inner.control_tx.lock() {
104 let _ = tx.send(ControlMsg::Arm);
105 }
106 Ok(())
107 }
108
109 pub fn disarm(&self) -> Result<()> {
122 self.inner.armed.store(false, Ordering::SeqCst);
123 if let Ok(tx) = self.inner.control_tx.lock() {
125 let _ = tx.send(ControlMsg::Disarm);
126 }
127 Ok(())
128 }
129
130 pub fn is_armed(&self) -> bool {
132 self.inner.armed.load(Ordering::SeqCst)
133 }
134
135 pub fn set_color_delay(&self, delay: Duration) {
140 self.inner
141 .color_delay_micros
142 .store(delay.as_micros() as u64, Ordering::SeqCst);
143 }
144
145 pub fn color_delay(&self) -> Duration {
147 Duration::from_micros(self.inner.color_delay_micros.load(Ordering::SeqCst))
148 }
149
150 pub fn stop(&self) -> Result<()> {
155 self.inner.stop_requested.store(true, Ordering::SeqCst);
156 if let Ok(tx) = self.inner.control_tx.lock() {
158 let _ = tx.send(ControlMsg::Stop);
159 }
160 Ok(())
161 }
162
163 pub fn is_stop_requested(&self) -> bool {
165 self.inner.stop_requested.load(Ordering::SeqCst)
166 }
167}
168
169struct StreamState {
174 current_instant: StreamInstant,
176 scheduled_ahead: u64,
178
179 chunk_buffer: Vec<LaserPoint>,
182 last_chunk: Vec<LaserPoint>,
184 last_chunk_len: usize,
186
187 color_delay_line: VecDeque<(u16, u16, u16, u16)>,
189
190 startup_blank_remaining: usize,
192 startup_blank_points: usize,
194
195 target_buffer_secs: f64,
198 min_buffer_secs: f64,
200 target_buffer_points: u64,
202
203 stats: StreamStats,
205 last_armed: bool,
207 shutter_open: bool,
209}
210
211impl StreamState {
212 fn new(
217 max_points_per_chunk: usize,
218 startup_blank_points: usize,
219 config: &StreamConfig,
220 ) -> Self {
221 let pps = config.pps as f64;
222 let target_buffer_secs = config.target_buffer.as_secs_f64();
223 let min_buffer_secs = config.min_buffer.as_secs_f64();
224 Self {
225 current_instant: StreamInstant::new(0),
226 scheduled_ahead: 0,
227 chunk_buffer: vec![LaserPoint::default(); max_points_per_chunk],
228 last_chunk: vec![LaserPoint::default(); max_points_per_chunk],
229 last_chunk_len: 0,
230 color_delay_line: VecDeque::new(),
231 startup_blank_remaining: 0,
232 startup_blank_points,
233 target_buffer_secs,
234 min_buffer_secs,
235 target_buffer_points: (target_buffer_secs * pps) as u64,
236 stats: StreamStats::default(),
237 last_armed: false,
238 shutter_open: false,
239 }
240 }
241}
242
243pub struct Stream {
255 info: DacInfo,
257 backend: Option<Box<dyn StreamBackend>>,
259 config: StreamConfig,
261 control: StreamControl,
263 control_rx: Receiver<ControlMsg>,
265 state: StreamState,
267}
268
269impl Stream {
270 fn duration_micros_to_points(micros: u64, pps: u32) -> usize {
272 if micros == 0 {
273 0
274 } else {
275 (micros as f64 * pps as f64 / 1_000_000.0).ceil() as usize
276 }
277 }
278
279 pub(crate) fn with_backend(
281 info: DacInfo,
282 backend: Box<dyn StreamBackend>,
283 config: StreamConfig,
284 ) -> Self {
285 let (control_tx, control_rx) = mpsc::channel();
286 let max_points = info.caps.max_points_per_chunk;
287 let startup_blank_points =
288 Self::duration_micros_to_points(config.startup_blank.as_micros() as u64, config.pps);
289 Self {
290 info,
291 backend: Some(backend),
292 config: config.clone(),
293 control: StreamControl::new(control_tx, config.color_delay),
294 control_rx,
295 state: StreamState::new(max_points, startup_blank_points, &config),
296 }
297 }
298
299 pub fn info(&self) -> &DacInfo {
301 &self.info
302 }
303
304 pub fn config(&self) -> &StreamConfig {
306 &self.config
307 }
308
309 pub fn control(&self) -> StreamControl {
311 self.control.clone()
312 }
313
314 pub fn status(&self) -> Result<StreamStatus> {
316 let device_queued_points = self.backend.as_ref().and_then(|b| b.queued_points());
317
318 Ok(StreamStatus {
319 connected: self
320 .backend
321 .as_ref()
322 .map(|b| b.is_connected())
323 .unwrap_or(false),
324 scheduled_ahead_points: self.state.scheduled_ahead,
325 device_queued_points,
326 stats: Some(self.state.stats.clone()),
327 })
328 }
329
330 fn handle_shutter_transition(&mut self, is_armed: bool) {
332 let was_armed = self.state.last_armed;
333 self.state.last_armed = is_armed;
334
335 if was_armed && !is_armed {
336 self.state.color_delay_line.clear();
338 if self.state.shutter_open {
339 if let Some(backend) = &mut self.backend {
340 let _ = backend.set_shutter(false); }
342 self.state.shutter_open = false;
343 }
344 } else if !was_armed && is_armed {
345 let delay_micros = self.control.inner.color_delay_micros.load(Ordering::SeqCst);
349 let delay_points = Self::duration_micros_to_points(delay_micros, self.config.pps);
350 self.state.color_delay_line.clear();
351 for _ in 0..delay_points {
352 self.state.color_delay_line.push_back((0, 0, 0, 0));
353 }
354
355 self.state.startup_blank_remaining = self.state.startup_blank_points;
356
357 if !self.state.shutter_open {
358 if let Some(backend) = &mut self.backend {
359 let _ = backend.set_shutter(true); }
361 self.state.shutter_open = true;
362 }
363 }
364 }
365
366 pub fn stop(&mut self) -> Result<()> {
372 self.control.disarm()?;
374
375 self.control.stop()?;
376
377 if let Some(backend) = &mut self.backend {
379 let _ = backend.set_shutter(false);
380 backend.stop()?;
381 }
382
383 Ok(())
384 }
385
386 pub fn into_dac(mut self) -> (Dac, StreamStats) {
408 let _ = self.control.disarm();
410 let _ = self.control.stop();
411 if let Some(backend) = &mut self.backend {
412 let _ = backend.set_shutter(false);
413 let _ = backend.stop();
414 }
415
416 let backend = self.backend.take();
418 let stats = self.state.stats.clone();
419
420 let dac = Dac {
421 info: self.info.clone(),
422 backend,
423 };
424
425 (dac, stats)
426 }
427
428 pub fn run<F, E>(mut self, mut producer: F, mut on_error: E) -> Result<RunExit>
469 where
470 F: FnMut(&ChunkRequest, &mut [LaserPoint]) -> ChunkResult + Send + 'static,
471 E: FnMut(Error) + Send + 'static,
472 {
473 use std::time::Instant;
474
475 let pps = self.config.pps as f64;
476 let max_points = self.info.caps.max_points_per_chunk;
477
478 let mut last_iteration = Instant::now();
482
483 loop {
484 if self.control.is_stop_requested() {
486 return Ok(RunExit::Stopped);
487 }
488
489 let now = Instant::now();
493 let elapsed = now.duration_since(last_iteration);
494 let points_consumed = (elapsed.as_secs_f64() * pps) as u64;
495 self.state.scheduled_ahead = self.state.scheduled_ahead.saturating_sub(points_consumed);
496 last_iteration = now;
497
498 let buffered = self.estimate_buffer_points();
500 let target_points = self.state.target_buffer_points;
501
502 if buffered > target_points {
505 let excess_points = buffered - target_points;
506 let sleep_time = Duration::from_secs_f64(excess_points as f64 / pps);
507 if self.sleep_with_control_check(sleep_time)? {
508 return Ok(RunExit::Stopped);
509 }
510 continue; }
512
513 if let Some(backend) = &self.backend {
515 if !backend.is_connected() {
516 log::warn!("backend.is_connected() = false, exiting with Disconnected");
517 on_error(Error::disconnected("backend disconnected"));
518 return Ok(RunExit::Disconnected);
519 }
520 } else {
521 log::warn!("no backend, exiting with Disconnected");
522 on_error(Error::disconnected("no backend"));
523 return Ok(RunExit::Disconnected);
524 }
525
526 if self.process_control_messages() {
528 return Ok(RunExit::Stopped);
529 }
530
531 let req = self.build_fill_request(max_points, buffered);
533
534 let buffer = &mut self.state.chunk_buffer[..max_points];
536 let result = producer(&req, buffer);
537
538 match result {
540 ChunkResult::Filled(n) => {
541 let n = n.min(max_points);
543
544 if n == 0 && req.target_points > 0 {
546 self.handle_underrun(&req)?;
547 continue;
548 }
549
550 if n > 0 {
552 self.write_fill_points(n, &mut on_error)?;
553 }
554 }
555 ChunkResult::Starved => {
556 self.handle_underrun(&req)?;
557 }
558 ChunkResult::End => {
559 self.drain_and_blank();
561 return Ok(RunExit::ProducerEnded);
562 }
563 }
564 }
565 }
566
567 fn sleep_with_control_check(&mut self, duration: Duration) -> Result<bool> {
571 const SLEEP_SLICE: Duration = Duration::from_millis(2);
572 let mut remaining = duration;
573
574 while remaining > Duration::ZERO {
575 let slice = remaining.min(SLEEP_SLICE);
576 std::thread::sleep(slice);
577 remaining = remaining.saturating_sub(slice);
578
579 if self.process_control_messages() {
581 return Ok(true);
582 }
583 }
584
585 Ok(false)
586 }
587
588 fn write_fill_points<E>(&mut self, n: usize, on_error: &mut E) -> Result<()>
592 where
593 E: FnMut(Error),
594 {
595 let is_armed = self.control.is_armed();
596 let pps = self.config.pps;
597
598 self.handle_shutter_transition(is_armed);
600
601 if !is_armed {
603 for p in &mut self.state.chunk_buffer[..n] {
604 *p = LaserPoint::blanked(p.x, p.y);
605 }
606 }
607
608 if is_armed && self.state.startup_blank_remaining > 0 {
610 let blank_count = n.min(self.state.startup_blank_remaining);
611 for p in &mut self.state.chunk_buffer[..blank_count] {
612 p.r = 0;
613 p.g = 0;
614 p.b = 0;
615 p.intensity = 0;
616 }
617 self.state.startup_blank_remaining -= blank_count;
618 }
619
620 let delay_micros = self.control.inner.color_delay_micros.load(Ordering::SeqCst);
622 let color_delay_points = Self::duration_micros_to_points(delay_micros, self.config.pps);
623
624 if color_delay_points > 0 {
625 self.state
627 .color_delay_line
628 .resize(color_delay_points, (0, 0, 0, 0));
629 for p in &mut self.state.chunk_buffer[..n] {
630 self.state
631 .color_delay_line
632 .push_back((p.r, p.g, p.b, p.intensity));
633 let (r, g, b, i) = self.state.color_delay_line.pop_front().unwrap();
634 p.r = r;
635 p.g = g;
636 p.b = b;
637 p.intensity = i;
638 }
639 } else if !self.state.color_delay_line.is_empty() {
640 self.state.color_delay_line.clear();
642 }
643
644 loop {
646 let backend = match self.backend.as_mut() {
648 Some(b) => b,
649 None => return Err(Error::disconnected("no backend")),
650 };
651
652 match backend.try_write_chunk(pps, &self.state.chunk_buffer[..n]) {
653 Ok(WriteOutcome::Written) => {
654 self.record_write(n, is_armed);
655 return Ok(());
656 }
657 Ok(WriteOutcome::WouldBlock) => {
658 }
661 Err(e) if e.is_stopped() => {
662 return Err(Error::Stopped);
663 }
664 Err(e) if e.is_disconnected() => {
665 log::warn!("write got Disconnected error, exiting stream: {e}");
666 on_error(Error::disconnected("backend disconnected"));
667 return Err(e);
668 }
669 Err(e) => {
670 log::warn!("write error, disconnecting backend: {e}");
671 let _ = backend.disconnect();
672 on_error(e);
673 return Ok(());
674 }
675 }
676
677 std::thread::yield_now();
679 if self.process_control_messages() {
680 return Err(Error::Stopped);
681 }
682 std::thread::sleep(Duration::from_micros(100));
683 }
684 }
685
686 fn handle_underrun(&mut self, req: &ChunkRequest) -> Result<()> {
688 self.state.stats.underrun_count += 1;
689
690 let is_armed = self.control.is_armed();
691 self.handle_shutter_transition(is_armed);
692
693 let n_points = req.target_points.max(1);
695
696 let fill_start = if !is_armed {
698 for i in 0..n_points {
700 self.state.chunk_buffer[i] = LaserPoint::blanked(0.0, 0.0);
701 }
702 n_points
703 } else {
704 match &self.config.underrun {
705 UnderrunPolicy::RepeatLast => {
706 if self.state.last_chunk_len > 0 {
707 for i in 0..n_points {
709 self.state.chunk_buffer[i] =
710 self.state.last_chunk[i % self.state.last_chunk_len];
711 }
712 n_points
713 } else {
714 for i in 0..n_points {
716 self.state.chunk_buffer[i] = LaserPoint::blanked(0.0, 0.0);
717 }
718 n_points
719 }
720 }
721 UnderrunPolicy::Blank => {
722 for i in 0..n_points {
723 self.state.chunk_buffer[i] = LaserPoint::blanked(0.0, 0.0);
724 }
725 n_points
726 }
727 UnderrunPolicy::Park { x, y } => {
728 for i in 0..n_points {
729 self.state.chunk_buffer[i] = LaserPoint::blanked(*x, *y);
730 }
731 n_points
732 }
733 UnderrunPolicy::Stop => {
734 self.control.stop()?;
735 return Err(Error::Stopped);
736 }
737 }
738 };
739
740 if let Some(backend) = &mut self.backend {
742 match backend.try_write_chunk(self.config.pps, &self.state.chunk_buffer[..fill_start]) {
743 Ok(WriteOutcome::Written) => {
744 self.record_write(fill_start, is_armed);
745 }
746 Ok(WriteOutcome::WouldBlock) => {
747 }
749 Err(_) => {
750 }
752 }
753 }
754
755 Ok(())
756 }
757
758 fn record_write(&mut self, n: usize, is_armed: bool) {
764 if is_armed {
765 debug_assert!(
766 n <= self.state.last_chunk.len(),
767 "n ({}) exceeds last_chunk capacity ({})",
768 n,
769 self.state.last_chunk.len()
770 );
771 self.state.last_chunk[..n].copy_from_slice(&self.state.chunk_buffer[..n]);
772 self.state.last_chunk_len = n;
773 }
774 self.state.current_instant += n as u64;
775 if self.info.caps.output_model == OutputModel::UsbFrameSwap {
776 self.state.scheduled_ahead = n as u64;
779 } else {
780 self.state.scheduled_ahead += n as u64;
781 }
782 self.state.stats.chunks_written += 1;
783 self.state.stats.points_written += n as u64;
784 }
785
786 fn process_control_messages(&mut self) -> bool {
799 loop {
800 match self.control_rx.try_recv() {
801 Ok(ControlMsg::Arm) => {
802 if !self.state.shutter_open {
804 if let Some(backend) = &mut self.backend {
805 let _ = backend.set_shutter(true);
806 }
807 self.state.shutter_open = true;
808 }
809 }
810 Ok(ControlMsg::Disarm) => {
811 if self.state.shutter_open {
813 if let Some(backend) = &mut self.backend {
814 let _ = backend.set_shutter(false);
815 }
816 self.state.shutter_open = false;
817 }
818 }
819 Ok(ControlMsg::Stop) => {
820 return true;
821 }
822 Err(TryRecvError::Empty) => break,
823 Err(TryRecvError::Disconnected) => break,
824 }
825 }
826 false
827 }
828
829 fn estimate_buffer_points(&self) -> u64 {
840 let software = self.state.scheduled_ahead;
841
842 if let Some(device_queue) = self.backend.as_ref().and_then(|b| b.queued_points()) {
844 return device_queue.min(software);
845 }
846
847 software
848 }
849
850 fn build_fill_request(&self, max_points: usize, buffered_points: u64) -> ChunkRequest {
864 let pps = self.config.pps;
865 let pps_f64 = pps as f64;
866
867 let buffered_secs = buffered_points as f64 / pps_f64;
869 let buffered = Duration::from_secs_f64(buffered_secs);
870
871 let start = self.state.current_instant;
873
874 let target_buffer_secs = self.state.target_buffer_secs;
876 let min_buffer_secs = self.state.min_buffer_secs;
877
878 let deficit_target = (target_buffer_secs - buffered_secs).max(0.0);
880 let target_points = (deficit_target * pps_f64).ceil() as usize;
881 let target_points = target_points.min(max_points);
882
883 let deficit_min = (min_buffer_secs - buffered_secs).max(0.0);
885 let min_points = (deficit_min * pps_f64).ceil() as usize;
886 let min_points = min_points.min(max_points);
887
888 let device_queued_points = self.backend.as_ref().and_then(|b| b.queued_points());
892
893 ChunkRequest {
894 start,
895 pps,
896 min_points,
897 target_points,
898 buffered_points,
899 buffered,
900 device_queued_points,
901 }
902 }
903
904 fn drain_and_blank(&mut self) {
914 use std::time::Instant;
915
916 let timeout = self.config.drain_timeout;
917 if timeout.is_zero() {
918 self.blank_and_close_shutter();
920 return;
921 }
922
923 let deadline = Instant::now() + timeout;
924 let pps = self.config.pps;
925
926 let has_queue_depth = self
928 .backend
929 .as_ref()
930 .and_then(|b| b.queued_points())
931 .is_some();
932
933 if has_queue_depth {
934 const POLL_INTERVAL: Duration = Duration::from_millis(5);
936 while Instant::now() < deadline {
937 if let Some(queued) = self.backend.as_ref().and_then(|b| b.queued_points()) {
938 if queued == 0 {
939 break;
940 }
941 } else {
942 break;
944 }
945
946 if self.process_control_messages() {
948 break;
949 }
950
951 std::thread::sleep(POLL_INTERVAL);
952 }
953 } else {
954 let estimated_drain =
956 Duration::from_secs_f64(self.state.scheduled_ahead as f64 / pps as f64);
957 let wait_time = estimated_drain.min(timeout);
958
959 const SLEEP_SLICE: Duration = Duration::from_millis(10);
961 let mut remaining = wait_time;
962 while remaining > Duration::ZERO && Instant::now() < deadline {
963 let slice = remaining.min(SLEEP_SLICE);
964 std::thread::sleep(slice);
965 remaining = remaining.saturating_sub(slice);
966
967 if self.process_control_messages() {
968 break;
969 }
970 }
971 }
972
973 self.blank_and_close_shutter();
974 }
975
976 fn blank_and_close_shutter(&mut self) {
981 if let Some(backend) = &mut self.backend {
983 let _ = backend.set_shutter(false);
984 }
985 self.state.shutter_open = false;
986
987 if let Some(backend) = &mut self.backend {
990 let blank_point = LaserPoint::blanked(0.0, 0.0);
991 let blank_chunk = [blank_point; 16];
992 let _ = backend.try_write_chunk(self.config.pps, &blank_chunk);
993 }
994 }
995}
996
997impl Drop for Stream {
998 fn drop(&mut self) {
999 let _ = self.stop();
1000 }
1001}
1002
1003pub struct Dac {
1024 info: DacInfo,
1025 backend: Option<Box<dyn StreamBackend>>,
1026}
1027
1028impl Dac {
1029 pub fn new(info: DacInfo, backend: Box<dyn StreamBackend>) -> Self {
1031 Self {
1032 info,
1033 backend: Some(backend),
1034 }
1035 }
1036
1037 pub fn info(&self) -> &DacInfo {
1039 &self.info
1040 }
1041
1042 pub fn id(&self) -> &str {
1044 &self.info.id
1045 }
1046
1047 pub fn name(&self) -> &str {
1049 &self.info.name
1050 }
1051
1052 pub fn kind(&self) -> &DacType {
1054 &self.info.kind
1055 }
1056
1057 pub fn caps(&self) -> &DacCapabilities {
1059 &self.info.caps
1060 }
1061
1062 pub fn has_backend(&self) -> bool {
1064 self.backend.is_some()
1065 }
1066
1067 pub fn is_connected(&self) -> bool {
1069 self.backend
1070 .as_ref()
1071 .map(|b| b.is_connected())
1072 .unwrap_or(false)
1073 }
1074
1075 pub fn start_stream(mut self, cfg: StreamConfig) -> Result<(Stream, DacInfo)> {
1100 let mut backend = self.backend.take().ok_or_else(|| {
1101 Error::invalid_config("device backend has already been used for a stream")
1102 })?;
1103
1104 Self::validate_config(&self.info.caps, &cfg)?;
1105
1106 if !backend.is_connected() {
1108 backend.connect()?;
1109 }
1110
1111 let stream = Stream::with_backend(self.info.clone(), backend, cfg);
1112
1113 Ok((stream, self.info))
1114 }
1115
1116 fn validate_config(caps: &DacCapabilities, cfg: &StreamConfig) -> Result<()> {
1117 if cfg.pps < caps.pps_min || cfg.pps > caps.pps_max {
1118 return Err(Error::invalid_config(format!(
1119 "PPS {} is outside device range [{}, {}]",
1120 cfg.pps, caps.pps_min, caps.pps_max
1121 )));
1122 }
1123
1124 Ok(())
1125 }
1126}
1127
1128#[cfg(test)]
1129mod tests {
1130 use super::*;
1131 use crate::backend::{StreamBackend, WriteOutcome};
1132 use std::sync::atomic::{AtomicU64, AtomicUsize, Ordering};
1133 use std::sync::{Arc, Mutex};
1134
1135 struct TestBackend {
1137 caps: DacCapabilities,
1138 connected: bool,
1139 write_count: Arc<AtomicUsize>,
1141 would_block_count: Arc<AtomicUsize>,
1143 queued: Arc<AtomicU64>,
1145 shutter_open: Arc<AtomicBool>,
1147 }
1148
1149 impl TestBackend {
1150 fn new() -> Self {
1151 Self {
1152 caps: DacCapabilities {
1153 pps_min: 1000,
1154 pps_max: 100000,
1155 max_points_per_chunk: 1000,
1156 output_model: crate::types::OutputModel::NetworkFifo,
1157 },
1158 connected: false,
1159 write_count: Arc::new(AtomicUsize::new(0)),
1160 would_block_count: Arc::new(AtomicUsize::new(0)),
1161 queued: Arc::new(AtomicU64::new(0)),
1162 shutter_open: Arc::new(AtomicBool::new(false)),
1163 }
1164 }
1165
1166 fn with_would_block_count(mut self, count: usize) -> Self {
1167 self.would_block_count = Arc::new(AtomicUsize::new(count));
1168 self
1169 }
1170
1171 fn with_output_model(mut self, model: OutputModel) -> Self {
1172 self.caps.output_model = model;
1173 self
1174 }
1175
1176 fn with_initial_queue(mut self, queue: u64) -> Self {
1178 self.queued = Arc::new(AtomicU64::new(queue));
1179 self
1180 }
1181 }
1182
1183 struct NoQueueTestBackend {
1194 inner: TestBackend,
1195 }
1196
1197 impl NoQueueTestBackend {
1198 fn new() -> Self {
1199 Self {
1200 inner: TestBackend::new(),
1201 }
1202 }
1203
1204 fn with_output_model(mut self, model: OutputModel) -> Self {
1205 self.inner = self.inner.with_output_model(model);
1206 self
1207 }
1208 }
1209
1210 impl StreamBackend for NoQueueTestBackend {
1211 fn dac_type(&self) -> DacType {
1212 self.inner.dac_type()
1213 }
1214
1215 fn caps(&self) -> &DacCapabilities {
1216 self.inner.caps()
1217 }
1218
1219 fn connect(&mut self) -> Result<()> {
1220 self.inner.connect()
1221 }
1222
1223 fn disconnect(&mut self) -> Result<()> {
1224 self.inner.disconnect()
1225 }
1226
1227 fn is_connected(&self) -> bool {
1228 self.inner.is_connected()
1229 }
1230
1231 fn try_write_chunk(&mut self, pps: u32, points: &[LaserPoint]) -> Result<WriteOutcome> {
1232 self.inner.try_write_chunk(pps, points)
1233 }
1234
1235 fn stop(&mut self) -> Result<()> {
1236 self.inner.stop()
1237 }
1238
1239 fn set_shutter(&mut self, open: bool) -> Result<()> {
1240 self.inner.set_shutter(open)
1241 }
1242
1243 fn queued_points(&self) -> Option<u64> {
1245 None
1246 }
1247 }
1248
1249 impl StreamBackend for TestBackend {
1250 fn dac_type(&self) -> DacType {
1251 DacType::Custom("Test".to_string())
1252 }
1253
1254 fn caps(&self) -> &DacCapabilities {
1255 &self.caps
1256 }
1257
1258 fn connect(&mut self) -> Result<()> {
1259 self.connected = true;
1260 Ok(())
1261 }
1262
1263 fn disconnect(&mut self) -> Result<()> {
1264 self.connected = false;
1265 Ok(())
1266 }
1267
1268 fn is_connected(&self) -> bool {
1269 self.connected
1270 }
1271
1272 fn try_write_chunk(&mut self, _pps: u32, points: &[LaserPoint]) -> Result<WriteOutcome> {
1273 self.write_count.fetch_add(1, Ordering::SeqCst);
1274
1275 let remaining = self.would_block_count.load(Ordering::SeqCst);
1277 if remaining > 0 {
1278 self.would_block_count.fetch_sub(1, Ordering::SeqCst);
1279 return Ok(WriteOutcome::WouldBlock);
1280 }
1281
1282 self.queued.fetch_add(points.len() as u64, Ordering::SeqCst);
1283 Ok(WriteOutcome::Written)
1284 }
1285
1286 fn stop(&mut self) -> Result<()> {
1287 Ok(())
1288 }
1289
1290 fn set_shutter(&mut self, open: bool) -> Result<()> {
1291 self.shutter_open.store(open, Ordering::SeqCst);
1292 Ok(())
1293 }
1294
1295 fn queued_points(&self) -> Option<u64> {
1296 Some(self.queued.load(Ordering::SeqCst))
1297 }
1298 }
1299
1300 #[test]
1301 fn test_stream_control_arm_disarm() {
1302 let (tx, _rx) = mpsc::channel();
1303 let control = StreamControl::new(tx, Duration::ZERO);
1304 assert!(!control.is_armed());
1305
1306 control.arm().unwrap();
1307 assert!(control.is_armed());
1308
1309 control.disarm().unwrap();
1310 assert!(!control.is_armed());
1311 }
1312
1313 #[test]
1314 fn test_stream_control_stop() {
1315 let (tx, _rx) = mpsc::channel();
1316 let control = StreamControl::new(tx, Duration::ZERO);
1317 assert!(!control.is_stop_requested());
1318
1319 control.stop().unwrap();
1320 assert!(control.is_stop_requested());
1321 }
1322
1323 #[test]
1324 fn test_stream_control_clone_shares_state() {
1325 let (tx, _rx) = mpsc::channel();
1326 let control1 = StreamControl::new(tx, Duration::ZERO);
1327 let control2 = control1.clone();
1328
1329 control1.arm().unwrap();
1330 assert!(control2.is_armed());
1331
1332 control2.stop().unwrap();
1333 assert!(control1.is_stop_requested());
1334 }
1335
1336 #[test]
1337 fn test_device_start_stream_connects_backend() {
1338 let backend = TestBackend::new();
1339 let info = DacInfo {
1340 id: "test".to_string(),
1341 name: "Test Device".to_string(),
1342 kind: DacType::Custom("Test".to_string()),
1343 caps: backend.caps().clone(),
1344 };
1345 let device = Dac::new(info, Box::new(backend));
1346
1347 assert!(!device.is_connected());
1349
1350 let cfg = StreamConfig::new(30000);
1352 let result = device.start_stream(cfg);
1353 assert!(result.is_ok());
1354
1355 let (stream, _info) = result.unwrap();
1356 assert!(stream.backend.as_ref().unwrap().is_connected());
1357 }
1358
1359 #[test]
1360 fn test_handle_underrun_advances_state() {
1361 let mut backend = TestBackend::new();
1362 backend.connected = true;
1363 let info = DacInfo {
1364 id: "test".to_string(),
1365 name: "Test Device".to_string(),
1366 kind: DacType::Custom("Test".to_string()),
1367 caps: backend.caps().clone(),
1368 };
1369
1370 let cfg = StreamConfig::new(30000);
1371 let mut stream = Stream::with_backend(info, Box::new(backend), cfg);
1372
1373 let initial_instant = stream.state.current_instant;
1375 let initial_scheduled = stream.state.scheduled_ahead;
1376 let initial_chunks = stream.state.stats.chunks_written;
1377 let initial_points = stream.state.stats.points_written;
1378
1379 let req = ChunkRequest {
1381 start: StreamInstant::new(0),
1382 pps: 30000,
1383 min_points: 100,
1384 target_points: 100,
1385 buffered_points: 0,
1386 buffered: Duration::ZERO,
1387 device_queued_points: None,
1388 };
1389 stream.handle_underrun(&req).unwrap();
1390
1391 assert!(stream.state.current_instant > initial_instant);
1393 assert!(stream.state.scheduled_ahead > initial_scheduled);
1394 assert_eq!(stream.state.stats.chunks_written, initial_chunks + 1);
1395 assert_eq!(stream.state.stats.points_written, initial_points + 100);
1396 assert_eq!(stream.state.stats.underrun_count, 1);
1397 }
1398
1399 #[test]
1400 fn test_run_retries_on_would_block() {
1401 let backend = TestBackend::new().with_would_block_count(3);
1403 let write_count = backend.write_count.clone();
1404
1405 let mut backend_box: Box<dyn StreamBackend> = Box::new(backend);
1406 backend_box.connect().unwrap();
1407
1408 let info = DacInfo {
1409 id: "test".to_string(),
1410 name: "Test Device".to_string(),
1411 kind: DacType::Custom("Test".to_string()),
1412 caps: backend_box.caps().clone(),
1413 };
1414
1415 let cfg = StreamConfig::new(30000);
1416 let stream = Stream::with_backend(info, backend_box, cfg);
1417
1418 let produced_count = Arc::new(AtomicUsize::new(0));
1419 let produced_count_clone = produced_count.clone();
1420 let result = stream.run(
1421 move |req, buffer| {
1422 let count = produced_count_clone.fetch_add(1, Ordering::SeqCst);
1423 if count < 1 {
1424 let n = req.target_points.min(buffer.len());
1425 for i in 0..n {
1426 buffer[i] = LaserPoint::blanked(0.0, 0.0);
1427 }
1428 ChunkResult::Filled(n)
1429 } else {
1430 ChunkResult::End
1431 }
1432 },
1433 |_e| {},
1434 );
1435
1436 assert_eq!(result.unwrap(), RunExit::ProducerEnded);
1437 assert!(write_count.load(Ordering::SeqCst) >= 1);
1440 }
1441
1442 #[test]
1443 fn test_arm_opens_shutter_disarm_closes_shutter() {
1444 let backend = TestBackend::new();
1445 let shutter_open = backend.shutter_open.clone();
1446
1447 let mut backend_box: Box<dyn StreamBackend> = Box::new(backend);
1448 backend_box.connect().unwrap();
1449
1450 let info = DacInfo {
1451 id: "test".to_string(),
1452 name: "Test Device".to_string(),
1453 kind: DacType::Custom("Test".to_string()),
1454 caps: backend_box.caps().clone(),
1455 };
1456
1457 let cfg = StreamConfig::new(30000);
1458 let mut stream = Stream::with_backend(info, backend_box, cfg);
1459
1460 assert!(!shutter_open.load(Ordering::SeqCst));
1462
1463 let control = stream.control();
1465 control.arm().unwrap();
1466
1467 let stopped = stream.process_control_messages();
1469 assert!(!stopped);
1470 assert!(shutter_open.load(Ordering::SeqCst));
1471
1472 control.disarm().unwrap();
1474
1475 let stopped = stream.process_control_messages();
1477 assert!(!stopped);
1478 assert!(!shutter_open.load(Ordering::SeqCst));
1479 }
1480
1481 #[test]
1482 fn test_handle_underrun_blanks_when_disarmed() {
1483 let backend = TestBackend::new();
1484
1485 let mut backend_box: Box<dyn StreamBackend> = Box::new(backend);
1486 backend_box.connect().unwrap();
1487
1488 let info = DacInfo {
1489 id: "test".to_string(),
1490 name: "Test Device".to_string(),
1491 kind: DacType::Custom("Test".to_string()),
1492 caps: backend_box.caps().clone(),
1493 };
1494
1495 let cfg = StreamConfig::new(30000).with_underrun(UnderrunPolicy::RepeatLast);
1497 let mut stream = Stream::with_backend(info, backend_box, cfg);
1498
1499 let colored_point = LaserPoint::new(0.5, 0.5, 65535, 65535, 65535, 65535);
1501 for i in 0..100 {
1502 stream.state.last_chunk[i] = colored_point;
1503 }
1504 stream.state.last_chunk_len = 100;
1505
1506 assert!(!stream.control.is_armed());
1508
1509 let req = ChunkRequest {
1510 start: StreamInstant::new(0),
1511 pps: 30000,
1512 min_points: 100,
1513 target_points: 100,
1514 buffered_points: 0,
1515 buffered: Duration::ZERO,
1516 device_queued_points: None,
1517 };
1518
1519 stream.handle_underrun(&req).unwrap();
1521
1522 assert_eq!(stream.state.last_chunk[0].r, 65535); }
1527
1528 #[test]
1529 fn test_stop_closes_shutter() {
1530 let backend = TestBackend::new();
1531 let shutter_open = backend.shutter_open.clone();
1532
1533 let mut backend_box: Box<dyn StreamBackend> = Box::new(backend);
1534 backend_box.connect().unwrap();
1535
1536 let info = DacInfo {
1537 id: "test".to_string(),
1538 name: "Test Device".to_string(),
1539 kind: DacType::Custom("Test".to_string()),
1540 caps: backend_box.caps().clone(),
1541 };
1542
1543 let cfg = StreamConfig::new(30000);
1544 let mut stream = Stream::with_backend(info, backend_box, cfg);
1545
1546 stream.control.arm().unwrap();
1548 stream.process_control_messages();
1549 assert!(shutter_open.load(Ordering::SeqCst));
1550
1551 stream.stop().unwrap();
1553 assert!(!shutter_open.load(Ordering::SeqCst));
1554 }
1555
1556 #[test]
1557 fn test_arm_disarm_arm_cycle() {
1558 let backend = TestBackend::new();
1559 let shutter_open = backend.shutter_open.clone();
1560
1561 let mut backend_box: Box<dyn StreamBackend> = Box::new(backend);
1562 backend_box.connect().unwrap();
1563
1564 let info = DacInfo {
1565 id: "test".to_string(),
1566 name: "Test Device".to_string(),
1567 kind: DacType::Custom("Test".to_string()),
1568 caps: backend_box.caps().clone(),
1569 };
1570
1571 let cfg = StreamConfig::new(30000);
1572 let mut stream = Stream::with_backend(info, backend_box, cfg);
1573 let control = stream.control();
1574
1575 assert!(!control.is_armed());
1577 assert!(!shutter_open.load(Ordering::SeqCst));
1578
1579 control.arm().unwrap();
1581 stream.process_control_messages();
1582 assert!(control.is_armed());
1583 assert!(shutter_open.load(Ordering::SeqCst));
1584
1585 control.disarm().unwrap();
1587 stream.process_control_messages();
1588 assert!(!control.is_armed());
1589 assert!(!shutter_open.load(Ordering::SeqCst));
1590
1591 control.arm().unwrap();
1593 stream.process_control_messages();
1594 assert!(control.is_armed());
1595 assert!(shutter_open.load(Ordering::SeqCst));
1596 }
1597
1598 #[test]
1603 fn test_run_buffer_driven_behavior() {
1604 let mut backend = NoQueueTestBackend::new();
1607 backend.inner.connected = true;
1608 let write_count = backend.inner.write_count.clone();
1609
1610 let info = DacInfo {
1611 id: "test".to_string(),
1612 name: "Test Device".to_string(),
1613 kind: DacType::Custom("Test".to_string()),
1614 caps: backend.inner.caps().clone(),
1615 };
1616
1617 let cfg = StreamConfig::new(30000)
1619 .with_target_buffer(Duration::from_millis(10))
1620 .with_min_buffer(Duration::from_millis(5));
1621 let stream = Stream::with_backend(info, Box::new(backend), cfg);
1622
1623 let call_count = Arc::new(AtomicUsize::new(0));
1624 let call_count_clone = call_count.clone();
1625
1626 let result = stream.run(
1627 move |req, buffer| {
1628 let count = call_count_clone.fetch_add(1, Ordering::SeqCst);
1629
1630 if count >= 4 {
1632 ChunkResult::End
1633 } else {
1634 let n = req.target_points.min(buffer.len()).min(100);
1635 for i in 0..n {
1636 buffer[i] = LaserPoint::blanked(0.0, 0.0);
1637 }
1638 ChunkResult::Filled(n)
1639 }
1640 },
1641 |_e| {},
1642 );
1643
1644 assert_eq!(result.unwrap(), RunExit::ProducerEnded);
1645 assert!(
1646 write_count.load(Ordering::SeqCst) >= 4,
1647 "Should have written multiple chunks"
1648 );
1649 }
1650
1651 #[test]
1652 fn test_run_sleeps_when_buffer_healthy() {
1653 use std::time::Instant;
1656
1657 let mut backend = NoQueueTestBackend::new();
1658 backend.inner.connected = true;
1659
1660 let info = DacInfo {
1661 id: "test".to_string(),
1662 name: "Test Device".to_string(),
1663 kind: DacType::Custom("Test".to_string()),
1664 caps: backend.inner.caps().clone(),
1665 };
1666
1667 let cfg = StreamConfig::new(30000)
1669 .with_target_buffer(Duration::from_millis(5))
1670 .with_min_buffer(Duration::from_millis(2))
1671 .with_drain_timeout(Duration::ZERO);
1672 let stream = Stream::with_backend(info, Box::new(backend), cfg);
1673
1674 let call_count = Arc::new(AtomicUsize::new(0));
1675 let call_count_clone = call_count.clone();
1676 let start_time = Instant::now();
1677
1678 let result = stream.run(
1679 move |req, buffer| {
1680 let count = call_count_clone.fetch_add(1, Ordering::SeqCst);
1681
1682 if count >= 2 {
1684 ChunkResult::End
1685 } else {
1686 let n = req.target_points.min(buffer.len());
1688 for i in 0..n {
1689 buffer[i] = LaserPoint::blanked(0.0, 0.0);
1690 }
1691 ChunkResult::Filled(n)
1692 }
1693 },
1694 |_e| {},
1695 );
1696
1697 assert_eq!(result.unwrap(), RunExit::ProducerEnded);
1698
1699 let elapsed = start_time.elapsed();
1701 assert!(
1704 elapsed.as_millis() < 100,
1705 "Elapsed time {:?} is too long for test",
1706 elapsed
1707 );
1708 }
1709
1710 #[test]
1711 fn test_run_stops_on_control_stop() {
1712 use std::thread;
1714
1715 let backend = TestBackend::new();
1716
1717 let mut backend_box: Box<dyn StreamBackend> = Box::new(backend);
1718 backend_box.connect().unwrap();
1719
1720 let info = DacInfo {
1721 id: "test".to_string(),
1722 name: "Test Device".to_string(),
1723 kind: DacType::Custom("Test".to_string()),
1724 caps: backend_box.caps().clone(),
1725 };
1726
1727 let cfg = StreamConfig::new(30000);
1728 let stream = Stream::with_backend(info, backend_box, cfg);
1729 let control = stream.control();
1730
1731 let control_clone = control.clone();
1733 thread::spawn(move || {
1734 thread::sleep(Duration::from_millis(20));
1735 control_clone.stop().unwrap();
1736 });
1737
1738 let result = stream.run(
1739 |req, buffer| {
1740 let n = req.target_points.min(buffer.len()).min(10);
1741 for i in 0..n {
1742 buffer[i] = LaserPoint::blanked(0.0, 0.0);
1743 }
1744 ChunkResult::Filled(n)
1745 },
1746 |_e| {},
1747 );
1748
1749 assert_eq!(result.unwrap(), RunExit::Stopped);
1751 }
1752
1753 #[test]
1754 fn test_run_producer_ended() {
1755 let backend = TestBackend::new();
1757
1758 let mut backend_box: Box<dyn StreamBackend> = Box::new(backend);
1759 backend_box.connect().unwrap();
1760
1761 let info = DacInfo {
1762 id: "test".to_string(),
1763 name: "Test Device".to_string(),
1764 kind: DacType::Custom("Test".to_string()),
1765 caps: backend_box.caps().clone(),
1766 };
1767
1768 let cfg = StreamConfig::new(30000);
1769 let stream = Stream::with_backend(info, backend_box, cfg);
1770
1771 let call_count = Arc::new(AtomicUsize::new(0));
1772 let call_count_clone = call_count.clone();
1773
1774 let result = stream.run(
1775 move |req, buffer| {
1776 let count = call_count_clone.fetch_add(1, Ordering::SeqCst);
1777
1778 if count == 0 {
1779 let n = req.target_points.min(buffer.len()).min(100);
1781 for i in 0..n {
1782 buffer[i] = LaserPoint::blanked(0.0, 0.0);
1783 }
1784 ChunkResult::Filled(n)
1785 } else {
1786 ChunkResult::End
1788 }
1789 },
1790 |_e| {},
1791 );
1792
1793 assert_eq!(result.unwrap(), RunExit::ProducerEnded);
1794 assert_eq!(call_count.load(Ordering::SeqCst), 2);
1795 }
1796
1797 #[test]
1798 fn test_run_starved_applies_underrun_policy() {
1799 let backend = TestBackend::new();
1801 let queued = backend.queued.clone();
1802
1803 let mut backend_box: Box<dyn StreamBackend> = Box::new(backend);
1804 backend_box.connect().unwrap();
1805
1806 let info = DacInfo {
1807 id: "test".to_string(),
1808 name: "Test Device".to_string(),
1809 kind: DacType::Custom("Test".to_string()),
1810 caps: backend_box.caps().clone(),
1811 };
1812
1813 let cfg = StreamConfig::new(30000).with_underrun(UnderrunPolicy::Blank);
1815 let stream = Stream::with_backend(info, backend_box, cfg);
1816
1817 let call_count = Arc::new(AtomicUsize::new(0));
1818 let call_count_clone = call_count.clone();
1819
1820 let result = stream.run(
1821 move |_req, _buffer| {
1822 let count = call_count_clone.fetch_add(1, Ordering::SeqCst);
1823
1824 if count == 0 {
1825 ChunkResult::Starved
1827 } else {
1828 ChunkResult::End
1830 }
1831 },
1832 |_e| {},
1833 );
1834
1835 assert_eq!(result.unwrap(), RunExit::ProducerEnded);
1836
1837 assert!(
1839 queued.load(Ordering::SeqCst) > 0,
1840 "Underrun policy should have written blank points"
1841 );
1842 }
1843
1844 #[test]
1845 fn test_run_filled_zero_with_target_treated_as_starved() {
1846 let backend = TestBackend::new();
1848 let queued = backend.queued.clone();
1849
1850 let mut backend_box: Box<dyn StreamBackend> = Box::new(backend);
1851 backend_box.connect().unwrap();
1852
1853 let info = DacInfo {
1854 id: "test".to_string(),
1855 name: "Test Device".to_string(),
1856 kind: DacType::Custom("Test".to_string()),
1857 caps: backend_box.caps().clone(),
1858 };
1859
1860 let cfg = StreamConfig::new(30000).with_underrun(UnderrunPolicy::Blank);
1861 let stream = Stream::with_backend(info, backend_box, cfg);
1862
1863 let call_count = Arc::new(AtomicUsize::new(0));
1864 let call_count_clone = call_count.clone();
1865
1866 let result = stream.run(
1867 move |_req, _buffer| {
1868 let count = call_count_clone.fetch_add(1, Ordering::SeqCst);
1869
1870 if count == 0 {
1871 ChunkResult::Filled(0)
1873 } else {
1874 ChunkResult::End
1875 }
1876 },
1877 |_e| {},
1878 );
1879
1880 assert_eq!(result.unwrap(), RunExit::ProducerEnded);
1881
1882 assert!(
1884 queued.load(Ordering::SeqCst) > 0,
1885 "Filled(0) with target > 0 should trigger underrun and write blank points"
1886 );
1887 }
1888
1889 #[test]
1894 fn test_estimate_buffer_uses_software_when_no_hardware() {
1895 let mut backend = NoQueueTestBackend::new();
1897 backend.inner.connected = true;
1898
1899 let info = DacInfo {
1900 id: "test".to_string(),
1901 name: "Test Device".to_string(),
1902 kind: DacType::Custom("Test".to_string()),
1903 caps: backend.inner.caps().clone(),
1904 };
1905
1906 let cfg = StreamConfig::new(30000);
1907 let mut stream = Stream::with_backend(info, Box::new(backend), cfg);
1908
1909 stream.state.scheduled_ahead = 500;
1911
1912 let estimate = stream.estimate_buffer_points();
1914 assert_eq!(estimate, 500);
1915 }
1916
1917 #[test]
1918 fn test_estimate_buffer_uses_min_of_hardware_and_software() {
1919 let backend = TestBackend::new().with_initial_queue(300);
1921 let queued = backend.queued.clone();
1922
1923 let mut backend_box: Box<dyn StreamBackend> = Box::new(backend);
1924 backend_box.connect().unwrap();
1925
1926 let info = DacInfo {
1927 id: "test".to_string(),
1928 name: "Test Device".to_string(),
1929 kind: DacType::Custom("Test".to_string()),
1930 caps: backend_box.caps().clone(),
1931 };
1932
1933 let cfg = StreamConfig::new(30000);
1934 let mut stream = Stream::with_backend(info, backend_box, cfg);
1935
1936 stream.state.scheduled_ahead = 500;
1938 let estimate = stream.estimate_buffer_points();
1939 assert_eq!(
1940 estimate, 300,
1941 "Should use hardware (300) when it's less than software (500)"
1942 );
1943
1944 queued.store(800, Ordering::SeqCst);
1946 let estimate = stream.estimate_buffer_points();
1947 assert_eq!(
1948 estimate, 500,
1949 "Should use software (500) when it's less than hardware (800)"
1950 );
1951 }
1952
1953 #[test]
1954 fn test_estimate_buffer_conservative_prevents_underrun() {
1955 let backend = TestBackend::new().with_initial_queue(100);
1958 let queued = backend.queued.clone();
1959
1960 let mut backend_box: Box<dyn StreamBackend> = Box::new(backend);
1961 backend_box.connect().unwrap();
1962
1963 let info = DacInfo {
1964 id: "test".to_string(),
1965 name: "Test Device".to_string(),
1966 kind: DacType::Custom("Test".to_string()),
1967 caps: backend_box.caps().clone(),
1968 };
1969
1970 let cfg = StreamConfig::new(30000);
1971 let mut stream = Stream::with_backend(info, backend_box, cfg);
1972
1973 stream.state.scheduled_ahead = 1000;
1976
1977 let estimate = stream.estimate_buffer_points();
1978
1979 assert_eq!(
1981 estimate, 100,
1982 "Should use conservative estimate (100) not optimistic (1000)"
1983 );
1984
1985 queued.store(2000, Ordering::SeqCst);
1988 stream.state.scheduled_ahead = 500;
1989
1990 let estimate = stream.estimate_buffer_points();
1991 assert_eq!(
1992 estimate, 500,
1993 "Should use conservative estimate (500) not hardware (2000)"
1994 );
1995 }
1996
1997 #[test]
1998 fn test_build_fill_request_uses_conservative_estimation() {
1999 let backend = TestBackend::new().with_initial_queue(200);
2001
2002 let mut backend_box: Box<dyn StreamBackend> = Box::new(backend);
2003 backend_box.connect().unwrap();
2004
2005 let info = DacInfo {
2006 id: "test".to_string(),
2007 name: "Test Device".to_string(),
2008 kind: DacType::Custom("Test".to_string()),
2009 caps: backend_box.caps().clone(),
2010 };
2011
2012 let cfg = StreamConfig::new(30000)
2013 .with_target_buffer(Duration::from_millis(40))
2014 .with_min_buffer(Duration::from_millis(10));
2015 let mut stream = Stream::with_backend(info, backend_box, cfg);
2016
2017 stream.state.scheduled_ahead = 500;
2019
2020 let req = stream.build_fill_request(1000, stream.estimate_buffer_points());
2021
2022 assert_eq!(req.buffered_points, 200);
2024 assert_eq!(req.device_queued_points, Some(200));
2025 }
2026
2027 #[test]
2028 fn test_build_fill_request_calculates_min_and_target_points() {
2029 let mut backend = NoQueueTestBackend::new();
2032 backend.inner.connected = true;
2033
2034 let info = DacInfo {
2035 id: "test".to_string(),
2036 name: "Test Device".to_string(),
2037 kind: DacType::Custom("Test".to_string()),
2038 caps: backend.inner.caps().clone(),
2039 };
2040
2041 let cfg = StreamConfig::new(30000)
2045 .with_target_buffer(Duration::from_millis(40))
2046 .with_min_buffer(Duration::from_millis(10));
2047 let mut stream = Stream::with_backend(info, Box::new(backend), cfg);
2048
2049 stream.state.scheduled_ahead = 0;
2051 let req = stream.build_fill_request(1000, stream.estimate_buffer_points());
2052
2053 assert_eq!(req.target_points, 1000);
2055 assert_eq!(req.min_points, 300);
2057
2058 stream.state.scheduled_ahead = 500;
2060 let req = stream.build_fill_request(1000, stream.estimate_buffer_points());
2061
2062 assert_eq!(req.target_points, 700);
2064 assert_eq!(req.min_points, 0);
2066
2067 stream.state.scheduled_ahead = 1200;
2069 let req = stream.build_fill_request(1000, stream.estimate_buffer_points());
2070
2071 assert_eq!(req.target_points, 0);
2073 assert_eq!(req.min_points, 0);
2075 }
2076
2077 #[test]
2078 fn test_build_fill_request_ceiling_rounds_min_points() {
2079 let mut backend = NoQueueTestBackend::new();
2082 backend.inner.connected = true;
2083
2084 let info = DacInfo {
2085 id: "test".to_string(),
2086 name: "Test Device".to_string(),
2087 kind: DacType::Custom("Test".to_string()),
2088 caps: backend.inner.caps().clone(),
2089 };
2090
2091 let cfg = StreamConfig::new(30000)
2093 .with_target_buffer(Duration::from_millis(40))
2094 .with_min_buffer(Duration::from_millis(10));
2095 let mut stream = Stream::with_backend(info, Box::new(backend), cfg);
2096
2097 stream.state.scheduled_ahead = 299;
2099 let req = stream.build_fill_request(1000, stream.estimate_buffer_points());
2100
2101 assert!(
2104 req.min_points >= 1,
2105 "min_points should be at least 1 to reach min_buffer"
2106 );
2107 }
2108
2109 #[test]
2114 fn test_fill_result_filled_writes_points_and_updates_state() {
2115 let backend = TestBackend::new();
2117 let queued = backend.queued.clone();
2118
2119 let mut backend_box: Box<dyn StreamBackend> = Box::new(backend);
2120 backend_box.connect().unwrap();
2121
2122 let info = DacInfo {
2123 id: "test".to_string(),
2124 name: "Test Device".to_string(),
2125 kind: DacType::Custom("Test".to_string()),
2126 caps: backend_box.caps().clone(),
2127 };
2128
2129 let cfg = StreamConfig::new(30000);
2130 let stream = Stream::with_backend(info, backend_box, cfg);
2131
2132 let points_written = Arc::new(AtomicUsize::new(0));
2133 let points_written_clone = points_written.clone();
2134 let call_count = Arc::new(AtomicUsize::new(0));
2135 let call_count_clone = call_count.clone();
2136
2137 let result = stream.run(
2138 move |req, buffer| {
2139 let count = call_count_clone.fetch_add(1, Ordering::SeqCst);
2140
2141 if count < 3 {
2142 let n = req.target_points.min(50);
2144 for i in 0..n {
2145 buffer[i] =
2146 LaserPoint::new(0.1 * i as f32, 0.2 * i as f32, 1000, 2000, 3000, 4000);
2147 }
2148 points_written_clone.fetch_add(n, Ordering::SeqCst);
2149 ChunkResult::Filled(n)
2150 } else {
2151 ChunkResult::End
2152 }
2153 },
2154 |_e| {},
2155 );
2156
2157 assert_eq!(result.unwrap(), RunExit::ProducerEnded);
2158
2159 let total_queued = queued.load(Ordering::SeqCst);
2162 let total_written = points_written.load(Ordering::SeqCst);
2163 assert!(
2164 total_queued > 0,
2165 "Points should have been queued to backend"
2166 );
2167 assert!(
2168 total_queued as usize >= total_written,
2169 "Queued points ({}) should be at least written points ({})",
2170 total_queued,
2171 total_written
2172 );
2173 }
2174
2175 #[test]
2176 fn test_fill_result_filled_updates_last_chunk_when_armed() {
2177 let backend = TestBackend::new();
2179
2180 let mut backend_box: Box<dyn StreamBackend> = Box::new(backend);
2181 backend_box.connect().unwrap();
2182
2183 let info = DacInfo {
2184 id: "test".to_string(),
2185 name: "Test Device".to_string(),
2186 kind: DacType::Custom("Test".to_string()),
2187 caps: backend_box.caps().clone(),
2188 };
2189
2190 let cfg = StreamConfig::new(30000).with_underrun(UnderrunPolicy::RepeatLast);
2191 let stream = Stream::with_backend(info, backend_box, cfg);
2192
2193 let control = stream.control();
2195 control.arm().unwrap();
2196
2197 let call_count = Arc::new(AtomicUsize::new(0));
2198 let call_count_clone = call_count.clone();
2199
2200 let result = stream.run(
2201 move |req, buffer| {
2202 let count = call_count_clone.fetch_add(1, Ordering::SeqCst);
2203
2204 if count == 0 {
2205 let n = req.target_points.min(10);
2207 for i in 0..n {
2208 buffer[i] = LaserPoint::new(0.5, 0.5, 10000, 20000, 30000, 40000);
2209 }
2210 ChunkResult::Filled(n)
2211 } else if count == 1 {
2212 ChunkResult::Starved
2214 } else {
2215 ChunkResult::End
2216 }
2217 },
2218 |_e| {},
2219 );
2220
2221 assert_eq!(result.unwrap(), RunExit::ProducerEnded);
2222 }
2225
2226 #[test]
2227 fn test_fill_result_starved_repeat_last_with_stored_chunk() {
2228 let backend = TestBackend::new();
2230 let queued = backend.queued.clone();
2231
2232 let mut backend_box: Box<dyn StreamBackend> = Box::new(backend);
2233 backend_box.connect().unwrap();
2234
2235 let info = DacInfo {
2236 id: "test".to_string(),
2237 name: "Test Device".to_string(),
2238 kind: DacType::Custom("Test".to_string()),
2239 caps: backend_box.caps().clone(),
2240 };
2241
2242 let cfg = StreamConfig::new(30000).with_underrun(UnderrunPolicy::RepeatLast);
2243 let stream = Stream::with_backend(info, backend_box, cfg);
2244
2245 let control = stream.control();
2247 control.arm().unwrap();
2248
2249 let call_count = Arc::new(AtomicUsize::new(0));
2250 let call_count_clone = call_count.clone();
2251
2252 let result = stream.run(
2253 move |req, buffer| {
2254 let count = call_count_clone.fetch_add(1, Ordering::SeqCst);
2255
2256 if count == 0 {
2257 let n = req.target_points.min(50);
2259 for i in 0..n {
2260 buffer[i] = LaserPoint::new(0.3, 0.3, 5000, 5000, 5000, 5000);
2261 }
2262 ChunkResult::Filled(n)
2263 } else if count == 1 {
2264 ChunkResult::Starved
2266 } else {
2267 ChunkResult::End
2268 }
2269 },
2270 |_e| {},
2271 );
2272
2273 assert_eq!(result.unwrap(), RunExit::ProducerEnded);
2274
2275 let total_queued = queued.load(Ordering::SeqCst);
2277 assert!(
2278 total_queued >= 50,
2279 "Should have written initial chunk plus repeated chunk"
2280 );
2281 }
2282
2283 #[test]
2284 fn test_fill_result_starved_repeat_last_without_stored_chunk_falls_back_to_blank() {
2285 let backend = TestBackend::new();
2287 let queued = backend.queued.clone();
2288
2289 let mut backend_box: Box<dyn StreamBackend> = Box::new(backend);
2290 backend_box.connect().unwrap();
2291
2292 let info = DacInfo {
2293 id: "test".to_string(),
2294 name: "Test Device".to_string(),
2295 kind: DacType::Custom("Test".to_string()),
2296 caps: backend_box.caps().clone(),
2297 };
2298
2299 let cfg = StreamConfig::new(30000).with_underrun(UnderrunPolicy::RepeatLast);
2300 let stream = Stream::with_backend(info, backend_box, cfg);
2301
2302 let control = stream.control();
2304 control.arm().unwrap();
2305
2306 let call_count = Arc::new(AtomicUsize::new(0));
2307 let call_count_clone = call_count.clone();
2308
2309 let result = stream.run(
2310 move |_req, _buffer| {
2311 let count = call_count_clone.fetch_add(1, Ordering::SeqCst);
2312
2313 if count == 0 {
2314 ChunkResult::Starved
2316 } else {
2317 ChunkResult::End
2318 }
2319 },
2320 |_e| {},
2321 );
2322
2323 assert_eq!(result.unwrap(), RunExit::ProducerEnded);
2324
2325 let total_queued = queued.load(Ordering::SeqCst);
2327 assert!(
2328 total_queued > 0,
2329 "Should have written blank points as fallback"
2330 );
2331 }
2332
2333 #[test]
2334 fn test_fill_result_starved_with_park_policy() {
2335 let backend = TestBackend::new();
2337 let queued = backend.queued.clone();
2338
2339 let mut backend_box: Box<dyn StreamBackend> = Box::new(backend);
2340 backend_box.connect().unwrap();
2341
2342 let info = DacInfo {
2343 id: "test".to_string(),
2344 name: "Test Device".to_string(),
2345 kind: DacType::Custom("Test".to_string()),
2346 caps: backend_box.caps().clone(),
2347 };
2348
2349 let cfg = StreamConfig::new(30000).with_underrun(UnderrunPolicy::Park { x: 0.5, y: -0.5 });
2351 let stream = Stream::with_backend(info, backend_box, cfg);
2352
2353 let control = stream.control();
2355 control.arm().unwrap();
2356
2357 let call_count = Arc::new(AtomicUsize::new(0));
2358 let call_count_clone = call_count.clone();
2359
2360 let result = stream.run(
2361 move |_req, _buffer| {
2362 let count = call_count_clone.fetch_add(1, Ordering::SeqCst);
2363
2364 if count == 0 {
2365 ChunkResult::Starved
2366 } else {
2367 ChunkResult::End
2368 }
2369 },
2370 |_e| {},
2371 );
2372
2373 assert_eq!(result.unwrap(), RunExit::ProducerEnded);
2374
2375 let total_queued = queued.load(Ordering::SeqCst);
2377 assert!(total_queued > 0, "Should have written parked points");
2378 }
2379
2380 #[test]
2381 fn test_fill_result_starved_with_stop_policy() {
2382 let backend = TestBackend::new();
2384
2385 let mut backend_box: Box<dyn StreamBackend> = Box::new(backend);
2386 backend_box.connect().unwrap();
2387
2388 let info = DacInfo {
2389 id: "test".to_string(),
2390 name: "Test Device".to_string(),
2391 kind: DacType::Custom("Test".to_string()),
2392 caps: backend_box.caps().clone(),
2393 };
2394
2395 let cfg = StreamConfig::new(30000).with_underrun(UnderrunPolicy::Stop);
2396 let stream = Stream::with_backend(info, backend_box, cfg);
2397
2398 let control = stream.control();
2401 control.arm().unwrap();
2402
2403 let result = stream.run(
2404 |_req, _buffer| {
2405 ChunkResult::Starved
2407 },
2408 |_e| {},
2409 );
2410
2411 assert!(result.is_err(), "Stop policy should return an error");
2414 assert!(
2415 result.unwrap_err().is_stopped(),
2416 "Error should be Stopped variant"
2417 );
2418 }
2419
2420 #[test]
2421 fn test_fill_result_end_returns_producer_ended() {
2422 let backend = TestBackend::new();
2424
2425 let mut backend_box: Box<dyn StreamBackend> = Box::new(backend);
2426 backend_box.connect().unwrap();
2427
2428 let info = DacInfo {
2429 id: "test".to_string(),
2430 name: "Test Device".to_string(),
2431 kind: DacType::Custom("Test".to_string()),
2432 caps: backend_box.caps().clone(),
2433 };
2434
2435 let cfg = StreamConfig::new(30000);
2436 let stream = Stream::with_backend(info, backend_box, cfg);
2437
2438 let result = stream.run(
2439 |_req, _buffer| {
2440 ChunkResult::End
2442 },
2443 |_e| {},
2444 );
2445
2446 assert_eq!(result.unwrap(), RunExit::ProducerEnded);
2447 }
2448
2449 #[test]
2450 fn test_fill_result_filled_exceeds_buffer_clamped() {
2451 let backend = TestBackend::new();
2453 let queued = backend.queued.clone();
2454
2455 let mut backend_box: Box<dyn StreamBackend> = Box::new(backend);
2456 backend_box.connect().unwrap();
2457
2458 let info = DacInfo {
2459 id: "test".to_string(),
2460 name: "Test Device".to_string(),
2461 kind: DacType::Custom("Test".to_string()),
2462 caps: backend_box.caps().clone(),
2463 };
2464
2465 let cfg = StreamConfig::new(30000);
2466 let stream = Stream::with_backend(info, backend_box, cfg);
2467
2468 let call_count = Arc::new(AtomicUsize::new(0));
2469 let call_count_clone = call_count.clone();
2470
2471 let result = stream.run(
2472 move |_req, buffer| {
2473 let count = call_count_clone.fetch_add(1, Ordering::SeqCst);
2474
2475 if count == 0 {
2476 for i in 0..buffer.len() {
2478 buffer[i] = LaserPoint::blanked(0.0, 0.0);
2479 }
2480 ChunkResult::Filled(buffer.len() + 1000)
2482 } else {
2483 ChunkResult::End
2484 }
2485 },
2486 |_e| {},
2487 );
2488
2489 assert_eq!(result.unwrap(), RunExit::ProducerEnded);
2490
2491 let total_queued = queued.load(Ordering::SeqCst);
2494 assert!(total_queued > 0, "Should have written some points");
2495 assert!(
2497 total_queued <= 1016,
2498 "Points should be clamped to max_points_per_chunk (+ drain)"
2499 );
2500 }
2501
2502 #[test]
2507 fn test_full_stream_lifecycle_create_arm_stream_stop() {
2508 let backend = TestBackend::new();
2510 let queued = backend.queued.clone();
2511 let shutter_open = backend.shutter_open.clone();
2512
2513 let info = DacInfo {
2514 id: "test".to_string(),
2515 name: "Test Device".to_string(),
2516 kind: DacType::Custom("Test".to_string()),
2517 caps: backend.caps().clone(),
2518 };
2519
2520 let device = Dac::new(info, Box::new(backend));
2522 assert!(!device.is_connected());
2523
2524 let cfg = StreamConfig::new(30000);
2525 let (stream, returned_info) = device.start_stream(cfg).unwrap();
2526 assert_eq!(returned_info.id, "test");
2527
2528 let control = stream.control();
2530 assert!(!control.is_armed());
2531 assert!(!shutter_open.load(Ordering::SeqCst));
2532
2533 control.arm().unwrap();
2535 assert!(control.is_armed());
2536
2537 let call_count = Arc::new(AtomicUsize::new(0));
2538 let call_count_clone = call_count.clone();
2539
2540 let result = stream.run(
2542 move |req, buffer| {
2543 let count = call_count_clone.fetch_add(1, Ordering::SeqCst);
2544
2545 if count < 5 {
2546 let n = req.target_points.min(buffer.len()).min(100);
2548 for i in 0..n {
2549 let t = i as f32 / 100.0;
2550 buffer[i] = LaserPoint::new(t, t, 10000, 20000, 30000, 40000);
2551 }
2552 ChunkResult::Filled(n)
2553 } else {
2554 ChunkResult::End
2555 }
2556 },
2557 |_e| {},
2558 );
2559
2560 assert_eq!(result.unwrap(), RunExit::ProducerEnded);
2562 assert!(
2563 queued.load(Ordering::SeqCst) > 0,
2564 "Should have written points"
2565 );
2566 assert!(
2567 call_count.load(Ordering::SeqCst) >= 5,
2568 "Should have called producer multiple times"
2569 );
2570 }
2571
2572 #[test]
2573 fn test_full_stream_lifecycle_with_underrun_recovery() {
2574 let backend = TestBackend::new();
2576 let queued = backend.queued.clone();
2577
2578 let mut backend_box: Box<dyn StreamBackend> = Box::new(backend);
2579 backend_box.connect().unwrap();
2580
2581 let info = DacInfo {
2582 id: "test".to_string(),
2583 name: "Test Device".to_string(),
2584 kind: DacType::Custom("Test".to_string()),
2585 caps: backend_box.caps().clone(),
2586 };
2587
2588 let cfg = StreamConfig::new(30000).with_underrun(UnderrunPolicy::RepeatLast);
2589 let stream = Stream::with_backend(info, backend_box, cfg);
2590
2591 let control = stream.control();
2593 control.arm().unwrap();
2594
2595 let call_count = Arc::new(AtomicUsize::new(0));
2596 let call_count_clone = call_count.clone();
2597
2598 let result = stream.run(
2599 move |req, buffer| {
2600 let count = call_count_clone.fetch_add(1, Ordering::SeqCst);
2601
2602 match count {
2603 0 => {
2604 let n = req.target_points.min(buffer.len()).min(50);
2606 for i in 0..n {
2607 buffer[i] = LaserPoint::new(0.5, 0.5, 30000, 30000, 30000, 30000);
2608 }
2609 ChunkResult::Filled(n)
2610 }
2611 1 => {
2612 ChunkResult::Starved
2614 }
2615 2 => {
2616 let n = req.target_points.min(buffer.len()).min(50);
2618 for i in 0..n {
2619 buffer[i] = LaserPoint::new(-0.5, -0.5, 20000, 20000, 20000, 20000);
2620 }
2621 ChunkResult::Filled(n)
2622 }
2623 _ => ChunkResult::End,
2624 }
2625 },
2626 |_e| {},
2627 );
2628
2629 assert_eq!(result.unwrap(), RunExit::ProducerEnded);
2630 let total = queued.load(Ordering::SeqCst);
2632 assert!(
2633 total >= 100,
2634 "Should have written multiple chunks including underrun recovery"
2635 );
2636 }
2637
2638 #[test]
2639 fn test_full_stream_lifecycle_external_stop() {
2640 use std::thread;
2642
2643 let backend = TestBackend::new();
2644
2645 let mut backend_box: Box<dyn StreamBackend> = Box::new(backend);
2646 backend_box.connect().unwrap();
2647
2648 let info = DacInfo {
2649 id: "test".to_string(),
2650 name: "Test Device".to_string(),
2651 kind: DacType::Custom("Test".to_string()),
2652 caps: backend_box.caps().clone(),
2653 };
2654
2655 let cfg = StreamConfig::new(30000);
2656 let stream = Stream::with_backend(info, backend_box, cfg);
2657
2658 let control = stream.control();
2659 let control_clone = control.clone();
2660
2661 thread::spawn(move || {
2663 thread::sleep(Duration::from_millis(30));
2664 control_clone.stop().unwrap();
2665 });
2666
2667 let result = stream.run(
2668 |req, buffer| {
2669 let n = req.target_points.min(buffer.len()).min(10);
2671 for i in 0..n {
2672 buffer[i] = LaserPoint::blanked(0.0, 0.0);
2673 }
2674 ChunkResult::Filled(n)
2675 },
2676 |_e| {},
2677 );
2678
2679 assert_eq!(result.unwrap(), RunExit::Stopped);
2680 }
2681
2682 #[test]
2683 fn test_full_stream_lifecycle_into_dac_recovery() {
2684 let backend = TestBackend::new();
2686
2687 let info = DacInfo {
2688 id: "test".to_string(),
2689 name: "Test Device".to_string(),
2690 kind: DacType::Custom("Test".to_string()),
2691 caps: backend.caps().clone(),
2692 };
2693
2694 let device = Dac::new(info, Box::new(backend));
2696 let cfg = StreamConfig::new(30000);
2697 let (stream, _) = device.start_stream(cfg).unwrap();
2698
2699 let call_count = Arc::new(AtomicUsize::new(0));
2700 let call_count_clone = call_count.clone();
2701
2702 let result = stream.run(
2703 move |req, buffer| {
2704 let count = call_count_clone.fetch_add(1, Ordering::SeqCst);
2705 if count < 2 {
2706 let n = req.target_points.min(buffer.len()).min(50);
2707 for i in 0..n {
2708 buffer[i] = LaserPoint::blanked(0.0, 0.0);
2709 }
2710 ChunkResult::Filled(n)
2711 } else {
2712 ChunkResult::End
2713 }
2714 },
2715 |_e| {},
2716 );
2717
2718 assert_eq!(result.unwrap(), RunExit::ProducerEnded);
2719
2720 }
2724
2725 #[test]
2726 fn test_stream_stats_tracking() {
2727 let backend = TestBackend::new();
2729
2730 let mut backend_box: Box<dyn StreamBackend> = Box::new(backend);
2731 backend_box.connect().unwrap();
2732
2733 let info = DacInfo {
2734 id: "test".to_string(),
2735 name: "Test Device".to_string(),
2736 kind: DacType::Custom("Test".to_string()),
2737 caps: backend_box.caps().clone(),
2738 };
2739
2740 let cfg = StreamConfig::new(30000);
2741 let stream = Stream::with_backend(info, backend_box, cfg);
2742
2743 let control = stream.control();
2745 control.arm().unwrap();
2746
2747 let call_count = Arc::new(AtomicUsize::new(0));
2748 let call_count_clone = call_count.clone();
2749 let points_per_call = 50;
2750
2751 let result = stream.run(
2752 move |req, buffer| {
2753 let count = call_count_clone.fetch_add(1, Ordering::SeqCst);
2754 if count < 3 {
2755 let n = req.target_points.min(buffer.len()).min(points_per_call);
2756 for i in 0..n {
2757 buffer[i] = LaserPoint::blanked(0.0, 0.0);
2758 }
2759 ChunkResult::Filled(n)
2760 } else {
2761 ChunkResult::End
2762 }
2763 },
2764 |_e| {},
2765 );
2766
2767 assert_eq!(result.unwrap(), RunExit::ProducerEnded);
2768 }
2771
2772 #[test]
2773 fn test_stream_disarm_during_streaming() {
2774 use std::thread;
2776
2777 let backend = TestBackend::new();
2778 let shutter_open = backend.shutter_open.clone();
2779
2780 let mut backend_box: Box<dyn StreamBackend> = Box::new(backend);
2781 backend_box.connect().unwrap();
2782
2783 let info = DacInfo {
2784 id: "test".to_string(),
2785 name: "Test Device".to_string(),
2786 kind: DacType::Custom("Test".to_string()),
2787 caps: backend_box.caps().clone(),
2788 };
2789
2790 let cfg = StreamConfig::new(30000);
2791 let stream = Stream::with_backend(info, backend_box, cfg);
2792
2793 let control = stream.control();
2794 let control_clone = control.clone();
2795
2796 control.arm().unwrap();
2798 assert!(control.is_armed());
2799
2800 thread::spawn(move || {
2802 thread::sleep(Duration::from_millis(15));
2803 control_clone.disarm().unwrap();
2804 thread::sleep(Duration::from_millis(15));
2805 control_clone.stop().unwrap();
2806 });
2807
2808 let result = stream.run(
2809 |req, buffer| {
2810 let n = req.target_points.min(buffer.len()).min(10);
2811 for i in 0..n {
2812 buffer[i] = LaserPoint::new(0.1, 0.1, 50000, 50000, 50000, 50000);
2813 }
2814 ChunkResult::Filled(n)
2815 },
2816 |_e| {},
2817 );
2818
2819 assert_eq!(result.unwrap(), RunExit::Stopped);
2820 assert!(!shutter_open.load(Ordering::SeqCst));
2822 }
2823
2824 #[test]
2825 fn test_stream_with_mock_backend_disconnect() {
2826 use std::sync::atomic::AtomicBool;
2828
2829 struct DisconnectingBackend {
2830 inner: TestBackend,
2831 disconnect_after: Arc<AtomicUsize>,
2832 call_count: Arc<AtomicUsize>,
2833 }
2834
2835 impl StreamBackend for DisconnectingBackend {
2836 fn dac_type(&self) -> DacType {
2837 self.inner.dac_type()
2838 }
2839
2840 fn caps(&self) -> &DacCapabilities {
2841 self.inner.caps()
2842 }
2843
2844 fn connect(&mut self) -> Result<()> {
2845 self.inner.connect()
2846 }
2847
2848 fn disconnect(&mut self) -> Result<()> {
2849 self.inner.disconnect()
2850 }
2851
2852 fn is_connected(&self) -> bool {
2853 let count = self.call_count.load(Ordering::SeqCst);
2854 let disconnect_after = self.disconnect_after.load(Ordering::SeqCst);
2855 if count >= disconnect_after {
2856 return false;
2857 }
2858 self.inner.is_connected()
2859 }
2860
2861 fn try_write_chunk(&mut self, pps: u32, points: &[LaserPoint]) -> Result<WriteOutcome> {
2862 self.call_count.fetch_add(1, Ordering::SeqCst);
2863 self.inner.try_write_chunk(pps, points)
2864 }
2865
2866 fn stop(&mut self) -> Result<()> {
2867 self.inner.stop()
2868 }
2869
2870 fn set_shutter(&mut self, open: bool) -> Result<()> {
2871 self.inner.set_shutter(open)
2872 }
2873
2874 fn queued_points(&self) -> Option<u64> {
2875 self.inner.queued_points()
2876 }
2877 }
2878
2879 let mut backend = DisconnectingBackend {
2880 inner: TestBackend::new(),
2881 disconnect_after: Arc::new(AtomicUsize::new(3)),
2882 call_count: Arc::new(AtomicUsize::new(0)),
2883 };
2884 backend.inner.connected = true;
2885
2886 let info = DacInfo {
2887 id: "test".to_string(),
2888 name: "Test Device".to_string(),
2889 kind: DacType::Custom("Test".to_string()),
2890 caps: backend.inner.caps().clone(),
2891 };
2892
2893 let cfg = StreamConfig::new(30000);
2894 let stream = Stream::with_backend(info, Box::new(backend), cfg);
2895
2896 let error_occurred = Arc::new(AtomicBool::new(false));
2897 let error_occurred_clone = error_occurred.clone();
2898
2899 let result = stream.run(
2900 |req, buffer| {
2901 let n = req.target_points.min(buffer.len()).min(10);
2902 for i in 0..n {
2903 buffer[i] = LaserPoint::blanked(0.0, 0.0);
2904 }
2905 ChunkResult::Filled(n)
2906 },
2907 move |_e| {
2908 error_occurred_clone.store(true, Ordering::SeqCst);
2909 },
2910 );
2911
2912 assert_eq!(result.unwrap(), RunExit::Disconnected);
2914 }
2915
2916 struct FailingWriteBackend {
2933 inner: TestBackend,
2934 fail_after: usize,
2935 write_count: Arc<AtomicUsize>,
2936 disconnect_called: Arc<AtomicBool>,
2937 error_kind: std::io::ErrorKind,
2938 error_message: &'static str,
2939 report_queue_depth: bool,
2940 }
2941
2942 impl FailingWriteBackend {
2943 fn new(fail_after: usize) -> Self {
2945 Self {
2946 inner: TestBackend::new(),
2947 fail_after,
2948 write_count: Arc::new(AtomicUsize::new(0)),
2949 disconnect_called: Arc::new(AtomicBool::new(false)),
2950 error_kind: std::io::ErrorKind::BrokenPipe,
2951 error_message: "simulated write failure",
2952 report_queue_depth: true,
2953 }
2954 }
2955
2956 fn with_error(mut self, kind: std::io::ErrorKind, message: &'static str) -> Self {
2958 self.error_kind = kind;
2959 self.error_message = message;
2960 self
2961 }
2962
2963 fn without_queue_depth(mut self) -> Self {
2965 self.report_queue_depth = false;
2966 self
2967 }
2968 }
2969
2970 impl StreamBackend for FailingWriteBackend {
2971 fn dac_type(&self) -> DacType {
2972 DacType::Custom("FailingTest".to_string())
2973 }
2974
2975 fn caps(&self) -> &DacCapabilities {
2976 self.inner.caps()
2977 }
2978
2979 fn connect(&mut self) -> Result<()> {
2980 self.inner.connect()
2981 }
2982
2983 fn disconnect(&mut self) -> Result<()> {
2984 self.disconnect_called.store(true, Ordering::SeqCst);
2985 self.inner.disconnect()
2986 }
2987
2988 fn is_connected(&self) -> bool {
2989 self.inner.is_connected()
2990 }
2991
2992 fn try_write_chunk(&mut self, pps: u32, points: &[LaserPoint]) -> Result<WriteOutcome> {
2993 let count = self.write_count.fetch_add(1, Ordering::SeqCst);
2994 if count >= self.fail_after {
2995 Err(Error::backend(std::io::Error::new(
2996 self.error_kind,
2997 self.error_message,
2998 )))
2999 } else {
3000 self.inner.try_write_chunk(pps, points)
3001 }
3002 }
3003
3004 fn stop(&mut self) -> Result<()> {
3005 self.inner.stop()
3006 }
3007
3008 fn set_shutter(&mut self, open: bool) -> Result<()> {
3009 self.inner.set_shutter(open)
3010 }
3011
3012 fn queued_points(&self) -> Option<u64> {
3013 if self.report_queue_depth {
3014 self.inner.queued_points()
3015 } else {
3016 None
3017 }
3018 }
3019 }
3020
3021 fn blank_producer(req: &ChunkRequest, buffer: &mut [LaserPoint]) -> ChunkResult {
3023 let n = req.target_points.min(buffer.len()).min(10);
3024 for i in 0..n {
3025 buffer[i] = LaserPoint::blanked(0.0, 0.0);
3026 }
3027 ChunkResult::Filled(n)
3028 }
3029
3030 fn make_test_stream(mut backend: impl StreamBackend + 'static) -> Stream {
3032 backend.connect().unwrap();
3033 let info = DacInfo {
3034 id: "test".to_string(),
3035 name: "Test Device".to_string(),
3036 kind: backend.dac_type(),
3037 caps: backend.caps().clone(),
3038 };
3039 Stream::with_backend(info, Box::new(backend), StreamConfig::new(30000))
3040 }
3041
3042 #[test]
3043 fn test_backend_write_error_exits_with_disconnected() {
3044 use std::thread;
3050
3051 let backend = FailingWriteBackend::new(2);
3052 let disconnect_called = backend.disconnect_called.clone();
3053 let stream = make_test_stream(backend);
3054
3055 let handle = thread::spawn(move || stream.run(blank_producer, |_err| {}));
3056 let result = handle.join().expect("stream thread panicked");
3057
3058 assert_eq!(
3059 result.unwrap(),
3060 RunExit::Disconnected,
3061 "Write error should cause stream to exit with Disconnected"
3062 );
3063 assert!(
3064 disconnect_called.load(Ordering::SeqCst),
3065 "backend.disconnect() should have been called after write error"
3066 );
3067 }
3068
3069 #[test]
3070 fn test_backend_write_error_fires_on_error() {
3071 let backend = FailingWriteBackend::new(1);
3073 let stream = make_test_stream(backend);
3074
3075 let got_backend_error = Arc::new(AtomicBool::new(false));
3076 let got_backend_error_clone = got_backend_error.clone();
3077
3078 let result = stream.run(blank_producer, move |err| {
3079 if matches!(err, Error::Backend(_)) {
3080 got_backend_error_clone.store(true, Ordering::SeqCst);
3081 }
3082 });
3083
3084 assert_eq!(result.unwrap(), RunExit::Disconnected);
3085 assert!(
3086 got_backend_error.load(Ordering::SeqCst),
3087 "on_error should have received the Backend error"
3088 );
3089 }
3090
3091 #[test]
3092 fn test_backend_write_error_immediate_fail() {
3093 let stream = make_test_stream(FailingWriteBackend::new(0));
3095
3096 let result = stream.run(blank_producer, |_err| {});
3097
3098 assert_eq!(
3099 result.unwrap(),
3100 RunExit::Disconnected,
3101 "Immediate write failure should exit with Disconnected"
3102 );
3103 }
3104
3105 fn helios_like_backend(fail_after: usize) -> FailingWriteBackend {
3116 FailingWriteBackend::new(fail_after)
3117 .with_error(
3118 std::io::ErrorKind::TimedOut,
3119 "usb connection error: Operation timed out",
3120 )
3121 .without_queue_depth()
3122 }
3123
3124 #[test]
3125 fn test_helios_status_timeout_exits_with_disconnected() {
3126 use std::thread;
3134
3135 let backend = helios_like_backend(3);
3136 let disconnect_called = backend.disconnect_called.clone();
3137 let stream = make_test_stream(backend);
3138
3139 let handle = thread::spawn(move || stream.run(blank_producer, |_err| {}));
3140 let result = handle.join().expect("stream thread panicked");
3141
3142 assert_eq!(
3143 result.unwrap(),
3144 RunExit::Disconnected,
3145 "Helios status timeout should cause stream to exit with Disconnected"
3146 );
3147 assert!(
3148 disconnect_called.load(Ordering::SeqCst),
3149 "backend.disconnect() should have been called on status timeout"
3150 );
3151 }
3152
3153 #[test]
3154 fn test_helios_status_timeout_fires_on_error_with_backend_variant() {
3155 let stream = make_test_stream(helios_like_backend(1));
3158
3159 let got_backend_error = Arc::new(AtomicBool::new(false));
3160 let error_received: Arc<Mutex<Option<String>>> = Arc::new(Mutex::new(None));
3161 let got_backend_error_clone = got_backend_error.clone();
3162 let error_received_clone = error_received.clone();
3163
3164 let result = stream.run(blank_producer, move |err| {
3165 if matches!(err, Error::Backend(_)) {
3166 got_backend_error_clone.store(true, Ordering::SeqCst);
3167 *error_received_clone.lock().unwrap() = Some(err.to_string());
3168 }
3169 });
3170
3171 assert_eq!(result.unwrap(), RunExit::Disconnected);
3172 assert!(
3173 got_backend_error.load(Ordering::SeqCst),
3174 "on_error should receive Error::Backend for Helios timeout"
3175 );
3176 let msg = error_received.lock().unwrap();
3177 assert!(
3178 msg.as_ref().unwrap().contains("Operation timed out"),
3179 "Error message should mention timeout, got: {:?}",
3180 msg
3181 );
3182 }
3183
3184 #[test]
3185 fn test_helios_immediate_status_timeout() {
3186 let backend = helios_like_backend(0);
3189 let disconnect_called = backend.disconnect_called.clone();
3190 let stream = make_test_stream(backend);
3191
3192 let result = stream.run(blank_producer, |_err| {});
3193
3194 assert_eq!(
3195 result.unwrap(),
3196 RunExit::Disconnected,
3197 "Immediate status timeout should exit with Disconnected"
3198 );
3199 assert!(
3200 disconnect_called.load(Ordering::SeqCst),
3201 "backend.disconnect() should be called even on first-write failure"
3202 );
3203 }
3204
3205 #[test]
3210 fn test_fill_result_end_drains_with_queue_depth() {
3211 use std::time::Instant;
3213
3214 let backend = TestBackend::new().with_initial_queue(1000);
3215 let queued = backend.queued.clone();
3216
3217 let mut backend_box: Box<dyn StreamBackend> = Box::new(backend);
3218 backend_box.connect().unwrap();
3219
3220 let info = DacInfo {
3221 id: "test".to_string(),
3222 name: "Test Device".to_string(),
3223 kind: DacType::Custom("Test".to_string()),
3224 caps: backend_box.caps().clone(),
3225 };
3226
3227 let cfg = StreamConfig::new(30000).with_drain_timeout(Duration::from_millis(100));
3229 let stream = Stream::with_backend(info, backend_box, cfg);
3230
3231 queued.store(0, Ordering::SeqCst);
3233
3234 let start = Instant::now();
3235 let result = stream.run(|_req, _buffer| ChunkResult::End, |_e| {});
3236
3237 let elapsed = start.elapsed();
3238
3239 assert_eq!(result.unwrap(), RunExit::ProducerEnded);
3240 assert!(
3242 elapsed.as_millis() < 50,
3243 "Should return quickly when queue is empty, took {:?}",
3244 elapsed
3245 );
3246 }
3247
3248 #[test]
3249 fn test_fill_result_end_respects_drain_timeout() {
3250 use std::time::Instant;
3252
3253 let backend = TestBackend::new().with_initial_queue(100000); let mut backend_box: Box<dyn StreamBackend> = Box::new(backend);
3256 backend_box.connect().unwrap();
3257
3258 let info = DacInfo {
3259 id: "test".to_string(),
3260 name: "Test Device".to_string(),
3261 kind: DacType::Custom("Test".to_string()),
3262 caps: backend_box.caps().clone(),
3263 };
3264
3265 let cfg = StreamConfig::new(30000).with_drain_timeout(Duration::from_millis(50));
3267 let stream = Stream::with_backend(info, backend_box, cfg);
3268
3269 let start = Instant::now();
3270 let result = stream.run(|_req, _buffer| ChunkResult::End, |_e| {});
3271
3272 let elapsed = start.elapsed();
3273
3274 assert_eq!(result.unwrap(), RunExit::ProducerEnded);
3275 assert!(
3277 elapsed.as_millis() >= 40 && elapsed.as_millis() < 150,
3278 "Should respect drain timeout (~50ms), took {:?}",
3279 elapsed
3280 );
3281 }
3282
3283 #[test]
3284 fn test_fill_result_end_skips_drain_with_zero_timeout() {
3285 use std::time::Instant;
3287
3288 let backend = TestBackend::new().with_initial_queue(100000);
3289
3290 let mut backend_box: Box<dyn StreamBackend> = Box::new(backend);
3291 backend_box.connect().unwrap();
3292
3293 let info = DacInfo {
3294 id: "test".to_string(),
3295 name: "Test Device".to_string(),
3296 kind: DacType::Custom("Test".to_string()),
3297 caps: backend_box.caps().clone(),
3298 };
3299
3300 let cfg = StreamConfig::new(30000).with_drain_timeout(Duration::ZERO);
3302 let stream = Stream::with_backend(info, backend_box, cfg);
3303
3304 let start = Instant::now();
3305 let result = stream.run(|_req, _buffer| ChunkResult::End, |_e| {});
3306
3307 let elapsed = start.elapsed();
3308
3309 assert_eq!(result.unwrap(), RunExit::ProducerEnded);
3310 assert!(
3312 elapsed.as_millis() < 20,
3313 "Should skip drain with zero timeout, took {:?}",
3314 elapsed
3315 );
3316 }
3317
3318 #[test]
3319 fn test_fill_result_end_drains_without_queue_depth() {
3320 use std::time::Instant;
3322
3323 let mut backend = NoQueueTestBackend::new();
3324 backend.inner.connected = true;
3325
3326 let info = DacInfo {
3327 id: "test".to_string(),
3328 name: "Test Device".to_string(),
3329 kind: DacType::Custom("Test".to_string()),
3330 caps: backend.inner.caps().clone(),
3331 };
3332
3333 let cfg = StreamConfig::new(30000).with_drain_timeout(Duration::from_millis(100));
3335 let stream = Stream::with_backend(info, Box::new(backend), cfg);
3336
3337 let start = Instant::now();
3338 let result = stream.run(|_req, _buffer| ChunkResult::End, |_e| {});
3339
3340 let elapsed = start.elapsed();
3341
3342 assert_eq!(result.unwrap(), RunExit::ProducerEnded);
3343 assert!(
3346 elapsed.as_millis() < 50,
3347 "Should return quickly with empty buffer estimate, took {:?}",
3348 elapsed
3349 );
3350 }
3351
3352 #[test]
3353 fn test_fill_result_end_closes_shutter() {
3354 let backend = TestBackend::new();
3356 let shutter_open = backend.shutter_open.clone();
3357
3358 let mut backend_box: Box<dyn StreamBackend> = Box::new(backend);
3359 backend_box.connect().unwrap();
3360
3361 let info = DacInfo {
3362 id: "test".to_string(),
3363 name: "Test Device".to_string(),
3364 kind: DacType::Custom("Test".to_string()),
3365 caps: backend_box.caps().clone(),
3366 };
3367
3368 let cfg = StreamConfig::new(30000).with_drain_timeout(Duration::from_millis(10));
3369 let stream = Stream::with_backend(info, backend_box, cfg);
3370
3371 let control = stream.control();
3373 control.arm().unwrap();
3374
3375 let result = stream.run(
3376 |req, buffer| {
3377 let n = req.target_points.min(buffer.len()).min(10);
3379 for i in 0..n {
3380 buffer[i] = LaserPoint::blanked(0.0, 0.0);
3381 }
3382 ChunkResult::End
3383 },
3384 |_e| {},
3385 );
3386
3387 assert_eq!(result.unwrap(), RunExit::ProducerEnded);
3388 assert!(
3390 !shutter_open.load(Ordering::SeqCst),
3391 "Shutter should be closed after drain"
3392 );
3393 }
3394
3395 #[test]
3400 fn test_color_delay_zero_is_passthrough() {
3401 let mut backend = TestBackend::new();
3403 backend.connected = true;
3404
3405 let info = DacInfo {
3406 id: "test".to_string(),
3407 name: "Test".to_string(),
3408 kind: DacType::Custom("Test".to_string()),
3409 caps: backend.caps().clone(),
3410 };
3411
3412 let cfg = StreamConfig::new(30000); let mut stream = Stream::with_backend(info, Box::new(backend), cfg);
3414
3415 stream.control.arm().unwrap();
3417 stream.process_control_messages();
3418 stream.state.last_armed = true;
3419
3420 let n = 5;
3422 for i in 0..n {
3423 stream.state.chunk_buffer[i] =
3424 LaserPoint::new(0.0, 0.0, (i as u16 + 1) * 1000, 0, 0, 65535);
3425 }
3426
3427 let mut on_error = |_: Error| {};
3429 stream.write_fill_points(n, &mut on_error).unwrap();
3430
3431 assert!(stream.state.color_delay_line.is_empty());
3433 }
3434
3435 #[test]
3436 fn test_color_delay_shifts_colors() {
3437 let mut backend = TestBackend::new();
3439 backend.connected = true;
3440
3441 let info = DacInfo {
3442 id: "test".to_string(),
3443 name: "Test".to_string(),
3444 kind: DacType::Custom("Test".to_string()),
3445 caps: backend.caps().clone(),
3446 };
3447
3448 let cfg = StreamConfig::new(10000).with_color_delay(Duration::from_micros(300));
3450 let mut stream = Stream::with_backend(info, Box::new(backend), cfg);
3451
3452 stream.control.arm().unwrap();
3454 stream.process_control_messages();
3455 stream.state.last_armed = true;
3457
3458 stream.state.color_delay_line.clear();
3460 for _ in 0..3 {
3461 stream.state.color_delay_line.push_back((0, 0, 0, 0));
3462 }
3463
3464 let n = 5;
3466 for i in 0..n {
3467 stream.state.chunk_buffer[i] = LaserPoint::new(
3468 i as f32 * 0.1,
3469 0.0,
3470 (i as u16 + 1) * 10000,
3471 (i as u16 + 1) * 5000,
3472 (i as u16 + 1) * 2000,
3473 65535,
3474 );
3475 }
3476
3477 let mut on_error = |_: Error| {};
3478 stream.write_fill_points(n, &mut on_error).unwrap();
3479
3480 assert_eq!(stream.state.color_delay_line.len(), 3);
3486
3487 let expected: Vec<(u16, u16, u16, u16)> = (3..=5)
3489 .map(|i| (i * 10000u16, i * 5000, i * 2000, 65535))
3490 .collect();
3491 let actual: Vec<(u16, u16, u16, u16)> =
3492 stream.state.color_delay_line.iter().copied().collect();
3493 assert_eq!(actual, expected);
3494 }
3495
3496 #[test]
3497 fn test_color_delay_resets_on_disarm_arm() {
3498 let mut backend = TestBackend::new();
3500 backend.connected = true;
3501
3502 let info = DacInfo {
3503 id: "test".to_string(),
3504 name: "Test".to_string(),
3505 kind: DacType::Custom("Test".to_string()),
3506 caps: backend.caps().clone(),
3507 };
3508
3509 let cfg = StreamConfig::new(10000).with_color_delay(Duration::from_micros(200));
3511 let mut stream = Stream::with_backend(info, Box::new(backend), cfg);
3512
3513 stream.handle_shutter_transition(true);
3515 assert_eq!(stream.state.color_delay_line.len(), 2);
3516 assert_eq!(stream.state.color_delay_line.front(), Some(&(0, 0, 0, 0)));
3517
3518 stream.handle_shutter_transition(false);
3520 assert!(stream.state.color_delay_line.is_empty());
3521
3522 stream.handle_shutter_transition(true);
3524 assert_eq!(stream.state.color_delay_line.len(), 2);
3525 }
3526
3527 #[test]
3528 fn test_color_delay_dynamic_change() {
3529 let mut backend = TestBackend::new();
3531 backend.connected = true;
3532
3533 let info = DacInfo {
3534 id: "test".to_string(),
3535 name: "Test".to_string(),
3536 kind: DacType::Custom("Test".to_string()),
3537 caps: backend.caps().clone(),
3538 };
3539
3540 let cfg = StreamConfig::new(10000).with_color_delay(Duration::from_micros(200));
3542 let mut stream = Stream::with_backend(info, Box::new(backend), cfg);
3543
3544 stream.control.arm().unwrap();
3546 stream.process_control_messages();
3547 stream.state.last_armed = true;
3548
3549 stream.state.color_delay_line.clear();
3551 for _ in 0..2 {
3552 stream.state.color_delay_line.push_back((0, 0, 0, 0));
3553 }
3554
3555 let n = 3;
3557 for i in 0..n {
3558 stream.state.chunk_buffer[i] =
3559 LaserPoint::new(0.0, 0.0, (i as u16 + 1) * 10000, 0, 0, 65535);
3560 }
3561 let mut on_error = |_: Error| {};
3562 stream.write_fill_points(n, &mut on_error).unwrap();
3563
3564 stream.control.set_color_delay(Duration::from_micros(500));
3566
3567 for i in 0..n {
3569 stream.state.chunk_buffer[i] =
3570 LaserPoint::new(0.0, 0.0, (i as u16 + 4) * 10000, 0, 0, 65535);
3571 }
3572 stream.write_fill_points(n, &mut on_error).unwrap();
3573
3574 assert_eq!(stream.state.color_delay_line.len(), 5);
3575
3576 stream.control.set_color_delay(Duration::ZERO);
3578
3579 for i in 0..n {
3580 stream.state.chunk_buffer[i] = LaserPoint::new(0.0, 0.0, 50000, 0, 0, 65535);
3581 }
3582 stream.write_fill_points(n, &mut on_error).unwrap();
3583
3584 assert!(stream.state.color_delay_line.is_empty());
3586 }
3587
3588 #[test]
3593 fn test_startup_blank_blanks_first_n_points() {
3594 let backend = TestBackend::new();
3595 let mut backend_box: Box<dyn StreamBackend> = Box::new(backend);
3596 backend_box.connect().unwrap();
3597
3598 let info = DacInfo {
3599 id: "test".to_string(),
3600 name: "Test Device".to_string(),
3601 kind: DacType::Custom("Test".to_string()),
3602 caps: backend_box.caps().clone(),
3603 };
3604
3605 let cfg = StreamConfig::new(10000)
3608 .with_startup_blank(Duration::from_micros(500))
3609 .with_color_delay(Duration::ZERO);
3610 let mut stream = Stream::with_backend(info, backend_box, cfg);
3611
3612 assert_eq!(stream.state.startup_blank_points, 5);
3613
3614 stream.control.arm().unwrap();
3616 stream.process_control_messages();
3617
3618 stream.state.last_armed = false; let n = 10;
3623 for i in 0..n {
3624 stream.state.chunk_buffer[i] =
3625 LaserPoint::new(i as f32 * 0.1, 0.0, 65535, 32000, 16000, 65535);
3626 }
3627
3628 let mut on_error = |_: Error| {};
3629 stream.write_fill_points(n, &mut on_error).unwrap();
3630
3631 assert_eq!(stream.state.startup_blank_remaining, 0);
3634
3635 stream.state.last_armed = true; for i in 0..n {
3638 stream.state.chunk_buffer[i] = LaserPoint::new(0.0, 0.0, 65535, 32000, 16000, 65535);
3639 }
3640 stream.write_fill_points(n, &mut on_error).unwrap();
3641
3642 assert_eq!(stream.state.chunk_buffer[0].r, 65535);
3646 assert_eq!(stream.state.chunk_buffer[0].g, 32000);
3647 }
3648
3649 #[test]
3650 fn test_startup_blank_resets_on_rearm() {
3651 let backend = TestBackend::new();
3652 let mut backend_box: Box<dyn StreamBackend> = Box::new(backend);
3653 backend_box.connect().unwrap();
3654
3655 let info = DacInfo {
3656 id: "test".to_string(),
3657 name: "Test Device".to_string(),
3658 kind: DacType::Custom("Test".to_string()),
3659 caps: backend_box.caps().clone(),
3660 };
3661
3662 let cfg = StreamConfig::new(10000)
3664 .with_startup_blank(Duration::from_micros(500))
3665 .with_color_delay(Duration::ZERO);
3666 let mut stream = Stream::with_backend(info, backend_box, cfg);
3667
3668 stream.state.last_armed = false;
3670 stream.control.arm().unwrap();
3671 stream.process_control_messages();
3672
3673 let n = 10;
3674 for i in 0..n {
3675 stream.state.chunk_buffer[i] = LaserPoint::new(0.0, 0.0, 65535, 65535, 65535, 65535);
3676 }
3677 let mut on_error = |_: Error| {};
3678 stream.state.last_armed = false;
3680 stream.write_fill_points(n, &mut on_error).unwrap();
3681 assert_eq!(stream.state.startup_blank_remaining, 0);
3682
3683 stream.control.disarm().unwrap();
3685 stream.process_control_messages();
3686
3687 stream.control.arm().unwrap();
3688 stream.process_control_messages();
3689
3690 stream.state.last_armed = false;
3692 for i in 0..n {
3693 stream.state.chunk_buffer[i] = LaserPoint::new(0.0, 0.0, 65535, 65535, 65535, 65535);
3694 }
3695 stream.write_fill_points(n, &mut on_error).unwrap();
3696
3697 assert_eq!(stream.state.startup_blank_remaining, 0);
3699 }
3700
3701 #[test]
3702 fn test_startup_blank_zero_is_noop() {
3703 let backend = TestBackend::new();
3704 let mut backend_box: Box<dyn StreamBackend> = Box::new(backend);
3705 backend_box.connect().unwrap();
3706
3707 let info = DacInfo {
3708 id: "test".to_string(),
3709 name: "Test Device".to_string(),
3710 kind: DacType::Custom("Test".to_string()),
3711 caps: backend_box.caps().clone(),
3712 };
3713
3714 let cfg = StreamConfig::new(10000)
3716 .with_startup_blank(Duration::ZERO)
3717 .with_color_delay(Duration::ZERO);
3718 let mut stream = Stream::with_backend(info, backend_box, cfg);
3719
3720 assert_eq!(stream.state.startup_blank_points, 0);
3721
3722 stream.control.arm().unwrap();
3724 stream.process_control_messages();
3725 stream.state.last_armed = false; let n = 5;
3728 for i in 0..n {
3729 stream.state.chunk_buffer[i] = LaserPoint::new(0.0, 0.0, 65535, 32000, 16000, 65535);
3730 }
3731 let mut on_error = |_: Error| {};
3732 stream.write_fill_points(n, &mut on_error).unwrap();
3733
3734 assert_eq!(stream.state.chunk_buffer[0].r, 65535);
3736 assert_eq!(stream.state.chunk_buffer[0].g, 32000);
3737 assert_eq!(stream.state.chunk_buffer[0].b, 16000);
3738 assert_eq!(stream.state.chunk_buffer[0].intensity, 65535);
3739 assert_eq!(stream.state.startup_blank_remaining, 0);
3740 }
3741
3742 #[test]
3747 fn test_usb_frame_swap_replaces_scheduled_ahead() {
3748 let backend = TestBackend::new().with_output_model(OutputModel::UsbFrameSwap);
3749 let mut backend_box: Box<dyn StreamBackend> = Box::new(backend);
3750 backend_box.connect().unwrap();
3751
3752 let info = DacInfo {
3753 id: "test".to_string(),
3754 name: "Test Device".to_string(),
3755 kind: DacType::Custom("Test".to_string()),
3756 caps: backend_box.caps().clone(),
3757 };
3758
3759 let cfg = StreamConfig::new(30000).with_color_delay(Duration::ZERO);
3760 let mut stream = Stream::with_backend(info, backend_box, cfg);
3761
3762 stream.control.arm().unwrap();
3764 stream.process_control_messages();
3765
3766 let n = 50;
3767 for _ in 0..2 {
3768 for i in 0..n {
3769 stream.state.chunk_buffer[i] = LaserPoint::new(0.0, 0.0, 0, 0, 0, 0);
3770 }
3771 let mut on_error = |_: Error| {};
3772 stream.write_fill_points(n, &mut on_error).unwrap();
3773 }
3774
3775 assert_eq!(stream.state.scheduled_ahead, n as u64);
3777 assert_eq!(stream.state.stats.chunks_written, 2);
3778 assert_eq!(stream.state.stats.points_written, 2 * n as u64);
3779 }
3780
3781 #[test]
3782 fn test_usb_frame_swap_no_queue_reporting() {
3783 let backend = NoQueueTestBackend::new().with_output_model(OutputModel::UsbFrameSwap);
3784 let mut backend_box: Box<dyn StreamBackend> = Box::new(backend);
3785 backend_box.connect().unwrap();
3786
3787 let info = DacInfo {
3788 id: "test".to_string(),
3789 name: "Test Device".to_string(),
3790 kind: DacType::Custom("Test".to_string()),
3791 caps: backend_box.caps().clone(),
3792 };
3793
3794 let cfg = StreamConfig::new(30000).with_color_delay(Duration::ZERO);
3795 let mut stream = Stream::with_backend(info, backend_box, cfg);
3796
3797 stream.control.arm().unwrap();
3799 stream.process_control_messages();
3800
3801 let n = 50;
3802 for _ in 0..2 {
3803 for i in 0..n {
3804 stream.state.chunk_buffer[i] = LaserPoint::new(0.0, 0.0, 0, 0, 0, 0);
3805 }
3806 let mut on_error = |_: Error| {};
3807 stream.write_fill_points(n, &mut on_error).unwrap();
3808 }
3809
3810 assert_eq!(stream.state.scheduled_ahead, n as u64);
3813
3814 let est = stream.estimate_buffer_points();
3816 assert_eq!(est, n as u64);
3818 }
3819
3820 #[test]
3821 fn test_network_fifo_accumulates_scheduled_ahead() {
3822 let backend = TestBackend::new(); let mut backend_box: Box<dyn StreamBackend> = Box::new(backend);
3824 backend_box.connect().unwrap();
3825
3826 let info = DacInfo {
3827 id: "test".to_string(),
3828 name: "Test Device".to_string(),
3829 kind: DacType::Custom("Test".to_string()),
3830 caps: backend_box.caps().clone(),
3831 };
3832
3833 let cfg = StreamConfig::new(30000).with_color_delay(Duration::ZERO);
3834 let mut stream = Stream::with_backend(info, backend_box, cfg);
3835
3836 stream.control.arm().unwrap();
3838 stream.process_control_messages();
3839
3840 let n = 50;
3841 for _ in 0..2 {
3842 for i in 0..n {
3843 stream.state.chunk_buffer[i] = LaserPoint::new(0.0, 0.0, 0, 0, 0, 0);
3844 }
3845 let mut on_error = |_: Error| {};
3846 stream.write_fill_points(n, &mut on_error).unwrap();
3847 }
3848
3849 assert_eq!(stream.state.scheduled_ahead, 2 * n as u64);
3851 assert_eq!(stream.state.stats.chunks_written, 2);
3852 assert_eq!(stream.state.stats.points_written, 2 * n as u64);
3853 }
3854
3855 #[test]
3856 fn test_validate_config_rejects_pps_below_min() {
3857 let caps = DacCapabilities {
3859 pps_min: 7,
3860 pps_max: 65535,
3861 max_points_per_chunk: 1000,
3862 output_model: OutputModel::NetworkFifo,
3863 };
3864 let cfg = StreamConfig::new(5);
3865 let result = Dac::validate_config(&caps, &cfg);
3866 assert!(result.is_err());
3867 let msg = result.unwrap_err().to_string();
3868 assert!(msg.contains("PPS 5"), "expected PPS 5 in error: {msg}");
3869 }
3870
3871 #[test]
3872 fn test_validate_config_rejects_pps_above_max() {
3873 let caps = DacCapabilities {
3875 pps_min: 1,
3876 pps_max: 35_000,
3877 max_points_per_chunk: 1000,
3878 output_model: OutputModel::NetworkFifo,
3879 };
3880 let cfg = StreamConfig::new(50_000);
3881 let result = Dac::validate_config(&caps, &cfg);
3882 assert!(result.is_err());
3883 let msg = result.unwrap_err().to_string();
3884 assert!(
3885 msg.contains("PPS 50000"),
3886 "expected PPS 50000 in error: {msg}"
3887 );
3888 }
3889
3890 #[test]
3891 fn test_validate_config_avb_accepts_standard_pps() {
3892 let caps = DacCapabilities {
3894 pps_min: 1,
3895 pps_max: 100_000,
3896 max_points_per_chunk: 4096,
3897 output_model: OutputModel::NetworkFifo,
3898 };
3899 let cfg = StreamConfig::new(30_000);
3900 assert!(Dac::validate_config(&caps, &cfg).is_ok());
3901 }
3902}