laser_dac/
stream.rs

1//! Stream and Dac types for point output.
2//!
3//! This module provides the `Stream` type for streaming point chunks to a DAC,
4//! `StreamControl` for out-of-band control (arm/disarm/stop), and `Dac` for
5//! connected devices that can start streaming sessions.
6//!
7//! # Armed/Disarmed Model
8//!
9//! Streams use a binary armed/disarmed safety model:
10//!
11//! - **Armed**: Content passes through to device, shutter opened (best-effort).
12//! - **Disarmed** (default): Shutter closed, intensity/RGB forced to 0.
13//!
14//! All streams start disarmed. Call `arm()` to enable output.
15//! `arm()` and `disarm()` are the only safety controls — there is no separate
16//! shutter API. This keeps the mental model simple: armed = laser may emit.
17//!
18//! # Hardware Shutter Support
19//!
20//! Shutter control is best-effort and varies by backend:
21//! - **LaserCube USB/WiFi**: Actual hardware control
22//! - **Ether Dream, Helios, IDN**: No-op (safety relies on software blanking)
23//!
24//! # Disconnect Behavior
25//!
26//! No automatic reconnection. On disconnect, create a new `Dac` and `Stream`.
27//! New streams always start disarmed for safety.
28
29use std::sync::atomic::{AtomicBool, Ordering};
30use std::sync::mpsc::{self, Receiver, Sender, TryRecvError};
31use std::sync::{Arc, Mutex};
32use std::time::Duration;
33
34use crate::backend::{Error, Result, StreamBackend, WriteOutcome};
35use crate::types::{
36    ChunkRequest, DacCapabilities, DacInfo, DacType, LaserPoint, RunExit, StreamConfig,
37    StreamInstant, StreamStats, StreamStatus, UnderrunPolicy,
38};
39
40// =============================================================================
41// Stream Control
42// =============================================================================
43
44/// Control messages sent from StreamControl to Stream.
45///
46/// These messages allow out-of-band control actions to take effect immediately,
47/// even when the stream is waiting (pacing, backpressure, etc.).
48#[derive(Debug, Clone, Copy)]
49enum ControlMsg {
50    /// Arm the output (opens hardware shutter).
51    Arm,
52    /// Disarm the output (closes hardware shutter).
53    Disarm,
54    /// Request the stream to stop.
55    Stop,
56}
57
58/// Thread-safe control handle for safety-critical actions.
59///
60/// This allows out-of-band control of the stream (arm/disarm/stop) from
61/// a different thread, e.g., for E-stop functionality.
62///
63/// Control actions take effect as soon as possible - the stream processes
64/// control messages at every opportunity (during waits, between retries, etc.).
65#[derive(Clone)]
66pub struct StreamControl {
67    inner: Arc<StreamControlInner>,
68}
69
70struct StreamControlInner {
71    /// Whether output is armed (laser can fire).
72    armed: AtomicBool,
73    /// Whether a stop has been requested.
74    stop_requested: AtomicBool,
75    /// Channel for sending control messages to the stream loop.
76    /// Wrapped in Mutex because Sender is Send but not Sync.
77    control_tx: Mutex<Sender<ControlMsg>>,
78}
79
80impl StreamControl {
81    fn new(control_tx: Sender<ControlMsg>) -> Self {
82        Self {
83            inner: Arc::new(StreamControlInner {
84                armed: AtomicBool::new(false),
85                stop_requested: AtomicBool::new(false),
86                control_tx: Mutex::new(control_tx),
87            }),
88        }
89    }
90
91    /// Arm the output (allow laser to fire).
92    ///
93    /// When armed, content from the producer passes through unmodified
94    /// and the hardware shutter is opened (best-effort).
95    pub fn arm(&self) -> Result<()> {
96        self.inner.armed.store(true, Ordering::SeqCst);
97        // Send message to stream for immediate shutter control
98        if let Ok(tx) = self.inner.control_tx.lock() {
99            let _ = tx.send(ControlMsg::Arm);
100        }
101        Ok(())
102    }
103
104    /// Disarm the output (force laser off). Designed for E-stop use.
105    ///
106    /// Immediately sets an atomic flag (works even if stream loop is blocked),
107    /// then sends a message to close the hardware shutter. All future points
108    /// are blanked in software. The stream stays alive outputting blanks -
109    /// use `stop()` to terminate entirely.
110    ///
111    /// **Latency**: Points already in the device buffer will still play out.
112    /// `target_queue_points` bounds this latency.
113    ///
114    /// **Hardware shutter**: Best-effort. LaserCube has actual hardware control;
115    /// Ether Dream, Helios, IDN are no-ops (safety relies on software blanking).
116    pub fn disarm(&self) -> Result<()> {
117        self.inner.armed.store(false, Ordering::SeqCst);
118        // Send message to stream for immediate shutter control
119        if let Ok(tx) = self.inner.control_tx.lock() {
120            let _ = tx.send(ControlMsg::Disarm);
121        }
122        Ok(())
123    }
124
125    /// Check if the output is armed.
126    pub fn is_armed(&self) -> bool {
127        self.inner.armed.load(Ordering::SeqCst)
128    }
129
130    /// Request the stream to stop.
131    ///
132    /// Signals termination; `run()` returns `RunExit::Stopped`.
133    /// For clean shutdown with shutter close, prefer `Stream::stop()`.
134    pub fn stop(&self) -> Result<()> {
135        self.inner.stop_requested.store(true, Ordering::SeqCst);
136        // Send message to stream for immediate stop
137        if let Ok(tx) = self.inner.control_tx.lock() {
138            let _ = tx.send(ControlMsg::Stop);
139        }
140        Ok(())
141    }
142
143    /// Check if a stop has been requested.
144    pub fn is_stop_requested(&self) -> bool {
145        self.inner.stop_requested.load(Ordering::SeqCst)
146    }
147}
148
149// =============================================================================
150// Stream State
151// =============================================================================
152
153struct StreamState {
154    /// Current position in stream time (points since start).
155    current_instant: StreamInstant,
156    /// Points scheduled ahead of current_instant.
157    scheduled_ahead: u64,
158    /// Last chunk that was produced (for repeat-last underrun policy).
159    last_chunk: Option<Vec<LaserPoint>>,
160    /// Statistics.
161    stats: StreamStats,
162    /// Track the last armed state to detect transitions.
163    last_armed: bool,
164    /// Whether the hardware shutter is currently open.
165    shutter_open: bool,
166}
167
168impl StreamState {
169    fn new() -> Self {
170        Self {
171            current_instant: StreamInstant::new(0),
172            scheduled_ahead: 0,
173            last_chunk: None,
174            stats: StreamStats::default(),
175            last_armed: false,
176            shutter_open: false,
177        }
178    }
179}
180
181// =============================================================================
182// Stream
183// =============================================================================
184
185/// A streaming session for outputting point chunks to a DAC.
186///
187/// The stream provides two modes of operation:
188///
189/// - **Blocking mode**: Call `next_request()` to get what to produce, then `write()`.
190/// - **Callback mode**: Call `run()` with a producer closure.
191///
192/// The stream owns pacing, backpressure, and the timebase (`StreamInstant`).
193pub struct Stream {
194    /// Device info for this stream.
195    info: DacInfo,
196    /// The backend.
197    backend: Option<Box<dyn StreamBackend>>,
198    /// Stream configuration.
199    config: StreamConfig,
200    /// Resolved chunk size.
201    chunk_points: usize,
202    /// Thread-safe control handle.
203    control: StreamControl,
204    /// Receiver for control messages from StreamControl.
205    control_rx: Receiver<ControlMsg>,
206    /// Stream state.
207    state: StreamState,
208}
209
210impl Stream {
211    /// Create a new stream with a backend.
212    pub(crate) fn with_backend(
213        info: DacInfo,
214        backend: Box<dyn StreamBackend>,
215        config: StreamConfig,
216        chunk_points: usize,
217    ) -> Self {
218        let (control_tx, control_rx) = mpsc::channel();
219        Self {
220            info,
221            backend: Some(backend),
222            config,
223            chunk_points,
224            control: StreamControl::new(control_tx),
225            control_rx,
226            state: StreamState::new(),
227        }
228    }
229
230    /// Returns the device info.
231    pub fn info(&self) -> &DacInfo {
232        &self.info
233    }
234
235    /// Returns the stream configuration.
236    pub fn config(&self) -> &StreamConfig {
237        &self.config
238    }
239
240    /// Returns a thread-safe control handle.
241    pub fn control(&self) -> StreamControl {
242        self.control.clone()
243    }
244
245    /// The resolved chunk size chosen for this stream.
246    ///
247    /// This is fixed for the lifetime of the stream.
248    pub fn chunk_points(&self) -> usize {
249        self.chunk_points
250    }
251
252    /// Returns the current stream status.
253    pub fn status(&self) -> Result<StreamStatus> {
254        let device_queued_points = self.backend.as_ref().and_then(|b| b.queued_points());
255
256        Ok(StreamStatus {
257            connected: self
258                .backend
259                .as_ref()
260                .map(|b| b.is_connected())
261                .unwrap_or(false),
262            chunk_points: self.chunk_points,
263            scheduled_ahead_points: self.state.scheduled_ahead,
264            device_queued_points,
265            stats: Some(self.state.stats.clone()),
266        })
267    }
268
269    /// Blocks until the stream wants the next chunk.
270    ///
271    /// Returns a `ChunkRequest` describing exactly what to produce.
272    /// The producer must return exactly `req.n_points` points.
273    pub fn next_request(&mut self) -> Result<ChunkRequest> {
274        // Check for stop request
275        if self.control.is_stop_requested() {
276            return Err(Error::Stopped);
277        }
278
279        // Check for backend
280        let backend = self
281            .backend
282            .as_ref()
283            .ok_or_else(|| Error::disconnected("no backend"))?;
284
285        if !backend.is_connected() {
286            return Err(Error::disconnected("backend disconnected"));
287        }
288
289        // Wait for the right time to request the next chunk.
290        self.wait_for_ready()?;
291
292        let device_queued_points = self.backend.as_ref().and_then(|b| b.queued_points());
293
294        Ok(ChunkRequest {
295            start: self.state.current_instant,
296            pps: self.config.pps,
297            n_points: self.chunk_points,
298            scheduled_ahead_points: self.state.scheduled_ahead,
299            device_queued_points,
300        })
301    }
302
303    /// Writes exactly `req.n_points` points for the given request.
304    ///
305    /// # Contract
306    ///
307    /// - `points.len()` must equal `req.n_points`.
308    /// - The request must be the most recent one from `next_request()`.
309    ///
310    /// # Shutter Control
311    ///
312    /// This method manages the hardware shutter based on arm state transitions:
313    /// - When transitioning from armed to disarmed, the shutter is closed (best-effort).
314    /// - When transitioning from disarmed to armed, the shutter is opened (best-effort).
315    pub fn write(&mut self, req: &ChunkRequest, points: &[LaserPoint]) -> Result<()> {
316        // Validate point count
317        if points.len() != req.n_points {
318            return Err(Error::invalid_config(format!(
319                "expected {} points, got {}",
320                req.n_points,
321                points.len()
322            )));
323        }
324
325        // Check for stop request
326        if self.control.is_stop_requested() {
327            return Err(Error::Stopped);
328        }
329
330        let is_armed = self.control.is_armed();
331
332        // Handle shutter transitions
333        self.handle_shutter_transition(is_armed);
334
335        // Write to backend (optimized: no allocation when armed)
336        let backend = self
337            .backend
338            .as_mut()
339            .ok_or_else(|| Error::disconnected("no backend"))?;
340
341        let outcome = if is_armed {
342            // Armed: pass points directly to backend (zero-copy)
343            backend.try_write_chunk(self.config.pps, points)?
344        } else {
345            // Disarmed: blank all points (allocate only when needed)
346            let blanked: Vec<LaserPoint> = points
347                .iter()
348                .map(|p| LaserPoint::blanked(p.x, p.y))
349                .collect();
350            backend.try_write_chunk(self.config.pps, &blanked)?
351        };
352
353        match outcome {
354            WriteOutcome::Written => {
355                // Update state
356                if is_armed {
357                    self.state.last_chunk = Some(points.to_vec());
358                }
359                self.state.current_instant += self.chunk_points as u64;
360                self.state.scheduled_ahead += self.chunk_points as u64;
361                self.state.stats.chunks_written += 1;
362                self.state.stats.points_written += self.chunk_points as u64;
363                Ok(())
364            }
365            WriteOutcome::WouldBlock => Err(Error::WouldBlock),
366        }
367    }
368
369    /// Handle hardware shutter transitions based on arm state changes.
370    fn handle_shutter_transition(&mut self, is_armed: bool) {
371        let was_armed = self.state.last_armed;
372        self.state.last_armed = is_armed;
373
374        if was_armed && !is_armed {
375            // Disarmed: close the shutter for safety (best-effort)
376            if self.state.shutter_open {
377                if let Some(backend) = &mut self.backend {
378                    let _ = backend.set_shutter(false); // Best-effort, ignore errors
379                }
380                self.state.shutter_open = false;
381            }
382        } else if !was_armed && is_armed {
383            // Armed: open the shutter (best-effort)
384            if !self.state.shutter_open {
385                if let Some(backend) = &mut self.backend {
386                    let _ = backend.set_shutter(true); // Best-effort, ignore errors
387                }
388                self.state.shutter_open = true;
389            }
390        }
391    }
392
393    /// Stop the stream and terminate output.
394    ///
395    /// Disarms the output (software blanking + hardware shutter) before stopping
396    /// the backend to prevent the "freeze on last bright point" hazard.
397    /// Use `disarm()` instead if you want to keep the stream alive but safe.
398    pub fn stop(&mut self) -> Result<()> {
399        // Disarm: sets armed flag for software blanking
400        self.control.disarm()?;
401
402        self.control.stop()?;
403
404        // Directly close shutter and stop backend (defense-in-depth)
405        if let Some(backend) = &mut self.backend {
406            let _ = backend.set_shutter(false);
407            backend.stop()?;
408        }
409
410        Ok(())
411    }
412
413    /// Consume the stream and recover the device for reuse.
414    ///
415    /// This method disarms and stops the stream (software blanking + hardware shutter),
416    /// then returns the underlying `Dac` along with the final `StreamStats`.
417    /// The device can then be used to start a new stream with different configuration.
418    ///
419    /// # Example
420    ///
421    /// ```ignore
422    /// let (stream, info) = device.start_stream(config)?;
423    /// // ... stream for a while ...
424    /// let (device, stats) = stream.into_dac();
425    /// println!("Streamed {} points", stats.points_written);
426    ///
427    /// // Restart with different config
428    /// let new_config = StreamConfig::new(60_000);
429    /// let (stream2, _) = device.start_stream(new_config)?;
430    /// ```
431    pub fn into_dac(mut self) -> (Dac, StreamStats) {
432        // Disarm (software blanking) and close shutter before stopping
433        let _ = self.control.disarm();
434        let _ = self.control.stop();
435        if let Some(backend) = &mut self.backend {
436            let _ = backend.set_shutter(false);
437            let _ = backend.stop();
438        }
439
440        // Take the backend (leaves None, so Drop won't try to stop again)
441        let backend = self.backend.take();
442        let stats = self.state.stats.clone();
443
444        let dac = Dac {
445            info: self.info.clone(),
446            backend,
447        };
448
449        (dac, stats)
450    }
451
452    /// Run the stream in callback mode.
453    ///
454    /// The producer is called whenever the stream needs a new chunk.
455    /// Return `Some(points)` to continue, or `None` to end the stream.
456    ///
457    /// # Error Classification
458    ///
459    /// The `on_error` callback receives recoverable errors that don't terminate the stream.
460    /// Terminal conditions result in returning from `run()`:
461    ///
462    /// - **`RunExit::Stopped`**: Stream was stopped via `StreamControl::stop()` or underrun policy.
463    /// - **`RunExit::ProducerEnded`**: Producer returned `None`.
464    /// - **`RunExit::Disconnected`**: Device disconnected or became unreachable.
465    ///
466    /// Recoverable errors (reported via `on_error`, stream continues):
467    /// - Transient backend errors that don't indicate disconnection.
468    pub fn run<F, E>(mut self, mut producer: F, mut on_error: E) -> Result<RunExit>
469    where
470        F: FnMut(ChunkRequest) -> Option<Vec<LaserPoint>> + Send + 'static,
471        E: FnMut(Error) + Send + 'static,
472    {
473        loop {
474            // Check for stop request
475            if self.control.is_stop_requested() {
476                return Ok(RunExit::Stopped);
477            }
478
479            // Get next request
480            let req = match self.next_request() {
481                Ok(req) => req,
482                Err(e) if e.is_stopped() => {
483                    return Ok(RunExit::Stopped);
484                }
485                Err(e) if e.is_disconnected() => {
486                    on_error(e);
487                    return Ok(RunExit::Disconnected);
488                }
489                Err(e) => {
490                    // Recoverable error - report and retry
491                    on_error(e);
492                    continue;
493                }
494            };
495
496            // Process control messages before calling producer
497            if self.process_control_messages() {
498                return Ok(RunExit::Stopped);
499            }
500
501            // Call producer
502            match producer(req.clone()) {
503                Some(points) => {
504                    // Try to write, handling backpressure with retries
505                    loop {
506                        match self.write(&req, &points) {
507                            Ok(()) => break,
508                            Err(e) if e.is_would_block() => {
509                                // Backend buffer full - yield first for low-latency scenarios,
510                                // then sleep briefly if still blocked
511                                std::thread::yield_now();
512
513                                // Process control messages for immediate shutter control
514                                if self.process_control_messages() {
515                                    return Ok(RunExit::Stopped);
516                                }
517
518                                std::thread::sleep(Duration::from_micros(100));
519
520                                // Check stop again after sleep
521                                if self.process_control_messages() {
522                                    return Ok(RunExit::Stopped);
523                                }
524                                continue;
525                            }
526                            Err(e) if e.is_stopped() => {
527                                return Ok(RunExit::Stopped);
528                            }
529                            Err(e) if e.is_disconnected() => {
530                                on_error(e);
531                                return Ok(RunExit::Disconnected);
532                            }
533                            Err(e) => {
534                                // Recoverable error - report and handle underrun
535                                on_error(e);
536                                if let Err(e2) = self.handle_underrun(&req) {
537                                    // Underrun handling can also hit terminal conditions
538                                    if e2.is_stopped() {
539                                        return Ok(RunExit::Stopped);
540                                    }
541                                    on_error(e2);
542                                }
543                                break;
544                            }
545                        }
546                    }
547                }
548                None => {
549                    return Ok(RunExit::ProducerEnded);
550                }
551            }
552        }
553    }
554
555    // =========================================================================
556    // Internal helpers
557    // =========================================================================
558
559    /// Process any pending control messages from StreamControl.
560    ///
561    /// This method drains the control message queue and takes immediate action:
562    /// - `Arm`: Opens the shutter (best-effort)
563    /// - `Disarm`: Closes the shutter immediately
564    /// - `Stop`: Returns `true` to signal the caller to stop
565    ///
566    /// Returns `true` if stop was requested, `false` otherwise.
567    fn process_control_messages(&mut self) -> bool {
568        loop {
569            match self.control_rx.try_recv() {
570                Ok(ControlMsg::Arm) => {
571                    // Open shutter (best-effort) if not already open
572                    if !self.state.shutter_open {
573                        if let Some(backend) = &mut self.backend {
574                            let _ = backend.set_shutter(true);
575                        }
576                        self.state.shutter_open = true;
577                    }
578                }
579                Ok(ControlMsg::Disarm) => {
580                    // Close shutter immediately for safety
581                    if self.state.shutter_open {
582                        if let Some(backend) = &mut self.backend {
583                            let _ = backend.set_shutter(false);
584                        }
585                        self.state.shutter_open = false;
586                    }
587                }
588                Ok(ControlMsg::Stop) => {
589                    return true;
590                }
591                Err(TryRecvError::Empty) => break,
592                Err(TryRecvError::Disconnected) => break,
593            }
594        }
595        false
596    }
597
598    /// Wait until we're ready for the next chunk (pacing).
599    ///
600    /// Sleeps in small slices to allow processing control messages promptly.
601    fn wait_for_ready(&mut self) -> Result<()> {
602        // Maximum sleep slice - controls responsiveness to control messages
603        const SLEEP_SLICE: Duration = Duration::from_millis(5);
604
605        let target = self.config.target_queue_points as u64;
606
607        // Use the more accurate queue depth when available from the device.
608        // This handles cases where the device reports actual buffer state,
609        // which may differ from our software-tracked scheduled_ahead.
610        let effective_queue = if self.info.caps.can_estimate_queue {
611            self.backend
612                .as_ref()
613                .and_then(|b| b.queued_points())
614                .map(|device_q| device_q.max(self.state.scheduled_ahead))
615                .unwrap_or(self.state.scheduled_ahead)
616        } else {
617            self.state.scheduled_ahead
618        };
619
620        if effective_queue < target {
621            return Ok(());
622        }
623
624        let points_to_drain = effective_queue.saturating_sub(target / 2);
625        let seconds_to_wait = points_to_drain as f64 / self.config.pps as f64;
626        let wait_duration = Duration::from_secs_f64(seconds_to_wait.min(0.1));
627
628        // Sleep in small slices to process control messages promptly
629        let mut remaining = wait_duration;
630        while remaining > Duration::ZERO {
631            let slice = remaining.min(SLEEP_SLICE);
632            std::thread::sleep(slice);
633            remaining = remaining.saturating_sub(slice);
634
635            // Process control messages - handle shutter close immediately
636            if self.process_control_messages() {
637                return Err(Error::Stopped);
638            }
639        }
640
641        let elapsed = wait_duration.as_secs_f64();
642        let points_drained = (elapsed * self.config.pps as f64) as u64;
643        self.state.scheduled_ahead = self.state.scheduled_ahead.saturating_sub(points_drained);
644
645        Ok(())
646    }
647
648    /// Handle an underrun by applying the underrun policy.
649    ///
650    /// # Safety Behavior
651    ///
652    /// When disarmed, this always outputs blanked points regardless of the underrun
653    /// policy. The `RepeatLast` policy means "repeat last armed content" - when
654    /// disarmed, repeating content would be unsafe.
655    fn handle_underrun(&mut self, req: &ChunkRequest) -> Result<()> {
656        self.state.stats.underrun_count += 1;
657
658        let is_armed = self.control.is_armed();
659
660        // Handle shutter transitions (same safety behavior as write())
661        self.handle_shutter_transition(is_armed);
662
663        // Determine fill points based on arm state and policy
664        let fill_points: Vec<LaserPoint> = if !is_armed {
665            // When disarmed, always output blanked points for safety
666            // RepeatLast means "repeat last armed content" - meaningless when disarmed
667            vec![LaserPoint::blanked(0.0, 0.0); req.n_points]
668        } else {
669            match &self.config.underrun {
670                UnderrunPolicy::RepeatLast => self
671                    .state
672                    .last_chunk
673                    .clone()
674                    .unwrap_or_else(|| vec![LaserPoint::blanked(0.0, 0.0); req.n_points]),
675                UnderrunPolicy::Blank => {
676                    vec![LaserPoint::blanked(0.0, 0.0); req.n_points]
677                }
678                UnderrunPolicy::Park { x, y } => {
679                    vec![LaserPoint::blanked(*x, *y); req.n_points]
680                }
681                UnderrunPolicy::Stop => {
682                    self.control.stop()?;
683                    return Err(Error::Stopped);
684                }
685            }
686        };
687
688        if let Some(backend) = &mut self.backend {
689            match backend.try_write_chunk(self.config.pps, &fill_points) {
690                Ok(WriteOutcome::Written) => {
691                    // Update stream state to keep timebase accurate
692                    let n_points = fill_points.len();
693                    // Only update last_chunk when armed (it's the "last armed content")
694                    if is_armed {
695                        self.state.last_chunk = Some(fill_points);
696                    }
697                    self.state.current_instant += n_points as u64;
698                    self.state.scheduled_ahead += n_points as u64;
699                    self.state.stats.chunks_written += 1;
700                    self.state.stats.points_written += n_points as u64;
701                }
702                Ok(WriteOutcome::WouldBlock) => {
703                    // Backend is full, can't write fill points - this is expected
704                }
705                Err(_) => {
706                    // Backend error during underrun handling - ignore, we're already recovering
707                }
708            }
709        }
710
711        Ok(())
712    }
713}
714
715impl Drop for Stream {
716    fn drop(&mut self) {
717        let _ = self.stop();
718    }
719}
720
721// =============================================================================
722// Device
723// =============================================================================
724
725/// A connected device that can start streaming sessions.
726///
727/// When starting a stream, the device is consumed and the backend ownership
728/// transfers to the stream. The `DacInfo` is returned alongside the stream
729/// so metadata remains accessible.
730///
731/// # Example
732///
733/// ```ignore
734/// let device = open_device("my-device")?;
735/// let config = StreamConfig::new(30_000);
736/// let (stream, info) = device.start_stream(config)?;
737/// println!("Streaming to: {}", info.name);
738/// ```
739pub struct Dac {
740    info: DacInfo,
741    backend: Option<Box<dyn StreamBackend>>,
742}
743
744impl Dac {
745    /// Create a new device from a backend.
746    pub fn new(info: DacInfo, backend: Box<dyn StreamBackend>) -> Self {
747        Self {
748            info,
749            backend: Some(backend),
750        }
751    }
752
753    /// Returns the device info.
754    pub fn info(&self) -> &DacInfo {
755        &self.info
756    }
757
758    /// Returns the device ID.
759    pub fn id(&self) -> &str {
760        &self.info.id
761    }
762
763    /// Returns the device name.
764    pub fn name(&self) -> &str {
765        &self.info.name
766    }
767
768    /// Returns the DAC type.
769    pub fn kind(&self) -> &DacType {
770        &self.info.kind
771    }
772
773    /// Returns the device capabilities.
774    pub fn caps(&self) -> &DacCapabilities {
775        &self.info.caps
776    }
777
778    /// Returns whether the device has a backend (not yet used for a stream).
779    pub fn has_backend(&self) -> bool {
780        self.backend.is_some()
781    }
782
783    /// Returns whether the device is connected.
784    pub fn is_connected(&self) -> bool {
785        self.backend
786            .as_ref()
787            .map(|b| b.is_connected())
788            .unwrap_or(false)
789    }
790
791    /// Starts a streaming session, consuming the device.
792    ///
793    /// # Ownership
794    ///
795    /// This method consumes the `Dac` because:
796    /// - Each device can only have one active stream at a time.
797    /// - The backend is moved into the `Stream` to ensure exclusive access.
798    /// - This prevents accidental reuse of a device that's already streaming.
799    ///
800    /// The method returns both the `Stream` and a copy of `DacInfo`, so you
801    /// retain access to device metadata (id, name, capabilities) after starting.
802    ///
803    /// # Connection
804    ///
805    /// If the device is not already connected, this method will establish the
806    /// connection before creating the stream. Connection failures are returned
807    /// as errors.
808    ///
809    /// # Errors
810    ///
811    /// Returns an error if:
812    /// - The device backend has already been used for a stream.
813    /// - The configuration is invalid (PPS out of range, invalid chunk size, etc.).
814    /// - The backend fails to connect.
815    pub fn start_stream(mut self, cfg: StreamConfig) -> Result<(Stream, DacInfo)> {
816        let mut backend = self.backend.take().ok_or_else(|| {
817            Error::invalid_config("device backend has already been used for a stream")
818        })?;
819
820        Self::validate_config(&self.info.caps, &cfg)?;
821
822        // Connect the backend if not already connected
823        if !backend.is_connected() {
824            backend.connect()?;
825        }
826
827        let chunk_points = cfg.chunk_points.unwrap_or_else(|| {
828            Self::compute_default_chunk_size(&self.info.caps, cfg.pps, cfg.target_queue_points)
829        });
830
831        let stream = Stream::with_backend(self.info.clone(), backend, cfg, chunk_points);
832
833        Ok((stream, self.info))
834    }
835
836    fn validate_config(caps: &DacCapabilities, cfg: &StreamConfig) -> Result<()> {
837        if cfg.pps < caps.pps_min || cfg.pps > caps.pps_max {
838            return Err(Error::invalid_config(format!(
839                "PPS {} is outside device range [{}, {}]",
840                cfg.pps, caps.pps_min, caps.pps_max
841            )));
842        }
843
844        if let Some(chunk_points) = cfg.chunk_points {
845            if chunk_points > caps.max_points_per_chunk {
846                return Err(Error::invalid_config(format!(
847                    "chunk_points {} exceeds device max {}",
848                    chunk_points, caps.max_points_per_chunk
849                )));
850            }
851            if chunk_points == 0 {
852                return Err(Error::invalid_config("chunk_points cannot be 0"));
853            }
854        }
855
856        if cfg.target_queue_points == 0 {
857            return Err(Error::invalid_config("target_queue_points cannot be 0"));
858        }
859
860        Ok(())
861    }
862
863    fn compute_default_chunk_size(
864        caps: &DacCapabilities,
865        pps: u32,
866        target_queue_points: usize,
867    ) -> usize {
868        // Target ~10ms worth of points per chunk
869        let target_chunk_ms = 10;
870        let time_based_points = (pps as usize * target_chunk_ms) / 1000;
871
872        // Also bound by target queue: aim for ~¼ of target queue per chunk.
873        // This ensures we don't send huge chunks relative to our latency target.
874        let queue_based_max = target_queue_points / 4;
875
876        let max_points = caps.max_points_per_chunk.min(queue_based_max.max(100));
877        let min_points = 100;
878
879        time_based_points.clamp(min_points, max_points)
880    }
881}
882
883/// Legacy alias for compatibility.
884pub type OwnedDac = Dac;
885
886#[cfg(test)]
887mod tests {
888    use super::*;
889    use crate::backend::{StreamBackend, WriteOutcome};
890    use std::sync::atomic::{AtomicU64, AtomicUsize, Ordering};
891    use std::sync::Arc;
892
893    /// A test backend for unit testing stream behavior.
894    struct TestBackend {
895        caps: DacCapabilities,
896        connected: bool,
897        /// Count of write attempts
898        write_count: Arc<AtomicUsize>,
899        /// Number of WouldBlock responses to return before accepting writes
900        would_block_count: Arc<AtomicUsize>,
901        /// Simulated queue depth
902        queued: Arc<AtomicU64>,
903        /// Track shutter state for testing
904        shutter_open: Arc<AtomicBool>,
905    }
906
907    impl TestBackend {
908        fn new() -> Self {
909            Self {
910                caps: DacCapabilities {
911                    pps_min: 1000,
912                    pps_max: 100000,
913                    max_points_per_chunk: 1000,
914                    prefers_constant_pps: false,
915                    can_estimate_queue: true,
916                    output_model: crate::types::OutputModel::NetworkFifo,
917                },
918                connected: false,
919                write_count: Arc::new(AtomicUsize::new(0)),
920                would_block_count: Arc::new(AtomicUsize::new(0)),
921                queued: Arc::new(AtomicU64::new(0)),
922                shutter_open: Arc::new(AtomicBool::new(false)),
923            }
924        }
925
926        fn with_would_block_count(mut self, count: usize) -> Self {
927            self.would_block_count = Arc::new(AtomicUsize::new(count));
928            self
929        }
930    }
931
932    impl StreamBackend for TestBackend {
933        fn dac_type(&self) -> DacType {
934            DacType::Custom("Test".to_string())
935        }
936
937        fn caps(&self) -> &DacCapabilities {
938            &self.caps
939        }
940
941        fn connect(&mut self) -> Result<()> {
942            self.connected = true;
943            Ok(())
944        }
945
946        fn disconnect(&mut self) -> Result<()> {
947            self.connected = false;
948            Ok(())
949        }
950
951        fn is_connected(&self) -> bool {
952            self.connected
953        }
954
955        fn try_write_chunk(&mut self, _pps: u32, points: &[LaserPoint]) -> Result<WriteOutcome> {
956            self.write_count.fetch_add(1, Ordering::SeqCst);
957
958            // Return WouldBlock until count reaches 0
959            let remaining = self.would_block_count.load(Ordering::SeqCst);
960            if remaining > 0 {
961                self.would_block_count.fetch_sub(1, Ordering::SeqCst);
962                return Ok(WriteOutcome::WouldBlock);
963            }
964
965            self.queued.fetch_add(points.len() as u64, Ordering::SeqCst);
966            Ok(WriteOutcome::Written)
967        }
968
969        fn stop(&mut self) -> Result<()> {
970            Ok(())
971        }
972
973        fn set_shutter(&mut self, open: bool) -> Result<()> {
974            self.shutter_open.store(open, Ordering::SeqCst);
975            Ok(())
976        }
977
978        fn queued_points(&self) -> Option<u64> {
979            Some(self.queued.load(Ordering::SeqCst))
980        }
981    }
982
983    #[test]
984    fn test_stream_control_arm_disarm() {
985        let (tx, _rx) = mpsc::channel();
986        let control = StreamControl::new(tx);
987        assert!(!control.is_armed());
988
989        control.arm().unwrap();
990        assert!(control.is_armed());
991
992        control.disarm().unwrap();
993        assert!(!control.is_armed());
994    }
995
996    #[test]
997    fn test_stream_control_stop() {
998        let (tx, _rx) = mpsc::channel();
999        let control = StreamControl::new(tx);
1000        assert!(!control.is_stop_requested());
1001
1002        control.stop().unwrap();
1003        assert!(control.is_stop_requested());
1004    }
1005
1006    #[test]
1007    fn test_stream_control_clone_shares_state() {
1008        let (tx, _rx) = mpsc::channel();
1009        let control1 = StreamControl::new(tx);
1010        let control2 = control1.clone();
1011
1012        control1.arm().unwrap();
1013        assert!(control2.is_armed());
1014
1015        control2.stop().unwrap();
1016        assert!(control1.is_stop_requested());
1017    }
1018
1019    #[test]
1020    fn test_device_start_stream_connects_backend() {
1021        let backend = TestBackend::new();
1022        let info = DacInfo {
1023            id: "test".to_string(),
1024            name: "Test Device".to_string(),
1025            kind: DacType::Custom("Test".to_string()),
1026            caps: backend.caps().clone(),
1027        };
1028        let device = Dac::new(info, Box::new(backend));
1029
1030        // Device should not be connected initially
1031        assert!(!device.is_connected());
1032
1033        // start_stream should connect and return a usable stream
1034        let cfg = StreamConfig::new(30000);
1035        let result = device.start_stream(cfg);
1036        assert!(result.is_ok());
1037
1038        let (stream, _info) = result.unwrap();
1039        assert!(stream.backend.as_ref().unwrap().is_connected());
1040    }
1041
1042    #[test]
1043    fn test_handle_underrun_advances_state() {
1044        let mut backend = TestBackend::new();
1045        backend.connected = true;
1046        let info = DacInfo {
1047            id: "test".to_string(),
1048            name: "Test Device".to_string(),
1049            kind: DacType::Custom("Test".to_string()),
1050            caps: backend.caps().clone(),
1051        };
1052
1053        let cfg = StreamConfig::new(30000);
1054        let mut stream = Stream::with_backend(info, Box::new(backend), cfg, 100);
1055
1056        // Record initial state
1057        let initial_instant = stream.state.current_instant;
1058        let initial_scheduled = stream.state.scheduled_ahead;
1059        let initial_chunks = stream.state.stats.chunks_written;
1060        let initial_points = stream.state.stats.points_written;
1061
1062        // Trigger underrun handling
1063        let req = ChunkRequest {
1064            start: StreamInstant::new(0),
1065            pps: 30000,
1066            n_points: 100,
1067            scheduled_ahead_points: 0,
1068            device_queued_points: None,
1069        };
1070        stream.handle_underrun(&req).unwrap();
1071
1072        // State should have advanced
1073        assert!(stream.state.current_instant > initial_instant);
1074        assert!(stream.state.scheduled_ahead > initial_scheduled);
1075        assert_eq!(stream.state.stats.chunks_written, initial_chunks + 1);
1076        assert_eq!(stream.state.stats.points_written, initial_points + 100);
1077        assert_eq!(stream.state.stats.underrun_count, 1);
1078    }
1079
1080    #[test]
1081    fn test_run_retries_on_would_block() {
1082        // Create a backend that returns WouldBlock 3 times before accepting
1083        let backend = TestBackend::new().with_would_block_count(3);
1084        let write_count = backend.write_count.clone();
1085
1086        let mut backend_box: Box<dyn StreamBackend> = Box::new(backend);
1087        backend_box.connect().unwrap();
1088
1089        let info = DacInfo {
1090            id: "test".to_string(),
1091            name: "Test Device".to_string(),
1092            kind: DacType::Custom("Test".to_string()),
1093            caps: backend_box.caps().clone(),
1094        };
1095
1096        let cfg = StreamConfig::new(30000).with_target_queue_points(10000);
1097        let stream = Stream::with_backend(info, backend_box, cfg, 100);
1098
1099        let produced_count = Arc::new(AtomicUsize::new(0));
1100        let produced_count_clone = produced_count.clone();
1101        let result = stream.run(
1102            move |_req| {
1103                let count = produced_count_clone.fetch_add(1, Ordering::SeqCst);
1104                if count < 1 {
1105                    Some(vec![LaserPoint::blanked(0.0, 0.0); 100])
1106                } else {
1107                    None // End after one chunk
1108                }
1109            },
1110            |_e| {},
1111        );
1112
1113        assert_eq!(result.unwrap(), RunExit::ProducerEnded);
1114        // Should have attempted write 4 times (3 WouldBlock + 1 success)
1115        assert_eq!(write_count.load(Ordering::SeqCst), 4);
1116    }
1117
1118    #[test]
1119    fn test_arm_opens_shutter_disarm_closes_shutter() {
1120        let backend = TestBackend::new();
1121        let shutter_open = backend.shutter_open.clone();
1122
1123        let mut backend_box: Box<dyn StreamBackend> = Box::new(backend);
1124        backend_box.connect().unwrap();
1125
1126        let info = DacInfo {
1127            id: "test".to_string(),
1128            name: "Test Device".to_string(),
1129            kind: DacType::Custom("Test".to_string()),
1130            caps: backend_box.caps().clone(),
1131        };
1132
1133        let cfg = StreamConfig::new(30000);
1134        let mut stream = Stream::with_backend(info, backend_box, cfg, 100);
1135
1136        // Initially shutter is closed
1137        assert!(!shutter_open.load(Ordering::SeqCst));
1138
1139        // Arm via control (this sends ControlMsg::Arm)
1140        let control = stream.control();
1141        control.arm().unwrap();
1142
1143        // Process control messages - this should open the shutter
1144        let stopped = stream.process_control_messages();
1145        assert!(!stopped);
1146        assert!(shutter_open.load(Ordering::SeqCst));
1147
1148        // Disarm (this sends ControlMsg::Disarm)
1149        control.disarm().unwrap();
1150
1151        // Process control messages - this should close the shutter
1152        let stopped = stream.process_control_messages();
1153        assert!(!stopped);
1154        assert!(!shutter_open.load(Ordering::SeqCst));
1155    }
1156
1157    #[test]
1158    fn test_handle_underrun_blanks_when_disarmed() {
1159        let backend = TestBackend::new();
1160
1161        let mut backend_box: Box<dyn StreamBackend> = Box::new(backend);
1162        backend_box.connect().unwrap();
1163
1164        let info = DacInfo {
1165            id: "test".to_string(),
1166            name: "Test Device".to_string(),
1167            kind: DacType::Custom("Test".to_string()),
1168            caps: backend_box.caps().clone(),
1169        };
1170
1171        // Use RepeatLast policy - but when disarmed, should still blank
1172        let cfg = StreamConfig::new(30000).with_underrun(UnderrunPolicy::RepeatLast);
1173        let mut stream = Stream::with_backend(info, backend_box, cfg, 100);
1174
1175        // Set some last_chunk with colored points
1176        stream.state.last_chunk = Some(vec![
1177            LaserPoint::new(0.5, 0.5, 65535, 65535, 65535, 65535);
1178            100
1179        ]);
1180
1181        // Ensure disarmed (default state)
1182        assert!(!stream.control.is_armed());
1183
1184        let req = ChunkRequest {
1185            start: StreamInstant::new(0),
1186            pps: 30000,
1187            n_points: 100,
1188            scheduled_ahead_points: 0,
1189            device_queued_points: None,
1190        };
1191
1192        // Handle underrun while disarmed
1193        stream.handle_underrun(&req).unwrap();
1194
1195        // last_chunk should NOT be updated (we're disarmed)
1196        // The actual write was blanked points, but we don't update last_chunk when disarmed
1197        // because "last armed content" hasn't changed
1198        let last = stream.state.last_chunk.as_ref().unwrap();
1199        assert_eq!(last[0].r, 65535); // Still the old colored points
1200    }
1201
1202    #[test]
1203    fn test_stop_closes_shutter() {
1204        let backend = TestBackend::new();
1205        let shutter_open = backend.shutter_open.clone();
1206
1207        let mut backend_box: Box<dyn StreamBackend> = Box::new(backend);
1208        backend_box.connect().unwrap();
1209
1210        let info = DacInfo {
1211            id: "test".to_string(),
1212            name: "Test Device".to_string(),
1213            kind: DacType::Custom("Test".to_string()),
1214            caps: backend_box.caps().clone(),
1215        };
1216
1217        let cfg = StreamConfig::new(30000);
1218        let mut stream = Stream::with_backend(info, backend_box, cfg, 100);
1219
1220        // Arm first to open shutter
1221        stream.control.arm().unwrap();
1222        stream.process_control_messages();
1223        assert!(shutter_open.load(Ordering::SeqCst));
1224
1225        // Stop should close shutter
1226        stream.stop().unwrap();
1227        assert!(!shutter_open.load(Ordering::SeqCst));
1228    }
1229
1230    #[test]
1231    fn test_arm_disarm_arm_cycle() {
1232        let backend = TestBackend::new();
1233        let shutter_open = backend.shutter_open.clone();
1234
1235        let mut backend_box: Box<dyn StreamBackend> = Box::new(backend);
1236        backend_box.connect().unwrap();
1237
1238        let info = DacInfo {
1239            id: "test".to_string(),
1240            name: "Test Device".to_string(),
1241            kind: DacType::Custom("Test".to_string()),
1242            caps: backend_box.caps().clone(),
1243        };
1244
1245        let cfg = StreamConfig::new(30000);
1246        let mut stream = Stream::with_backend(info, backend_box, cfg, 100);
1247        let control = stream.control();
1248
1249        // Initial state: disarmed
1250        assert!(!control.is_armed());
1251        assert!(!shutter_open.load(Ordering::SeqCst));
1252
1253        // Arm
1254        control.arm().unwrap();
1255        stream.process_control_messages();
1256        assert!(control.is_armed());
1257        assert!(shutter_open.load(Ordering::SeqCst));
1258
1259        // Disarm
1260        control.disarm().unwrap();
1261        stream.process_control_messages();
1262        assert!(!control.is_armed());
1263        assert!(!shutter_open.load(Ordering::SeqCst));
1264
1265        // Arm again
1266        control.arm().unwrap();
1267        stream.process_control_messages();
1268        assert!(control.is_armed());
1269        assert!(shutter_open.load(Ordering::SeqCst));
1270    }
1271}