Skip to main content

laser_dac/
stream.rs

1//! Stream and Dac types for point output.
2//!
3//! This module provides the `Stream` type for streaming point chunks to a DAC,
4//! `StreamControl` for out-of-band control (arm/disarm/stop), and `Dac` for
5//! connected devices that can start streaming sessions.
6//!
7//! # Armed/Disarmed Model
8//!
9//! Streams use a binary armed/disarmed safety model:
10//!
11//! - **Armed**: Content passes through to device, shutter opened (best-effort).
12//! - **Disarmed** (default): Shutter closed, intensity/RGB forced to 0.
13//!
14//! All streams start disarmed. Call `arm()` to enable output.
15//! `arm()` and `disarm()` are the only safety controls β€” there is no separate
16//! shutter API. This keeps the mental model simple: armed = laser may emit.
17//!
18//! # Hardware Shutter Support
19//!
20//! Shutter control is best-effort and varies by backend:
21//! - **LaserCube USB/WiFi**: Actual hardware control
22//! - **Helios**: Hardware shutter control via USB interrupt
23//! - **Ether Dream, IDN**: No-op (safety relies on software blanking)
24//!
25//! # Disconnect Behavior
26//!
27//! No automatic reconnection. On disconnect, create a new `Dac` and `Stream`.
28//! New streams always start disarmed for safety.
29
30use std::sync::atomic::{AtomicBool, Ordering};
31use std::sync::mpsc::{self, Receiver, Sender, TryRecvError};
32use std::sync::{Arc, Mutex};
33use std::time::Duration;
34
35use crate::backend::{Error, Result, StreamBackend, WriteOutcome};
36use crate::types::{
37    ChunkRequest, DacCapabilities, DacInfo, DacType, LaserPoint, OutputModel, RunExit,
38    StreamConfig, StreamInstant, StreamStats, StreamStatus, UnderrunPolicy,
39};
40
41// =============================================================================
42// Stream Control
43// =============================================================================
44
45/// Control messages sent from StreamControl to Stream.
46///
47/// These messages allow out-of-band control actions to take effect immediately,
48/// even when the stream is waiting (pacing, backpressure, etc.).
49#[derive(Debug, Clone, Copy)]
50enum ControlMsg {
51    /// Arm the output (opens hardware shutter).
52    Arm,
53    /// Disarm the output (closes hardware shutter).
54    Disarm,
55    /// Request the stream to stop.
56    Stop,
57}
58
59/// Thread-safe control handle for safety-critical actions.
60///
61/// This allows out-of-band control of the stream (arm/disarm/stop) from
62/// a different thread, e.g., for E-stop functionality.
63///
64/// Control actions take effect as soon as possible - the stream processes
65/// control messages at every opportunity (during waits, between retries, etc.).
66#[derive(Clone)]
67pub struct StreamControl {
68    inner: Arc<StreamControlInner>,
69}
70
71struct StreamControlInner {
72    /// Whether output is armed (laser can fire).
73    armed: AtomicBool,
74    /// Whether a stop has been requested.
75    stop_requested: AtomicBool,
76    /// Channel for sending control messages to the stream loop.
77    /// Wrapped in Mutex because Sender is Send but not Sync.
78    control_tx: Mutex<Sender<ControlMsg>>,
79}
80
81impl StreamControl {
82    fn new(control_tx: Sender<ControlMsg>) -> Self {
83        Self {
84            inner: Arc::new(StreamControlInner {
85                armed: AtomicBool::new(false),
86                stop_requested: AtomicBool::new(false),
87                control_tx: Mutex::new(control_tx),
88            }),
89        }
90    }
91
92    /// Arm the output (allow laser to fire).
93    ///
94    /// When armed, content from the producer passes through unmodified
95    /// and the hardware shutter is opened (best-effort).
96    pub fn arm(&self) -> Result<()> {
97        self.inner.armed.store(true, Ordering::SeqCst);
98        // Send message to stream for immediate shutter control
99        if let Ok(tx) = self.inner.control_tx.lock() {
100            let _ = tx.send(ControlMsg::Arm);
101        }
102        Ok(())
103    }
104
105    /// Disarm the output (force laser off). Designed for E-stop use.
106    ///
107    /// Immediately sets an atomic flag (works even if stream loop is blocked),
108    /// then sends a message to close the hardware shutter. All future points
109    /// are blanked in software. The stream stays alive outputting blanks -
110    /// use `stop()` to terminate entirely.
111    ///
112    /// **Latency**: Points already in the device buffer will still play out.
113    /// `target_queue_points` bounds this latency.
114    ///
115    /// **Hardware shutter**: Best-effort. LaserCube and Helios have actual hardware
116    /// control; Ether Dream, IDN are no-ops (safety relies on software blanking).
117    pub fn disarm(&self) -> Result<()> {
118        self.inner.armed.store(false, Ordering::SeqCst);
119        // Send message to stream for immediate shutter control
120        if let Ok(tx) = self.inner.control_tx.lock() {
121            let _ = tx.send(ControlMsg::Disarm);
122        }
123        Ok(())
124    }
125
126    /// Check if the output is armed.
127    pub fn is_armed(&self) -> bool {
128        self.inner.armed.load(Ordering::SeqCst)
129    }
130
131    /// Request the stream to stop.
132    ///
133    /// Signals termination; `run()` returns `RunExit::Stopped`.
134    /// For clean shutdown with shutter close, prefer `Stream::stop()`.
135    pub fn stop(&self) -> Result<()> {
136        self.inner.stop_requested.store(true, Ordering::SeqCst);
137        // Send message to stream for immediate stop
138        if let Ok(tx) = self.inner.control_tx.lock() {
139            let _ = tx.send(ControlMsg::Stop);
140        }
141        Ok(())
142    }
143
144    /// Check if a stop has been requested.
145    pub fn is_stop_requested(&self) -> bool {
146        self.inner.stop_requested.load(Ordering::SeqCst)
147    }
148}
149
150// =============================================================================
151// Stream State
152// =============================================================================
153
154struct StreamState {
155    /// Current position in stream time (points since start).
156    current_instant: StreamInstant,
157    /// Points scheduled ahead of current_instant.
158    scheduled_ahead: u64,
159    /// Last chunk that was produced (for repeat-last underrun policy).
160    last_chunk: Option<Vec<LaserPoint>>,
161    /// Statistics.
162    stats: StreamStats,
163    /// Track the last armed state to detect transitions.
164    last_armed: bool,
165    /// Whether the hardware shutter is currently open.
166    shutter_open: bool,
167}
168
169impl StreamState {
170    fn new() -> Self {
171        Self {
172            current_instant: StreamInstant::new(0),
173            scheduled_ahead: 0,
174            last_chunk: None,
175            stats: StreamStats::default(),
176            last_armed: false,
177            shutter_open: false,
178        }
179    }
180}
181
182// =============================================================================
183// Stream
184// =============================================================================
185
186/// A streaming session for outputting point chunks to a DAC.
187///
188/// The stream provides two modes of operation:
189///
190/// - **Blocking mode**: Call `next_request()` to get what to produce, then `write()`.
191/// - **Callback mode**: Call `run()` with a producer closure.
192///
193/// The stream owns pacing, backpressure, and the timebase (`StreamInstant`).
194pub struct Stream {
195    /// Device info for this stream.
196    info: DacInfo,
197    /// The backend.
198    backend: Option<Box<dyn StreamBackend>>,
199    /// Stream configuration.
200    config: StreamConfig,
201    /// Resolved chunk size.
202    chunk_points: usize,
203    /// Thread-safe control handle.
204    control: StreamControl,
205    /// Receiver for control messages from StreamControl.
206    control_rx: Receiver<ControlMsg>,
207    /// Stream state.
208    state: StreamState,
209}
210
211impl Stream {
212    /// Create a new stream with a backend.
213    pub(crate) fn with_backend(
214        info: DacInfo,
215        backend: Box<dyn StreamBackend>,
216        config: StreamConfig,
217        chunk_points: usize,
218    ) -> Self {
219        let (control_tx, control_rx) = mpsc::channel();
220        Self {
221            info,
222            backend: Some(backend),
223            config,
224            chunk_points,
225            control: StreamControl::new(control_tx),
226            control_rx,
227            state: StreamState::new(),
228        }
229    }
230
231    /// Returns the device info.
232    pub fn info(&self) -> &DacInfo {
233        &self.info
234    }
235
236    /// Returns the stream configuration.
237    pub fn config(&self) -> &StreamConfig {
238        &self.config
239    }
240
241    /// Returns a thread-safe control handle.
242    pub fn control(&self) -> StreamControl {
243        self.control.clone()
244    }
245
246    /// The resolved chunk size chosen for this stream.
247    ///
248    /// This is fixed for the lifetime of the stream.
249    pub fn chunk_points(&self) -> usize {
250        self.chunk_points
251    }
252
253    /// Returns the current stream status.
254    pub fn status(&self) -> Result<StreamStatus> {
255        let device_queued_points = self.backend.as_ref().and_then(|b| b.queued_points());
256
257        Ok(StreamStatus {
258            connected: self
259                .backend
260                .as_ref()
261                .map(|b| b.is_connected())
262                .unwrap_or(false),
263            chunk_points: self.chunk_points,
264            scheduled_ahead_points: self.state.scheduled_ahead,
265            device_queued_points,
266            stats: Some(self.state.stats.clone()),
267        })
268    }
269
270    /// Blocks until the stream wants the next chunk.
271    ///
272    /// Returns a `ChunkRequest` describing exactly what to produce.
273    /// The producer must return exactly `req.n_points` points.
274    pub fn next_request(&mut self) -> Result<ChunkRequest> {
275        // Check for stop request
276        if self.control.is_stop_requested() {
277            return Err(Error::Stopped);
278        }
279
280        // Check for backend
281        let backend = self
282            .backend
283            .as_ref()
284            .ok_or_else(|| Error::disconnected("no backend"))?;
285
286        if !backend.is_connected() {
287            return Err(Error::disconnected("backend disconnected"));
288        }
289
290        // Wait for the right time to request the next chunk.
291        self.wait_for_ready()?;
292
293        let device_queued_points = self.backend.as_ref().and_then(|b| b.queued_points());
294
295        Ok(ChunkRequest {
296            start: self.state.current_instant,
297            pps: self.config.pps,
298            n_points: self.chunk_points,
299            scheduled_ahead_points: self.state.scheduled_ahead,
300            device_queued_points,
301        })
302    }
303
304    /// Writes exactly `req.n_points` points for the given request.
305    ///
306    /// If the device cannot accept data immediately (backpressure), this method
307    /// retries automatically with brief sleeps until the write succeeds or the
308    /// stream is stopped.
309    ///
310    /// # Contract
311    ///
312    /// - `points.len()` must equal `req.n_points`.
313    /// - The request must be the most recent one from `next_request()`.
314    ///
315    /// # Shutter Control
316    ///
317    /// This method manages the hardware shutter based on arm state transitions:
318    /// - When transitioning from armed to disarmed, the shutter is closed (best-effort).
319    /// - When transitioning from disarmed to armed, the shutter is opened (best-effort).
320    pub fn write(&mut self, req: &ChunkRequest, points: &[LaserPoint]) -> Result<()> {
321        // Validate point count
322        if points.len() != req.n_points {
323            return Err(Error::invalid_config(format!(
324                "expected {} points, got {}",
325                req.n_points,
326                points.len()
327            )));
328        }
329
330        loop {
331            // Check for stop request
332            if self.control.is_stop_requested() {
333                return Err(Error::Stopped);
334            }
335
336            let is_armed = self.control.is_armed();
337
338            // Handle shutter transitions
339            self.handle_shutter_transition(is_armed);
340
341            // Write to backend (optimized: no allocation when armed)
342            let backend = self
343                .backend
344                .as_mut()
345                .ok_or_else(|| Error::disconnected("no backend"))?;
346
347            let outcome = if is_armed {
348                // Armed: pass points directly to backend (zero-copy)
349                backend.try_write_chunk(self.config.pps, points)?
350            } else {
351                // Disarmed: blank all points (allocate only when needed)
352                let blanked: Vec<LaserPoint> = points
353                    .iter()
354                    .map(|p| LaserPoint::blanked(p.x, p.y))
355                    .collect();
356                backend.try_write_chunk(self.config.pps, &blanked)?
357            };
358
359            match outcome {
360                WriteOutcome::Written => {
361                    // Update state
362                    if is_armed {
363                        self.state.last_chunk = Some(points.to_vec());
364                    }
365                    self.state.current_instant += self.chunk_points as u64;
366                    if self.info.caps.output_model == OutputModel::UsbFrameSwap {
367                        self.state.scheduled_ahead = self.chunk_points as u64;
368                    } else {
369                        self.state.scheduled_ahead += self.chunk_points as u64;
370                    }
371                    self.state.stats.chunks_written += 1;
372                    self.state.stats.points_written += self.chunk_points as u64;
373                    return Ok(());
374                }
375                WriteOutcome::WouldBlock => {
376                    // Backend buffer full - yield and retry
377                    std::thread::yield_now();
378
379                    if self.process_control_messages() {
380                        return Err(Error::Stopped);
381                    }
382
383                    std::thread::sleep(Duration::from_micros(100));
384
385                    if self.process_control_messages() {
386                        return Err(Error::Stopped);
387                    }
388                }
389            }
390        }
391    }
392
393    /// Handle hardware shutter transitions based on arm state changes.
394    fn handle_shutter_transition(&mut self, is_armed: bool) {
395        let was_armed = self.state.last_armed;
396        self.state.last_armed = is_armed;
397
398        if was_armed && !is_armed {
399            // Disarmed: close the shutter for safety (best-effort)
400            if self.state.shutter_open {
401                if let Some(backend) = &mut self.backend {
402                    let _ = backend.set_shutter(false); // Best-effort, ignore errors
403                }
404                self.state.shutter_open = false;
405            }
406        } else if !was_armed && is_armed {
407            // Armed: open the shutter (best-effort)
408            if !self.state.shutter_open {
409                if let Some(backend) = &mut self.backend {
410                    let _ = backend.set_shutter(true); // Best-effort, ignore errors
411                }
412                self.state.shutter_open = true;
413            }
414        }
415    }
416
417    /// Stop the stream and terminate output.
418    ///
419    /// Disarms the output (software blanking + hardware shutter) before stopping
420    /// the backend to prevent the "freeze on last bright point" hazard.
421    /// Use `disarm()` instead if you want to keep the stream alive but safe.
422    pub fn stop(&mut self) -> Result<()> {
423        // Disarm: sets armed flag for software blanking
424        self.control.disarm()?;
425
426        self.control.stop()?;
427
428        // Directly close shutter and stop backend (defense-in-depth)
429        if let Some(backend) = &mut self.backend {
430            let _ = backend.set_shutter(false);
431            backend.stop()?;
432        }
433
434        Ok(())
435    }
436
437    /// Consume the stream and recover the device for reuse.
438    ///
439    /// This method disarms and stops the stream (software blanking + hardware shutter),
440    /// then returns the underlying `Dac` along with the final `StreamStats`.
441    /// The device can then be used to start a new stream with different configuration.
442    ///
443    /// # Example
444    ///
445    /// ```ignore
446    /// let (stream, info) = device.start_stream(config)?;
447    /// // ... stream for a while ...
448    /// let (device, stats) = stream.into_dac();
449    /// println!("Streamed {} points", stats.points_written);
450    ///
451    /// // Restart with different config
452    /// let new_config = StreamConfig::new(60_000);
453    /// let (stream2, _) = device.start_stream(new_config)?;
454    /// ```
455    pub fn into_dac(mut self) -> (Dac, StreamStats) {
456        // Disarm (software blanking) and close shutter before stopping
457        let _ = self.control.disarm();
458        let _ = self.control.stop();
459        if let Some(backend) = &mut self.backend {
460            let _ = backend.set_shutter(false);
461            let _ = backend.stop();
462        }
463
464        // Take the backend (leaves None, so Drop won't try to stop again)
465        let backend = self.backend.take();
466        let stats = self.state.stats.clone();
467
468        let dac = Dac {
469            info: self.info.clone(),
470            backend,
471        };
472
473        (dac, stats)
474    }
475
476    /// Run the stream in callback mode.
477    ///
478    /// The producer is called whenever the stream needs a new chunk.
479    /// Return `Some(points)` to continue, or `None` to end the stream.
480    ///
481    /// # Error Classification
482    ///
483    /// The `on_error` callback receives recoverable errors that don't terminate the stream.
484    /// Terminal conditions result in returning from `run()`:
485    ///
486    /// - **`RunExit::Stopped`**: Stream was stopped via `StreamControl::stop()` or underrun policy.
487    /// - **`RunExit::ProducerEnded`**: Producer returned `None`.
488    /// - **`RunExit::Disconnected`**: Device disconnected or became unreachable.
489    ///
490    /// Recoverable errors (reported via `on_error`, stream continues):
491    /// - Transient backend errors that don't indicate disconnection.
492    pub fn run<F, E>(mut self, mut producer: F, mut on_error: E) -> Result<RunExit>
493    where
494        F: FnMut(ChunkRequest) -> Option<Vec<LaserPoint>> + Send + 'static,
495        E: FnMut(Error) + Send + 'static,
496    {
497        loop {
498            // Check for stop request
499            if self.control.is_stop_requested() {
500                return Ok(RunExit::Stopped);
501            }
502
503            // Get next request
504            let req = match self.next_request() {
505                Ok(req) => req,
506                Err(e) if e.is_stopped() => {
507                    return Ok(RunExit::Stopped);
508                }
509                Err(e) if e.is_disconnected() => {
510                    on_error(e);
511                    return Ok(RunExit::Disconnected);
512                }
513                Err(e) => {
514                    // Recoverable error - report and retry
515                    on_error(e);
516                    continue;
517                }
518            };
519
520            // Process control messages before calling producer
521            if self.process_control_messages() {
522                return Ok(RunExit::Stopped);
523            }
524
525            // Call producer
526            match producer(req.clone()) {
527                Some(points) => {
528                    match self.write(&req, &points) {
529                        Ok(()) => {}
530                        Err(e) if e.is_stopped() => {
531                            return Ok(RunExit::Stopped);
532                        }
533                        Err(e) if e.is_disconnected() => {
534                            on_error(e);
535                            return Ok(RunExit::Disconnected);
536                        }
537                        Err(e) => {
538                            // Recoverable error - report and handle underrun
539                            on_error(e);
540                            if let Err(e2) = self.handle_underrun(&req) {
541                                // Underrun handling can also hit terminal conditions
542                                if e2.is_stopped() {
543                                    return Ok(RunExit::Stopped);
544                                }
545                                on_error(e2);
546                            }
547                        }
548                    }
549                }
550                None => {
551                    return Ok(RunExit::ProducerEnded);
552                }
553            }
554        }
555    }
556
557    // =========================================================================
558    // Internal helpers
559    // =========================================================================
560
561    /// Process any pending control messages from StreamControl.
562    ///
563    /// This method drains the control message queue and takes immediate action:
564    /// - `Arm`: Opens the shutter (best-effort)
565    /// - `Disarm`: Closes the shutter immediately
566    /// - `Stop`: Returns `true` to signal the caller to stop
567    ///
568    /// Returns `true` if stop was requested, `false` otherwise.
569    fn process_control_messages(&mut self) -> bool {
570        loop {
571            match self.control_rx.try_recv() {
572                Ok(ControlMsg::Arm) => {
573                    // Open shutter (best-effort) if not already open
574                    if !self.state.shutter_open {
575                        if let Some(backend) = &mut self.backend {
576                            let _ = backend.set_shutter(true);
577                        }
578                        self.state.shutter_open = true;
579                    }
580                }
581                Ok(ControlMsg::Disarm) => {
582                    // Close shutter immediately for safety
583                    if self.state.shutter_open {
584                        if let Some(backend) = &mut self.backend {
585                            let _ = backend.set_shutter(false);
586                        }
587                        self.state.shutter_open = false;
588                    }
589                }
590                Ok(ControlMsg::Stop) => {
591                    return true;
592                }
593                Err(TryRecvError::Empty) => break,
594                Err(TryRecvError::Disconnected) => break,
595            }
596        }
597        false
598    }
599
600    /// Wait until we're ready for the next chunk (pacing).
601    ///
602    /// Sleeps in small slices to allow processing control messages promptly.
603    fn wait_for_ready(&mut self) -> Result<()> {
604        // Maximum sleep slice - controls responsiveness to control messages
605        const SLEEP_SLICE: Duration = Duration::from_millis(5);
606
607        let target = self.config.target_queue_points as u64;
608
609        // Use the more accurate queue depth when available from the device.
610        // This handles cases where the device reports actual buffer state,
611        // which may differ from our software-tracked scheduled_ahead.
612        let effective_queue = if self.info.caps.can_estimate_queue {
613            self.backend
614                .as_ref()
615                .and_then(|b| b.queued_points())
616                .map(|device_q| device_q.max(self.state.scheduled_ahead))
617                .unwrap_or(self.state.scheduled_ahead)
618        } else {
619            self.state.scheduled_ahead
620        };
621
622        if effective_queue < target {
623            return Ok(());
624        }
625
626        let points_to_drain = effective_queue.saturating_sub(target / 2);
627        let seconds_to_wait = points_to_drain as f64 / self.config.pps as f64;
628        let wait_duration = Duration::from_secs_f64(seconds_to_wait.min(0.1));
629
630        // Sleep in small slices to process control messages promptly
631        let mut remaining = wait_duration;
632        while remaining > Duration::ZERO {
633            let slice = remaining.min(SLEEP_SLICE);
634            std::thread::sleep(slice);
635            remaining = remaining.saturating_sub(slice);
636
637            // Process control messages - handle shutter close immediately
638            if self.process_control_messages() {
639                return Err(Error::Stopped);
640            }
641        }
642
643        let elapsed = wait_duration.as_secs_f64();
644        let points_drained = (elapsed * self.config.pps as f64) as u64;
645        self.state.scheduled_ahead = self.state.scheduled_ahead.saturating_sub(points_drained);
646
647        Ok(())
648    }
649
650    /// Handle an underrun by applying the underrun policy.
651    ///
652    /// # Safety Behavior
653    ///
654    /// When disarmed, this always outputs blanked points regardless of the underrun
655    /// policy. The `RepeatLast` policy means "repeat last armed content" - when
656    /// disarmed, repeating content would be unsafe.
657    fn handle_underrun(&mut self, req: &ChunkRequest) -> Result<()> {
658        self.state.stats.underrun_count += 1;
659
660        let is_armed = self.control.is_armed();
661
662        // Handle shutter transitions (same safety behavior as write())
663        self.handle_shutter_transition(is_armed);
664
665        // Determine fill points based on arm state and policy
666        let fill_points: Vec<LaserPoint> = if !is_armed {
667            // When disarmed, always output blanked points for safety
668            // RepeatLast means "repeat last armed content" - meaningless when disarmed
669            vec![LaserPoint::blanked(0.0, 0.0); req.n_points]
670        } else {
671            match &self.config.underrun {
672                UnderrunPolicy::RepeatLast => self
673                    .state
674                    .last_chunk
675                    .clone()
676                    .unwrap_or_else(|| vec![LaserPoint::blanked(0.0, 0.0); req.n_points]),
677                UnderrunPolicy::Blank => {
678                    vec![LaserPoint::blanked(0.0, 0.0); req.n_points]
679                }
680                UnderrunPolicy::Park { x, y } => {
681                    vec![LaserPoint::blanked(*x, *y); req.n_points]
682                }
683                UnderrunPolicy::Stop => {
684                    self.control.stop()?;
685                    return Err(Error::Stopped);
686                }
687            }
688        };
689
690        if let Some(backend) = &mut self.backend {
691            match backend.try_write_chunk(self.config.pps, &fill_points) {
692                Ok(WriteOutcome::Written) => {
693                    // Update stream state to keep timebase accurate
694                    let n_points = fill_points.len();
695                    // Only update last_chunk when armed (it's the "last armed content")
696                    if is_armed {
697                        self.state.last_chunk = Some(fill_points);
698                    }
699                    self.state.current_instant += n_points as u64;
700                    if self.info.caps.output_model == OutputModel::UsbFrameSwap {
701                        self.state.scheduled_ahead = n_points as u64;
702                    } else {
703                        self.state.scheduled_ahead += n_points as u64;
704                    }
705                    self.state.stats.chunks_written += 1;
706                    self.state.stats.points_written += n_points as u64;
707                }
708                Ok(WriteOutcome::WouldBlock) => {
709                    // Backend is full, can't write fill points - this is expected
710                }
711                Err(_) => {
712                    // Backend error during underrun handling - ignore, we're already recovering
713                }
714            }
715        }
716
717        Ok(())
718    }
719}
720
721impl Drop for Stream {
722    fn drop(&mut self) {
723        let _ = self.stop();
724    }
725}
726
727// =============================================================================
728// Device
729// =============================================================================
730
731/// A connected device that can start streaming sessions.
732///
733/// When starting a stream, the device is consumed and the backend ownership
734/// transfers to the stream. The `DacInfo` is returned alongside the stream
735/// so metadata remains accessible.
736///
737/// # Example
738///
739/// ```ignore
740/// let device = open_device("my-device")?;
741/// let config = StreamConfig::new(30_000);
742/// let (stream, info) = device.start_stream(config)?;
743/// println!("Streaming to: {}", info.name);
744/// ```
745pub struct Dac {
746    info: DacInfo,
747    backend: Option<Box<dyn StreamBackend>>,
748}
749
750impl Dac {
751    /// Create a new device from a backend.
752    pub fn new(info: DacInfo, backend: Box<dyn StreamBackend>) -> Self {
753        Self {
754            info,
755            backend: Some(backend),
756        }
757    }
758
759    /// Returns the device info.
760    pub fn info(&self) -> &DacInfo {
761        &self.info
762    }
763
764    /// Returns the device ID.
765    pub fn id(&self) -> &str {
766        &self.info.id
767    }
768
769    /// Returns the device name.
770    pub fn name(&self) -> &str {
771        &self.info.name
772    }
773
774    /// Returns the DAC type.
775    pub fn kind(&self) -> &DacType {
776        &self.info.kind
777    }
778
779    /// Returns the device capabilities.
780    pub fn caps(&self) -> &DacCapabilities {
781        &self.info.caps
782    }
783
784    /// Returns whether the device has a backend (not yet used for a stream).
785    pub fn has_backend(&self) -> bool {
786        self.backend.is_some()
787    }
788
789    /// Returns whether the device is connected.
790    pub fn is_connected(&self) -> bool {
791        self.backend
792            .as_ref()
793            .map(|b| b.is_connected())
794            .unwrap_or(false)
795    }
796
797    /// Starts a streaming session, consuming the device.
798    ///
799    /// # Ownership
800    ///
801    /// This method consumes the `Dac` because:
802    /// - Each device can only have one active stream at a time.
803    /// - The backend is moved into the `Stream` to ensure exclusive access.
804    /// - This prevents accidental reuse of a device that's already streaming.
805    ///
806    /// The method returns both the `Stream` and a copy of `DacInfo`, so you
807    /// retain access to device metadata (id, name, capabilities) after starting.
808    ///
809    /// # Connection
810    ///
811    /// If the device is not already connected, this method will establish the
812    /// connection before creating the stream. Connection failures are returned
813    /// as errors.
814    ///
815    /// # Errors
816    ///
817    /// Returns an error if:
818    /// - The device backend has already been used for a stream.
819    /// - The configuration is invalid (PPS out of range, invalid chunk size, etc.).
820    /// - The backend fails to connect.
821    pub fn start_stream(mut self, cfg: StreamConfig) -> Result<(Stream, DacInfo)> {
822        let mut backend = self.backend.take().ok_or_else(|| {
823            Error::invalid_config("device backend has already been used for a stream")
824        })?;
825
826        Self::validate_config(&self.info.caps, &cfg)?;
827
828        // Connect the backend if not already connected
829        if !backend.is_connected() {
830            backend.connect()?;
831        }
832
833        let chunk_points = cfg.chunk_points.unwrap_or_else(|| {
834            Self::compute_default_chunk_size(&self.info.caps, cfg.pps, cfg.target_queue_points)
835        });
836
837        let stream = Stream::with_backend(self.info.clone(), backend, cfg, chunk_points);
838
839        Ok((stream, self.info))
840    }
841
842    fn validate_config(caps: &DacCapabilities, cfg: &StreamConfig) -> Result<()> {
843        if cfg.pps < caps.pps_min || cfg.pps > caps.pps_max {
844            return Err(Error::invalid_config(format!(
845                "PPS {} is outside device range [{}, {}]",
846                cfg.pps, caps.pps_min, caps.pps_max
847            )));
848        }
849
850        if let Some(chunk_points) = cfg.chunk_points {
851            if chunk_points > caps.max_points_per_chunk {
852                return Err(Error::invalid_config(format!(
853                    "chunk_points {} exceeds device max {}",
854                    chunk_points, caps.max_points_per_chunk
855                )));
856            }
857            if chunk_points == 0 {
858                return Err(Error::invalid_config("chunk_points cannot be 0"));
859            }
860        }
861
862        if cfg.target_queue_points == 0 {
863            return Err(Error::invalid_config("target_queue_points cannot be 0"));
864        }
865
866        Ok(())
867    }
868
869    fn compute_default_chunk_size(
870        caps: &DacCapabilities,
871        pps: u32,
872        target_queue_points: usize,
873    ) -> usize {
874        // Target ~10ms worth of points per chunk
875        let target_chunk_ms = 10;
876        let time_based_points = (pps as usize * target_chunk_ms) / 1000;
877
878        // Also bound by target queue: aim for ~ΒΌ of target queue per chunk.
879        // This ensures we don't send huge chunks relative to our latency target.
880        let queue_based_max = target_queue_points / 4;
881
882        let max_points = caps.max_points_per_chunk.min(queue_based_max.max(100));
883        let min_points = 100;
884
885        time_based_points.clamp(min_points, max_points)
886    }
887}
888
889/// Legacy alias for compatibility.
890pub type OwnedDac = Dac;
891
892#[cfg(test)]
893mod tests {
894    use super::*;
895    use crate::backend::{StreamBackend, WriteOutcome};
896    use std::sync::atomic::{AtomicU64, AtomicUsize, Ordering};
897    use std::sync::Arc;
898
899    /// A test backend for unit testing stream behavior.
900    struct TestBackend {
901        caps: DacCapabilities,
902        connected: bool,
903        /// Count of write attempts
904        write_count: Arc<AtomicUsize>,
905        /// Number of WouldBlock responses to return before accepting writes
906        would_block_count: Arc<AtomicUsize>,
907        /// Simulated queue depth
908        queued: Arc<AtomicU64>,
909        /// Track shutter state for testing
910        shutter_open: Arc<AtomicBool>,
911    }
912
913    impl TestBackend {
914        fn new() -> Self {
915            Self {
916                caps: DacCapabilities {
917                    pps_min: 1000,
918                    pps_max: 100000,
919                    max_points_per_chunk: 1000,
920                    prefers_constant_pps: false,
921                    can_estimate_queue: true,
922                    output_model: crate::types::OutputModel::NetworkFifo,
923                },
924                connected: false,
925                write_count: Arc::new(AtomicUsize::new(0)),
926                would_block_count: Arc::new(AtomicUsize::new(0)),
927                queued: Arc::new(AtomicU64::new(0)),
928                shutter_open: Arc::new(AtomicBool::new(false)),
929            }
930        }
931
932        fn with_would_block_count(mut self, count: usize) -> Self {
933            self.would_block_count = Arc::new(AtomicUsize::new(count));
934            self
935        }
936    }
937
938    impl StreamBackend for TestBackend {
939        fn dac_type(&self) -> DacType {
940            DacType::Custom("Test".to_string())
941        }
942
943        fn caps(&self) -> &DacCapabilities {
944            &self.caps
945        }
946
947        fn connect(&mut self) -> Result<()> {
948            self.connected = true;
949            Ok(())
950        }
951
952        fn disconnect(&mut self) -> Result<()> {
953            self.connected = false;
954            Ok(())
955        }
956
957        fn is_connected(&self) -> bool {
958            self.connected
959        }
960
961        fn try_write_chunk(&mut self, _pps: u32, points: &[LaserPoint]) -> Result<WriteOutcome> {
962            self.write_count.fetch_add(1, Ordering::SeqCst);
963
964            // Return WouldBlock until count reaches 0
965            let remaining = self.would_block_count.load(Ordering::SeqCst);
966            if remaining > 0 {
967                self.would_block_count.fetch_sub(1, Ordering::SeqCst);
968                return Ok(WriteOutcome::WouldBlock);
969            }
970
971            self.queued.fetch_add(points.len() as u64, Ordering::SeqCst);
972            Ok(WriteOutcome::Written)
973        }
974
975        fn stop(&mut self) -> Result<()> {
976            Ok(())
977        }
978
979        fn set_shutter(&mut self, open: bool) -> Result<()> {
980            self.shutter_open.store(open, Ordering::SeqCst);
981            Ok(())
982        }
983
984        fn queued_points(&self) -> Option<u64> {
985            Some(self.queued.load(Ordering::SeqCst))
986        }
987    }
988
989    #[test]
990    fn test_stream_control_arm_disarm() {
991        let (tx, _rx) = mpsc::channel();
992        let control = StreamControl::new(tx);
993        assert!(!control.is_armed());
994
995        control.arm().unwrap();
996        assert!(control.is_armed());
997
998        control.disarm().unwrap();
999        assert!(!control.is_armed());
1000    }
1001
1002    #[test]
1003    fn test_stream_control_stop() {
1004        let (tx, _rx) = mpsc::channel();
1005        let control = StreamControl::new(tx);
1006        assert!(!control.is_stop_requested());
1007
1008        control.stop().unwrap();
1009        assert!(control.is_stop_requested());
1010    }
1011
1012    #[test]
1013    fn test_stream_control_clone_shares_state() {
1014        let (tx, _rx) = mpsc::channel();
1015        let control1 = StreamControl::new(tx);
1016        let control2 = control1.clone();
1017
1018        control1.arm().unwrap();
1019        assert!(control2.is_armed());
1020
1021        control2.stop().unwrap();
1022        assert!(control1.is_stop_requested());
1023    }
1024
1025    #[test]
1026    fn test_device_start_stream_connects_backend() {
1027        let backend = TestBackend::new();
1028        let info = DacInfo {
1029            id: "test".to_string(),
1030            name: "Test Device".to_string(),
1031            kind: DacType::Custom("Test".to_string()),
1032            caps: backend.caps().clone(),
1033        };
1034        let device = Dac::new(info, Box::new(backend));
1035
1036        // Device should not be connected initially
1037        assert!(!device.is_connected());
1038
1039        // start_stream should connect and return a usable stream
1040        let cfg = StreamConfig::new(30000);
1041        let result = device.start_stream(cfg);
1042        assert!(result.is_ok());
1043
1044        let (stream, _info) = result.unwrap();
1045        assert!(stream.backend.as_ref().unwrap().is_connected());
1046    }
1047
1048    #[test]
1049    fn test_handle_underrun_advances_state() {
1050        let mut backend = TestBackend::new();
1051        backend.connected = true;
1052        let info = DacInfo {
1053            id: "test".to_string(),
1054            name: "Test Device".to_string(),
1055            kind: DacType::Custom("Test".to_string()),
1056            caps: backend.caps().clone(),
1057        };
1058
1059        let cfg = StreamConfig::new(30000);
1060        let mut stream = Stream::with_backend(info, Box::new(backend), cfg, 100);
1061
1062        // Record initial state
1063        let initial_instant = stream.state.current_instant;
1064        let initial_scheduled = stream.state.scheduled_ahead;
1065        let initial_chunks = stream.state.stats.chunks_written;
1066        let initial_points = stream.state.stats.points_written;
1067
1068        // Trigger underrun handling
1069        let req = ChunkRequest {
1070            start: StreamInstant::new(0),
1071            pps: 30000,
1072            n_points: 100,
1073            scheduled_ahead_points: 0,
1074            device_queued_points: None,
1075        };
1076        stream.handle_underrun(&req).unwrap();
1077
1078        // State should have advanced
1079        assert!(stream.state.current_instant > initial_instant);
1080        assert!(stream.state.scheduled_ahead > initial_scheduled);
1081        assert_eq!(stream.state.stats.chunks_written, initial_chunks + 1);
1082        assert_eq!(stream.state.stats.points_written, initial_points + 100);
1083        assert_eq!(stream.state.stats.underrun_count, 1);
1084    }
1085
1086    #[test]
1087    fn test_run_retries_on_would_block() {
1088        // Create a backend that returns WouldBlock 3 times before accepting
1089        let backend = TestBackend::new().with_would_block_count(3);
1090        let write_count = backend.write_count.clone();
1091
1092        let mut backend_box: Box<dyn StreamBackend> = Box::new(backend);
1093        backend_box.connect().unwrap();
1094
1095        let info = DacInfo {
1096            id: "test".to_string(),
1097            name: "Test Device".to_string(),
1098            kind: DacType::Custom("Test".to_string()),
1099            caps: backend_box.caps().clone(),
1100        };
1101
1102        let cfg = StreamConfig::new(30000).with_target_queue_points(10000);
1103        let stream = Stream::with_backend(info, backend_box, cfg, 100);
1104
1105        let produced_count = Arc::new(AtomicUsize::new(0));
1106        let produced_count_clone = produced_count.clone();
1107        let result = stream.run(
1108            move |_req| {
1109                let count = produced_count_clone.fetch_add(1, Ordering::SeqCst);
1110                if count < 1 {
1111                    Some(vec![LaserPoint::blanked(0.0, 0.0); 100])
1112                } else {
1113                    None // End after one chunk
1114                }
1115            },
1116            |_e| {},
1117        );
1118
1119        assert_eq!(result.unwrap(), RunExit::ProducerEnded);
1120        // Should have attempted write 4 times (3 WouldBlock + 1 success)
1121        assert_eq!(write_count.load(Ordering::SeqCst), 4);
1122    }
1123
1124    #[test]
1125    fn test_arm_opens_shutter_disarm_closes_shutter() {
1126        let backend = TestBackend::new();
1127        let shutter_open = backend.shutter_open.clone();
1128
1129        let mut backend_box: Box<dyn StreamBackend> = Box::new(backend);
1130        backend_box.connect().unwrap();
1131
1132        let info = DacInfo {
1133            id: "test".to_string(),
1134            name: "Test Device".to_string(),
1135            kind: DacType::Custom("Test".to_string()),
1136            caps: backend_box.caps().clone(),
1137        };
1138
1139        let cfg = StreamConfig::new(30000);
1140        let mut stream = Stream::with_backend(info, backend_box, cfg, 100);
1141
1142        // Initially shutter is closed
1143        assert!(!shutter_open.load(Ordering::SeqCst));
1144
1145        // Arm via control (this sends ControlMsg::Arm)
1146        let control = stream.control();
1147        control.arm().unwrap();
1148
1149        // Process control messages - this should open the shutter
1150        let stopped = stream.process_control_messages();
1151        assert!(!stopped);
1152        assert!(shutter_open.load(Ordering::SeqCst));
1153
1154        // Disarm (this sends ControlMsg::Disarm)
1155        control.disarm().unwrap();
1156
1157        // Process control messages - this should close the shutter
1158        let stopped = stream.process_control_messages();
1159        assert!(!stopped);
1160        assert!(!shutter_open.load(Ordering::SeqCst));
1161    }
1162
1163    #[test]
1164    fn test_handle_underrun_blanks_when_disarmed() {
1165        let backend = TestBackend::new();
1166
1167        let mut backend_box: Box<dyn StreamBackend> = Box::new(backend);
1168        backend_box.connect().unwrap();
1169
1170        let info = DacInfo {
1171            id: "test".to_string(),
1172            name: "Test Device".to_string(),
1173            kind: DacType::Custom("Test".to_string()),
1174            caps: backend_box.caps().clone(),
1175        };
1176
1177        // Use RepeatLast policy - but when disarmed, should still blank
1178        let cfg = StreamConfig::new(30000).with_underrun(UnderrunPolicy::RepeatLast);
1179        let mut stream = Stream::with_backend(info, backend_box, cfg, 100);
1180
1181        // Set some last_chunk with colored points
1182        stream.state.last_chunk = Some(vec![
1183            LaserPoint::new(0.5, 0.5, 65535, 65535, 65535, 65535);
1184            100
1185        ]);
1186
1187        // Ensure disarmed (default state)
1188        assert!(!stream.control.is_armed());
1189
1190        let req = ChunkRequest {
1191            start: StreamInstant::new(0),
1192            pps: 30000,
1193            n_points: 100,
1194            scheduled_ahead_points: 0,
1195            device_queued_points: None,
1196        };
1197
1198        // Handle underrun while disarmed
1199        stream.handle_underrun(&req).unwrap();
1200
1201        // last_chunk should NOT be updated (we're disarmed)
1202        // The actual write was blanked points, but we don't update last_chunk when disarmed
1203        // because "last armed content" hasn't changed
1204        let last = stream.state.last_chunk.as_ref().unwrap();
1205        assert_eq!(last[0].r, 65535); // Still the old colored points
1206    }
1207
1208    #[test]
1209    fn test_stop_closes_shutter() {
1210        let backend = TestBackend::new();
1211        let shutter_open = backend.shutter_open.clone();
1212
1213        let mut backend_box: Box<dyn StreamBackend> = Box::new(backend);
1214        backend_box.connect().unwrap();
1215
1216        let info = DacInfo {
1217            id: "test".to_string(),
1218            name: "Test Device".to_string(),
1219            kind: DacType::Custom("Test".to_string()),
1220            caps: backend_box.caps().clone(),
1221        };
1222
1223        let cfg = StreamConfig::new(30000);
1224        let mut stream = Stream::with_backend(info, backend_box, cfg, 100);
1225
1226        // Arm first to open shutter
1227        stream.control.arm().unwrap();
1228        stream.process_control_messages();
1229        assert!(shutter_open.load(Ordering::SeqCst));
1230
1231        // Stop should close shutter
1232        stream.stop().unwrap();
1233        assert!(!shutter_open.load(Ordering::SeqCst));
1234    }
1235
1236    #[test]
1237    fn test_arm_disarm_arm_cycle() {
1238        let backend = TestBackend::new();
1239        let shutter_open = backend.shutter_open.clone();
1240
1241        let mut backend_box: Box<dyn StreamBackend> = Box::new(backend);
1242        backend_box.connect().unwrap();
1243
1244        let info = DacInfo {
1245            id: "test".to_string(),
1246            name: "Test Device".to_string(),
1247            kind: DacType::Custom("Test".to_string()),
1248            caps: backend_box.caps().clone(),
1249        };
1250
1251        let cfg = StreamConfig::new(30000);
1252        let mut stream = Stream::with_backend(info, backend_box, cfg, 100);
1253        let control = stream.control();
1254
1255        // Initial state: disarmed
1256        assert!(!control.is_armed());
1257        assert!(!shutter_open.load(Ordering::SeqCst));
1258
1259        // Arm
1260        control.arm().unwrap();
1261        stream.process_control_messages();
1262        assert!(control.is_armed());
1263        assert!(shutter_open.load(Ordering::SeqCst));
1264
1265        // Disarm
1266        control.disarm().unwrap();
1267        stream.process_control_messages();
1268        assert!(!control.is_armed());
1269        assert!(!shutter_open.load(Ordering::SeqCst));
1270
1271        // Arm again
1272        control.arm().unwrap();
1273        stream.process_control_messages();
1274        assert!(control.is_armed());
1275        assert!(shutter_open.load(Ordering::SeqCst));
1276    }
1277
1278    #[test]
1279    fn test_blocking_write_retries_on_would_block() {
1280        // Simulates the Helios scenario: device returns WouldBlock on first
1281        // attempts (e.g., still busy from a previous session), then accepts.
1282        // The blocking write() must retry internally instead of propagating
1283        // WouldBlock as an error to the caller.
1284        let backend = TestBackend::new().with_would_block_count(3);
1285        let write_count = backend.write_count.clone();
1286
1287        let mut backend_box: Box<dyn StreamBackend> = Box::new(backend);
1288        backend_box.connect().unwrap();
1289
1290        let info = DacInfo {
1291            id: "test".to_string(),
1292            name: "Test Device".to_string(),
1293            kind: DacType::Custom("Test".to_string()),
1294            caps: backend_box.caps().clone(),
1295        };
1296
1297        let cfg = StreamConfig::new(30000).with_target_queue_points(10000);
1298        let mut stream = Stream::with_backend(info, backend_box, cfg, 100);
1299
1300        let req = stream.next_request().unwrap();
1301        let points = vec![LaserPoint::blanked(0.0, 0.0); req.n_points];
1302
1303        // write() must succeed despite the backend returning WouldBlock 3 times
1304        stream.write(&req, &points).unwrap();
1305
1306        // Should have attempted 4 times (3 WouldBlock + 1 Written)
1307        assert_eq!(write_count.load(Ordering::SeqCst), 4);
1308    }
1309
1310    #[test]
1311    fn test_write_stops_during_would_block_retry() {
1312        // If the backend never accepts writes, write() must not loop forever.
1313        // A stop() from another thread must break out of the retry loop.
1314        let backend = TestBackend::new().with_would_block_count(usize::MAX);
1315
1316        let mut backend_box: Box<dyn StreamBackend> = Box::new(backend);
1317        backend_box.connect().unwrap();
1318
1319        let info = DacInfo {
1320            id: "test".to_string(),
1321            name: "Test Device".to_string(),
1322            kind: DacType::Custom("Test".to_string()),
1323            caps: backend_box.caps().clone(),
1324        };
1325
1326        let cfg = StreamConfig::new(30000).with_target_queue_points(10000);
1327        let mut stream = Stream::with_backend(info, backend_box, cfg, 100);
1328
1329        let req = stream.next_request().unwrap();
1330        let points = vec![LaserPoint::blanked(0.0, 0.0); req.n_points];
1331
1332        // Stop from another thread while write() is stuck retrying
1333        let control = stream.control();
1334        let handle = std::thread::spawn(move || {
1335            std::thread::sleep(Duration::from_millis(10));
1336            control.stop().unwrap();
1337        });
1338
1339        let result = stream.write(&req, &points);
1340        assert!(result.unwrap_err().is_stopped());
1341        handle.join().unwrap();
1342    }
1343
1344    #[test]
1345    fn test_write_processes_disarm_during_would_block_retry() {
1346        // If disarm() is called while write() is retrying on WouldBlock,
1347        // the shutter must be closed during the retry loop β€” not deferred
1348        // until the next write() call.
1349        let backend = TestBackend::new().with_would_block_count(100);
1350        let shutter_open = backend.shutter_open.clone();
1351
1352        let mut backend_box: Box<dyn StreamBackend> = Box::new(backend);
1353        backend_box.connect().unwrap();
1354
1355        let info = DacInfo {
1356            id: "test".to_string(),
1357            name: "Test Device".to_string(),
1358            kind: DacType::Custom("Test".to_string()),
1359            caps: backend_box.caps().clone(),
1360        };
1361
1362        let cfg = StreamConfig::new(30000).with_target_queue_points(10000);
1363        let mut stream = Stream::with_backend(info, backend_box, cfg, 100);
1364
1365        // Arm first so shutter is open
1366        stream.control.arm().unwrap();
1367        stream.process_control_messages();
1368        assert!(shutter_open.load(Ordering::SeqCst));
1369
1370        let req = stream.next_request().unwrap();
1371        let points = vec![LaserPoint::blanked(0.0, 0.0); req.n_points];
1372
1373        // Disarm from another thread while write() is retrying
1374        let control = stream.control();
1375        let handle = std::thread::spawn(move || {
1376            std::thread::sleep(Duration::from_millis(1));
1377            control.disarm().unwrap();
1378        });
1379
1380        // write() eventually succeeds after 100 WouldBlocks
1381        stream.write(&req, &points).unwrap();
1382        handle.join().unwrap();
1383
1384        // Shutter must have been closed during the retry loop
1385        assert!(!shutter_open.load(Ordering::SeqCst));
1386    }
1387
1388    #[test]
1389    fn test_write_stats_correct_after_would_block_retries() {
1390        // WouldBlock retries are internal β€” only the final successful write
1391        // should be counted in stats (1 chunk, N points).
1392        let backend = TestBackend::new().with_would_block_count(3);
1393
1394        let mut backend_box: Box<dyn StreamBackend> = Box::new(backend);
1395        backend_box.connect().unwrap();
1396
1397        let info = DacInfo {
1398            id: "test".to_string(),
1399            name: "Test Device".to_string(),
1400            kind: DacType::Custom("Test".to_string()),
1401            caps: backend_box.caps().clone(),
1402        };
1403
1404        let cfg = StreamConfig::new(30000).with_target_queue_points(10000);
1405        let mut stream = Stream::with_backend(info, backend_box, cfg, 100);
1406
1407        let req = stream.next_request().unwrap();
1408        let points = vec![LaserPoint::blanked(0.0, 0.0); req.n_points];
1409
1410        stream.write(&req, &points).unwrap();
1411
1412        // Despite 3 retries, only 1 successful chunk should be recorded
1413        assert_eq!(stream.state.stats.chunks_written, 1);
1414        assert_eq!(stream.state.stats.points_written, 100);
1415    }
1416
1417    #[test]
1418    fn test_write_rejects_wrong_point_count_without_retrying() {
1419        // Point count validation must happen before the retry loop.
1420        // A wrong count should fail immediately without touching the backend.
1421        let backend = TestBackend::new();
1422        let write_count = backend.write_count.clone();
1423
1424        let mut backend_box: Box<dyn StreamBackend> = Box::new(backend);
1425        backend_box.connect().unwrap();
1426
1427        let info = DacInfo {
1428            id: "test".to_string(),
1429            name: "Test Device".to_string(),
1430            kind: DacType::Custom("Test".to_string()),
1431            caps: backend_box.caps().clone(),
1432        };
1433
1434        let cfg = StreamConfig::new(30000).with_target_queue_points(10000);
1435        let mut stream = Stream::with_backend(info, backend_box, cfg, 100);
1436
1437        let req = stream.next_request().unwrap();
1438        // Wrong number of points (one extra)
1439        let points = vec![LaserPoint::blanked(0.0, 0.0); req.n_points + 1];
1440
1441        let result = stream.write(&req, &points);
1442        assert!(result.is_err());
1443
1444        // Backend should never have been called
1445        assert_eq!(write_count.load(Ordering::SeqCst), 0);
1446    }
1447}