1use std::sync::atomic::{AtomicBool, Ordering};
31use std::sync::mpsc::{self, Receiver, Sender, TryRecvError};
32use std::sync::{Arc, Mutex};
33use std::time::Duration;
34
35use crate::backend::{Error, Result, StreamBackend, WriteOutcome};
36use crate::types::{
37 ChunkRequest, DacCapabilities, DacInfo, DacType, LaserPoint, OutputModel, RunExit,
38 StreamConfig, StreamInstant, StreamStats, StreamStatus, UnderrunPolicy,
39};
40
41#[derive(Debug, Clone, Copy)]
50enum ControlMsg {
51 Arm,
53 Disarm,
55 Stop,
57}
58
59#[derive(Clone)]
67pub struct StreamControl {
68 inner: Arc<StreamControlInner>,
69}
70
71struct StreamControlInner {
72 armed: AtomicBool,
74 stop_requested: AtomicBool,
76 control_tx: Mutex<Sender<ControlMsg>>,
79}
80
81impl StreamControl {
82 fn new(control_tx: Sender<ControlMsg>) -> Self {
83 Self {
84 inner: Arc::new(StreamControlInner {
85 armed: AtomicBool::new(false),
86 stop_requested: AtomicBool::new(false),
87 control_tx: Mutex::new(control_tx),
88 }),
89 }
90 }
91
92 pub fn arm(&self) -> Result<()> {
97 self.inner.armed.store(true, Ordering::SeqCst);
98 if let Ok(tx) = self.inner.control_tx.lock() {
100 let _ = tx.send(ControlMsg::Arm);
101 }
102 Ok(())
103 }
104
105 pub fn disarm(&self) -> Result<()> {
118 self.inner.armed.store(false, Ordering::SeqCst);
119 if let Ok(tx) = self.inner.control_tx.lock() {
121 let _ = tx.send(ControlMsg::Disarm);
122 }
123 Ok(())
124 }
125
126 pub fn is_armed(&self) -> bool {
128 self.inner.armed.load(Ordering::SeqCst)
129 }
130
131 pub fn stop(&self) -> Result<()> {
136 self.inner.stop_requested.store(true, Ordering::SeqCst);
137 if let Ok(tx) = self.inner.control_tx.lock() {
139 let _ = tx.send(ControlMsg::Stop);
140 }
141 Ok(())
142 }
143
144 pub fn is_stop_requested(&self) -> bool {
146 self.inner.stop_requested.load(Ordering::SeqCst)
147 }
148}
149
150struct StreamState {
155 current_instant: StreamInstant,
157 scheduled_ahead: u64,
159 last_chunk: Option<Vec<LaserPoint>>,
161 stats: StreamStats,
163 last_armed: bool,
165 shutter_open: bool,
167}
168
169impl StreamState {
170 fn new() -> Self {
171 Self {
172 current_instant: StreamInstant::new(0),
173 scheduled_ahead: 0,
174 last_chunk: None,
175 stats: StreamStats::default(),
176 last_armed: false,
177 shutter_open: false,
178 }
179 }
180}
181
182pub struct Stream {
195 info: DacInfo,
197 backend: Option<Box<dyn StreamBackend>>,
199 config: StreamConfig,
201 chunk_points: usize,
203 control: StreamControl,
205 control_rx: Receiver<ControlMsg>,
207 state: StreamState,
209}
210
211impl Stream {
212 pub(crate) fn with_backend(
214 info: DacInfo,
215 backend: Box<dyn StreamBackend>,
216 config: StreamConfig,
217 chunk_points: usize,
218 ) -> Self {
219 let (control_tx, control_rx) = mpsc::channel();
220 Self {
221 info,
222 backend: Some(backend),
223 config,
224 chunk_points,
225 control: StreamControl::new(control_tx),
226 control_rx,
227 state: StreamState::new(),
228 }
229 }
230
231 pub fn info(&self) -> &DacInfo {
233 &self.info
234 }
235
236 pub fn config(&self) -> &StreamConfig {
238 &self.config
239 }
240
241 pub fn control(&self) -> StreamControl {
243 self.control.clone()
244 }
245
246 pub fn chunk_points(&self) -> usize {
250 self.chunk_points
251 }
252
253 pub fn status(&self) -> Result<StreamStatus> {
255 let device_queued_points = self.backend.as_ref().and_then(|b| b.queued_points());
256
257 Ok(StreamStatus {
258 connected: self
259 .backend
260 .as_ref()
261 .map(|b| b.is_connected())
262 .unwrap_or(false),
263 chunk_points: self.chunk_points,
264 scheduled_ahead_points: self.state.scheduled_ahead,
265 device_queued_points,
266 stats: Some(self.state.stats.clone()),
267 })
268 }
269
270 pub fn next_request(&mut self) -> Result<ChunkRequest> {
275 if self.control.is_stop_requested() {
277 return Err(Error::Stopped);
278 }
279
280 let backend = self
282 .backend
283 .as_ref()
284 .ok_or_else(|| Error::disconnected("no backend"))?;
285
286 if !backend.is_connected() {
287 return Err(Error::disconnected("backend disconnected"));
288 }
289
290 self.wait_for_ready()?;
292
293 let device_queued_points = self.backend.as_ref().and_then(|b| b.queued_points());
294
295 Ok(ChunkRequest {
296 start: self.state.current_instant,
297 pps: self.config.pps,
298 n_points: self.chunk_points,
299 scheduled_ahead_points: self.state.scheduled_ahead,
300 device_queued_points,
301 })
302 }
303
304 pub fn write(&mut self, req: &ChunkRequest, points: &[LaserPoint]) -> Result<()> {
321 if points.len() != req.n_points {
323 return Err(Error::invalid_config(format!(
324 "expected {} points, got {}",
325 req.n_points,
326 points.len()
327 )));
328 }
329
330 loop {
331 if self.control.is_stop_requested() {
333 return Err(Error::Stopped);
334 }
335
336 let is_armed = self.control.is_armed();
337
338 self.handle_shutter_transition(is_armed);
340
341 let backend = self
343 .backend
344 .as_mut()
345 .ok_or_else(|| Error::disconnected("no backend"))?;
346
347 let outcome = if is_armed {
348 backend.try_write_chunk(self.config.pps, points)?
350 } else {
351 let blanked: Vec<LaserPoint> = points
353 .iter()
354 .map(|p| LaserPoint::blanked(p.x, p.y))
355 .collect();
356 backend.try_write_chunk(self.config.pps, &blanked)?
357 };
358
359 match outcome {
360 WriteOutcome::Written => {
361 if is_armed {
363 self.state.last_chunk = Some(points.to_vec());
364 }
365 self.state.current_instant += self.chunk_points as u64;
366 if self.info.caps.output_model == OutputModel::UsbFrameSwap {
367 self.state.scheduled_ahead = self.chunk_points as u64;
368 } else {
369 self.state.scheduled_ahead += self.chunk_points as u64;
370 }
371 self.state.stats.chunks_written += 1;
372 self.state.stats.points_written += self.chunk_points as u64;
373 return Ok(());
374 }
375 WriteOutcome::WouldBlock => {
376 std::thread::yield_now();
378
379 if self.process_control_messages() {
380 return Err(Error::Stopped);
381 }
382
383 std::thread::sleep(Duration::from_micros(100));
384
385 if self.process_control_messages() {
386 return Err(Error::Stopped);
387 }
388 }
389 }
390 }
391 }
392
393 fn handle_shutter_transition(&mut self, is_armed: bool) {
395 let was_armed = self.state.last_armed;
396 self.state.last_armed = is_armed;
397
398 if was_armed && !is_armed {
399 if self.state.shutter_open {
401 if let Some(backend) = &mut self.backend {
402 let _ = backend.set_shutter(false); }
404 self.state.shutter_open = false;
405 }
406 } else if !was_armed && is_armed {
407 if !self.state.shutter_open {
409 if let Some(backend) = &mut self.backend {
410 let _ = backend.set_shutter(true); }
412 self.state.shutter_open = true;
413 }
414 }
415 }
416
417 pub fn stop(&mut self) -> Result<()> {
423 self.control.disarm()?;
425
426 self.control.stop()?;
427
428 if let Some(backend) = &mut self.backend {
430 let _ = backend.set_shutter(false);
431 backend.stop()?;
432 }
433
434 Ok(())
435 }
436
437 pub fn into_dac(mut self) -> (Dac, StreamStats) {
456 let _ = self.control.disarm();
458 let _ = self.control.stop();
459 if let Some(backend) = &mut self.backend {
460 let _ = backend.set_shutter(false);
461 let _ = backend.stop();
462 }
463
464 let backend = self.backend.take();
466 let stats = self.state.stats.clone();
467
468 let dac = Dac {
469 info: self.info.clone(),
470 backend,
471 };
472
473 (dac, stats)
474 }
475
476 pub fn run<F, E>(mut self, mut producer: F, mut on_error: E) -> Result<RunExit>
493 where
494 F: FnMut(ChunkRequest) -> Option<Vec<LaserPoint>> + Send + 'static,
495 E: FnMut(Error) + Send + 'static,
496 {
497 loop {
498 if self.control.is_stop_requested() {
500 return Ok(RunExit::Stopped);
501 }
502
503 let req = match self.next_request() {
505 Ok(req) => req,
506 Err(e) if e.is_stopped() => {
507 return Ok(RunExit::Stopped);
508 }
509 Err(e) if e.is_disconnected() => {
510 on_error(e);
511 return Ok(RunExit::Disconnected);
512 }
513 Err(e) => {
514 on_error(e);
516 continue;
517 }
518 };
519
520 if self.process_control_messages() {
522 return Ok(RunExit::Stopped);
523 }
524
525 match producer(req.clone()) {
527 Some(points) => {
528 match self.write(&req, &points) {
529 Ok(()) => {}
530 Err(e) if e.is_stopped() => {
531 return Ok(RunExit::Stopped);
532 }
533 Err(e) if e.is_disconnected() => {
534 on_error(e);
535 return Ok(RunExit::Disconnected);
536 }
537 Err(e) => {
538 on_error(e);
540 if let Err(e2) = self.handle_underrun(&req) {
541 if e2.is_stopped() {
543 return Ok(RunExit::Stopped);
544 }
545 on_error(e2);
546 }
547 }
548 }
549 }
550 None => {
551 return Ok(RunExit::ProducerEnded);
552 }
553 }
554 }
555 }
556
557 fn process_control_messages(&mut self) -> bool {
570 loop {
571 match self.control_rx.try_recv() {
572 Ok(ControlMsg::Arm) => {
573 if !self.state.shutter_open {
575 if let Some(backend) = &mut self.backend {
576 let _ = backend.set_shutter(true);
577 }
578 self.state.shutter_open = true;
579 }
580 }
581 Ok(ControlMsg::Disarm) => {
582 if self.state.shutter_open {
584 if let Some(backend) = &mut self.backend {
585 let _ = backend.set_shutter(false);
586 }
587 self.state.shutter_open = false;
588 }
589 }
590 Ok(ControlMsg::Stop) => {
591 return true;
592 }
593 Err(TryRecvError::Empty) => break,
594 Err(TryRecvError::Disconnected) => break,
595 }
596 }
597 false
598 }
599
600 fn wait_for_ready(&mut self) -> Result<()> {
604 const SLEEP_SLICE: Duration = Duration::from_millis(5);
606
607 let target = self.config.target_queue_points as u64;
608
609 let effective_queue = if self.info.caps.can_estimate_queue {
613 self.backend
614 .as_ref()
615 .and_then(|b| b.queued_points())
616 .map(|device_q| device_q.max(self.state.scheduled_ahead))
617 .unwrap_or(self.state.scheduled_ahead)
618 } else {
619 self.state.scheduled_ahead
620 };
621
622 if effective_queue < target {
623 return Ok(());
624 }
625
626 let points_to_drain = effective_queue.saturating_sub(target / 2);
627 let seconds_to_wait = points_to_drain as f64 / self.config.pps as f64;
628 let wait_duration = Duration::from_secs_f64(seconds_to_wait.min(0.1));
629
630 let mut remaining = wait_duration;
632 while remaining > Duration::ZERO {
633 let slice = remaining.min(SLEEP_SLICE);
634 std::thread::sleep(slice);
635 remaining = remaining.saturating_sub(slice);
636
637 if self.process_control_messages() {
639 return Err(Error::Stopped);
640 }
641 }
642
643 let elapsed = wait_duration.as_secs_f64();
644 let points_drained = (elapsed * self.config.pps as f64) as u64;
645 self.state.scheduled_ahead = self.state.scheduled_ahead.saturating_sub(points_drained);
646
647 Ok(())
648 }
649
650 fn handle_underrun(&mut self, req: &ChunkRequest) -> Result<()> {
658 self.state.stats.underrun_count += 1;
659
660 let is_armed = self.control.is_armed();
661
662 self.handle_shutter_transition(is_armed);
664
665 let fill_points: Vec<LaserPoint> = if !is_armed {
667 vec![LaserPoint::blanked(0.0, 0.0); req.n_points]
670 } else {
671 match &self.config.underrun {
672 UnderrunPolicy::RepeatLast => self
673 .state
674 .last_chunk
675 .clone()
676 .unwrap_or_else(|| vec![LaserPoint::blanked(0.0, 0.0); req.n_points]),
677 UnderrunPolicy::Blank => {
678 vec![LaserPoint::blanked(0.0, 0.0); req.n_points]
679 }
680 UnderrunPolicy::Park { x, y } => {
681 vec![LaserPoint::blanked(*x, *y); req.n_points]
682 }
683 UnderrunPolicy::Stop => {
684 self.control.stop()?;
685 return Err(Error::Stopped);
686 }
687 }
688 };
689
690 if let Some(backend) = &mut self.backend {
691 match backend.try_write_chunk(self.config.pps, &fill_points) {
692 Ok(WriteOutcome::Written) => {
693 let n_points = fill_points.len();
695 if is_armed {
697 self.state.last_chunk = Some(fill_points);
698 }
699 self.state.current_instant += n_points as u64;
700 if self.info.caps.output_model == OutputModel::UsbFrameSwap {
701 self.state.scheduled_ahead = n_points as u64;
702 } else {
703 self.state.scheduled_ahead += n_points as u64;
704 }
705 self.state.stats.chunks_written += 1;
706 self.state.stats.points_written += n_points as u64;
707 }
708 Ok(WriteOutcome::WouldBlock) => {
709 }
711 Err(_) => {
712 }
714 }
715 }
716
717 Ok(())
718 }
719}
720
721impl Drop for Stream {
722 fn drop(&mut self) {
723 let _ = self.stop();
724 }
725}
726
727pub struct Dac {
746 info: DacInfo,
747 backend: Option<Box<dyn StreamBackend>>,
748}
749
750impl Dac {
751 pub fn new(info: DacInfo, backend: Box<dyn StreamBackend>) -> Self {
753 Self {
754 info,
755 backend: Some(backend),
756 }
757 }
758
759 pub fn info(&self) -> &DacInfo {
761 &self.info
762 }
763
764 pub fn id(&self) -> &str {
766 &self.info.id
767 }
768
769 pub fn name(&self) -> &str {
771 &self.info.name
772 }
773
774 pub fn kind(&self) -> &DacType {
776 &self.info.kind
777 }
778
779 pub fn caps(&self) -> &DacCapabilities {
781 &self.info.caps
782 }
783
784 pub fn has_backend(&self) -> bool {
786 self.backend.is_some()
787 }
788
789 pub fn is_connected(&self) -> bool {
791 self.backend
792 .as_ref()
793 .map(|b| b.is_connected())
794 .unwrap_or(false)
795 }
796
797 pub fn start_stream(mut self, cfg: StreamConfig) -> Result<(Stream, DacInfo)> {
822 let mut backend = self.backend.take().ok_or_else(|| {
823 Error::invalid_config("device backend has already been used for a stream")
824 })?;
825
826 Self::validate_config(&self.info.caps, &cfg)?;
827
828 if !backend.is_connected() {
830 backend.connect()?;
831 }
832
833 let chunk_points = cfg.chunk_points.unwrap_or_else(|| {
834 Self::compute_default_chunk_size(&self.info.caps, cfg.pps, cfg.target_queue_points)
835 });
836
837 let stream = Stream::with_backend(self.info.clone(), backend, cfg, chunk_points);
838
839 Ok((stream, self.info))
840 }
841
842 fn validate_config(caps: &DacCapabilities, cfg: &StreamConfig) -> Result<()> {
843 if cfg.pps < caps.pps_min || cfg.pps > caps.pps_max {
844 return Err(Error::invalid_config(format!(
845 "PPS {} is outside device range [{}, {}]",
846 cfg.pps, caps.pps_min, caps.pps_max
847 )));
848 }
849
850 if let Some(chunk_points) = cfg.chunk_points {
851 if chunk_points > caps.max_points_per_chunk {
852 return Err(Error::invalid_config(format!(
853 "chunk_points {} exceeds device max {}",
854 chunk_points, caps.max_points_per_chunk
855 )));
856 }
857 if chunk_points == 0 {
858 return Err(Error::invalid_config("chunk_points cannot be 0"));
859 }
860 }
861
862 if cfg.target_queue_points == 0 {
863 return Err(Error::invalid_config("target_queue_points cannot be 0"));
864 }
865
866 Ok(())
867 }
868
869 fn compute_default_chunk_size(
870 caps: &DacCapabilities,
871 pps: u32,
872 target_queue_points: usize,
873 ) -> usize {
874 let target_chunk_ms = 10;
876 let time_based_points = (pps as usize * target_chunk_ms) / 1000;
877
878 let queue_based_max = target_queue_points / 4;
881
882 let max_points = caps.max_points_per_chunk.min(queue_based_max.max(100));
883 let min_points = 100;
884
885 time_based_points.clamp(min_points, max_points)
886 }
887}
888
889pub type OwnedDac = Dac;
891
892#[cfg(test)]
893mod tests {
894 use super::*;
895 use crate::backend::{StreamBackend, WriteOutcome};
896 use std::sync::atomic::{AtomicU64, AtomicUsize, Ordering};
897 use std::sync::Arc;
898
899 struct TestBackend {
901 caps: DacCapabilities,
902 connected: bool,
903 write_count: Arc<AtomicUsize>,
905 would_block_count: Arc<AtomicUsize>,
907 queued: Arc<AtomicU64>,
909 shutter_open: Arc<AtomicBool>,
911 }
912
913 impl TestBackend {
914 fn new() -> Self {
915 Self {
916 caps: DacCapabilities {
917 pps_min: 1000,
918 pps_max: 100000,
919 max_points_per_chunk: 1000,
920 prefers_constant_pps: false,
921 can_estimate_queue: true,
922 output_model: crate::types::OutputModel::NetworkFifo,
923 },
924 connected: false,
925 write_count: Arc::new(AtomicUsize::new(0)),
926 would_block_count: Arc::new(AtomicUsize::new(0)),
927 queued: Arc::new(AtomicU64::new(0)),
928 shutter_open: Arc::new(AtomicBool::new(false)),
929 }
930 }
931
932 fn with_would_block_count(mut self, count: usize) -> Self {
933 self.would_block_count = Arc::new(AtomicUsize::new(count));
934 self
935 }
936 }
937
938 impl StreamBackend for TestBackend {
939 fn dac_type(&self) -> DacType {
940 DacType::Custom("Test".to_string())
941 }
942
943 fn caps(&self) -> &DacCapabilities {
944 &self.caps
945 }
946
947 fn connect(&mut self) -> Result<()> {
948 self.connected = true;
949 Ok(())
950 }
951
952 fn disconnect(&mut self) -> Result<()> {
953 self.connected = false;
954 Ok(())
955 }
956
957 fn is_connected(&self) -> bool {
958 self.connected
959 }
960
961 fn try_write_chunk(&mut self, _pps: u32, points: &[LaserPoint]) -> Result<WriteOutcome> {
962 self.write_count.fetch_add(1, Ordering::SeqCst);
963
964 let remaining = self.would_block_count.load(Ordering::SeqCst);
966 if remaining > 0 {
967 self.would_block_count.fetch_sub(1, Ordering::SeqCst);
968 return Ok(WriteOutcome::WouldBlock);
969 }
970
971 self.queued.fetch_add(points.len() as u64, Ordering::SeqCst);
972 Ok(WriteOutcome::Written)
973 }
974
975 fn stop(&mut self) -> Result<()> {
976 Ok(())
977 }
978
979 fn set_shutter(&mut self, open: bool) -> Result<()> {
980 self.shutter_open.store(open, Ordering::SeqCst);
981 Ok(())
982 }
983
984 fn queued_points(&self) -> Option<u64> {
985 Some(self.queued.load(Ordering::SeqCst))
986 }
987 }
988
989 #[test]
990 fn test_stream_control_arm_disarm() {
991 let (tx, _rx) = mpsc::channel();
992 let control = StreamControl::new(tx);
993 assert!(!control.is_armed());
994
995 control.arm().unwrap();
996 assert!(control.is_armed());
997
998 control.disarm().unwrap();
999 assert!(!control.is_armed());
1000 }
1001
1002 #[test]
1003 fn test_stream_control_stop() {
1004 let (tx, _rx) = mpsc::channel();
1005 let control = StreamControl::new(tx);
1006 assert!(!control.is_stop_requested());
1007
1008 control.stop().unwrap();
1009 assert!(control.is_stop_requested());
1010 }
1011
1012 #[test]
1013 fn test_stream_control_clone_shares_state() {
1014 let (tx, _rx) = mpsc::channel();
1015 let control1 = StreamControl::new(tx);
1016 let control2 = control1.clone();
1017
1018 control1.arm().unwrap();
1019 assert!(control2.is_armed());
1020
1021 control2.stop().unwrap();
1022 assert!(control1.is_stop_requested());
1023 }
1024
1025 #[test]
1026 fn test_device_start_stream_connects_backend() {
1027 let backend = TestBackend::new();
1028 let info = DacInfo {
1029 id: "test".to_string(),
1030 name: "Test Device".to_string(),
1031 kind: DacType::Custom("Test".to_string()),
1032 caps: backend.caps().clone(),
1033 };
1034 let device = Dac::new(info, Box::new(backend));
1035
1036 assert!(!device.is_connected());
1038
1039 let cfg = StreamConfig::new(30000);
1041 let result = device.start_stream(cfg);
1042 assert!(result.is_ok());
1043
1044 let (stream, _info) = result.unwrap();
1045 assert!(stream.backend.as_ref().unwrap().is_connected());
1046 }
1047
1048 #[test]
1049 fn test_handle_underrun_advances_state() {
1050 let mut backend = TestBackend::new();
1051 backend.connected = true;
1052 let info = DacInfo {
1053 id: "test".to_string(),
1054 name: "Test Device".to_string(),
1055 kind: DacType::Custom("Test".to_string()),
1056 caps: backend.caps().clone(),
1057 };
1058
1059 let cfg = StreamConfig::new(30000);
1060 let mut stream = Stream::with_backend(info, Box::new(backend), cfg, 100);
1061
1062 let initial_instant = stream.state.current_instant;
1064 let initial_scheduled = stream.state.scheduled_ahead;
1065 let initial_chunks = stream.state.stats.chunks_written;
1066 let initial_points = stream.state.stats.points_written;
1067
1068 let req = ChunkRequest {
1070 start: StreamInstant::new(0),
1071 pps: 30000,
1072 n_points: 100,
1073 scheduled_ahead_points: 0,
1074 device_queued_points: None,
1075 };
1076 stream.handle_underrun(&req).unwrap();
1077
1078 assert!(stream.state.current_instant > initial_instant);
1080 assert!(stream.state.scheduled_ahead > initial_scheduled);
1081 assert_eq!(stream.state.stats.chunks_written, initial_chunks + 1);
1082 assert_eq!(stream.state.stats.points_written, initial_points + 100);
1083 assert_eq!(stream.state.stats.underrun_count, 1);
1084 }
1085
1086 #[test]
1087 fn test_run_retries_on_would_block() {
1088 let backend = TestBackend::new().with_would_block_count(3);
1090 let write_count = backend.write_count.clone();
1091
1092 let mut backend_box: Box<dyn StreamBackend> = Box::new(backend);
1093 backend_box.connect().unwrap();
1094
1095 let info = DacInfo {
1096 id: "test".to_string(),
1097 name: "Test Device".to_string(),
1098 kind: DacType::Custom("Test".to_string()),
1099 caps: backend_box.caps().clone(),
1100 };
1101
1102 let cfg = StreamConfig::new(30000).with_target_queue_points(10000);
1103 let stream = Stream::with_backend(info, backend_box, cfg, 100);
1104
1105 let produced_count = Arc::new(AtomicUsize::new(0));
1106 let produced_count_clone = produced_count.clone();
1107 let result = stream.run(
1108 move |_req| {
1109 let count = produced_count_clone.fetch_add(1, Ordering::SeqCst);
1110 if count < 1 {
1111 Some(vec![LaserPoint::blanked(0.0, 0.0); 100])
1112 } else {
1113 None }
1115 },
1116 |_e| {},
1117 );
1118
1119 assert_eq!(result.unwrap(), RunExit::ProducerEnded);
1120 assert_eq!(write_count.load(Ordering::SeqCst), 4);
1122 }
1123
1124 #[test]
1125 fn test_arm_opens_shutter_disarm_closes_shutter() {
1126 let backend = TestBackend::new();
1127 let shutter_open = backend.shutter_open.clone();
1128
1129 let mut backend_box: Box<dyn StreamBackend> = Box::new(backend);
1130 backend_box.connect().unwrap();
1131
1132 let info = DacInfo {
1133 id: "test".to_string(),
1134 name: "Test Device".to_string(),
1135 kind: DacType::Custom("Test".to_string()),
1136 caps: backend_box.caps().clone(),
1137 };
1138
1139 let cfg = StreamConfig::new(30000);
1140 let mut stream = Stream::with_backend(info, backend_box, cfg, 100);
1141
1142 assert!(!shutter_open.load(Ordering::SeqCst));
1144
1145 let control = stream.control();
1147 control.arm().unwrap();
1148
1149 let stopped = stream.process_control_messages();
1151 assert!(!stopped);
1152 assert!(shutter_open.load(Ordering::SeqCst));
1153
1154 control.disarm().unwrap();
1156
1157 let stopped = stream.process_control_messages();
1159 assert!(!stopped);
1160 assert!(!shutter_open.load(Ordering::SeqCst));
1161 }
1162
1163 #[test]
1164 fn test_handle_underrun_blanks_when_disarmed() {
1165 let backend = TestBackend::new();
1166
1167 let mut backend_box: Box<dyn StreamBackend> = Box::new(backend);
1168 backend_box.connect().unwrap();
1169
1170 let info = DacInfo {
1171 id: "test".to_string(),
1172 name: "Test Device".to_string(),
1173 kind: DacType::Custom("Test".to_string()),
1174 caps: backend_box.caps().clone(),
1175 };
1176
1177 let cfg = StreamConfig::new(30000).with_underrun(UnderrunPolicy::RepeatLast);
1179 let mut stream = Stream::with_backend(info, backend_box, cfg, 100);
1180
1181 stream.state.last_chunk = Some(vec![
1183 LaserPoint::new(0.5, 0.5, 65535, 65535, 65535, 65535);
1184 100
1185 ]);
1186
1187 assert!(!stream.control.is_armed());
1189
1190 let req = ChunkRequest {
1191 start: StreamInstant::new(0),
1192 pps: 30000,
1193 n_points: 100,
1194 scheduled_ahead_points: 0,
1195 device_queued_points: None,
1196 };
1197
1198 stream.handle_underrun(&req).unwrap();
1200
1201 let last = stream.state.last_chunk.as_ref().unwrap();
1205 assert_eq!(last[0].r, 65535); }
1207
1208 #[test]
1209 fn test_stop_closes_shutter() {
1210 let backend = TestBackend::new();
1211 let shutter_open = backend.shutter_open.clone();
1212
1213 let mut backend_box: Box<dyn StreamBackend> = Box::new(backend);
1214 backend_box.connect().unwrap();
1215
1216 let info = DacInfo {
1217 id: "test".to_string(),
1218 name: "Test Device".to_string(),
1219 kind: DacType::Custom("Test".to_string()),
1220 caps: backend_box.caps().clone(),
1221 };
1222
1223 let cfg = StreamConfig::new(30000);
1224 let mut stream = Stream::with_backend(info, backend_box, cfg, 100);
1225
1226 stream.control.arm().unwrap();
1228 stream.process_control_messages();
1229 assert!(shutter_open.load(Ordering::SeqCst));
1230
1231 stream.stop().unwrap();
1233 assert!(!shutter_open.load(Ordering::SeqCst));
1234 }
1235
1236 #[test]
1237 fn test_arm_disarm_arm_cycle() {
1238 let backend = TestBackend::new();
1239 let shutter_open = backend.shutter_open.clone();
1240
1241 let mut backend_box: Box<dyn StreamBackend> = Box::new(backend);
1242 backend_box.connect().unwrap();
1243
1244 let info = DacInfo {
1245 id: "test".to_string(),
1246 name: "Test Device".to_string(),
1247 kind: DacType::Custom("Test".to_string()),
1248 caps: backend_box.caps().clone(),
1249 };
1250
1251 let cfg = StreamConfig::new(30000);
1252 let mut stream = Stream::with_backend(info, backend_box, cfg, 100);
1253 let control = stream.control();
1254
1255 assert!(!control.is_armed());
1257 assert!(!shutter_open.load(Ordering::SeqCst));
1258
1259 control.arm().unwrap();
1261 stream.process_control_messages();
1262 assert!(control.is_armed());
1263 assert!(shutter_open.load(Ordering::SeqCst));
1264
1265 control.disarm().unwrap();
1267 stream.process_control_messages();
1268 assert!(!control.is_armed());
1269 assert!(!shutter_open.load(Ordering::SeqCst));
1270
1271 control.arm().unwrap();
1273 stream.process_control_messages();
1274 assert!(control.is_armed());
1275 assert!(shutter_open.load(Ordering::SeqCst));
1276 }
1277
1278 #[test]
1279 fn test_blocking_write_retries_on_would_block() {
1280 let backend = TestBackend::new().with_would_block_count(3);
1285 let write_count = backend.write_count.clone();
1286
1287 let mut backend_box: Box<dyn StreamBackend> = Box::new(backend);
1288 backend_box.connect().unwrap();
1289
1290 let info = DacInfo {
1291 id: "test".to_string(),
1292 name: "Test Device".to_string(),
1293 kind: DacType::Custom("Test".to_string()),
1294 caps: backend_box.caps().clone(),
1295 };
1296
1297 let cfg = StreamConfig::new(30000).with_target_queue_points(10000);
1298 let mut stream = Stream::with_backend(info, backend_box, cfg, 100);
1299
1300 let req = stream.next_request().unwrap();
1301 let points = vec![LaserPoint::blanked(0.0, 0.0); req.n_points];
1302
1303 stream.write(&req, &points).unwrap();
1305
1306 assert_eq!(write_count.load(Ordering::SeqCst), 4);
1308 }
1309
1310 #[test]
1311 fn test_write_stops_during_would_block_retry() {
1312 let backend = TestBackend::new().with_would_block_count(usize::MAX);
1315
1316 let mut backend_box: Box<dyn StreamBackend> = Box::new(backend);
1317 backend_box.connect().unwrap();
1318
1319 let info = DacInfo {
1320 id: "test".to_string(),
1321 name: "Test Device".to_string(),
1322 kind: DacType::Custom("Test".to_string()),
1323 caps: backend_box.caps().clone(),
1324 };
1325
1326 let cfg = StreamConfig::new(30000).with_target_queue_points(10000);
1327 let mut stream = Stream::with_backend(info, backend_box, cfg, 100);
1328
1329 let req = stream.next_request().unwrap();
1330 let points = vec![LaserPoint::blanked(0.0, 0.0); req.n_points];
1331
1332 let control = stream.control();
1334 let handle = std::thread::spawn(move || {
1335 std::thread::sleep(Duration::from_millis(10));
1336 control.stop().unwrap();
1337 });
1338
1339 let result = stream.write(&req, &points);
1340 assert!(result.unwrap_err().is_stopped());
1341 handle.join().unwrap();
1342 }
1343
1344 #[test]
1345 fn test_write_processes_disarm_during_would_block_retry() {
1346 let backend = TestBackend::new().with_would_block_count(100);
1350 let shutter_open = backend.shutter_open.clone();
1351
1352 let mut backend_box: Box<dyn StreamBackend> = Box::new(backend);
1353 backend_box.connect().unwrap();
1354
1355 let info = DacInfo {
1356 id: "test".to_string(),
1357 name: "Test Device".to_string(),
1358 kind: DacType::Custom("Test".to_string()),
1359 caps: backend_box.caps().clone(),
1360 };
1361
1362 let cfg = StreamConfig::new(30000).with_target_queue_points(10000);
1363 let mut stream = Stream::with_backend(info, backend_box, cfg, 100);
1364
1365 stream.control.arm().unwrap();
1367 stream.process_control_messages();
1368 assert!(shutter_open.load(Ordering::SeqCst));
1369
1370 let req = stream.next_request().unwrap();
1371 let points = vec![LaserPoint::blanked(0.0, 0.0); req.n_points];
1372
1373 let control = stream.control();
1375 let handle = std::thread::spawn(move || {
1376 std::thread::sleep(Duration::from_millis(1));
1377 control.disarm().unwrap();
1378 });
1379
1380 stream.write(&req, &points).unwrap();
1382 handle.join().unwrap();
1383
1384 assert!(!shutter_open.load(Ordering::SeqCst));
1386 }
1387
1388 #[test]
1389 fn test_write_stats_correct_after_would_block_retries() {
1390 let backend = TestBackend::new().with_would_block_count(3);
1393
1394 let mut backend_box: Box<dyn StreamBackend> = Box::new(backend);
1395 backend_box.connect().unwrap();
1396
1397 let info = DacInfo {
1398 id: "test".to_string(),
1399 name: "Test Device".to_string(),
1400 kind: DacType::Custom("Test".to_string()),
1401 caps: backend_box.caps().clone(),
1402 };
1403
1404 let cfg = StreamConfig::new(30000).with_target_queue_points(10000);
1405 let mut stream = Stream::with_backend(info, backend_box, cfg, 100);
1406
1407 let req = stream.next_request().unwrap();
1408 let points = vec![LaserPoint::blanked(0.0, 0.0); req.n_points];
1409
1410 stream.write(&req, &points).unwrap();
1411
1412 assert_eq!(stream.state.stats.chunks_written, 1);
1414 assert_eq!(stream.state.stats.points_written, 100);
1415 }
1416
1417 #[test]
1418 fn test_write_rejects_wrong_point_count_without_retrying() {
1419 let backend = TestBackend::new();
1422 let write_count = backend.write_count.clone();
1423
1424 let mut backend_box: Box<dyn StreamBackend> = Box::new(backend);
1425 backend_box.connect().unwrap();
1426
1427 let info = DacInfo {
1428 id: "test".to_string(),
1429 name: "Test Device".to_string(),
1430 kind: DacType::Custom("Test".to_string()),
1431 caps: backend_box.caps().clone(),
1432 };
1433
1434 let cfg = StreamConfig::new(30000).with_target_queue_points(10000);
1435 let mut stream = Stream::with_backend(info, backend_box, cfg, 100);
1436
1437 let req = stream.next_request().unwrap();
1438 let points = vec![LaserPoint::blanked(0.0, 0.0); req.n_points + 1];
1440
1441 let result = stream.write(&req, &points);
1442 assert!(result.is_err());
1443
1444 assert_eq!(write_count.load(Ordering::SeqCst), 0);
1446 }
1447}