1use std::sync::atomic::{AtomicBool, Ordering};
30use std::sync::mpsc::{self, Receiver, Sender, TryRecvError};
31use std::sync::{Arc, Mutex};
32use std::time::Duration;
33
34use crate::backend::{Error, Result, StreamBackend, WriteOutcome};
35use crate::types::{
36 ChunkRequest, DacCapabilities, DacInfo, DacType, LaserPoint, RunExit, StreamConfig,
37 StreamInstant, StreamStats, StreamStatus, UnderrunPolicy,
38};
39
40#[derive(Debug, Clone, Copy)]
49enum ControlMsg {
50 Arm,
52 Disarm,
54 Stop,
56}
57
58#[derive(Clone)]
66pub struct StreamControl {
67 inner: Arc<StreamControlInner>,
68}
69
70struct StreamControlInner {
71 armed: AtomicBool,
73 stop_requested: AtomicBool,
75 control_tx: Mutex<Sender<ControlMsg>>,
78}
79
80impl StreamControl {
81 fn new(control_tx: Sender<ControlMsg>) -> Self {
82 Self {
83 inner: Arc::new(StreamControlInner {
84 armed: AtomicBool::new(false),
85 stop_requested: AtomicBool::new(false),
86 control_tx: Mutex::new(control_tx),
87 }),
88 }
89 }
90
91 pub fn arm(&self) -> Result<()> {
96 self.inner.armed.store(true, Ordering::SeqCst);
97 if let Ok(tx) = self.inner.control_tx.lock() {
99 let _ = tx.send(ControlMsg::Arm);
100 }
101 Ok(())
102 }
103
104 pub fn disarm(&self) -> Result<()> {
117 self.inner.armed.store(false, Ordering::SeqCst);
118 if let Ok(tx) = self.inner.control_tx.lock() {
120 let _ = tx.send(ControlMsg::Disarm);
121 }
122 Ok(())
123 }
124
125 pub fn is_armed(&self) -> bool {
127 self.inner.armed.load(Ordering::SeqCst)
128 }
129
130 pub fn stop(&self) -> Result<()> {
135 self.inner.stop_requested.store(true, Ordering::SeqCst);
136 if let Ok(tx) = self.inner.control_tx.lock() {
138 let _ = tx.send(ControlMsg::Stop);
139 }
140 Ok(())
141 }
142
143 pub fn is_stop_requested(&self) -> bool {
145 self.inner.stop_requested.load(Ordering::SeqCst)
146 }
147}
148
149struct StreamState {
154 current_instant: StreamInstant,
156 scheduled_ahead: u64,
158 last_chunk: Option<Vec<LaserPoint>>,
160 stats: StreamStats,
162 last_armed: bool,
164 shutter_open: bool,
166}
167
168impl StreamState {
169 fn new() -> Self {
170 Self {
171 current_instant: StreamInstant::new(0),
172 scheduled_ahead: 0,
173 last_chunk: None,
174 stats: StreamStats::default(),
175 last_armed: false,
176 shutter_open: false,
177 }
178 }
179}
180
181pub struct Stream {
194 info: DacInfo,
196 backend: Option<Box<dyn StreamBackend>>,
198 config: StreamConfig,
200 chunk_points: usize,
202 control: StreamControl,
204 control_rx: Receiver<ControlMsg>,
206 state: StreamState,
208}
209
210impl Stream {
211 pub(crate) fn with_backend(
213 info: DacInfo,
214 backend: Box<dyn StreamBackend>,
215 config: StreamConfig,
216 chunk_points: usize,
217 ) -> Self {
218 let (control_tx, control_rx) = mpsc::channel();
219 Self {
220 info,
221 backend: Some(backend),
222 config,
223 chunk_points,
224 control: StreamControl::new(control_tx),
225 control_rx,
226 state: StreamState::new(),
227 }
228 }
229
230 pub fn info(&self) -> &DacInfo {
232 &self.info
233 }
234
235 pub fn config(&self) -> &StreamConfig {
237 &self.config
238 }
239
240 pub fn control(&self) -> StreamControl {
242 self.control.clone()
243 }
244
245 pub fn chunk_points(&self) -> usize {
249 self.chunk_points
250 }
251
252 pub fn status(&self) -> Result<StreamStatus> {
254 let device_queued_points = self.backend.as_ref().and_then(|b| b.queued_points());
255
256 Ok(StreamStatus {
257 connected: self
258 .backend
259 .as_ref()
260 .map(|b| b.is_connected())
261 .unwrap_or(false),
262 chunk_points: self.chunk_points,
263 scheduled_ahead_points: self.state.scheduled_ahead,
264 device_queued_points,
265 stats: Some(self.state.stats.clone()),
266 })
267 }
268
269 pub fn next_request(&mut self) -> Result<ChunkRequest> {
274 if self.control.is_stop_requested() {
276 return Err(Error::Stopped);
277 }
278
279 let backend = self
281 .backend
282 .as_ref()
283 .ok_or_else(|| Error::disconnected("no backend"))?;
284
285 if !backend.is_connected() {
286 return Err(Error::disconnected("backend disconnected"));
287 }
288
289 self.wait_for_ready()?;
291
292 let device_queued_points = self.backend.as_ref().and_then(|b| b.queued_points());
293
294 Ok(ChunkRequest {
295 start: self.state.current_instant,
296 pps: self.config.pps,
297 n_points: self.chunk_points,
298 scheduled_ahead_points: self.state.scheduled_ahead,
299 device_queued_points,
300 })
301 }
302
303 pub fn write(&mut self, req: &ChunkRequest, points: &[LaserPoint]) -> Result<()> {
316 if points.len() != req.n_points {
318 return Err(Error::invalid_config(format!(
319 "expected {} points, got {}",
320 req.n_points,
321 points.len()
322 )));
323 }
324
325 if self.control.is_stop_requested() {
327 return Err(Error::Stopped);
328 }
329
330 let is_armed = self.control.is_armed();
331
332 self.handle_shutter_transition(is_armed);
334
335 let backend = self
337 .backend
338 .as_mut()
339 .ok_or_else(|| Error::disconnected("no backend"))?;
340
341 let outcome = if is_armed {
342 backend.try_write_chunk(self.config.pps, points)?
344 } else {
345 let blanked: Vec<LaserPoint> = points
347 .iter()
348 .map(|p| LaserPoint::blanked(p.x, p.y))
349 .collect();
350 backend.try_write_chunk(self.config.pps, &blanked)?
351 };
352
353 match outcome {
354 WriteOutcome::Written => {
355 if is_armed {
357 self.state.last_chunk = Some(points.to_vec());
358 }
359 self.state.current_instant += self.chunk_points as u64;
360 self.state.scheduled_ahead += self.chunk_points as u64;
361 self.state.stats.chunks_written += 1;
362 self.state.stats.points_written += self.chunk_points as u64;
363 Ok(())
364 }
365 WriteOutcome::WouldBlock => Err(Error::WouldBlock),
366 }
367 }
368
369 fn handle_shutter_transition(&mut self, is_armed: bool) {
371 let was_armed = self.state.last_armed;
372 self.state.last_armed = is_armed;
373
374 if was_armed && !is_armed {
375 if self.state.shutter_open {
377 if let Some(backend) = &mut self.backend {
378 let _ = backend.set_shutter(false); }
380 self.state.shutter_open = false;
381 }
382 } else if !was_armed && is_armed {
383 if !self.state.shutter_open {
385 if let Some(backend) = &mut self.backend {
386 let _ = backend.set_shutter(true); }
388 self.state.shutter_open = true;
389 }
390 }
391 }
392
393 pub fn stop(&mut self) -> Result<()> {
399 self.control.disarm()?;
401
402 self.control.stop()?;
403
404 if let Some(backend) = &mut self.backend {
406 let _ = backend.set_shutter(false);
407 backend.stop()?;
408 }
409
410 Ok(())
411 }
412
413 pub fn into_dac(mut self) -> (Dac, StreamStats) {
432 let _ = self.control.disarm();
434 let _ = self.control.stop();
435 if let Some(backend) = &mut self.backend {
436 let _ = backend.set_shutter(false);
437 let _ = backend.stop();
438 }
439
440 let backend = self.backend.take();
442 let stats = self.state.stats.clone();
443
444 let dac = Dac {
445 info: self.info.clone(),
446 backend,
447 };
448
449 (dac, stats)
450 }
451
452 pub fn run<F, E>(mut self, mut producer: F, mut on_error: E) -> Result<RunExit>
469 where
470 F: FnMut(ChunkRequest) -> Option<Vec<LaserPoint>> + Send + 'static,
471 E: FnMut(Error) + Send + 'static,
472 {
473 loop {
474 if self.control.is_stop_requested() {
476 return Ok(RunExit::Stopped);
477 }
478
479 let req = match self.next_request() {
481 Ok(req) => req,
482 Err(e) if e.is_stopped() => {
483 return Ok(RunExit::Stopped);
484 }
485 Err(e) if e.is_disconnected() => {
486 on_error(e);
487 return Ok(RunExit::Disconnected);
488 }
489 Err(e) => {
490 on_error(e);
492 continue;
493 }
494 };
495
496 if self.process_control_messages() {
498 return Ok(RunExit::Stopped);
499 }
500
501 match producer(req.clone()) {
503 Some(points) => {
504 loop {
506 match self.write(&req, &points) {
507 Ok(()) => break,
508 Err(e) if e.is_would_block() => {
509 std::thread::yield_now();
512
513 if self.process_control_messages() {
515 return Ok(RunExit::Stopped);
516 }
517
518 std::thread::sleep(Duration::from_micros(100));
519
520 if self.process_control_messages() {
522 return Ok(RunExit::Stopped);
523 }
524 continue;
525 }
526 Err(e) if e.is_stopped() => {
527 return Ok(RunExit::Stopped);
528 }
529 Err(e) if e.is_disconnected() => {
530 on_error(e);
531 return Ok(RunExit::Disconnected);
532 }
533 Err(e) => {
534 on_error(e);
536 if let Err(e2) = self.handle_underrun(&req) {
537 if e2.is_stopped() {
539 return Ok(RunExit::Stopped);
540 }
541 on_error(e2);
542 }
543 break;
544 }
545 }
546 }
547 }
548 None => {
549 return Ok(RunExit::ProducerEnded);
550 }
551 }
552 }
553 }
554
555 fn process_control_messages(&mut self) -> bool {
568 loop {
569 match self.control_rx.try_recv() {
570 Ok(ControlMsg::Arm) => {
571 if !self.state.shutter_open {
573 if let Some(backend) = &mut self.backend {
574 let _ = backend.set_shutter(true);
575 }
576 self.state.shutter_open = true;
577 }
578 }
579 Ok(ControlMsg::Disarm) => {
580 if self.state.shutter_open {
582 if let Some(backend) = &mut self.backend {
583 let _ = backend.set_shutter(false);
584 }
585 self.state.shutter_open = false;
586 }
587 }
588 Ok(ControlMsg::Stop) => {
589 return true;
590 }
591 Err(TryRecvError::Empty) => break,
592 Err(TryRecvError::Disconnected) => break,
593 }
594 }
595 false
596 }
597
598 fn wait_for_ready(&mut self) -> Result<()> {
602 const SLEEP_SLICE: Duration = Duration::from_millis(5);
604
605 let target = self.config.target_queue_points as u64;
606
607 let effective_queue = if self.info.caps.can_estimate_queue {
611 self.backend
612 .as_ref()
613 .and_then(|b| b.queued_points())
614 .map(|device_q| device_q.max(self.state.scheduled_ahead))
615 .unwrap_or(self.state.scheduled_ahead)
616 } else {
617 self.state.scheduled_ahead
618 };
619
620 if effective_queue < target {
621 return Ok(());
622 }
623
624 let points_to_drain = effective_queue.saturating_sub(target / 2);
625 let seconds_to_wait = points_to_drain as f64 / self.config.pps as f64;
626 let wait_duration = Duration::from_secs_f64(seconds_to_wait.min(0.1));
627
628 let mut remaining = wait_duration;
630 while remaining > Duration::ZERO {
631 let slice = remaining.min(SLEEP_SLICE);
632 std::thread::sleep(slice);
633 remaining = remaining.saturating_sub(slice);
634
635 if self.process_control_messages() {
637 return Err(Error::Stopped);
638 }
639 }
640
641 let elapsed = wait_duration.as_secs_f64();
642 let points_drained = (elapsed * self.config.pps as f64) as u64;
643 self.state.scheduled_ahead = self.state.scheduled_ahead.saturating_sub(points_drained);
644
645 Ok(())
646 }
647
648 fn handle_underrun(&mut self, req: &ChunkRequest) -> Result<()> {
656 self.state.stats.underrun_count += 1;
657
658 let is_armed = self.control.is_armed();
659
660 self.handle_shutter_transition(is_armed);
662
663 let fill_points: Vec<LaserPoint> = if !is_armed {
665 vec![LaserPoint::blanked(0.0, 0.0); req.n_points]
668 } else {
669 match &self.config.underrun {
670 UnderrunPolicy::RepeatLast => self
671 .state
672 .last_chunk
673 .clone()
674 .unwrap_or_else(|| vec![LaserPoint::blanked(0.0, 0.0); req.n_points]),
675 UnderrunPolicy::Blank => {
676 vec![LaserPoint::blanked(0.0, 0.0); req.n_points]
677 }
678 UnderrunPolicy::Park { x, y } => {
679 vec![LaserPoint::blanked(*x, *y); req.n_points]
680 }
681 UnderrunPolicy::Stop => {
682 self.control.stop()?;
683 return Err(Error::Stopped);
684 }
685 }
686 };
687
688 if let Some(backend) = &mut self.backend {
689 match backend.try_write_chunk(self.config.pps, &fill_points) {
690 Ok(WriteOutcome::Written) => {
691 let n_points = fill_points.len();
693 if is_armed {
695 self.state.last_chunk = Some(fill_points);
696 }
697 self.state.current_instant += n_points as u64;
698 self.state.scheduled_ahead += n_points as u64;
699 self.state.stats.chunks_written += 1;
700 self.state.stats.points_written += n_points as u64;
701 }
702 Ok(WriteOutcome::WouldBlock) => {
703 }
705 Err(_) => {
706 }
708 }
709 }
710
711 Ok(())
712 }
713}
714
715impl Drop for Stream {
716 fn drop(&mut self) {
717 let _ = self.stop();
718 }
719}
720
721pub struct Dac {
740 info: DacInfo,
741 backend: Option<Box<dyn StreamBackend>>,
742}
743
744impl Dac {
745 pub fn new(info: DacInfo, backend: Box<dyn StreamBackend>) -> Self {
747 Self {
748 info,
749 backend: Some(backend),
750 }
751 }
752
753 pub fn info(&self) -> &DacInfo {
755 &self.info
756 }
757
758 pub fn id(&self) -> &str {
760 &self.info.id
761 }
762
763 pub fn name(&self) -> &str {
765 &self.info.name
766 }
767
768 pub fn kind(&self) -> &DacType {
770 &self.info.kind
771 }
772
773 pub fn caps(&self) -> &DacCapabilities {
775 &self.info.caps
776 }
777
778 pub fn has_backend(&self) -> bool {
780 self.backend.is_some()
781 }
782
783 pub fn is_connected(&self) -> bool {
785 self.backend
786 .as_ref()
787 .map(|b| b.is_connected())
788 .unwrap_or(false)
789 }
790
791 pub fn start_stream(mut self, cfg: StreamConfig) -> Result<(Stream, DacInfo)> {
816 let mut backend = self.backend.take().ok_or_else(|| {
817 Error::invalid_config("device backend has already been used for a stream")
818 })?;
819
820 Self::validate_config(&self.info.caps, &cfg)?;
821
822 if !backend.is_connected() {
824 backend.connect()?;
825 }
826
827 let chunk_points = cfg.chunk_points.unwrap_or_else(|| {
828 Self::compute_default_chunk_size(&self.info.caps, cfg.pps, cfg.target_queue_points)
829 });
830
831 let stream = Stream::with_backend(self.info.clone(), backend, cfg, chunk_points);
832
833 Ok((stream, self.info))
834 }
835
836 fn validate_config(caps: &DacCapabilities, cfg: &StreamConfig) -> Result<()> {
837 if cfg.pps < caps.pps_min || cfg.pps > caps.pps_max {
838 return Err(Error::invalid_config(format!(
839 "PPS {} is outside device range [{}, {}]",
840 cfg.pps, caps.pps_min, caps.pps_max
841 )));
842 }
843
844 if let Some(chunk_points) = cfg.chunk_points {
845 if chunk_points > caps.max_points_per_chunk {
846 return Err(Error::invalid_config(format!(
847 "chunk_points {} exceeds device max {}",
848 chunk_points, caps.max_points_per_chunk
849 )));
850 }
851 if chunk_points == 0 {
852 return Err(Error::invalid_config("chunk_points cannot be 0"));
853 }
854 }
855
856 if cfg.target_queue_points == 0 {
857 return Err(Error::invalid_config("target_queue_points cannot be 0"));
858 }
859
860 Ok(())
861 }
862
863 fn compute_default_chunk_size(
864 caps: &DacCapabilities,
865 pps: u32,
866 target_queue_points: usize,
867 ) -> usize {
868 let target_chunk_ms = 10;
870 let time_based_points = (pps as usize * target_chunk_ms) / 1000;
871
872 let queue_based_max = target_queue_points / 4;
875
876 let max_points = caps.max_points_per_chunk.min(queue_based_max.max(100));
877 let min_points = 100;
878
879 time_based_points.clamp(min_points, max_points)
880 }
881}
882
883pub type OwnedDac = Dac;
885
886#[cfg(test)]
887mod tests {
888 use super::*;
889 use crate::backend::{StreamBackend, WriteOutcome};
890 use std::sync::atomic::{AtomicU64, AtomicUsize, Ordering};
891 use std::sync::Arc;
892
893 struct TestBackend {
895 caps: DacCapabilities,
896 connected: bool,
897 write_count: Arc<AtomicUsize>,
899 would_block_count: Arc<AtomicUsize>,
901 queued: Arc<AtomicU64>,
903 shutter_open: Arc<AtomicBool>,
905 }
906
907 impl TestBackend {
908 fn new() -> Self {
909 Self {
910 caps: DacCapabilities {
911 pps_min: 1000,
912 pps_max: 100000,
913 max_points_per_chunk: 1000,
914 prefers_constant_pps: false,
915 can_estimate_queue: true,
916 output_model: crate::types::OutputModel::NetworkFifo,
917 },
918 connected: false,
919 write_count: Arc::new(AtomicUsize::new(0)),
920 would_block_count: Arc::new(AtomicUsize::new(0)),
921 queued: Arc::new(AtomicU64::new(0)),
922 shutter_open: Arc::new(AtomicBool::new(false)),
923 }
924 }
925
926 fn with_would_block_count(mut self, count: usize) -> Self {
927 self.would_block_count = Arc::new(AtomicUsize::new(count));
928 self
929 }
930 }
931
932 impl StreamBackend for TestBackend {
933 fn dac_type(&self) -> DacType {
934 DacType::Custom("Test".to_string())
935 }
936
937 fn caps(&self) -> &DacCapabilities {
938 &self.caps
939 }
940
941 fn connect(&mut self) -> Result<()> {
942 self.connected = true;
943 Ok(())
944 }
945
946 fn disconnect(&mut self) -> Result<()> {
947 self.connected = false;
948 Ok(())
949 }
950
951 fn is_connected(&self) -> bool {
952 self.connected
953 }
954
955 fn try_write_chunk(&mut self, _pps: u32, points: &[LaserPoint]) -> Result<WriteOutcome> {
956 self.write_count.fetch_add(1, Ordering::SeqCst);
957
958 let remaining = self.would_block_count.load(Ordering::SeqCst);
960 if remaining > 0 {
961 self.would_block_count.fetch_sub(1, Ordering::SeqCst);
962 return Ok(WriteOutcome::WouldBlock);
963 }
964
965 self.queued.fetch_add(points.len() as u64, Ordering::SeqCst);
966 Ok(WriteOutcome::Written)
967 }
968
969 fn stop(&mut self) -> Result<()> {
970 Ok(())
971 }
972
973 fn set_shutter(&mut self, open: bool) -> Result<()> {
974 self.shutter_open.store(open, Ordering::SeqCst);
975 Ok(())
976 }
977
978 fn queued_points(&self) -> Option<u64> {
979 Some(self.queued.load(Ordering::SeqCst))
980 }
981 }
982
983 #[test]
984 fn test_stream_control_arm_disarm() {
985 let (tx, _rx) = mpsc::channel();
986 let control = StreamControl::new(tx);
987 assert!(!control.is_armed());
988
989 control.arm().unwrap();
990 assert!(control.is_armed());
991
992 control.disarm().unwrap();
993 assert!(!control.is_armed());
994 }
995
996 #[test]
997 fn test_stream_control_stop() {
998 let (tx, _rx) = mpsc::channel();
999 let control = StreamControl::new(tx);
1000 assert!(!control.is_stop_requested());
1001
1002 control.stop().unwrap();
1003 assert!(control.is_stop_requested());
1004 }
1005
1006 #[test]
1007 fn test_stream_control_clone_shares_state() {
1008 let (tx, _rx) = mpsc::channel();
1009 let control1 = StreamControl::new(tx);
1010 let control2 = control1.clone();
1011
1012 control1.arm().unwrap();
1013 assert!(control2.is_armed());
1014
1015 control2.stop().unwrap();
1016 assert!(control1.is_stop_requested());
1017 }
1018
1019 #[test]
1020 fn test_device_start_stream_connects_backend() {
1021 let backend = TestBackend::new();
1022 let info = DacInfo {
1023 id: "test".to_string(),
1024 name: "Test Device".to_string(),
1025 kind: DacType::Custom("Test".to_string()),
1026 caps: backend.caps().clone(),
1027 };
1028 let device = Dac::new(info, Box::new(backend));
1029
1030 assert!(!device.is_connected());
1032
1033 let cfg = StreamConfig::new(30000);
1035 let result = device.start_stream(cfg);
1036 assert!(result.is_ok());
1037
1038 let (stream, _info) = result.unwrap();
1039 assert!(stream.backend.as_ref().unwrap().is_connected());
1040 }
1041
1042 #[test]
1043 fn test_handle_underrun_advances_state() {
1044 let mut backend = TestBackend::new();
1045 backend.connected = true;
1046 let info = DacInfo {
1047 id: "test".to_string(),
1048 name: "Test Device".to_string(),
1049 kind: DacType::Custom("Test".to_string()),
1050 caps: backend.caps().clone(),
1051 };
1052
1053 let cfg = StreamConfig::new(30000);
1054 let mut stream = Stream::with_backend(info, Box::new(backend), cfg, 100);
1055
1056 let initial_instant = stream.state.current_instant;
1058 let initial_scheduled = stream.state.scheduled_ahead;
1059 let initial_chunks = stream.state.stats.chunks_written;
1060 let initial_points = stream.state.stats.points_written;
1061
1062 let req = ChunkRequest {
1064 start: StreamInstant::new(0),
1065 pps: 30000,
1066 n_points: 100,
1067 scheduled_ahead_points: 0,
1068 device_queued_points: None,
1069 };
1070 stream.handle_underrun(&req).unwrap();
1071
1072 assert!(stream.state.current_instant > initial_instant);
1074 assert!(stream.state.scheduled_ahead > initial_scheduled);
1075 assert_eq!(stream.state.stats.chunks_written, initial_chunks + 1);
1076 assert_eq!(stream.state.stats.points_written, initial_points + 100);
1077 assert_eq!(stream.state.stats.underrun_count, 1);
1078 }
1079
1080 #[test]
1081 fn test_run_retries_on_would_block() {
1082 let backend = TestBackend::new().with_would_block_count(3);
1084 let write_count = backend.write_count.clone();
1085
1086 let mut backend_box: Box<dyn StreamBackend> = Box::new(backend);
1087 backend_box.connect().unwrap();
1088
1089 let info = DacInfo {
1090 id: "test".to_string(),
1091 name: "Test Device".to_string(),
1092 kind: DacType::Custom("Test".to_string()),
1093 caps: backend_box.caps().clone(),
1094 };
1095
1096 let cfg = StreamConfig::new(30000).with_target_queue_points(10000);
1097 let stream = Stream::with_backend(info, backend_box, cfg, 100);
1098
1099 let produced_count = Arc::new(AtomicUsize::new(0));
1100 let produced_count_clone = produced_count.clone();
1101 let result = stream.run(
1102 move |_req| {
1103 let count = produced_count_clone.fetch_add(1, Ordering::SeqCst);
1104 if count < 1 {
1105 Some(vec![LaserPoint::blanked(0.0, 0.0); 100])
1106 } else {
1107 None }
1109 },
1110 |_e| {},
1111 );
1112
1113 assert_eq!(result.unwrap(), RunExit::ProducerEnded);
1114 assert_eq!(write_count.load(Ordering::SeqCst), 4);
1116 }
1117
1118 #[test]
1119 fn test_arm_opens_shutter_disarm_closes_shutter() {
1120 let backend = TestBackend::new();
1121 let shutter_open = backend.shutter_open.clone();
1122
1123 let mut backend_box: Box<dyn StreamBackend> = Box::new(backend);
1124 backend_box.connect().unwrap();
1125
1126 let info = DacInfo {
1127 id: "test".to_string(),
1128 name: "Test Device".to_string(),
1129 kind: DacType::Custom("Test".to_string()),
1130 caps: backend_box.caps().clone(),
1131 };
1132
1133 let cfg = StreamConfig::new(30000);
1134 let mut stream = Stream::with_backend(info, backend_box, cfg, 100);
1135
1136 assert!(!shutter_open.load(Ordering::SeqCst));
1138
1139 let control = stream.control();
1141 control.arm().unwrap();
1142
1143 let stopped = stream.process_control_messages();
1145 assert!(!stopped);
1146 assert!(shutter_open.load(Ordering::SeqCst));
1147
1148 control.disarm().unwrap();
1150
1151 let stopped = stream.process_control_messages();
1153 assert!(!stopped);
1154 assert!(!shutter_open.load(Ordering::SeqCst));
1155 }
1156
1157 #[test]
1158 fn test_handle_underrun_blanks_when_disarmed() {
1159 let backend = TestBackend::new();
1160
1161 let mut backend_box: Box<dyn StreamBackend> = Box::new(backend);
1162 backend_box.connect().unwrap();
1163
1164 let info = DacInfo {
1165 id: "test".to_string(),
1166 name: "Test Device".to_string(),
1167 kind: DacType::Custom("Test".to_string()),
1168 caps: backend_box.caps().clone(),
1169 };
1170
1171 let cfg = StreamConfig::new(30000).with_underrun(UnderrunPolicy::RepeatLast);
1173 let mut stream = Stream::with_backend(info, backend_box, cfg, 100);
1174
1175 stream.state.last_chunk = Some(vec![
1177 LaserPoint::new(0.5, 0.5, 65535, 65535, 65535, 65535);
1178 100
1179 ]);
1180
1181 assert!(!stream.control.is_armed());
1183
1184 let req = ChunkRequest {
1185 start: StreamInstant::new(0),
1186 pps: 30000,
1187 n_points: 100,
1188 scheduled_ahead_points: 0,
1189 device_queued_points: None,
1190 };
1191
1192 stream.handle_underrun(&req).unwrap();
1194
1195 let last = stream.state.last_chunk.as_ref().unwrap();
1199 assert_eq!(last[0].r, 65535); }
1201
1202 #[test]
1203 fn test_stop_closes_shutter() {
1204 let backend = TestBackend::new();
1205 let shutter_open = backend.shutter_open.clone();
1206
1207 let mut backend_box: Box<dyn StreamBackend> = Box::new(backend);
1208 backend_box.connect().unwrap();
1209
1210 let info = DacInfo {
1211 id: "test".to_string(),
1212 name: "Test Device".to_string(),
1213 kind: DacType::Custom("Test".to_string()),
1214 caps: backend_box.caps().clone(),
1215 };
1216
1217 let cfg = StreamConfig::new(30000);
1218 let mut stream = Stream::with_backend(info, backend_box, cfg, 100);
1219
1220 stream.control.arm().unwrap();
1222 stream.process_control_messages();
1223 assert!(shutter_open.load(Ordering::SeqCst));
1224
1225 stream.stop().unwrap();
1227 assert!(!shutter_open.load(Ordering::SeqCst));
1228 }
1229
1230 #[test]
1231 fn test_arm_disarm_arm_cycle() {
1232 let backend = TestBackend::new();
1233 let shutter_open = backend.shutter_open.clone();
1234
1235 let mut backend_box: Box<dyn StreamBackend> = Box::new(backend);
1236 backend_box.connect().unwrap();
1237
1238 let info = DacInfo {
1239 id: "test".to_string(),
1240 name: "Test Device".to_string(),
1241 kind: DacType::Custom("Test".to_string()),
1242 caps: backend_box.caps().clone(),
1243 };
1244
1245 let cfg = StreamConfig::new(30000);
1246 let mut stream = Stream::with_backend(info, backend_box, cfg, 100);
1247 let control = stream.control();
1248
1249 assert!(!control.is_armed());
1251 assert!(!shutter_open.load(Ordering::SeqCst));
1252
1253 control.arm().unwrap();
1255 stream.process_control_messages();
1256 assert!(control.is_armed());
1257 assert!(shutter_open.load(Ordering::SeqCst));
1258
1259 control.disarm().unwrap();
1261 stream.process_control_messages();
1262 assert!(!control.is_armed());
1263 assert!(!shutter_open.load(Ordering::SeqCst));
1264
1265 control.arm().unwrap();
1267 stream.process_control_messages();
1268 assert!(control.is_armed());
1269 assert!(shutter_open.load(Ordering::SeqCst));
1270 }
1271}