1use std::collections::VecDeque;
31use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
32use std::sync::mpsc::{self, Receiver, Sender, TryRecvError};
33use std::sync::{Arc, Mutex};
34use std::time::Duration;
35
36use crate::backend::{Error, Result, StreamBackend, WriteOutcome};
37use crate::types::{
38 ChunkRequest, ChunkResult, DacCapabilities, DacInfo, DacType, LaserPoint, OutputModel, RunExit,
39 StreamConfig, StreamInstant, StreamStats, StreamStatus, UnderrunPolicy,
40};
41
42#[derive(Debug, Clone, Copy)]
51enum ControlMsg {
52 Arm,
54 Disarm,
56 Stop,
58}
59
60#[derive(Clone)]
68pub struct StreamControl {
69 inner: Arc<StreamControlInner>,
70}
71
72struct StreamControlInner {
73 armed: AtomicBool,
75 stop_requested: AtomicBool,
77 control_tx: Mutex<Sender<ControlMsg>>,
80 color_delay_micros: AtomicU64,
82}
83
84impl StreamControl {
85 fn new(control_tx: Sender<ControlMsg>, color_delay: Duration) -> Self {
86 Self {
87 inner: Arc::new(StreamControlInner {
88 armed: AtomicBool::new(false),
89 stop_requested: AtomicBool::new(false),
90 control_tx: Mutex::new(control_tx),
91 color_delay_micros: AtomicU64::new(color_delay.as_micros() as u64),
92 }),
93 }
94 }
95
96 pub fn arm(&self) -> Result<()> {
101 self.inner.armed.store(true, Ordering::SeqCst);
102 if let Ok(tx) = self.inner.control_tx.lock() {
104 let _ = tx.send(ControlMsg::Arm);
105 }
106 Ok(())
107 }
108
109 pub fn disarm(&self) -> Result<()> {
122 self.inner.armed.store(false, Ordering::SeqCst);
123 if let Ok(tx) = self.inner.control_tx.lock() {
125 let _ = tx.send(ControlMsg::Disarm);
126 }
127 Ok(())
128 }
129
130 pub fn is_armed(&self) -> bool {
132 self.inner.armed.load(Ordering::SeqCst)
133 }
134
135 pub fn set_color_delay(&self, delay: Duration) {
140 self.inner
141 .color_delay_micros
142 .store(delay.as_micros() as u64, Ordering::SeqCst);
143 }
144
145 pub fn color_delay(&self) -> Duration {
147 Duration::from_micros(self.inner.color_delay_micros.load(Ordering::SeqCst))
148 }
149
150 pub fn stop(&self) -> Result<()> {
155 self.inner.stop_requested.store(true, Ordering::SeqCst);
156 if let Ok(tx) = self.inner.control_tx.lock() {
158 let _ = tx.send(ControlMsg::Stop);
159 }
160 Ok(())
161 }
162
163 pub fn is_stop_requested(&self) -> bool {
165 self.inner.stop_requested.load(Ordering::SeqCst)
166 }
167}
168
169struct StreamState {
174 current_instant: StreamInstant,
176 scheduled_ahead: u64,
178
179 chunk_buffer: Vec<LaserPoint>,
182 last_chunk: Vec<LaserPoint>,
184 last_chunk_len: usize,
186
187 color_delay_line: VecDeque<(u16, u16, u16, u16)>,
189
190 startup_blank_remaining: usize,
192 startup_blank_points: usize,
194
195 stats: StreamStats,
197 last_armed: bool,
199 shutter_open: bool,
201}
202
203impl StreamState {
204 fn new(max_points_per_chunk: usize, startup_blank_points: usize) -> Self {
209 Self {
210 current_instant: StreamInstant::new(0),
211 scheduled_ahead: 0,
212 chunk_buffer: vec![LaserPoint::default(); max_points_per_chunk],
213 last_chunk: vec![LaserPoint::default(); max_points_per_chunk],
214 last_chunk_len: 0,
215 color_delay_line: VecDeque::new(),
216 startup_blank_remaining: 0,
217 startup_blank_points,
218 stats: StreamStats::default(),
219 last_armed: false,
220 shutter_open: false,
221 }
222 }
223}
224
225pub struct Stream {
237 info: DacInfo,
239 backend: Option<Box<dyn StreamBackend>>,
241 config: StreamConfig,
243 control: StreamControl,
245 control_rx: Receiver<ControlMsg>,
247 state: StreamState,
249}
250
251impl Stream {
252 pub(crate) fn with_backend(
254 info: DacInfo,
255 backend: Box<dyn StreamBackend>,
256 config: StreamConfig,
257 ) -> Self {
258 let (control_tx, control_rx) = mpsc::channel();
259 let max_points = info.caps.max_points_per_chunk;
260 let startup_blank_points = if config.startup_blank.is_zero() {
261 0
262 } else {
263 (config.startup_blank.as_secs_f64() * config.pps as f64).ceil() as usize
264 };
265 Self {
266 info,
267 backend: Some(backend),
268 config: config.clone(),
269 control: StreamControl::new(control_tx, config.color_delay),
270 control_rx,
271 state: StreamState::new(max_points, startup_blank_points),
272 }
273 }
274
275 pub fn info(&self) -> &DacInfo {
277 &self.info
278 }
279
280 pub fn config(&self) -> &StreamConfig {
282 &self.config
283 }
284
285 pub fn control(&self) -> StreamControl {
287 self.control.clone()
288 }
289
290 pub fn status(&self) -> Result<StreamStatus> {
292 let device_queued_points = self.backend.as_ref().and_then(|b| b.queued_points());
293
294 Ok(StreamStatus {
295 connected: self
296 .backend
297 .as_ref()
298 .map(|b| b.is_connected())
299 .unwrap_or(false),
300 scheduled_ahead_points: self.state.scheduled_ahead,
301 device_queued_points,
302 stats: Some(self.state.stats.clone()),
303 })
304 }
305
306 fn handle_shutter_transition(&mut self, is_armed: bool) {
308 let was_armed = self.state.last_armed;
309 self.state.last_armed = is_armed;
310
311 if was_armed && !is_armed {
312 self.state.color_delay_line.clear();
314 if self.state.shutter_open {
315 if let Some(backend) = &mut self.backend {
316 let _ = backend.set_shutter(false); }
318 self.state.shutter_open = false;
319 }
320 } else if !was_armed && is_armed {
321 let delay_micros = self.control.inner.color_delay_micros.load(Ordering::SeqCst);
325 let delay_points = if delay_micros == 0 {
326 0
327 } else {
328 (Duration::from_micros(delay_micros).as_secs_f64() * self.config.pps as f64).ceil()
329 as usize
330 };
331 self.state.color_delay_line.clear();
332 for _ in 0..delay_points {
333 self.state.color_delay_line.push_back((0, 0, 0, 0));
334 }
335
336 self.state.startup_blank_remaining = self.state.startup_blank_points;
337
338 if !self.state.shutter_open {
339 if let Some(backend) = &mut self.backend {
340 let _ = backend.set_shutter(true); }
342 self.state.shutter_open = true;
343 }
344 }
345 }
346
347 pub fn stop(&mut self) -> Result<()> {
353 self.control.disarm()?;
355
356 self.control.stop()?;
357
358 if let Some(backend) = &mut self.backend {
360 let _ = backend.set_shutter(false);
361 backend.stop()?;
362 }
363
364 Ok(())
365 }
366
367 pub fn into_dac(mut self) -> (Dac, StreamStats) {
389 let _ = self.control.disarm();
391 let _ = self.control.stop();
392 if let Some(backend) = &mut self.backend {
393 let _ = backend.set_shutter(false);
394 let _ = backend.stop();
395 }
396
397 let backend = self.backend.take();
399 let stats = self.state.stats.clone();
400
401 let dac = Dac {
402 info: self.info.clone(),
403 backend,
404 };
405
406 (dac, stats)
407 }
408
409 pub fn run<F, E>(mut self, mut producer: F, mut on_error: E) -> Result<RunExit>
450 where
451 F: FnMut(&ChunkRequest, &mut [LaserPoint]) -> ChunkResult + Send + 'static,
452 E: FnMut(Error) + Send + 'static,
453 {
454 use std::time::Instant;
455
456 let pps = self.config.pps as f64;
457 let max_points = self.info.caps.max_points_per_chunk;
458
459 let mut last_iteration = Instant::now();
463
464 loop {
465 if self.control.is_stop_requested() {
467 return Ok(RunExit::Stopped);
468 }
469
470 let now = Instant::now();
474 let elapsed = now.duration_since(last_iteration);
475 let points_consumed = (elapsed.as_secs_f64() * pps) as u64;
476 self.state.scheduled_ahead = self.state.scheduled_ahead.saturating_sub(points_consumed);
477 last_iteration = now;
478
479 let buffered = self.estimate_buffer_points();
481 let target_points = (self.config.target_buffer.as_secs_f64() * pps) as u64;
482
483 if buffered > target_points {
486 let excess_points = buffered - target_points;
487 let sleep_time = Duration::from_secs_f64(excess_points as f64 / pps);
488 if self.sleep_with_control_check(sleep_time)? {
489 return Ok(RunExit::Stopped);
490 }
491 continue; }
493
494 if let Some(backend) = &self.backend {
496 if !backend.is_connected() {
497 log::warn!("backend.is_connected() = false, exiting with Disconnected");
498 on_error(Error::disconnected("backend disconnected"));
499 return Ok(RunExit::Disconnected);
500 }
501 } else {
502 log::warn!("no backend, exiting with Disconnected");
503 on_error(Error::disconnected("no backend"));
504 return Ok(RunExit::Disconnected);
505 }
506
507 if self.process_control_messages() {
509 return Ok(RunExit::Stopped);
510 }
511
512 let req = self.build_fill_request(max_points);
514
515 let buffer = &mut self.state.chunk_buffer[..max_points];
517 let result = producer(&req, buffer);
518
519 match result {
521 ChunkResult::Filled(n) => {
522 let n = n.min(max_points);
524
525 if n == 0 && req.target_points > 0 {
527 self.handle_underrun(&req)?;
528 continue;
529 }
530
531 if n > 0 {
533 self.write_fill_points(n, &mut on_error)?;
534 }
535 }
536 ChunkResult::Starved => {
537 self.handle_underrun(&req)?;
538 }
539 ChunkResult::End => {
540 self.drain_and_blank();
542 return Ok(RunExit::ProducerEnded);
543 }
544 }
545 }
546 }
547
548 fn sleep_with_control_check(&mut self, duration: Duration) -> Result<bool> {
552 const SLEEP_SLICE: Duration = Duration::from_millis(2);
553 let mut remaining = duration;
554
555 while remaining > Duration::ZERO {
556 let slice = remaining.min(SLEEP_SLICE);
557 std::thread::sleep(slice);
558 remaining = remaining.saturating_sub(slice);
559
560 if self.process_control_messages() {
562 return Ok(true);
563 }
564 }
565
566 Ok(false)
567 }
568
569 fn write_fill_points<E>(&mut self, n: usize, on_error: &mut E) -> Result<()>
573 where
574 E: FnMut(Error),
575 {
576 let is_armed = self.control.is_armed();
577 let pps = self.config.pps;
578
579 self.handle_shutter_transition(is_armed);
581
582 if !is_armed {
584 for p in &mut self.state.chunk_buffer[..n] {
585 *p = LaserPoint::blanked(p.x, p.y);
586 }
587 }
588
589 if is_armed && self.state.startup_blank_remaining > 0 {
591 let blank_count = n.min(self.state.startup_blank_remaining);
592 for p in &mut self.state.chunk_buffer[..blank_count] {
593 p.r = 0;
594 p.g = 0;
595 p.b = 0;
596 p.intensity = 0;
597 }
598 self.state.startup_blank_remaining -= blank_count;
599 }
600
601 let delay_micros = self.control.inner.color_delay_micros.load(Ordering::SeqCst);
603 let color_delay_points = if delay_micros == 0 {
604 0
605 } else {
606 (Duration::from_micros(delay_micros).as_secs_f64() * self.config.pps as f64).ceil()
607 as usize
608 };
609
610 if color_delay_points > 0 {
611 self.state
613 .color_delay_line
614 .resize(color_delay_points, (0, 0, 0, 0));
615 for p in &mut self.state.chunk_buffer[..n] {
616 self.state
617 .color_delay_line
618 .push_back((p.r, p.g, p.b, p.intensity));
619 let (r, g, b, i) = self.state.color_delay_line.pop_front().unwrap();
620 p.r = r;
621 p.g = g;
622 p.b = b;
623 p.intensity = i;
624 }
625 } else if !self.state.color_delay_line.is_empty() {
626 self.state.color_delay_line.clear();
628 }
629
630 loop {
632 let backend = match self.backend.as_mut() {
634 Some(b) => b,
635 None => return Err(Error::disconnected("no backend")),
636 };
637
638 match backend.try_write_chunk(pps, &self.state.chunk_buffer[..n]) {
639 Ok(WriteOutcome::Written) => {
640 self.record_write(n, is_armed);
641 return Ok(());
642 }
643 Ok(WriteOutcome::WouldBlock) => {
644 }
647 Err(e) if e.is_stopped() => {
648 return Err(Error::Stopped);
649 }
650 Err(e) if e.is_disconnected() => {
651 log::warn!("write got Disconnected error, exiting stream: {e}");
652 on_error(Error::disconnected("backend disconnected"));
653 return Err(e);
654 }
655 Err(e) => {
656 log::warn!("write error, disconnecting backend: {e}");
657 let _ = backend.disconnect();
658 on_error(e);
659 return Ok(());
660 }
661 }
662
663 std::thread::yield_now();
665 if self.process_control_messages() {
666 return Err(Error::Stopped);
667 }
668 std::thread::sleep(Duration::from_micros(100));
669 }
670 }
671
672 fn handle_underrun(&mut self, req: &ChunkRequest) -> Result<()> {
674 self.state.stats.underrun_count += 1;
675
676 let is_armed = self.control.is_armed();
677 self.handle_shutter_transition(is_armed);
678
679 let n_points = req.target_points.max(1);
681
682 let fill_start = if !is_armed {
684 for i in 0..n_points {
686 self.state.chunk_buffer[i] = LaserPoint::blanked(0.0, 0.0);
687 }
688 n_points
689 } else {
690 match &self.config.underrun {
691 UnderrunPolicy::RepeatLast => {
692 if self.state.last_chunk_len > 0 {
693 for i in 0..n_points {
695 self.state.chunk_buffer[i] =
696 self.state.last_chunk[i % self.state.last_chunk_len];
697 }
698 n_points
699 } else {
700 for i in 0..n_points {
702 self.state.chunk_buffer[i] = LaserPoint::blanked(0.0, 0.0);
703 }
704 n_points
705 }
706 }
707 UnderrunPolicy::Blank => {
708 for i in 0..n_points {
709 self.state.chunk_buffer[i] = LaserPoint::blanked(0.0, 0.0);
710 }
711 n_points
712 }
713 UnderrunPolicy::Park { x, y } => {
714 for i in 0..n_points {
715 self.state.chunk_buffer[i] = LaserPoint::blanked(*x, *y);
716 }
717 n_points
718 }
719 UnderrunPolicy::Stop => {
720 self.control.stop()?;
721 return Err(Error::Stopped);
722 }
723 }
724 };
725
726 if let Some(backend) = &mut self.backend {
728 match backend.try_write_chunk(self.config.pps, &self.state.chunk_buffer[..fill_start]) {
729 Ok(WriteOutcome::Written) => {
730 self.record_write(fill_start, is_armed);
731 }
732 Ok(WriteOutcome::WouldBlock) => {
733 }
735 Err(_) => {
736 }
738 }
739 }
740
741 Ok(())
742 }
743
744 fn record_write(&mut self, n: usize, is_armed: bool) {
750 if is_armed {
751 debug_assert!(
752 n <= self.state.last_chunk.len(),
753 "n ({}) exceeds last_chunk capacity ({})",
754 n,
755 self.state.last_chunk.len()
756 );
757 self.state.last_chunk[..n].copy_from_slice(&self.state.chunk_buffer[..n]);
758 self.state.last_chunk_len = n;
759 }
760 self.state.current_instant += n as u64;
761 if self.info.caps.output_model == OutputModel::UsbFrameSwap {
762 self.state.scheduled_ahead = n as u64;
765 } else {
766 self.state.scheduled_ahead += n as u64;
767 }
768 self.state.stats.chunks_written += 1;
769 self.state.stats.points_written += n as u64;
770 }
771
772 fn process_control_messages(&mut self) -> bool {
785 loop {
786 match self.control_rx.try_recv() {
787 Ok(ControlMsg::Arm) => {
788 if !self.state.shutter_open {
790 if let Some(backend) = &mut self.backend {
791 let _ = backend.set_shutter(true);
792 }
793 self.state.shutter_open = true;
794 }
795 }
796 Ok(ControlMsg::Disarm) => {
797 if self.state.shutter_open {
799 if let Some(backend) = &mut self.backend {
800 let _ = backend.set_shutter(false);
801 }
802 self.state.shutter_open = false;
803 }
804 }
805 Ok(ControlMsg::Stop) => {
806 return true;
807 }
808 Err(TryRecvError::Empty) => break,
809 Err(TryRecvError::Disconnected) => break,
810 }
811 }
812 false
813 }
814
815 fn estimate_buffer_points(&self) -> u64 {
826 let software = self.state.scheduled_ahead;
827
828 if let Some(device_queue) = self.backend.as_ref().and_then(|b| b.queued_points()) {
830 return device_queue.min(software);
831 }
832
833 software
834 }
835
836 fn build_fill_request(&self, max_points: usize) -> ChunkRequest {
849 let pps = self.config.pps;
850
851 let buffered_points = self.estimate_buffer_points();
853 let buffered = Duration::from_secs_f64(buffered_points as f64 / pps as f64);
854
855 let start = self.state.current_instant;
860
861 let target_buffer_secs = self.config.target_buffer.as_secs_f64();
864 let min_buffer_secs = self.config.min_buffer.as_secs_f64();
865 let buffered_secs = buffered.as_secs_f64();
866
867 let deficit_target = (target_buffer_secs - buffered_secs).max(0.0);
869 let target_points = (deficit_target * pps as f64).ceil() as usize;
870 let target_points = target_points.min(max_points);
871
872 let deficit_min = (min_buffer_secs - buffered_secs).max(0.0);
874 let min_points = (deficit_min * pps as f64).ceil() as usize;
875 let min_points = min_points.min(max_points);
876
877 let device_queued_points = self.backend.as_ref().and_then(|b| b.queued_points());
879
880 ChunkRequest {
881 start,
882 pps,
883 min_points,
884 target_points,
885 buffered_points,
886 buffered,
887 device_queued_points,
888 }
889 }
890
891 fn drain_and_blank(&mut self) {
901 use std::time::Instant;
902
903 let timeout = self.config.drain_timeout;
904 if timeout.is_zero() {
905 self.blank_and_close_shutter();
907 return;
908 }
909
910 let deadline = Instant::now() + timeout;
911 let pps = self.config.pps;
912
913 let has_queue_depth = self
915 .backend
916 .as_ref()
917 .and_then(|b| b.queued_points())
918 .is_some();
919
920 if has_queue_depth {
921 const POLL_INTERVAL: Duration = Duration::from_millis(5);
923 while Instant::now() < deadline {
924 if let Some(queued) = self.backend.as_ref().and_then(|b| b.queued_points()) {
925 if queued == 0 {
926 break;
927 }
928 } else {
929 break;
931 }
932
933 if self.process_control_messages() {
935 break;
936 }
937
938 std::thread::sleep(POLL_INTERVAL);
939 }
940 } else {
941 let estimated_drain =
943 Duration::from_secs_f64(self.state.scheduled_ahead as f64 / pps as f64);
944 let wait_time = estimated_drain.min(timeout);
945
946 const SLEEP_SLICE: Duration = Duration::from_millis(10);
948 let mut remaining = wait_time;
949 while remaining > Duration::ZERO && Instant::now() < deadline {
950 let slice = remaining.min(SLEEP_SLICE);
951 std::thread::sleep(slice);
952 remaining = remaining.saturating_sub(slice);
953
954 if self.process_control_messages() {
955 break;
956 }
957 }
958 }
959
960 self.blank_and_close_shutter();
961 }
962
963 fn blank_and_close_shutter(&mut self) {
968 if let Some(backend) = &mut self.backend {
970 let _ = backend.set_shutter(false);
971 }
972 self.state.shutter_open = false;
973
974 if let Some(backend) = &mut self.backend {
977 let blank_point = LaserPoint::blanked(0.0, 0.0);
978 let blank_chunk = [blank_point; 16];
979 let _ = backend.try_write_chunk(self.config.pps, &blank_chunk);
980 }
981 }
982}
983
984impl Drop for Stream {
985 fn drop(&mut self) {
986 let _ = self.stop();
987 }
988}
989
990pub struct Dac {
1011 info: DacInfo,
1012 backend: Option<Box<dyn StreamBackend>>,
1013}
1014
1015impl Dac {
1016 pub fn new(info: DacInfo, backend: Box<dyn StreamBackend>) -> Self {
1018 Self {
1019 info,
1020 backend: Some(backend),
1021 }
1022 }
1023
1024 pub fn info(&self) -> &DacInfo {
1026 &self.info
1027 }
1028
1029 pub fn id(&self) -> &str {
1031 &self.info.id
1032 }
1033
1034 pub fn name(&self) -> &str {
1036 &self.info.name
1037 }
1038
1039 pub fn kind(&self) -> &DacType {
1041 &self.info.kind
1042 }
1043
1044 pub fn caps(&self) -> &DacCapabilities {
1046 &self.info.caps
1047 }
1048
1049 pub fn has_backend(&self) -> bool {
1051 self.backend.is_some()
1052 }
1053
1054 pub fn is_connected(&self) -> bool {
1056 self.backend
1057 .as_ref()
1058 .map(|b| b.is_connected())
1059 .unwrap_or(false)
1060 }
1061
1062 pub fn start_stream(mut self, cfg: StreamConfig) -> Result<(Stream, DacInfo)> {
1087 let mut backend = self.backend.take().ok_or_else(|| {
1088 Error::invalid_config("device backend has already been used for a stream")
1089 })?;
1090
1091 Self::validate_config(&self.info.caps, &cfg)?;
1092
1093 if !backend.is_connected() {
1095 backend.connect()?;
1096 }
1097
1098 let stream = Stream::with_backend(self.info.clone(), backend, cfg);
1099
1100 Ok((stream, self.info))
1101 }
1102
1103 fn validate_config(caps: &DacCapabilities, cfg: &StreamConfig) -> Result<()> {
1104 if cfg.pps < caps.pps_min || cfg.pps > caps.pps_max {
1105 return Err(Error::invalid_config(format!(
1106 "PPS {} is outside device range [{}, {}]",
1107 cfg.pps, caps.pps_min, caps.pps_max
1108 )));
1109 }
1110
1111 Ok(())
1112 }
1113}
1114
1115#[cfg(test)]
1116mod tests {
1117 use super::*;
1118 use crate::backend::{StreamBackend, WriteOutcome};
1119 use std::sync::atomic::{AtomicU64, AtomicUsize, Ordering};
1120 use std::sync::{Arc, Mutex};
1121
1122 struct TestBackend {
1124 caps: DacCapabilities,
1125 connected: bool,
1126 write_count: Arc<AtomicUsize>,
1128 would_block_count: Arc<AtomicUsize>,
1130 queued: Arc<AtomicU64>,
1132 shutter_open: Arc<AtomicBool>,
1134 }
1135
1136 impl TestBackend {
1137 fn new() -> Self {
1138 Self {
1139 caps: DacCapabilities {
1140 pps_min: 1000,
1141 pps_max: 100000,
1142 max_points_per_chunk: 1000,
1143 output_model: crate::types::OutputModel::NetworkFifo,
1144 },
1145 connected: false,
1146 write_count: Arc::new(AtomicUsize::new(0)),
1147 would_block_count: Arc::new(AtomicUsize::new(0)),
1148 queued: Arc::new(AtomicU64::new(0)),
1149 shutter_open: Arc::new(AtomicBool::new(false)),
1150 }
1151 }
1152
1153 fn with_would_block_count(mut self, count: usize) -> Self {
1154 self.would_block_count = Arc::new(AtomicUsize::new(count));
1155 self
1156 }
1157
1158 fn with_output_model(mut self, model: OutputModel) -> Self {
1159 self.caps.output_model = model;
1160 self
1161 }
1162
1163 fn with_initial_queue(mut self, queue: u64) -> Self {
1165 self.queued = Arc::new(AtomicU64::new(queue));
1166 self
1167 }
1168 }
1169
1170 struct NoQueueTestBackend {
1181 inner: TestBackend,
1182 }
1183
1184 impl NoQueueTestBackend {
1185 fn new() -> Self {
1186 Self {
1187 inner: TestBackend::new(),
1188 }
1189 }
1190
1191 fn with_output_model(mut self, model: OutputModel) -> Self {
1192 self.inner = self.inner.with_output_model(model);
1193 self
1194 }
1195 }
1196
1197 impl StreamBackend for NoQueueTestBackend {
1198 fn dac_type(&self) -> DacType {
1199 self.inner.dac_type()
1200 }
1201
1202 fn caps(&self) -> &DacCapabilities {
1203 self.inner.caps()
1204 }
1205
1206 fn connect(&mut self) -> Result<()> {
1207 self.inner.connect()
1208 }
1209
1210 fn disconnect(&mut self) -> Result<()> {
1211 self.inner.disconnect()
1212 }
1213
1214 fn is_connected(&self) -> bool {
1215 self.inner.is_connected()
1216 }
1217
1218 fn try_write_chunk(&mut self, pps: u32, points: &[LaserPoint]) -> Result<WriteOutcome> {
1219 self.inner.try_write_chunk(pps, points)
1220 }
1221
1222 fn stop(&mut self) -> Result<()> {
1223 self.inner.stop()
1224 }
1225
1226 fn set_shutter(&mut self, open: bool) -> Result<()> {
1227 self.inner.set_shutter(open)
1228 }
1229
1230 fn queued_points(&self) -> Option<u64> {
1232 None
1233 }
1234 }
1235
1236 impl StreamBackend for TestBackend {
1237 fn dac_type(&self) -> DacType {
1238 DacType::Custom("Test".to_string())
1239 }
1240
1241 fn caps(&self) -> &DacCapabilities {
1242 &self.caps
1243 }
1244
1245 fn connect(&mut self) -> Result<()> {
1246 self.connected = true;
1247 Ok(())
1248 }
1249
1250 fn disconnect(&mut self) -> Result<()> {
1251 self.connected = false;
1252 Ok(())
1253 }
1254
1255 fn is_connected(&self) -> bool {
1256 self.connected
1257 }
1258
1259 fn try_write_chunk(&mut self, _pps: u32, points: &[LaserPoint]) -> Result<WriteOutcome> {
1260 self.write_count.fetch_add(1, Ordering::SeqCst);
1261
1262 let remaining = self.would_block_count.load(Ordering::SeqCst);
1264 if remaining > 0 {
1265 self.would_block_count.fetch_sub(1, Ordering::SeqCst);
1266 return Ok(WriteOutcome::WouldBlock);
1267 }
1268
1269 self.queued.fetch_add(points.len() as u64, Ordering::SeqCst);
1270 Ok(WriteOutcome::Written)
1271 }
1272
1273 fn stop(&mut self) -> Result<()> {
1274 Ok(())
1275 }
1276
1277 fn set_shutter(&mut self, open: bool) -> Result<()> {
1278 self.shutter_open.store(open, Ordering::SeqCst);
1279 Ok(())
1280 }
1281
1282 fn queued_points(&self) -> Option<u64> {
1283 Some(self.queued.load(Ordering::SeqCst))
1284 }
1285 }
1286
1287 #[test]
1288 fn test_stream_control_arm_disarm() {
1289 let (tx, _rx) = mpsc::channel();
1290 let control = StreamControl::new(tx, Duration::ZERO);
1291 assert!(!control.is_armed());
1292
1293 control.arm().unwrap();
1294 assert!(control.is_armed());
1295
1296 control.disarm().unwrap();
1297 assert!(!control.is_armed());
1298 }
1299
1300 #[test]
1301 fn test_stream_control_stop() {
1302 let (tx, _rx) = mpsc::channel();
1303 let control = StreamControl::new(tx, Duration::ZERO);
1304 assert!(!control.is_stop_requested());
1305
1306 control.stop().unwrap();
1307 assert!(control.is_stop_requested());
1308 }
1309
1310 #[test]
1311 fn test_stream_control_clone_shares_state() {
1312 let (tx, _rx) = mpsc::channel();
1313 let control1 = StreamControl::new(tx, Duration::ZERO);
1314 let control2 = control1.clone();
1315
1316 control1.arm().unwrap();
1317 assert!(control2.is_armed());
1318
1319 control2.stop().unwrap();
1320 assert!(control1.is_stop_requested());
1321 }
1322
1323 #[test]
1324 fn test_device_start_stream_connects_backend() {
1325 let backend = TestBackend::new();
1326 let info = DacInfo {
1327 id: "test".to_string(),
1328 name: "Test Device".to_string(),
1329 kind: DacType::Custom("Test".to_string()),
1330 caps: backend.caps().clone(),
1331 };
1332 let device = Dac::new(info, Box::new(backend));
1333
1334 assert!(!device.is_connected());
1336
1337 let cfg = StreamConfig::new(30000);
1339 let result = device.start_stream(cfg);
1340 assert!(result.is_ok());
1341
1342 let (stream, _info) = result.unwrap();
1343 assert!(stream.backend.as_ref().unwrap().is_connected());
1344 }
1345
1346 #[test]
1347 fn test_handle_underrun_advances_state() {
1348 let mut backend = TestBackend::new();
1349 backend.connected = true;
1350 let info = DacInfo {
1351 id: "test".to_string(),
1352 name: "Test Device".to_string(),
1353 kind: DacType::Custom("Test".to_string()),
1354 caps: backend.caps().clone(),
1355 };
1356
1357 let cfg = StreamConfig::new(30000);
1358 let mut stream = Stream::with_backend(info, Box::new(backend), cfg);
1359
1360 let initial_instant = stream.state.current_instant;
1362 let initial_scheduled = stream.state.scheduled_ahead;
1363 let initial_chunks = stream.state.stats.chunks_written;
1364 let initial_points = stream.state.stats.points_written;
1365
1366 let req = ChunkRequest {
1368 start: StreamInstant::new(0),
1369 pps: 30000,
1370 min_points: 100,
1371 target_points: 100,
1372 buffered_points: 0,
1373 buffered: Duration::ZERO,
1374 device_queued_points: None,
1375 };
1376 stream.handle_underrun(&req).unwrap();
1377
1378 assert!(stream.state.current_instant > initial_instant);
1380 assert!(stream.state.scheduled_ahead > initial_scheduled);
1381 assert_eq!(stream.state.stats.chunks_written, initial_chunks + 1);
1382 assert_eq!(stream.state.stats.points_written, initial_points + 100);
1383 assert_eq!(stream.state.stats.underrun_count, 1);
1384 }
1385
1386 #[test]
1387 fn test_run_retries_on_would_block() {
1388 let backend = TestBackend::new().with_would_block_count(3);
1390 let write_count = backend.write_count.clone();
1391
1392 let mut backend_box: Box<dyn StreamBackend> = Box::new(backend);
1393 backend_box.connect().unwrap();
1394
1395 let info = DacInfo {
1396 id: "test".to_string(),
1397 name: "Test Device".to_string(),
1398 kind: DacType::Custom("Test".to_string()),
1399 caps: backend_box.caps().clone(),
1400 };
1401
1402 let cfg = StreamConfig::new(30000);
1403 let stream = Stream::with_backend(info, backend_box, cfg);
1404
1405 let produced_count = Arc::new(AtomicUsize::new(0));
1406 let produced_count_clone = produced_count.clone();
1407 let result = stream.run(
1408 move |req, buffer| {
1409 let count = produced_count_clone.fetch_add(1, Ordering::SeqCst);
1410 if count < 1 {
1411 let n = req.target_points.min(buffer.len());
1412 for i in 0..n {
1413 buffer[i] = LaserPoint::blanked(0.0, 0.0);
1414 }
1415 ChunkResult::Filled(n)
1416 } else {
1417 ChunkResult::End
1418 }
1419 },
1420 |_e| {},
1421 );
1422
1423 assert_eq!(result.unwrap(), RunExit::ProducerEnded);
1424 assert!(write_count.load(Ordering::SeqCst) >= 1);
1427 }
1428
1429 #[test]
1430 fn test_arm_opens_shutter_disarm_closes_shutter() {
1431 let backend = TestBackend::new();
1432 let shutter_open = backend.shutter_open.clone();
1433
1434 let mut backend_box: Box<dyn StreamBackend> = Box::new(backend);
1435 backend_box.connect().unwrap();
1436
1437 let info = DacInfo {
1438 id: "test".to_string(),
1439 name: "Test Device".to_string(),
1440 kind: DacType::Custom("Test".to_string()),
1441 caps: backend_box.caps().clone(),
1442 };
1443
1444 let cfg = StreamConfig::new(30000);
1445 let mut stream = Stream::with_backend(info, backend_box, cfg);
1446
1447 assert!(!shutter_open.load(Ordering::SeqCst));
1449
1450 let control = stream.control();
1452 control.arm().unwrap();
1453
1454 let stopped = stream.process_control_messages();
1456 assert!(!stopped);
1457 assert!(shutter_open.load(Ordering::SeqCst));
1458
1459 control.disarm().unwrap();
1461
1462 let stopped = stream.process_control_messages();
1464 assert!(!stopped);
1465 assert!(!shutter_open.load(Ordering::SeqCst));
1466 }
1467
1468 #[test]
1469 fn test_handle_underrun_blanks_when_disarmed() {
1470 let backend = TestBackend::new();
1471
1472 let mut backend_box: Box<dyn StreamBackend> = Box::new(backend);
1473 backend_box.connect().unwrap();
1474
1475 let info = DacInfo {
1476 id: "test".to_string(),
1477 name: "Test Device".to_string(),
1478 kind: DacType::Custom("Test".to_string()),
1479 caps: backend_box.caps().clone(),
1480 };
1481
1482 let cfg = StreamConfig::new(30000).with_underrun(UnderrunPolicy::RepeatLast);
1484 let mut stream = Stream::with_backend(info, backend_box, cfg);
1485
1486 let colored_point = LaserPoint::new(0.5, 0.5, 65535, 65535, 65535, 65535);
1488 for i in 0..100 {
1489 stream.state.last_chunk[i] = colored_point;
1490 }
1491 stream.state.last_chunk_len = 100;
1492
1493 assert!(!stream.control.is_armed());
1495
1496 let req = ChunkRequest {
1497 start: StreamInstant::new(0),
1498 pps: 30000,
1499 min_points: 100,
1500 target_points: 100,
1501 buffered_points: 0,
1502 buffered: Duration::ZERO,
1503 device_queued_points: None,
1504 };
1505
1506 stream.handle_underrun(&req).unwrap();
1508
1509 assert_eq!(stream.state.last_chunk[0].r, 65535); }
1514
1515 #[test]
1516 fn test_stop_closes_shutter() {
1517 let backend = TestBackend::new();
1518 let shutter_open = backend.shutter_open.clone();
1519
1520 let mut backend_box: Box<dyn StreamBackend> = Box::new(backend);
1521 backend_box.connect().unwrap();
1522
1523 let info = DacInfo {
1524 id: "test".to_string(),
1525 name: "Test Device".to_string(),
1526 kind: DacType::Custom("Test".to_string()),
1527 caps: backend_box.caps().clone(),
1528 };
1529
1530 let cfg = StreamConfig::new(30000);
1531 let mut stream = Stream::with_backend(info, backend_box, cfg);
1532
1533 stream.control.arm().unwrap();
1535 stream.process_control_messages();
1536 assert!(shutter_open.load(Ordering::SeqCst));
1537
1538 stream.stop().unwrap();
1540 assert!(!shutter_open.load(Ordering::SeqCst));
1541 }
1542
1543 #[test]
1544 fn test_arm_disarm_arm_cycle() {
1545 let backend = TestBackend::new();
1546 let shutter_open = backend.shutter_open.clone();
1547
1548 let mut backend_box: Box<dyn StreamBackend> = Box::new(backend);
1549 backend_box.connect().unwrap();
1550
1551 let info = DacInfo {
1552 id: "test".to_string(),
1553 name: "Test Device".to_string(),
1554 kind: DacType::Custom("Test".to_string()),
1555 caps: backend_box.caps().clone(),
1556 };
1557
1558 let cfg = StreamConfig::new(30000);
1559 let mut stream = Stream::with_backend(info, backend_box, cfg);
1560 let control = stream.control();
1561
1562 assert!(!control.is_armed());
1564 assert!(!shutter_open.load(Ordering::SeqCst));
1565
1566 control.arm().unwrap();
1568 stream.process_control_messages();
1569 assert!(control.is_armed());
1570 assert!(shutter_open.load(Ordering::SeqCst));
1571
1572 control.disarm().unwrap();
1574 stream.process_control_messages();
1575 assert!(!control.is_armed());
1576 assert!(!shutter_open.load(Ordering::SeqCst));
1577
1578 control.arm().unwrap();
1580 stream.process_control_messages();
1581 assert!(control.is_armed());
1582 assert!(shutter_open.load(Ordering::SeqCst));
1583 }
1584
1585 #[test]
1590 fn test_run_buffer_driven_behavior() {
1591 let mut backend = NoQueueTestBackend::new();
1594 backend.inner.connected = true;
1595 let write_count = backend.inner.write_count.clone();
1596
1597 let info = DacInfo {
1598 id: "test".to_string(),
1599 name: "Test Device".to_string(),
1600 kind: DacType::Custom("Test".to_string()),
1601 caps: backend.inner.caps().clone(),
1602 };
1603
1604 let cfg = StreamConfig::new(30000)
1606 .with_target_buffer(Duration::from_millis(10))
1607 .with_min_buffer(Duration::from_millis(5));
1608 let stream = Stream::with_backend(info, Box::new(backend), cfg);
1609
1610 let call_count = Arc::new(AtomicUsize::new(0));
1611 let call_count_clone = call_count.clone();
1612
1613 let result = stream.run(
1614 move |req, buffer| {
1615 let count = call_count_clone.fetch_add(1, Ordering::SeqCst);
1616
1617 if count >= 4 {
1619 ChunkResult::End
1620 } else {
1621 let n = req.target_points.min(buffer.len()).min(100);
1622 for i in 0..n {
1623 buffer[i] = LaserPoint::blanked(0.0, 0.0);
1624 }
1625 ChunkResult::Filled(n)
1626 }
1627 },
1628 |_e| {},
1629 );
1630
1631 assert_eq!(result.unwrap(), RunExit::ProducerEnded);
1632 assert!(
1633 write_count.load(Ordering::SeqCst) >= 4,
1634 "Should have written multiple chunks"
1635 );
1636 }
1637
1638 #[test]
1639 fn test_run_sleeps_when_buffer_healthy() {
1640 use std::time::Instant;
1643
1644 let mut backend = NoQueueTestBackend::new();
1645 backend.inner.connected = true;
1646
1647 let info = DacInfo {
1648 id: "test".to_string(),
1649 name: "Test Device".to_string(),
1650 kind: DacType::Custom("Test".to_string()),
1651 caps: backend.inner.caps().clone(),
1652 };
1653
1654 let cfg = StreamConfig::new(30000)
1656 .with_target_buffer(Duration::from_millis(5))
1657 .with_min_buffer(Duration::from_millis(2))
1658 .with_drain_timeout(Duration::ZERO);
1659 let stream = Stream::with_backend(info, Box::new(backend), cfg);
1660
1661 let call_count = Arc::new(AtomicUsize::new(0));
1662 let call_count_clone = call_count.clone();
1663 let start_time = Instant::now();
1664
1665 let result = stream.run(
1666 move |req, buffer| {
1667 let count = call_count_clone.fetch_add(1, Ordering::SeqCst);
1668
1669 if count >= 2 {
1671 ChunkResult::End
1672 } else {
1673 let n = req.target_points.min(buffer.len());
1675 for i in 0..n {
1676 buffer[i] = LaserPoint::blanked(0.0, 0.0);
1677 }
1678 ChunkResult::Filled(n)
1679 }
1680 },
1681 |_e| {},
1682 );
1683
1684 assert_eq!(result.unwrap(), RunExit::ProducerEnded);
1685
1686 let elapsed = start_time.elapsed();
1688 assert!(
1691 elapsed.as_millis() < 100,
1692 "Elapsed time {:?} is too long for test",
1693 elapsed
1694 );
1695 }
1696
1697 #[test]
1698 fn test_run_stops_on_control_stop() {
1699 use std::thread;
1701
1702 let backend = TestBackend::new();
1703
1704 let mut backend_box: Box<dyn StreamBackend> = Box::new(backend);
1705 backend_box.connect().unwrap();
1706
1707 let info = DacInfo {
1708 id: "test".to_string(),
1709 name: "Test Device".to_string(),
1710 kind: DacType::Custom("Test".to_string()),
1711 caps: backend_box.caps().clone(),
1712 };
1713
1714 let cfg = StreamConfig::new(30000);
1715 let stream = Stream::with_backend(info, backend_box, cfg);
1716 let control = stream.control();
1717
1718 let control_clone = control.clone();
1720 thread::spawn(move || {
1721 thread::sleep(Duration::from_millis(20));
1722 control_clone.stop().unwrap();
1723 });
1724
1725 let result = stream.run(
1726 |req, buffer| {
1727 let n = req.target_points.min(buffer.len()).min(10);
1728 for i in 0..n {
1729 buffer[i] = LaserPoint::blanked(0.0, 0.0);
1730 }
1731 ChunkResult::Filled(n)
1732 },
1733 |_e| {},
1734 );
1735
1736 assert_eq!(result.unwrap(), RunExit::Stopped);
1738 }
1739
1740 #[test]
1741 fn test_run_producer_ended() {
1742 let backend = TestBackend::new();
1744
1745 let mut backend_box: Box<dyn StreamBackend> = Box::new(backend);
1746 backend_box.connect().unwrap();
1747
1748 let info = DacInfo {
1749 id: "test".to_string(),
1750 name: "Test Device".to_string(),
1751 kind: DacType::Custom("Test".to_string()),
1752 caps: backend_box.caps().clone(),
1753 };
1754
1755 let cfg = StreamConfig::new(30000);
1756 let stream = Stream::with_backend(info, backend_box, cfg);
1757
1758 let call_count = Arc::new(AtomicUsize::new(0));
1759 let call_count_clone = call_count.clone();
1760
1761 let result = stream.run(
1762 move |req, buffer| {
1763 let count = call_count_clone.fetch_add(1, Ordering::SeqCst);
1764
1765 if count == 0 {
1766 let n = req.target_points.min(buffer.len()).min(100);
1768 for i in 0..n {
1769 buffer[i] = LaserPoint::blanked(0.0, 0.0);
1770 }
1771 ChunkResult::Filled(n)
1772 } else {
1773 ChunkResult::End
1775 }
1776 },
1777 |_e| {},
1778 );
1779
1780 assert_eq!(result.unwrap(), RunExit::ProducerEnded);
1781 assert_eq!(call_count.load(Ordering::SeqCst), 2);
1782 }
1783
1784 #[test]
1785 fn test_run_starved_applies_underrun_policy() {
1786 let backend = TestBackend::new();
1788 let queued = backend.queued.clone();
1789
1790 let mut backend_box: Box<dyn StreamBackend> = Box::new(backend);
1791 backend_box.connect().unwrap();
1792
1793 let info = DacInfo {
1794 id: "test".to_string(),
1795 name: "Test Device".to_string(),
1796 kind: DacType::Custom("Test".to_string()),
1797 caps: backend_box.caps().clone(),
1798 };
1799
1800 let cfg = StreamConfig::new(30000).with_underrun(UnderrunPolicy::Blank);
1802 let stream = Stream::with_backend(info, backend_box, cfg);
1803
1804 let call_count = Arc::new(AtomicUsize::new(0));
1805 let call_count_clone = call_count.clone();
1806
1807 let result = stream.run(
1808 move |_req, _buffer| {
1809 let count = call_count_clone.fetch_add(1, Ordering::SeqCst);
1810
1811 if count == 0 {
1812 ChunkResult::Starved
1814 } else {
1815 ChunkResult::End
1817 }
1818 },
1819 |_e| {},
1820 );
1821
1822 assert_eq!(result.unwrap(), RunExit::ProducerEnded);
1823
1824 assert!(
1826 queued.load(Ordering::SeqCst) > 0,
1827 "Underrun policy should have written blank points"
1828 );
1829 }
1830
1831 #[test]
1832 fn test_run_filled_zero_with_target_treated_as_starved() {
1833 let backend = TestBackend::new();
1835 let queued = backend.queued.clone();
1836
1837 let mut backend_box: Box<dyn StreamBackend> = Box::new(backend);
1838 backend_box.connect().unwrap();
1839
1840 let info = DacInfo {
1841 id: "test".to_string(),
1842 name: "Test Device".to_string(),
1843 kind: DacType::Custom("Test".to_string()),
1844 caps: backend_box.caps().clone(),
1845 };
1846
1847 let cfg = StreamConfig::new(30000).with_underrun(UnderrunPolicy::Blank);
1848 let stream = Stream::with_backend(info, backend_box, cfg);
1849
1850 let call_count = Arc::new(AtomicUsize::new(0));
1851 let call_count_clone = call_count.clone();
1852
1853 let result = stream.run(
1854 move |_req, _buffer| {
1855 let count = call_count_clone.fetch_add(1, Ordering::SeqCst);
1856
1857 if count == 0 {
1858 ChunkResult::Filled(0)
1860 } else {
1861 ChunkResult::End
1862 }
1863 },
1864 |_e| {},
1865 );
1866
1867 assert_eq!(result.unwrap(), RunExit::ProducerEnded);
1868
1869 assert!(
1871 queued.load(Ordering::SeqCst) > 0,
1872 "Filled(0) with target > 0 should trigger underrun and write blank points"
1873 );
1874 }
1875
1876 #[test]
1881 fn test_estimate_buffer_uses_software_when_no_hardware() {
1882 let mut backend = NoQueueTestBackend::new();
1884 backend.inner.connected = true;
1885
1886 let info = DacInfo {
1887 id: "test".to_string(),
1888 name: "Test Device".to_string(),
1889 kind: DacType::Custom("Test".to_string()),
1890 caps: backend.inner.caps().clone(),
1891 };
1892
1893 let cfg = StreamConfig::new(30000);
1894 let mut stream = Stream::with_backend(info, Box::new(backend), cfg);
1895
1896 stream.state.scheduled_ahead = 500;
1898
1899 let estimate = stream.estimate_buffer_points();
1901 assert_eq!(estimate, 500);
1902 }
1903
1904 #[test]
1905 fn test_estimate_buffer_uses_min_of_hardware_and_software() {
1906 let backend = TestBackend::new().with_initial_queue(300);
1908 let queued = backend.queued.clone();
1909
1910 let mut backend_box: Box<dyn StreamBackend> = Box::new(backend);
1911 backend_box.connect().unwrap();
1912
1913 let info = DacInfo {
1914 id: "test".to_string(),
1915 name: "Test Device".to_string(),
1916 kind: DacType::Custom("Test".to_string()),
1917 caps: backend_box.caps().clone(),
1918 };
1919
1920 let cfg = StreamConfig::new(30000);
1921 let mut stream = Stream::with_backend(info, backend_box, cfg);
1922
1923 stream.state.scheduled_ahead = 500;
1925 let estimate = stream.estimate_buffer_points();
1926 assert_eq!(
1927 estimate, 300,
1928 "Should use hardware (300) when it's less than software (500)"
1929 );
1930
1931 queued.store(800, Ordering::SeqCst);
1933 let estimate = stream.estimate_buffer_points();
1934 assert_eq!(
1935 estimate, 500,
1936 "Should use software (500) when it's less than hardware (800)"
1937 );
1938 }
1939
1940 #[test]
1941 fn test_estimate_buffer_conservative_prevents_underrun() {
1942 let backend = TestBackend::new().with_initial_queue(100);
1945 let queued = backend.queued.clone();
1946
1947 let mut backend_box: Box<dyn StreamBackend> = Box::new(backend);
1948 backend_box.connect().unwrap();
1949
1950 let info = DacInfo {
1951 id: "test".to_string(),
1952 name: "Test Device".to_string(),
1953 kind: DacType::Custom("Test".to_string()),
1954 caps: backend_box.caps().clone(),
1955 };
1956
1957 let cfg = StreamConfig::new(30000);
1958 let mut stream = Stream::with_backend(info, backend_box, cfg);
1959
1960 stream.state.scheduled_ahead = 1000;
1963
1964 let estimate = stream.estimate_buffer_points();
1965
1966 assert_eq!(
1968 estimate, 100,
1969 "Should use conservative estimate (100) not optimistic (1000)"
1970 );
1971
1972 queued.store(2000, Ordering::SeqCst);
1975 stream.state.scheduled_ahead = 500;
1976
1977 let estimate = stream.estimate_buffer_points();
1978 assert_eq!(
1979 estimate, 500,
1980 "Should use conservative estimate (500) not hardware (2000)"
1981 );
1982 }
1983
1984 #[test]
1985 fn test_build_fill_request_uses_conservative_estimation() {
1986 let backend = TestBackend::new().with_initial_queue(200);
1988
1989 let mut backend_box: Box<dyn StreamBackend> = Box::new(backend);
1990 backend_box.connect().unwrap();
1991
1992 let info = DacInfo {
1993 id: "test".to_string(),
1994 name: "Test Device".to_string(),
1995 kind: DacType::Custom("Test".to_string()),
1996 caps: backend_box.caps().clone(),
1997 };
1998
1999 let cfg = StreamConfig::new(30000)
2000 .with_target_buffer(Duration::from_millis(40))
2001 .with_min_buffer(Duration::from_millis(10));
2002 let mut stream = Stream::with_backend(info, backend_box, cfg);
2003
2004 stream.state.scheduled_ahead = 500;
2006
2007 let req = stream.build_fill_request(1000);
2008
2009 assert_eq!(req.buffered_points, 200);
2011 assert_eq!(req.device_queued_points, Some(200));
2012 }
2013
2014 #[test]
2015 fn test_build_fill_request_calculates_min_and_target_points() {
2016 let mut backend = NoQueueTestBackend::new();
2019 backend.inner.connected = true;
2020
2021 let info = DacInfo {
2022 id: "test".to_string(),
2023 name: "Test Device".to_string(),
2024 kind: DacType::Custom("Test".to_string()),
2025 caps: backend.inner.caps().clone(),
2026 };
2027
2028 let cfg = StreamConfig::new(30000)
2032 .with_target_buffer(Duration::from_millis(40))
2033 .with_min_buffer(Duration::from_millis(10));
2034 let mut stream = Stream::with_backend(info, Box::new(backend), cfg);
2035
2036 stream.state.scheduled_ahead = 0;
2038 let req = stream.build_fill_request(1000);
2039
2040 assert_eq!(req.target_points, 1000);
2042 assert_eq!(req.min_points, 300);
2044
2045 stream.state.scheduled_ahead = 500;
2047 let req = stream.build_fill_request(1000);
2048
2049 assert_eq!(req.target_points, 700);
2051 assert_eq!(req.min_points, 0);
2053
2054 stream.state.scheduled_ahead = 1200;
2056 let req = stream.build_fill_request(1000);
2057
2058 assert_eq!(req.target_points, 0);
2060 assert_eq!(req.min_points, 0);
2062 }
2063
2064 #[test]
2065 fn test_build_fill_request_ceiling_rounds_min_points() {
2066 let mut backend = NoQueueTestBackend::new();
2069 backend.inner.connected = true;
2070
2071 let info = DacInfo {
2072 id: "test".to_string(),
2073 name: "Test Device".to_string(),
2074 kind: DacType::Custom("Test".to_string()),
2075 caps: backend.inner.caps().clone(),
2076 };
2077
2078 let cfg = StreamConfig::new(30000)
2080 .with_target_buffer(Duration::from_millis(40))
2081 .with_min_buffer(Duration::from_millis(10));
2082 let mut stream = Stream::with_backend(info, Box::new(backend), cfg);
2083
2084 stream.state.scheduled_ahead = 299;
2086 let req = stream.build_fill_request(1000);
2087
2088 assert!(
2091 req.min_points >= 1,
2092 "min_points should be at least 1 to reach min_buffer"
2093 );
2094 }
2095
2096 #[test]
2101 fn test_fill_result_filled_writes_points_and_updates_state() {
2102 let backend = TestBackend::new();
2104 let queued = backend.queued.clone();
2105
2106 let mut backend_box: Box<dyn StreamBackend> = Box::new(backend);
2107 backend_box.connect().unwrap();
2108
2109 let info = DacInfo {
2110 id: "test".to_string(),
2111 name: "Test Device".to_string(),
2112 kind: DacType::Custom("Test".to_string()),
2113 caps: backend_box.caps().clone(),
2114 };
2115
2116 let cfg = StreamConfig::new(30000);
2117 let stream = Stream::with_backend(info, backend_box, cfg);
2118
2119 let points_written = Arc::new(AtomicUsize::new(0));
2120 let points_written_clone = points_written.clone();
2121 let call_count = Arc::new(AtomicUsize::new(0));
2122 let call_count_clone = call_count.clone();
2123
2124 let result = stream.run(
2125 move |req, buffer| {
2126 let count = call_count_clone.fetch_add(1, Ordering::SeqCst);
2127
2128 if count < 3 {
2129 let n = req.target_points.min(50);
2131 for i in 0..n {
2132 buffer[i] =
2133 LaserPoint::new(0.1 * i as f32, 0.2 * i as f32, 1000, 2000, 3000, 4000);
2134 }
2135 points_written_clone.fetch_add(n, Ordering::SeqCst);
2136 ChunkResult::Filled(n)
2137 } else {
2138 ChunkResult::End
2139 }
2140 },
2141 |_e| {},
2142 );
2143
2144 assert_eq!(result.unwrap(), RunExit::ProducerEnded);
2145
2146 let total_queued = queued.load(Ordering::SeqCst);
2149 let total_written = points_written.load(Ordering::SeqCst);
2150 assert!(
2151 total_queued > 0,
2152 "Points should have been queued to backend"
2153 );
2154 assert!(
2155 total_queued as usize >= total_written,
2156 "Queued points ({}) should be at least written points ({})",
2157 total_queued,
2158 total_written
2159 );
2160 }
2161
2162 #[test]
2163 fn test_fill_result_filled_updates_last_chunk_when_armed() {
2164 let backend = TestBackend::new();
2166
2167 let mut backend_box: Box<dyn StreamBackend> = Box::new(backend);
2168 backend_box.connect().unwrap();
2169
2170 let info = DacInfo {
2171 id: "test".to_string(),
2172 name: "Test Device".to_string(),
2173 kind: DacType::Custom("Test".to_string()),
2174 caps: backend_box.caps().clone(),
2175 };
2176
2177 let cfg = StreamConfig::new(30000).with_underrun(UnderrunPolicy::RepeatLast);
2178 let stream = Stream::with_backend(info, backend_box, cfg);
2179
2180 let control = stream.control();
2182 control.arm().unwrap();
2183
2184 let call_count = Arc::new(AtomicUsize::new(0));
2185 let call_count_clone = call_count.clone();
2186
2187 let result = stream.run(
2188 move |req, buffer| {
2189 let count = call_count_clone.fetch_add(1, Ordering::SeqCst);
2190
2191 if count == 0 {
2192 let n = req.target_points.min(10);
2194 for i in 0..n {
2195 buffer[i] = LaserPoint::new(0.5, 0.5, 10000, 20000, 30000, 40000);
2196 }
2197 ChunkResult::Filled(n)
2198 } else if count == 1 {
2199 ChunkResult::Starved
2201 } else {
2202 ChunkResult::End
2203 }
2204 },
2205 |_e| {},
2206 );
2207
2208 assert_eq!(result.unwrap(), RunExit::ProducerEnded);
2209 }
2212
2213 #[test]
2214 fn test_fill_result_starved_repeat_last_with_stored_chunk() {
2215 let backend = TestBackend::new();
2217 let queued = backend.queued.clone();
2218
2219 let mut backend_box: Box<dyn StreamBackend> = Box::new(backend);
2220 backend_box.connect().unwrap();
2221
2222 let info = DacInfo {
2223 id: "test".to_string(),
2224 name: "Test Device".to_string(),
2225 kind: DacType::Custom("Test".to_string()),
2226 caps: backend_box.caps().clone(),
2227 };
2228
2229 let cfg = StreamConfig::new(30000).with_underrun(UnderrunPolicy::RepeatLast);
2230 let stream = Stream::with_backend(info, backend_box, cfg);
2231
2232 let control = stream.control();
2234 control.arm().unwrap();
2235
2236 let call_count = Arc::new(AtomicUsize::new(0));
2237 let call_count_clone = call_count.clone();
2238
2239 let result = stream.run(
2240 move |req, buffer| {
2241 let count = call_count_clone.fetch_add(1, Ordering::SeqCst);
2242
2243 if count == 0 {
2244 let n = req.target_points.min(50);
2246 for i in 0..n {
2247 buffer[i] = LaserPoint::new(0.3, 0.3, 5000, 5000, 5000, 5000);
2248 }
2249 ChunkResult::Filled(n)
2250 } else if count == 1 {
2251 ChunkResult::Starved
2253 } else {
2254 ChunkResult::End
2255 }
2256 },
2257 |_e| {},
2258 );
2259
2260 assert_eq!(result.unwrap(), RunExit::ProducerEnded);
2261
2262 let total_queued = queued.load(Ordering::SeqCst);
2264 assert!(
2265 total_queued >= 50,
2266 "Should have written initial chunk plus repeated chunk"
2267 );
2268 }
2269
2270 #[test]
2271 fn test_fill_result_starved_repeat_last_without_stored_chunk_falls_back_to_blank() {
2272 let backend = TestBackend::new();
2274 let queued = backend.queued.clone();
2275
2276 let mut backend_box: Box<dyn StreamBackend> = Box::new(backend);
2277 backend_box.connect().unwrap();
2278
2279 let info = DacInfo {
2280 id: "test".to_string(),
2281 name: "Test Device".to_string(),
2282 kind: DacType::Custom("Test".to_string()),
2283 caps: backend_box.caps().clone(),
2284 };
2285
2286 let cfg = StreamConfig::new(30000).with_underrun(UnderrunPolicy::RepeatLast);
2287 let stream = Stream::with_backend(info, backend_box, cfg);
2288
2289 let control = stream.control();
2291 control.arm().unwrap();
2292
2293 let call_count = Arc::new(AtomicUsize::new(0));
2294 let call_count_clone = call_count.clone();
2295
2296 let result = stream.run(
2297 move |_req, _buffer| {
2298 let count = call_count_clone.fetch_add(1, Ordering::SeqCst);
2299
2300 if count == 0 {
2301 ChunkResult::Starved
2303 } else {
2304 ChunkResult::End
2305 }
2306 },
2307 |_e| {},
2308 );
2309
2310 assert_eq!(result.unwrap(), RunExit::ProducerEnded);
2311
2312 let total_queued = queued.load(Ordering::SeqCst);
2314 assert!(
2315 total_queued > 0,
2316 "Should have written blank points as fallback"
2317 );
2318 }
2319
2320 #[test]
2321 fn test_fill_result_starved_with_park_policy() {
2322 let backend = TestBackend::new();
2324 let queued = backend.queued.clone();
2325
2326 let mut backend_box: Box<dyn StreamBackend> = Box::new(backend);
2327 backend_box.connect().unwrap();
2328
2329 let info = DacInfo {
2330 id: "test".to_string(),
2331 name: "Test Device".to_string(),
2332 kind: DacType::Custom("Test".to_string()),
2333 caps: backend_box.caps().clone(),
2334 };
2335
2336 let cfg = StreamConfig::new(30000).with_underrun(UnderrunPolicy::Park { x: 0.5, y: -0.5 });
2338 let stream = Stream::with_backend(info, backend_box, cfg);
2339
2340 let control = stream.control();
2342 control.arm().unwrap();
2343
2344 let call_count = Arc::new(AtomicUsize::new(0));
2345 let call_count_clone = call_count.clone();
2346
2347 let result = stream.run(
2348 move |_req, _buffer| {
2349 let count = call_count_clone.fetch_add(1, Ordering::SeqCst);
2350
2351 if count == 0 {
2352 ChunkResult::Starved
2353 } else {
2354 ChunkResult::End
2355 }
2356 },
2357 |_e| {},
2358 );
2359
2360 assert_eq!(result.unwrap(), RunExit::ProducerEnded);
2361
2362 let total_queued = queued.load(Ordering::SeqCst);
2364 assert!(total_queued > 0, "Should have written parked points");
2365 }
2366
2367 #[test]
2368 fn test_fill_result_starved_with_stop_policy() {
2369 let backend = TestBackend::new();
2371
2372 let mut backend_box: Box<dyn StreamBackend> = Box::new(backend);
2373 backend_box.connect().unwrap();
2374
2375 let info = DacInfo {
2376 id: "test".to_string(),
2377 name: "Test Device".to_string(),
2378 kind: DacType::Custom("Test".to_string()),
2379 caps: backend_box.caps().clone(),
2380 };
2381
2382 let cfg = StreamConfig::new(30000).with_underrun(UnderrunPolicy::Stop);
2383 let stream = Stream::with_backend(info, backend_box, cfg);
2384
2385 let control = stream.control();
2388 control.arm().unwrap();
2389
2390 let result = stream.run(
2391 |_req, _buffer| {
2392 ChunkResult::Starved
2394 },
2395 |_e| {},
2396 );
2397
2398 assert!(result.is_err(), "Stop policy should return an error");
2401 assert!(
2402 result.unwrap_err().is_stopped(),
2403 "Error should be Stopped variant"
2404 );
2405 }
2406
2407 #[test]
2408 fn test_fill_result_end_returns_producer_ended() {
2409 let backend = TestBackend::new();
2411
2412 let mut backend_box: Box<dyn StreamBackend> = Box::new(backend);
2413 backend_box.connect().unwrap();
2414
2415 let info = DacInfo {
2416 id: "test".to_string(),
2417 name: "Test Device".to_string(),
2418 kind: DacType::Custom("Test".to_string()),
2419 caps: backend_box.caps().clone(),
2420 };
2421
2422 let cfg = StreamConfig::new(30000);
2423 let stream = Stream::with_backend(info, backend_box, cfg);
2424
2425 let result = stream.run(
2426 |_req, _buffer| {
2427 ChunkResult::End
2429 },
2430 |_e| {},
2431 );
2432
2433 assert_eq!(result.unwrap(), RunExit::ProducerEnded);
2434 }
2435
2436 #[test]
2437 fn test_fill_result_filled_exceeds_buffer_clamped() {
2438 let backend = TestBackend::new();
2440 let queued = backend.queued.clone();
2441
2442 let mut backend_box: Box<dyn StreamBackend> = Box::new(backend);
2443 backend_box.connect().unwrap();
2444
2445 let info = DacInfo {
2446 id: "test".to_string(),
2447 name: "Test Device".to_string(),
2448 kind: DacType::Custom("Test".to_string()),
2449 caps: backend_box.caps().clone(),
2450 };
2451
2452 let cfg = StreamConfig::new(30000);
2453 let stream = Stream::with_backend(info, backend_box, cfg);
2454
2455 let call_count = Arc::new(AtomicUsize::new(0));
2456 let call_count_clone = call_count.clone();
2457
2458 let result = stream.run(
2459 move |_req, buffer| {
2460 let count = call_count_clone.fetch_add(1, Ordering::SeqCst);
2461
2462 if count == 0 {
2463 for i in 0..buffer.len() {
2465 buffer[i] = LaserPoint::blanked(0.0, 0.0);
2466 }
2467 ChunkResult::Filled(buffer.len() + 1000)
2469 } else {
2470 ChunkResult::End
2471 }
2472 },
2473 |_e| {},
2474 );
2475
2476 assert_eq!(result.unwrap(), RunExit::ProducerEnded);
2477
2478 let total_queued = queued.load(Ordering::SeqCst);
2481 assert!(total_queued > 0, "Should have written some points");
2482 assert!(
2484 total_queued <= 1016,
2485 "Points should be clamped to max_points_per_chunk (+ drain)"
2486 );
2487 }
2488
2489 #[test]
2494 fn test_full_stream_lifecycle_create_arm_stream_stop() {
2495 let backend = TestBackend::new();
2497 let queued = backend.queued.clone();
2498 let shutter_open = backend.shutter_open.clone();
2499
2500 let info = DacInfo {
2501 id: "test".to_string(),
2502 name: "Test Device".to_string(),
2503 kind: DacType::Custom("Test".to_string()),
2504 caps: backend.caps().clone(),
2505 };
2506
2507 let device = Dac::new(info, Box::new(backend));
2509 assert!(!device.is_connected());
2510
2511 let cfg = StreamConfig::new(30000);
2512 let (stream, returned_info) = device.start_stream(cfg).unwrap();
2513 assert_eq!(returned_info.id, "test");
2514
2515 let control = stream.control();
2517 assert!(!control.is_armed());
2518 assert!(!shutter_open.load(Ordering::SeqCst));
2519
2520 control.arm().unwrap();
2522 assert!(control.is_armed());
2523
2524 let call_count = Arc::new(AtomicUsize::new(0));
2525 let call_count_clone = call_count.clone();
2526
2527 let result = stream.run(
2529 move |req, buffer| {
2530 let count = call_count_clone.fetch_add(1, Ordering::SeqCst);
2531
2532 if count < 5 {
2533 let n = req.target_points.min(buffer.len()).min(100);
2535 for i in 0..n {
2536 let t = i as f32 / 100.0;
2537 buffer[i] = LaserPoint::new(t, t, 10000, 20000, 30000, 40000);
2538 }
2539 ChunkResult::Filled(n)
2540 } else {
2541 ChunkResult::End
2542 }
2543 },
2544 |_e| {},
2545 );
2546
2547 assert_eq!(result.unwrap(), RunExit::ProducerEnded);
2549 assert!(
2550 queued.load(Ordering::SeqCst) > 0,
2551 "Should have written points"
2552 );
2553 assert!(
2554 call_count.load(Ordering::SeqCst) >= 5,
2555 "Should have called producer multiple times"
2556 );
2557 }
2558
2559 #[test]
2560 fn test_full_stream_lifecycle_with_underrun_recovery() {
2561 let backend = TestBackend::new();
2563 let queued = backend.queued.clone();
2564
2565 let mut backend_box: Box<dyn StreamBackend> = Box::new(backend);
2566 backend_box.connect().unwrap();
2567
2568 let info = DacInfo {
2569 id: "test".to_string(),
2570 name: "Test Device".to_string(),
2571 kind: DacType::Custom("Test".to_string()),
2572 caps: backend_box.caps().clone(),
2573 };
2574
2575 let cfg = StreamConfig::new(30000).with_underrun(UnderrunPolicy::RepeatLast);
2576 let stream = Stream::with_backend(info, backend_box, cfg);
2577
2578 let control = stream.control();
2580 control.arm().unwrap();
2581
2582 let call_count = Arc::new(AtomicUsize::new(0));
2583 let call_count_clone = call_count.clone();
2584
2585 let result = stream.run(
2586 move |req, buffer| {
2587 let count = call_count_clone.fetch_add(1, Ordering::SeqCst);
2588
2589 match count {
2590 0 => {
2591 let n = req.target_points.min(buffer.len()).min(50);
2593 for i in 0..n {
2594 buffer[i] = LaserPoint::new(0.5, 0.5, 30000, 30000, 30000, 30000);
2595 }
2596 ChunkResult::Filled(n)
2597 }
2598 1 => {
2599 ChunkResult::Starved
2601 }
2602 2 => {
2603 let n = req.target_points.min(buffer.len()).min(50);
2605 for i in 0..n {
2606 buffer[i] = LaserPoint::new(-0.5, -0.5, 20000, 20000, 20000, 20000);
2607 }
2608 ChunkResult::Filled(n)
2609 }
2610 _ => ChunkResult::End,
2611 }
2612 },
2613 |_e| {},
2614 );
2615
2616 assert_eq!(result.unwrap(), RunExit::ProducerEnded);
2617 let total = queued.load(Ordering::SeqCst);
2619 assert!(
2620 total >= 100,
2621 "Should have written multiple chunks including underrun recovery"
2622 );
2623 }
2624
2625 #[test]
2626 fn test_full_stream_lifecycle_external_stop() {
2627 use std::thread;
2629
2630 let backend = TestBackend::new();
2631
2632 let mut backend_box: Box<dyn StreamBackend> = Box::new(backend);
2633 backend_box.connect().unwrap();
2634
2635 let info = DacInfo {
2636 id: "test".to_string(),
2637 name: "Test Device".to_string(),
2638 kind: DacType::Custom("Test".to_string()),
2639 caps: backend_box.caps().clone(),
2640 };
2641
2642 let cfg = StreamConfig::new(30000);
2643 let stream = Stream::with_backend(info, backend_box, cfg);
2644
2645 let control = stream.control();
2646 let control_clone = control.clone();
2647
2648 thread::spawn(move || {
2650 thread::sleep(Duration::from_millis(30));
2651 control_clone.stop().unwrap();
2652 });
2653
2654 let result = stream.run(
2655 |req, buffer| {
2656 let n = req.target_points.min(buffer.len()).min(10);
2658 for i in 0..n {
2659 buffer[i] = LaserPoint::blanked(0.0, 0.0);
2660 }
2661 ChunkResult::Filled(n)
2662 },
2663 |_e| {},
2664 );
2665
2666 assert_eq!(result.unwrap(), RunExit::Stopped);
2667 }
2668
2669 #[test]
2670 fn test_full_stream_lifecycle_into_dac_recovery() {
2671 let backend = TestBackend::new();
2673
2674 let info = DacInfo {
2675 id: "test".to_string(),
2676 name: "Test Device".to_string(),
2677 kind: DacType::Custom("Test".to_string()),
2678 caps: backend.caps().clone(),
2679 };
2680
2681 let device = Dac::new(info, Box::new(backend));
2683 let cfg = StreamConfig::new(30000);
2684 let (stream, _) = device.start_stream(cfg).unwrap();
2685
2686 let call_count = Arc::new(AtomicUsize::new(0));
2687 let call_count_clone = call_count.clone();
2688
2689 let result = stream.run(
2690 move |req, buffer| {
2691 let count = call_count_clone.fetch_add(1, Ordering::SeqCst);
2692 if count < 2 {
2693 let n = req.target_points.min(buffer.len()).min(50);
2694 for i in 0..n {
2695 buffer[i] = LaserPoint::blanked(0.0, 0.0);
2696 }
2697 ChunkResult::Filled(n)
2698 } else {
2699 ChunkResult::End
2700 }
2701 },
2702 |_e| {},
2703 );
2704
2705 assert_eq!(result.unwrap(), RunExit::ProducerEnded);
2706
2707 }
2711
2712 #[test]
2713 fn test_stream_stats_tracking() {
2714 let backend = TestBackend::new();
2716
2717 let mut backend_box: Box<dyn StreamBackend> = Box::new(backend);
2718 backend_box.connect().unwrap();
2719
2720 let info = DacInfo {
2721 id: "test".to_string(),
2722 name: "Test Device".to_string(),
2723 kind: DacType::Custom("Test".to_string()),
2724 caps: backend_box.caps().clone(),
2725 };
2726
2727 let cfg = StreamConfig::new(30000);
2728 let stream = Stream::with_backend(info, backend_box, cfg);
2729
2730 let control = stream.control();
2732 control.arm().unwrap();
2733
2734 let call_count = Arc::new(AtomicUsize::new(0));
2735 let call_count_clone = call_count.clone();
2736 let points_per_call = 50;
2737
2738 let result = stream.run(
2739 move |req, buffer| {
2740 let count = call_count_clone.fetch_add(1, Ordering::SeqCst);
2741 if count < 3 {
2742 let n = req.target_points.min(buffer.len()).min(points_per_call);
2743 for i in 0..n {
2744 buffer[i] = LaserPoint::blanked(0.0, 0.0);
2745 }
2746 ChunkResult::Filled(n)
2747 } else {
2748 ChunkResult::End
2749 }
2750 },
2751 |_e| {},
2752 );
2753
2754 assert_eq!(result.unwrap(), RunExit::ProducerEnded);
2755 }
2758
2759 #[test]
2760 fn test_stream_disarm_during_streaming() {
2761 use std::thread;
2763
2764 let backend = TestBackend::new();
2765 let shutter_open = backend.shutter_open.clone();
2766
2767 let mut backend_box: Box<dyn StreamBackend> = Box::new(backend);
2768 backend_box.connect().unwrap();
2769
2770 let info = DacInfo {
2771 id: "test".to_string(),
2772 name: "Test Device".to_string(),
2773 kind: DacType::Custom("Test".to_string()),
2774 caps: backend_box.caps().clone(),
2775 };
2776
2777 let cfg = StreamConfig::new(30000);
2778 let stream = Stream::with_backend(info, backend_box, cfg);
2779
2780 let control = stream.control();
2781 let control_clone = control.clone();
2782
2783 control.arm().unwrap();
2785 assert!(control.is_armed());
2786
2787 thread::spawn(move || {
2789 thread::sleep(Duration::from_millis(15));
2790 control_clone.disarm().unwrap();
2791 thread::sleep(Duration::from_millis(15));
2792 control_clone.stop().unwrap();
2793 });
2794
2795 let result = stream.run(
2796 |req, buffer| {
2797 let n = req.target_points.min(buffer.len()).min(10);
2798 for i in 0..n {
2799 buffer[i] = LaserPoint::new(0.1, 0.1, 50000, 50000, 50000, 50000);
2800 }
2801 ChunkResult::Filled(n)
2802 },
2803 |_e| {},
2804 );
2805
2806 assert_eq!(result.unwrap(), RunExit::Stopped);
2807 assert!(!shutter_open.load(Ordering::SeqCst));
2809 }
2810
2811 #[test]
2812 fn test_stream_with_mock_backend_disconnect() {
2813 use std::sync::atomic::AtomicBool;
2815
2816 struct DisconnectingBackend {
2817 inner: TestBackend,
2818 disconnect_after: Arc<AtomicUsize>,
2819 call_count: Arc<AtomicUsize>,
2820 }
2821
2822 impl StreamBackend for DisconnectingBackend {
2823 fn dac_type(&self) -> DacType {
2824 self.inner.dac_type()
2825 }
2826
2827 fn caps(&self) -> &DacCapabilities {
2828 self.inner.caps()
2829 }
2830
2831 fn connect(&mut self) -> Result<()> {
2832 self.inner.connect()
2833 }
2834
2835 fn disconnect(&mut self) -> Result<()> {
2836 self.inner.disconnect()
2837 }
2838
2839 fn is_connected(&self) -> bool {
2840 let count = self.call_count.load(Ordering::SeqCst);
2841 let disconnect_after = self.disconnect_after.load(Ordering::SeqCst);
2842 if count >= disconnect_after {
2843 return false;
2844 }
2845 self.inner.is_connected()
2846 }
2847
2848 fn try_write_chunk(&mut self, pps: u32, points: &[LaserPoint]) -> Result<WriteOutcome> {
2849 self.call_count.fetch_add(1, Ordering::SeqCst);
2850 self.inner.try_write_chunk(pps, points)
2851 }
2852
2853 fn stop(&mut self) -> Result<()> {
2854 self.inner.stop()
2855 }
2856
2857 fn set_shutter(&mut self, open: bool) -> Result<()> {
2858 self.inner.set_shutter(open)
2859 }
2860
2861 fn queued_points(&self) -> Option<u64> {
2862 self.inner.queued_points()
2863 }
2864 }
2865
2866 let mut backend = DisconnectingBackend {
2867 inner: TestBackend::new(),
2868 disconnect_after: Arc::new(AtomicUsize::new(3)),
2869 call_count: Arc::new(AtomicUsize::new(0)),
2870 };
2871 backend.inner.connected = true;
2872
2873 let info = DacInfo {
2874 id: "test".to_string(),
2875 name: "Test Device".to_string(),
2876 kind: DacType::Custom("Test".to_string()),
2877 caps: backend.inner.caps().clone(),
2878 };
2879
2880 let cfg = StreamConfig::new(30000);
2881 let stream = Stream::with_backend(info, Box::new(backend), cfg);
2882
2883 let error_occurred = Arc::new(AtomicBool::new(false));
2884 let error_occurred_clone = error_occurred.clone();
2885
2886 let result = stream.run(
2887 |req, buffer| {
2888 let n = req.target_points.min(buffer.len()).min(10);
2889 for i in 0..n {
2890 buffer[i] = LaserPoint::blanked(0.0, 0.0);
2891 }
2892 ChunkResult::Filled(n)
2893 },
2894 move |_e| {
2895 error_occurred_clone.store(true, Ordering::SeqCst);
2896 },
2897 );
2898
2899 assert_eq!(result.unwrap(), RunExit::Disconnected);
2901 }
2902
2903 struct FailingWriteBackend {
2919 inner: TestBackend,
2920 fail_after: usize,
2921 write_count: Arc<AtomicUsize>,
2922 disconnect_called: Arc<AtomicBool>,
2923 }
2924
2925 impl FailingWriteBackend {
2926 fn new(fail_after: usize) -> Self {
2927 Self {
2928 inner: TestBackend::new(),
2929 fail_after,
2930 write_count: Arc::new(AtomicUsize::new(0)),
2931 disconnect_called: Arc::new(AtomicBool::new(false)),
2932 }
2933 }
2934 }
2935
2936 impl StreamBackend for FailingWriteBackend {
2937 fn dac_type(&self) -> DacType {
2938 DacType::Custom("FailingTest".to_string())
2939 }
2940
2941 fn caps(&self) -> &DacCapabilities {
2942 self.inner.caps()
2943 }
2944
2945 fn connect(&mut self) -> Result<()> {
2946 self.inner.connect()
2947 }
2948
2949 fn disconnect(&mut self) -> Result<()> {
2950 self.disconnect_called.store(true, Ordering::SeqCst);
2951 self.inner.disconnect()
2952 }
2953
2954 fn is_connected(&self) -> bool {
2955 self.inner.is_connected()
2956 }
2957
2958 fn try_write_chunk(&mut self, pps: u32, points: &[LaserPoint]) -> Result<WriteOutcome> {
2959 let count = self.write_count.fetch_add(1, Ordering::SeqCst);
2960 if count >= self.fail_after {
2961 Err(Error::backend(std::io::Error::new(
2964 std::io::ErrorKind::BrokenPipe,
2965 "simulated write failure",
2966 )))
2967 } else {
2968 self.inner.try_write_chunk(pps, points)
2969 }
2970 }
2971
2972 fn stop(&mut self) -> Result<()> {
2973 self.inner.stop()
2974 }
2975
2976 fn set_shutter(&mut self, open: bool) -> Result<()> {
2977 self.inner.set_shutter(open)
2978 }
2979
2980 fn queued_points(&self) -> Option<u64> {
2981 self.inner.queued_points()
2982 }
2983 }
2984
2985 #[test]
2986 fn test_backend_write_error_exits_with_disconnected() {
2987 use std::thread;
2993
2994 let mut backend = FailingWriteBackend::new(2); backend.inner.connected = true;
2996 let disconnect_called = backend.disconnect_called.clone();
2997
2998 let info = DacInfo {
2999 id: "test".to_string(),
3000 name: "Test Device".to_string(),
3001 kind: DacType::Custom("FailingTest".to_string()),
3002 caps: backend.inner.caps().clone(),
3003 };
3004
3005 let cfg = StreamConfig::new(30000);
3006 let stream = Stream::with_backend(info, Box::new(backend), cfg);
3007
3008 let handle = thread::spawn(move || {
3011 stream.run(
3012 |req, buffer| {
3013 let n = req.target_points.min(buffer.len()).min(10);
3014 for i in 0..n {
3015 buffer[i] = LaserPoint::blanked(0.0, 0.0);
3016 }
3017 ChunkResult::Filled(n)
3018 },
3019 |_err| {},
3020 )
3021 });
3022
3023 let result = handle.join().expect("stream thread panicked");
3025
3026 assert_eq!(
3028 result.unwrap(),
3029 RunExit::Disconnected,
3030 "Write error should cause stream to exit with Disconnected"
3031 );
3032 assert!(
3033 disconnect_called.load(Ordering::SeqCst),
3034 "backend.disconnect() should have been called after write error"
3035 );
3036 }
3037
3038 #[test]
3039 fn test_backend_write_error_fires_on_error() {
3040 let mut backend = FailingWriteBackend::new(1); backend.inner.connected = true;
3043
3044 let info = DacInfo {
3045 id: "test".to_string(),
3046 name: "Test Device".to_string(),
3047 kind: DacType::Custom("FailingTest".to_string()),
3048 caps: backend.inner.caps().clone(),
3049 };
3050
3051 let cfg = StreamConfig::new(30000);
3052 let stream = Stream::with_backend(info, Box::new(backend), cfg);
3053
3054 let error_count = Arc::new(AtomicUsize::new(0));
3055 let got_backend_error = Arc::new(AtomicBool::new(false));
3056 let error_count_clone = error_count.clone();
3057 let got_backend_error_clone = got_backend_error.clone();
3058
3059 let result = stream.run(
3060 |req, buffer| {
3061 let n = req.target_points.min(buffer.len()).min(10);
3062 for i in 0..n {
3063 buffer[i] = LaserPoint::blanked(0.0, 0.0);
3064 }
3065 ChunkResult::Filled(n)
3066 },
3067 move |err| {
3068 error_count_clone.fetch_add(1, Ordering::SeqCst);
3069 if matches!(err, Error::Backend(_)) {
3070 got_backend_error_clone.store(true, Ordering::SeqCst);
3071 }
3072 },
3073 );
3074
3075 assert_eq!(result.unwrap(), RunExit::Disconnected);
3076 assert!(
3077 got_backend_error.load(Ordering::SeqCst),
3078 "on_error should have received the Backend error"
3079 );
3080 }
3081
3082 #[test]
3083 fn test_backend_write_error_immediate_fail() {
3084 let mut backend = FailingWriteBackend::new(0); backend.inner.connected = true;
3087
3088 let info = DacInfo {
3089 id: "test".to_string(),
3090 name: "Test Device".to_string(),
3091 kind: DacType::Custom("FailingTest".to_string()),
3092 caps: backend.inner.caps().clone(),
3093 };
3094
3095 let cfg = StreamConfig::new(30000);
3096 let stream = Stream::with_backend(info, Box::new(backend), cfg);
3097
3098 let result = stream.run(
3099 |req, buffer| {
3100 let n = req.target_points.min(buffer.len()).min(10);
3101 for i in 0..n {
3102 buffer[i] = LaserPoint::blanked(0.0, 0.0);
3103 }
3104 ChunkResult::Filled(n)
3105 },
3106 |_err| {},
3107 );
3108
3109 assert_eq!(
3110 result.unwrap(),
3111 RunExit::Disconnected,
3112 "Immediate write failure should exit with Disconnected"
3113 );
3114 }
3115
3116 struct HeliosLikeBackend {
3133 inner: TestBackend,
3134 fail_after: usize,
3135 write_count: Arc<AtomicUsize>,
3136 disconnect_called: Arc<AtomicBool>,
3137 error_received: Arc<Mutex<Option<String>>>,
3138 }
3139
3140 impl HeliosLikeBackend {
3141 fn new(fail_after: usize) -> Self {
3142 Self {
3143 inner: TestBackend::new(),
3144 fail_after,
3145 write_count: Arc::new(AtomicUsize::new(0)),
3146 disconnect_called: Arc::new(AtomicBool::new(false)),
3147 error_received: Arc::new(Mutex::new(None)),
3148 }
3149 }
3150 }
3151
3152 impl StreamBackend for HeliosLikeBackend {
3153 fn dac_type(&self) -> DacType {
3154 DacType::Custom("HeliosLikeTest".to_string())
3155 }
3156
3157 fn caps(&self) -> &DacCapabilities {
3158 self.inner.caps()
3159 }
3160
3161 fn connect(&mut self) -> Result<()> {
3162 self.inner.connect()
3163 }
3164
3165 fn disconnect(&mut self) -> Result<()> {
3166 self.disconnect_called.store(true, Ordering::SeqCst);
3167 self.inner.disconnect()
3168 }
3169
3170 fn is_connected(&self) -> bool {
3171 self.inner.is_connected()
3172 }
3173
3174 fn try_write_chunk(&mut self, pps: u32, points: &[LaserPoint]) -> Result<WriteOutcome> {
3175 let count = self.write_count.fetch_add(1, Ordering::SeqCst);
3176 if count >= self.fail_after {
3177 Err(Error::backend(std::io::Error::new(
3181 std::io::ErrorKind::TimedOut,
3182 "usb connection error: Operation timed out",
3183 )))
3184 } else {
3185 self.inner.try_write_chunk(pps, points)
3186 }
3187 }
3188
3189 fn stop(&mut self) -> Result<()> {
3190 self.inner.stop()
3191 }
3192
3193 fn set_shutter(&mut self, open: bool) -> Result<()> {
3194 self.inner.set_shutter(open)
3195 }
3196
3197 fn queued_points(&self) -> Option<u64> {
3198 None
3200 }
3201 }
3202
3203 #[test]
3204 fn test_helios_status_timeout_exits_with_disconnected() {
3205 use std::thread;
3213
3214 let mut backend = HeliosLikeBackend::new(3);
3215 backend.inner.connected = true;
3216 let disconnect_called = backend.disconnect_called.clone();
3217
3218 let info = DacInfo {
3219 id: "test-helios".to_string(),
3220 name: "Test Helios".to_string(),
3221 kind: DacType::Custom("HeliosLikeTest".to_string()),
3222 caps: backend.inner.caps().clone(),
3223 };
3224
3225 let cfg = StreamConfig::new(30000);
3226 let stream = Stream::with_backend(info, Box::new(backend), cfg);
3227
3228 let handle = thread::spawn(move || {
3229 stream.run(
3230 |req, buffer| {
3231 let n = req.target_points.min(buffer.len()).min(10);
3232 for i in 0..n {
3233 buffer[i] = LaserPoint::blanked(0.0, 0.0);
3234 }
3235 ChunkResult::Filled(n)
3236 },
3237 |_err| {},
3238 )
3239 });
3240
3241 let result = handle.join().expect("stream thread panicked");
3242
3243 assert_eq!(
3244 result.unwrap(),
3245 RunExit::Disconnected,
3246 "Helios status timeout should cause stream to exit with Disconnected"
3247 );
3248 assert!(
3249 disconnect_called.load(Ordering::SeqCst),
3250 "backend.disconnect() should have been called on status timeout"
3251 );
3252 }
3253
3254 #[test]
3255 fn test_helios_status_timeout_fires_on_error_with_backend_variant() {
3256 let mut backend = HeliosLikeBackend::new(1);
3259 backend.inner.connected = true;
3260 let error_received = backend.error_received.clone();
3261
3262 let info = DacInfo {
3263 id: "test-helios".to_string(),
3264 name: "Test Helios".to_string(),
3265 kind: DacType::Custom("HeliosLikeTest".to_string()),
3266 caps: backend.inner.caps().clone(),
3267 };
3268
3269 let cfg = StreamConfig::new(30000);
3270 let stream = Stream::with_backend(info, Box::new(backend), cfg);
3271
3272 let got_backend_error = Arc::new(AtomicBool::new(false));
3273 let got_backend_error_clone = got_backend_error.clone();
3274 let error_received_clone = error_received.clone();
3275
3276 let result = stream.run(
3277 |req, buffer| {
3278 let n = req.target_points.min(buffer.len()).min(10);
3279 for i in 0..n {
3280 buffer[i] = LaserPoint::blanked(0.0, 0.0);
3281 }
3282 ChunkResult::Filled(n)
3283 },
3284 move |err| {
3285 if matches!(err, Error::Backend(_)) {
3286 got_backend_error_clone.store(true, Ordering::SeqCst);
3287 *error_received_clone.lock().unwrap() = Some(err.to_string());
3288 }
3289 },
3290 );
3291
3292 assert_eq!(result.unwrap(), RunExit::Disconnected);
3293 assert!(
3294 got_backend_error.load(Ordering::SeqCst),
3295 "on_error should receive Error::Backend for Helios timeout"
3296 );
3297 let msg = error_received.lock().unwrap();
3298 assert!(
3299 msg.as_ref().unwrap().contains("Operation timed out"),
3300 "Error message should mention timeout, got: {:?}",
3301 msg
3302 );
3303 }
3304
3305 #[test]
3306 fn test_helios_immediate_status_timeout() {
3307 let mut backend = HeliosLikeBackend::new(0);
3310 backend.inner.connected = true;
3311 let disconnect_called = backend.disconnect_called.clone();
3312
3313 let info = DacInfo {
3314 id: "test-helios".to_string(),
3315 name: "Test Helios".to_string(),
3316 kind: DacType::Custom("HeliosLikeTest".to_string()),
3317 caps: backend.inner.caps().clone(),
3318 };
3319
3320 let cfg = StreamConfig::new(30000);
3321 let stream = Stream::with_backend(info, Box::new(backend), cfg);
3322
3323 let result = stream.run(
3324 |req, buffer| {
3325 let n = req.target_points.min(buffer.len()).min(10);
3326 for i in 0..n {
3327 buffer[i] = LaserPoint::blanked(0.0, 0.0);
3328 }
3329 ChunkResult::Filled(n)
3330 },
3331 |_err| {},
3332 );
3333
3334 assert_eq!(
3335 result.unwrap(),
3336 RunExit::Disconnected,
3337 "Immediate status timeout should exit with Disconnected"
3338 );
3339 assert!(
3340 disconnect_called.load(Ordering::SeqCst),
3341 "backend.disconnect() should be called even on first-write failure"
3342 );
3343 }
3344
3345 #[test]
3350 fn test_fill_result_end_drains_with_queue_depth() {
3351 use std::time::Instant;
3353
3354 let backend = TestBackend::new().with_initial_queue(1000);
3355 let queued = backend.queued.clone();
3356
3357 let mut backend_box: Box<dyn StreamBackend> = Box::new(backend);
3358 backend_box.connect().unwrap();
3359
3360 let info = DacInfo {
3361 id: "test".to_string(),
3362 name: "Test Device".to_string(),
3363 kind: DacType::Custom("Test".to_string()),
3364 caps: backend_box.caps().clone(),
3365 };
3366
3367 let cfg = StreamConfig::new(30000).with_drain_timeout(Duration::from_millis(100));
3369 let stream = Stream::with_backend(info, backend_box, cfg);
3370
3371 queued.store(0, Ordering::SeqCst);
3373
3374 let start = Instant::now();
3375 let result = stream.run(|_req, _buffer| ChunkResult::End, |_e| {});
3376
3377 let elapsed = start.elapsed();
3378
3379 assert_eq!(result.unwrap(), RunExit::ProducerEnded);
3380 assert!(
3382 elapsed.as_millis() < 50,
3383 "Should return quickly when queue is empty, took {:?}",
3384 elapsed
3385 );
3386 }
3387
3388 #[test]
3389 fn test_fill_result_end_respects_drain_timeout() {
3390 use std::time::Instant;
3392
3393 let backend = TestBackend::new().with_initial_queue(100000); let mut backend_box: Box<dyn StreamBackend> = Box::new(backend);
3396 backend_box.connect().unwrap();
3397
3398 let info = DacInfo {
3399 id: "test".to_string(),
3400 name: "Test Device".to_string(),
3401 kind: DacType::Custom("Test".to_string()),
3402 caps: backend_box.caps().clone(),
3403 };
3404
3405 let cfg = StreamConfig::new(30000).with_drain_timeout(Duration::from_millis(50));
3407 let stream = Stream::with_backend(info, backend_box, cfg);
3408
3409 let start = Instant::now();
3410 let result = stream.run(|_req, _buffer| ChunkResult::End, |_e| {});
3411
3412 let elapsed = start.elapsed();
3413
3414 assert_eq!(result.unwrap(), RunExit::ProducerEnded);
3415 assert!(
3417 elapsed.as_millis() >= 40 && elapsed.as_millis() < 150,
3418 "Should respect drain timeout (~50ms), took {:?}",
3419 elapsed
3420 );
3421 }
3422
3423 #[test]
3424 fn test_fill_result_end_skips_drain_with_zero_timeout() {
3425 use std::time::Instant;
3427
3428 let backend = TestBackend::new().with_initial_queue(100000);
3429
3430 let mut backend_box: Box<dyn StreamBackend> = Box::new(backend);
3431 backend_box.connect().unwrap();
3432
3433 let info = DacInfo {
3434 id: "test".to_string(),
3435 name: "Test Device".to_string(),
3436 kind: DacType::Custom("Test".to_string()),
3437 caps: backend_box.caps().clone(),
3438 };
3439
3440 let cfg = StreamConfig::new(30000).with_drain_timeout(Duration::ZERO);
3442 let stream = Stream::with_backend(info, backend_box, cfg);
3443
3444 let start = Instant::now();
3445 let result = stream.run(|_req, _buffer| ChunkResult::End, |_e| {});
3446
3447 let elapsed = start.elapsed();
3448
3449 assert_eq!(result.unwrap(), RunExit::ProducerEnded);
3450 assert!(
3452 elapsed.as_millis() < 20,
3453 "Should skip drain with zero timeout, took {:?}",
3454 elapsed
3455 );
3456 }
3457
3458 #[test]
3459 fn test_fill_result_end_drains_without_queue_depth() {
3460 use std::time::Instant;
3462
3463 let mut backend = NoQueueTestBackend::new();
3464 backend.inner.connected = true;
3465
3466 let info = DacInfo {
3467 id: "test".to_string(),
3468 name: "Test Device".to_string(),
3469 kind: DacType::Custom("Test".to_string()),
3470 caps: backend.inner.caps().clone(),
3471 };
3472
3473 let cfg = StreamConfig::new(30000).with_drain_timeout(Duration::from_millis(100));
3475 let stream = Stream::with_backend(info, Box::new(backend), cfg);
3476
3477 let start = Instant::now();
3478 let result = stream.run(|_req, _buffer| ChunkResult::End, |_e| {});
3479
3480 let elapsed = start.elapsed();
3481
3482 assert_eq!(result.unwrap(), RunExit::ProducerEnded);
3483 assert!(
3486 elapsed.as_millis() < 50,
3487 "Should return quickly with empty buffer estimate, took {:?}",
3488 elapsed
3489 );
3490 }
3491
3492 #[test]
3493 fn test_fill_result_end_closes_shutter() {
3494 let backend = TestBackend::new();
3496 let shutter_open = backend.shutter_open.clone();
3497
3498 let mut backend_box: Box<dyn StreamBackend> = Box::new(backend);
3499 backend_box.connect().unwrap();
3500
3501 let info = DacInfo {
3502 id: "test".to_string(),
3503 name: "Test Device".to_string(),
3504 kind: DacType::Custom("Test".to_string()),
3505 caps: backend_box.caps().clone(),
3506 };
3507
3508 let cfg = StreamConfig::new(30000).with_drain_timeout(Duration::from_millis(10));
3509 let stream = Stream::with_backend(info, backend_box, cfg);
3510
3511 let control = stream.control();
3513 control.arm().unwrap();
3514
3515 let result = stream.run(
3516 |req, buffer| {
3517 let n = req.target_points.min(buffer.len()).min(10);
3519 for i in 0..n {
3520 buffer[i] = LaserPoint::blanked(0.0, 0.0);
3521 }
3522 ChunkResult::End
3523 },
3524 |_e| {},
3525 );
3526
3527 assert_eq!(result.unwrap(), RunExit::ProducerEnded);
3528 assert!(
3530 !shutter_open.load(Ordering::SeqCst),
3531 "Shutter should be closed after drain"
3532 );
3533 }
3534
3535 #[test]
3540 fn test_color_delay_zero_is_passthrough() {
3541 let mut backend = TestBackend::new();
3543 backend.connected = true;
3544
3545 let info = DacInfo {
3546 id: "test".to_string(),
3547 name: "Test".to_string(),
3548 kind: DacType::Custom("Test".to_string()),
3549 caps: backend.caps().clone(),
3550 };
3551
3552 let cfg = StreamConfig::new(30000); let mut stream = Stream::with_backend(info, Box::new(backend), cfg);
3554
3555 stream.control.arm().unwrap();
3557 stream.process_control_messages();
3558 stream.state.last_armed = true;
3559
3560 let n = 5;
3562 for i in 0..n {
3563 stream.state.chunk_buffer[i] =
3564 LaserPoint::new(0.0, 0.0, (i as u16 + 1) * 1000, 0, 0, 65535);
3565 }
3566
3567 let mut on_error = |_: Error| {};
3569 stream.write_fill_points(n, &mut on_error).unwrap();
3570
3571 assert!(stream.state.color_delay_line.is_empty());
3573 }
3574
3575 #[test]
3576 fn test_color_delay_shifts_colors() {
3577 let mut backend = TestBackend::new();
3579 backend.connected = true;
3580
3581 let info = DacInfo {
3582 id: "test".to_string(),
3583 name: "Test".to_string(),
3584 kind: DacType::Custom("Test".to_string()),
3585 caps: backend.caps().clone(),
3586 };
3587
3588 let cfg = StreamConfig::new(10000).with_color_delay(Duration::from_micros(300));
3590 let mut stream = Stream::with_backend(info, Box::new(backend), cfg);
3591
3592 stream.control.arm().unwrap();
3594 stream.process_control_messages();
3595 stream.state.last_armed = true;
3597
3598 stream.state.color_delay_line.clear();
3600 for _ in 0..3 {
3601 stream.state.color_delay_line.push_back((0, 0, 0, 0));
3602 }
3603
3604 let n = 5;
3606 for i in 0..n {
3607 stream.state.chunk_buffer[i] = LaserPoint::new(
3608 i as f32 * 0.1,
3609 0.0,
3610 (i as u16 + 1) * 10000,
3611 (i as u16 + 1) * 5000,
3612 (i as u16 + 1) * 2000,
3613 65535,
3614 );
3615 }
3616
3617 let mut on_error = |_: Error| {};
3618 stream.write_fill_points(n, &mut on_error).unwrap();
3619
3620 assert_eq!(stream.state.color_delay_line.len(), 3);
3626
3627 let expected: Vec<(u16, u16, u16, u16)> = (3..=5)
3629 .map(|i| (i * 10000u16, i * 5000, i * 2000, 65535))
3630 .collect();
3631 let actual: Vec<(u16, u16, u16, u16)> =
3632 stream.state.color_delay_line.iter().copied().collect();
3633 assert_eq!(actual, expected);
3634 }
3635
3636 #[test]
3637 fn test_color_delay_resets_on_disarm_arm() {
3638 let mut backend = TestBackend::new();
3640 backend.connected = true;
3641
3642 let info = DacInfo {
3643 id: "test".to_string(),
3644 name: "Test".to_string(),
3645 kind: DacType::Custom("Test".to_string()),
3646 caps: backend.caps().clone(),
3647 };
3648
3649 let cfg = StreamConfig::new(10000).with_color_delay(Duration::from_micros(200));
3651 let mut stream = Stream::with_backend(info, Box::new(backend), cfg);
3652
3653 stream.handle_shutter_transition(true);
3655 assert_eq!(stream.state.color_delay_line.len(), 2);
3656 assert_eq!(stream.state.color_delay_line.front(), Some(&(0, 0, 0, 0)));
3657
3658 stream.handle_shutter_transition(false);
3660 assert!(stream.state.color_delay_line.is_empty());
3661
3662 stream.handle_shutter_transition(true);
3664 assert_eq!(stream.state.color_delay_line.len(), 2);
3665 }
3666
3667 #[test]
3668 fn test_color_delay_dynamic_change() {
3669 let mut backend = TestBackend::new();
3671 backend.connected = true;
3672
3673 let info = DacInfo {
3674 id: "test".to_string(),
3675 name: "Test".to_string(),
3676 kind: DacType::Custom("Test".to_string()),
3677 caps: backend.caps().clone(),
3678 };
3679
3680 let cfg = StreamConfig::new(10000).with_color_delay(Duration::from_micros(200));
3682 let mut stream = Stream::with_backend(info, Box::new(backend), cfg);
3683
3684 stream.control.arm().unwrap();
3686 stream.process_control_messages();
3687 stream.state.last_armed = true;
3688
3689 stream.state.color_delay_line.clear();
3691 for _ in 0..2 {
3692 stream.state.color_delay_line.push_back((0, 0, 0, 0));
3693 }
3694
3695 let n = 3;
3697 for i in 0..n {
3698 stream.state.chunk_buffer[i] =
3699 LaserPoint::new(0.0, 0.0, (i as u16 + 1) * 10000, 0, 0, 65535);
3700 }
3701 let mut on_error = |_: Error| {};
3702 stream.write_fill_points(n, &mut on_error).unwrap();
3703
3704 stream.control.set_color_delay(Duration::from_micros(500));
3706
3707 for i in 0..n {
3709 stream.state.chunk_buffer[i] =
3710 LaserPoint::new(0.0, 0.0, (i as u16 + 4) * 10000, 0, 0, 65535);
3711 }
3712 stream.write_fill_points(n, &mut on_error).unwrap();
3713
3714 assert_eq!(stream.state.color_delay_line.len(), 5);
3715
3716 stream.control.set_color_delay(Duration::ZERO);
3718
3719 for i in 0..n {
3720 stream.state.chunk_buffer[i] = LaserPoint::new(0.0, 0.0, 50000, 0, 0, 65535);
3721 }
3722 stream.write_fill_points(n, &mut on_error).unwrap();
3723
3724 assert!(stream.state.color_delay_line.is_empty());
3726 }
3727
3728 #[test]
3733 fn test_startup_blank_blanks_first_n_points() {
3734 let backend = TestBackend::new();
3735 let mut backend_box: Box<dyn StreamBackend> = Box::new(backend);
3736 backend_box.connect().unwrap();
3737
3738 let info = DacInfo {
3739 id: "test".to_string(),
3740 name: "Test Device".to_string(),
3741 kind: DacType::Custom("Test".to_string()),
3742 caps: backend_box.caps().clone(),
3743 };
3744
3745 let cfg = StreamConfig::new(10000)
3748 .with_startup_blank(Duration::from_micros(500))
3749 .with_color_delay(Duration::ZERO);
3750 let mut stream = Stream::with_backend(info, backend_box, cfg);
3751
3752 assert_eq!(stream.state.startup_blank_points, 5);
3753
3754 stream.control.arm().unwrap();
3756 stream.process_control_messages();
3757
3758 stream.state.last_armed = false; let n = 10;
3763 for i in 0..n {
3764 stream.state.chunk_buffer[i] =
3765 LaserPoint::new(i as f32 * 0.1, 0.0, 65535, 32000, 16000, 65535);
3766 }
3767
3768 let mut on_error = |_: Error| {};
3769 stream.write_fill_points(n, &mut on_error).unwrap();
3770
3771 assert_eq!(stream.state.startup_blank_remaining, 0);
3774
3775 stream.state.last_armed = true; for i in 0..n {
3778 stream.state.chunk_buffer[i] = LaserPoint::new(0.0, 0.0, 65535, 32000, 16000, 65535);
3779 }
3780 stream.write_fill_points(n, &mut on_error).unwrap();
3781
3782 assert_eq!(stream.state.chunk_buffer[0].r, 65535);
3786 assert_eq!(stream.state.chunk_buffer[0].g, 32000);
3787 }
3788
3789 #[test]
3790 fn test_startup_blank_resets_on_rearm() {
3791 let backend = TestBackend::new();
3792 let mut backend_box: Box<dyn StreamBackend> = Box::new(backend);
3793 backend_box.connect().unwrap();
3794
3795 let info = DacInfo {
3796 id: "test".to_string(),
3797 name: "Test Device".to_string(),
3798 kind: DacType::Custom("Test".to_string()),
3799 caps: backend_box.caps().clone(),
3800 };
3801
3802 let cfg = StreamConfig::new(10000)
3804 .with_startup_blank(Duration::from_micros(500))
3805 .with_color_delay(Duration::ZERO);
3806 let mut stream = Stream::with_backend(info, backend_box, cfg);
3807
3808 stream.state.last_armed = false;
3810 stream.control.arm().unwrap();
3811 stream.process_control_messages();
3812
3813 let n = 10;
3814 for i in 0..n {
3815 stream.state.chunk_buffer[i] = LaserPoint::new(0.0, 0.0, 65535, 65535, 65535, 65535);
3816 }
3817 let mut on_error = |_: Error| {};
3818 stream.state.last_armed = false;
3820 stream.write_fill_points(n, &mut on_error).unwrap();
3821 assert_eq!(stream.state.startup_blank_remaining, 0);
3822
3823 stream.control.disarm().unwrap();
3825 stream.process_control_messages();
3826
3827 stream.control.arm().unwrap();
3828 stream.process_control_messages();
3829
3830 stream.state.last_armed = false;
3832 for i in 0..n {
3833 stream.state.chunk_buffer[i] = LaserPoint::new(0.0, 0.0, 65535, 65535, 65535, 65535);
3834 }
3835 stream.write_fill_points(n, &mut on_error).unwrap();
3836
3837 assert_eq!(stream.state.startup_blank_remaining, 0);
3839 }
3840
3841 #[test]
3842 fn test_startup_blank_zero_is_noop() {
3843 let backend = TestBackend::new();
3844 let mut backend_box: Box<dyn StreamBackend> = Box::new(backend);
3845 backend_box.connect().unwrap();
3846
3847 let info = DacInfo {
3848 id: "test".to_string(),
3849 name: "Test Device".to_string(),
3850 kind: DacType::Custom("Test".to_string()),
3851 caps: backend_box.caps().clone(),
3852 };
3853
3854 let cfg = StreamConfig::new(10000)
3856 .with_startup_blank(Duration::ZERO)
3857 .with_color_delay(Duration::ZERO);
3858 let mut stream = Stream::with_backend(info, backend_box, cfg);
3859
3860 assert_eq!(stream.state.startup_blank_points, 0);
3861
3862 stream.control.arm().unwrap();
3864 stream.process_control_messages();
3865 stream.state.last_armed = false; let n = 5;
3868 for i in 0..n {
3869 stream.state.chunk_buffer[i] = LaserPoint::new(0.0, 0.0, 65535, 32000, 16000, 65535);
3870 }
3871 let mut on_error = |_: Error| {};
3872 stream.write_fill_points(n, &mut on_error).unwrap();
3873
3874 assert_eq!(stream.state.chunk_buffer[0].r, 65535);
3876 assert_eq!(stream.state.chunk_buffer[0].g, 32000);
3877 assert_eq!(stream.state.chunk_buffer[0].b, 16000);
3878 assert_eq!(stream.state.chunk_buffer[0].intensity, 65535);
3879 assert_eq!(stream.state.startup_blank_remaining, 0);
3880 }
3881
3882 #[test]
3887 fn test_usb_frame_swap_replaces_scheduled_ahead() {
3888 let backend = TestBackend::new().with_output_model(OutputModel::UsbFrameSwap);
3889 let mut backend_box: Box<dyn StreamBackend> = Box::new(backend);
3890 backend_box.connect().unwrap();
3891
3892 let info = DacInfo {
3893 id: "test".to_string(),
3894 name: "Test Device".to_string(),
3895 kind: DacType::Custom("Test".to_string()),
3896 caps: backend_box.caps().clone(),
3897 };
3898
3899 let cfg = StreamConfig::new(30000).with_color_delay(Duration::ZERO);
3900 let mut stream = Stream::with_backend(info, backend_box, cfg);
3901
3902 stream.control.arm().unwrap();
3904 stream.process_control_messages();
3905
3906 let n = 50;
3907 for _ in 0..2 {
3908 for i in 0..n {
3909 stream.state.chunk_buffer[i] = LaserPoint::new(0.0, 0.0, 0, 0, 0, 0);
3910 }
3911 let mut on_error = |_: Error| {};
3912 stream.write_fill_points(n, &mut on_error).unwrap();
3913 }
3914
3915 assert_eq!(stream.state.scheduled_ahead, n as u64);
3917 assert_eq!(stream.state.stats.chunks_written, 2);
3918 assert_eq!(stream.state.stats.points_written, 2 * n as u64);
3919 }
3920
3921 #[test]
3922 fn test_usb_frame_swap_no_queue_reporting() {
3923 let backend = NoQueueTestBackend::new().with_output_model(OutputModel::UsbFrameSwap);
3924 let mut backend_box: Box<dyn StreamBackend> = Box::new(backend);
3925 backend_box.connect().unwrap();
3926
3927 let info = DacInfo {
3928 id: "test".to_string(),
3929 name: "Test Device".to_string(),
3930 kind: DacType::Custom("Test".to_string()),
3931 caps: backend_box.caps().clone(),
3932 };
3933
3934 let cfg = StreamConfig::new(30000).with_color_delay(Duration::ZERO);
3935 let mut stream = Stream::with_backend(info, backend_box, cfg);
3936
3937 stream.control.arm().unwrap();
3939 stream.process_control_messages();
3940
3941 let n = 50;
3942 for _ in 0..2 {
3943 for i in 0..n {
3944 stream.state.chunk_buffer[i] = LaserPoint::new(0.0, 0.0, 0, 0, 0, 0);
3945 }
3946 let mut on_error = |_: Error| {};
3947 stream.write_fill_points(n, &mut on_error).unwrap();
3948 }
3949
3950 assert_eq!(stream.state.scheduled_ahead, n as u64);
3953
3954 let est = stream.estimate_buffer_points();
3956 assert_eq!(est, n as u64);
3958 }
3959
3960 #[test]
3961 fn test_network_fifo_accumulates_scheduled_ahead() {
3962 let backend = TestBackend::new(); let mut backend_box: Box<dyn StreamBackend> = Box::new(backend);
3964 backend_box.connect().unwrap();
3965
3966 let info = DacInfo {
3967 id: "test".to_string(),
3968 name: "Test Device".to_string(),
3969 kind: DacType::Custom("Test".to_string()),
3970 caps: backend_box.caps().clone(),
3971 };
3972
3973 let cfg = StreamConfig::new(30000).with_color_delay(Duration::ZERO);
3974 let mut stream = Stream::with_backend(info, backend_box, cfg);
3975
3976 stream.control.arm().unwrap();
3978 stream.process_control_messages();
3979
3980 let n = 50;
3981 for _ in 0..2 {
3982 for i in 0..n {
3983 stream.state.chunk_buffer[i] = LaserPoint::new(0.0, 0.0, 0, 0, 0, 0);
3984 }
3985 let mut on_error = |_: Error| {};
3986 stream.write_fill_points(n, &mut on_error).unwrap();
3987 }
3988
3989 assert_eq!(stream.state.scheduled_ahead, 2 * n as u64);
3991 assert_eq!(stream.state.stats.chunks_written, 2);
3992 assert_eq!(stream.state.stats.points_written, 2 * n as u64);
3993 }
3994}