Skip to main content

laser_dac/
stream.rs

1//! Stream and Dac types for point output.
2//!
3//! This module provides the `Stream` type for streaming point chunks to a DAC,
4//! `StreamControl` for out-of-band control (arm/disarm/stop), and `Dac` for
5//! connected devices that can start streaming sessions.
6//!
7//! # Armed/Disarmed Model
8//!
9//! Streams use a binary armed/disarmed safety model:
10//!
11//! - **Armed**: Content passes through to device, shutter opened (best-effort).
12//! - **Disarmed** (default): Shutter closed, intensity/RGB forced to 0.
13//!
14//! All streams start disarmed. Call `arm()` to enable output.
15//! `arm()` and `disarm()` are the only safety controls — there is no separate
16//! shutter API. This keeps the mental model simple: armed = laser may emit.
17//!
18//! # Hardware Shutter Support
19//!
20//! Shutter control is best-effort and varies by backend:
21//! - **LaserCube USB/WiFi**: Actual hardware control
22//! - **Helios**: Hardware shutter control via USB interrupt
23//! - **Ether Dream, IDN**: No-op (safety relies on software blanking)
24//!
25//! # Disconnect Behavior
26//!
27//! No automatic reconnection. On disconnect, create a new `Dac` and `Stream`.
28//! New streams always start disarmed for safety.
29
30use std::collections::VecDeque;
31use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
32use std::sync::mpsc::{self, Receiver, Sender, TryRecvError};
33use std::sync::{Arc, Mutex};
34use std::time::Duration;
35
36use crate::backend::{Error, Result, StreamBackend, WriteOutcome};
37use crate::types::{
38    ChunkRequest, ChunkResult, DacCapabilities, DacInfo, DacType, LaserPoint, OutputModel, RunExit,
39    StreamConfig, StreamInstant, StreamStats, StreamStatus, UnderrunPolicy,
40};
41
42// =============================================================================
43// Stream Control
44// =============================================================================
45
46/// Control messages sent from StreamControl to Stream.
47///
48/// These messages allow out-of-band control actions to take effect immediately,
49/// even when the stream is waiting (pacing, backpressure, etc.).
50#[derive(Debug, Clone, Copy)]
51enum ControlMsg {
52    /// Arm the output (opens hardware shutter).
53    Arm,
54    /// Disarm the output (closes hardware shutter).
55    Disarm,
56    /// Request the stream to stop.
57    Stop,
58}
59
60/// Thread-safe control handle for safety-critical actions.
61///
62/// This allows out-of-band control of the stream (arm/disarm/stop) from
63/// a different thread, e.g., for E-stop functionality.
64///
65/// Control actions take effect as soon as possible - the stream processes
66/// control messages at every opportunity (during waits, between retries, etc.).
67#[derive(Clone)]
68pub struct StreamControl {
69    inner: Arc<StreamControlInner>,
70}
71
72struct StreamControlInner {
73    /// Whether output is armed (laser can fire).
74    armed: AtomicBool,
75    /// Whether a stop has been requested.
76    stop_requested: AtomicBool,
77    /// Channel for sending control messages to the stream loop.
78    /// Wrapped in Mutex because Sender is Send but not Sync.
79    control_tx: Mutex<Sender<ControlMsg>>,
80    /// Color delay in microseconds (readable per-chunk without locking).
81    color_delay_micros: AtomicU64,
82}
83
84impl StreamControl {
85    fn new(control_tx: Sender<ControlMsg>, color_delay: Duration) -> Self {
86        Self {
87            inner: Arc::new(StreamControlInner {
88                armed: AtomicBool::new(false),
89                stop_requested: AtomicBool::new(false),
90                control_tx: Mutex::new(control_tx),
91                color_delay_micros: AtomicU64::new(color_delay.as_micros() as u64),
92            }),
93        }
94    }
95
96    /// Arm the output (allow laser to fire).
97    ///
98    /// When armed, content from the producer passes through unmodified
99    /// and the hardware shutter is opened (best-effort).
100    pub fn arm(&self) -> Result<()> {
101        self.inner.armed.store(true, Ordering::SeqCst);
102        // Send message to stream for immediate shutter control
103        if let Ok(tx) = self.inner.control_tx.lock() {
104            let _ = tx.send(ControlMsg::Arm);
105        }
106        Ok(())
107    }
108
109    /// Disarm the output (force laser off). Designed for E-stop use.
110    ///
111    /// Immediately sets an atomic flag (works even if stream loop is blocked),
112    /// then sends a message to close the hardware shutter. All future points
113    /// are blanked in software. The stream stays alive outputting blanks -
114    /// use `stop()` to terminate entirely.
115    ///
116    /// **Latency**: Points already in the device buffer will still play out.
117    /// `target_buffer` bounds this latency.
118    ///
119    /// **Hardware shutter**: Best-effort. LaserCube and Helios have actual hardware
120    /// control; Ether Dream, IDN are no-ops (safety relies on software blanking).
121    pub fn disarm(&self) -> Result<()> {
122        self.inner.armed.store(false, Ordering::SeqCst);
123        // Send message to stream for immediate shutter control
124        if let Ok(tx) = self.inner.control_tx.lock() {
125            let _ = tx.send(ControlMsg::Disarm);
126        }
127        Ok(())
128    }
129
130    /// Check if the output is armed.
131    pub fn is_armed(&self) -> bool {
132        self.inner.armed.load(Ordering::SeqCst)
133    }
134
135    /// Set the color delay for scanner sync compensation.
136    ///
137    /// Takes effect within one chunk period. The delay is quantized to
138    /// whole points: `ceil(delay * pps)`.
139    pub fn set_color_delay(&self, delay: Duration) {
140        self.inner
141            .color_delay_micros
142            .store(delay.as_micros() as u64, Ordering::SeqCst);
143    }
144
145    /// Get the current color delay.
146    pub fn color_delay(&self) -> Duration {
147        Duration::from_micros(self.inner.color_delay_micros.load(Ordering::SeqCst))
148    }
149
150    /// Request the stream to stop.
151    ///
152    /// Signals termination; `run()` returns `RunExit::Stopped`.
153    /// For clean shutdown with shutter close, prefer `Stream::stop()`.
154    pub fn stop(&self) -> Result<()> {
155        self.inner.stop_requested.store(true, Ordering::SeqCst);
156        // Send message to stream for immediate stop
157        if let Ok(tx) = self.inner.control_tx.lock() {
158            let _ = tx.send(ControlMsg::Stop);
159        }
160        Ok(())
161    }
162
163    /// Check if a stop has been requested.
164    pub fn is_stop_requested(&self) -> bool {
165        self.inner.stop_requested.load(Ordering::SeqCst)
166    }
167}
168
169// =============================================================================
170// Stream State
171// =============================================================================
172
173struct StreamState {
174    /// Current position in stream time (points since start).
175    current_instant: StreamInstant,
176    /// Points scheduled ahead of current_instant.
177    scheduled_ahead: u64,
178
179    // Pre-allocated buffers (no per-chunk allocation in hot path)
180    /// Buffer for callback to fill points into.
181    chunk_buffer: Vec<LaserPoint>,
182    /// Last chunk for RepeatLast underrun policy.
183    last_chunk: Vec<LaserPoint>,
184    /// Number of valid points in last_chunk.
185    last_chunk_len: usize,
186
187    /// FIFO for color delay (r, g, b, intensity per point).
188    color_delay_line: VecDeque<(u16, u16, u16, u16)>,
189
190    /// Points remaining in the startup blank window (decremented as points are written).
191    startup_blank_remaining: usize,
192    /// Total startup blank points (computed once from config).
193    startup_blank_points: usize,
194
195    // Cached config-derived values (avoid recomputing per-iteration float math)
196    /// target_buffer as seconds (cached from config).
197    target_buffer_secs: f64,
198    /// min_buffer as seconds (cached from config).
199    min_buffer_secs: f64,
200    /// target_buffer as points (cached from config + pps).
201    target_buffer_points: u64,
202
203    /// Statistics.
204    stats: StreamStats,
205    /// Track the last armed state to detect transitions.
206    last_armed: bool,
207    /// Whether the hardware shutter is currently open.
208    shutter_open: bool,
209}
210
211impl StreamState {
212    /// Create new stream state with pre-allocated buffers.
213    ///
214    /// Buffers are sized to `max_points_per_chunk` from DAC capabilities,
215    /// ensuring we can handle any catch-up scenario without reallocation.
216    fn new(
217        max_points_per_chunk: usize,
218        startup_blank_points: usize,
219        config: &StreamConfig,
220    ) -> Self {
221        let pps = config.pps as f64;
222        let target_buffer_secs = config.target_buffer.as_secs_f64();
223        let min_buffer_secs = config.min_buffer.as_secs_f64();
224        Self {
225            current_instant: StreamInstant::new(0),
226            scheduled_ahead: 0,
227            chunk_buffer: vec![LaserPoint::default(); max_points_per_chunk],
228            last_chunk: vec![LaserPoint::default(); max_points_per_chunk],
229            last_chunk_len: 0,
230            color_delay_line: VecDeque::new(),
231            startup_blank_remaining: 0,
232            startup_blank_points,
233            target_buffer_secs,
234            min_buffer_secs,
235            target_buffer_points: (target_buffer_secs * pps) as u64,
236            stats: StreamStats::default(),
237            last_armed: false,
238            shutter_open: false,
239        }
240    }
241}
242
243// =============================================================================
244// Stream
245// =============================================================================
246
247/// A streaming session for outputting points to a DAC.
248///
249/// Use [`run()`](Self::run) to stream with buffer-driven timing.
250/// The callback is invoked when the buffer needs filling, providing automatic
251/// backpressure handling and zero allocations in the hot path.
252///
253/// The stream owns pacing, backpressure, and the timebase (`StreamInstant`).
254pub struct Stream {
255    /// Device info for this stream.
256    info: DacInfo,
257    /// The backend.
258    backend: Option<Box<dyn StreamBackend>>,
259    /// Stream configuration.
260    config: StreamConfig,
261    /// Thread-safe control handle.
262    control: StreamControl,
263    /// Receiver for control messages from StreamControl.
264    control_rx: Receiver<ControlMsg>,
265    /// Stream state.
266    state: StreamState,
267}
268
269impl Stream {
270    /// Convert a duration in microseconds to a point count at the given PPS, rounding up.
271    fn duration_micros_to_points(micros: u64, pps: u32) -> usize {
272        if micros == 0 {
273            0
274        } else {
275            (micros as f64 * pps as f64 / 1_000_000.0).ceil() as usize
276        }
277    }
278
279    /// Create a new stream with a backend.
280    pub(crate) fn with_backend(
281        info: DacInfo,
282        backend: Box<dyn StreamBackend>,
283        config: StreamConfig,
284    ) -> Self {
285        let (control_tx, control_rx) = mpsc::channel();
286        let max_points = info.caps.max_points_per_chunk;
287        let startup_blank_points =
288            Self::duration_micros_to_points(config.startup_blank.as_micros() as u64, config.pps);
289        Self {
290            info,
291            backend: Some(backend),
292            config: config.clone(),
293            control: StreamControl::new(control_tx, config.color_delay),
294            control_rx,
295            state: StreamState::new(max_points, startup_blank_points, &config),
296        }
297    }
298
299    /// Returns the device info.
300    pub fn info(&self) -> &DacInfo {
301        &self.info
302    }
303
304    /// Returns the stream configuration.
305    pub fn config(&self) -> &StreamConfig {
306        &self.config
307    }
308
309    /// Returns a thread-safe control handle.
310    pub fn control(&self) -> StreamControl {
311        self.control.clone()
312    }
313
314    /// Returns the current stream status.
315    pub fn status(&self) -> Result<StreamStatus> {
316        let device_queued_points = self.backend.as_ref().and_then(|b| b.queued_points());
317
318        Ok(StreamStatus {
319            connected: self
320                .backend
321                .as_ref()
322                .map(|b| b.is_connected())
323                .unwrap_or(false),
324            scheduled_ahead_points: self.state.scheduled_ahead,
325            device_queued_points,
326            stats: Some(self.state.stats.clone()),
327        })
328    }
329
330    /// Handle hardware shutter transitions based on arm state changes.
331    fn handle_shutter_transition(&mut self, is_armed: bool) {
332        let was_armed = self.state.last_armed;
333        self.state.last_armed = is_armed;
334
335        if was_armed && !is_armed {
336            // Disarmed: close the shutter for safety (best-effort)
337            self.state.color_delay_line.clear();
338            if self.state.shutter_open {
339                if let Some(backend) = &mut self.backend {
340                    let _ = backend.set_shutter(false); // Best-effort, ignore errors
341                }
342                self.state.shutter_open = false;
343            }
344        } else if !was_armed && is_armed {
345            // Armed: open the shutter (best-effort)
346            // Pre-fill color delay line with blanked colors so early points
347            // come out dark while galvos settle.
348            let delay_micros = self.control.inner.color_delay_micros.load(Ordering::SeqCst);
349            let delay_points = Self::duration_micros_to_points(delay_micros, self.config.pps);
350            self.state.color_delay_line.clear();
351            for _ in 0..delay_points {
352                self.state.color_delay_line.push_back((0, 0, 0, 0));
353            }
354
355            self.state.startup_blank_remaining = self.state.startup_blank_points;
356
357            if !self.state.shutter_open {
358                if let Some(backend) = &mut self.backend {
359                    let _ = backend.set_shutter(true); // Best-effort, ignore errors
360                }
361                self.state.shutter_open = true;
362            }
363        }
364    }
365
366    /// Stop the stream and terminate output.
367    ///
368    /// Disarms the output (software blanking + hardware shutter) before stopping
369    /// the backend to prevent the "freeze on last bright point" hazard.
370    /// Use `disarm()` instead if you want to keep the stream alive but safe.
371    pub fn stop(&mut self) -> Result<()> {
372        // Disarm: sets armed flag for software blanking
373        self.control.disarm()?;
374
375        self.control.stop()?;
376
377        // Directly close shutter and stop backend (defense-in-depth)
378        if let Some(backend) = &mut self.backend {
379            let _ = backend.set_shutter(false);
380            backend.stop()?;
381        }
382
383        Ok(())
384    }
385
386    /// Consume the stream and recover the device for reuse.
387    ///
388    /// This method disarms and stops the stream (software blanking + hardware shutter),
389    /// then returns the underlying `Dac` along with the final `StreamStats`.
390    /// The device can then be used to start a new stream with different configuration.
391    ///
392    /// # Example
393    ///
394    /// ```ignore
395    /// use laser_dac::StreamConfig;
396    ///
397    /// // device: Dac, config: StreamConfig (from prior setup)
398    /// let (stream, info) = device.start_stream(config)?;
399    /// // ... stream for a while ...
400    /// let (device, stats) = stream.into_dac();
401    /// println!("Streamed {} points", stats.points_written);
402    ///
403    /// // Restart with different config
404    /// let new_config = StreamConfig::new(60_000);
405    /// let (stream2, _) = device.start_stream(new_config)?;
406    /// ```
407    pub fn into_dac(mut self) -> (Dac, StreamStats) {
408        // Disarm (software blanking) and close shutter before stopping
409        let _ = self.control.disarm();
410        let _ = self.control.stop();
411        if let Some(backend) = &mut self.backend {
412            let _ = backend.set_shutter(false);
413            let _ = backend.stop();
414        }
415
416        // Take the backend (leaves None, so Drop won't try to stop again)
417        let backend = self.backend.take();
418        let stats = self.state.stats.clone();
419
420        let dac = Dac {
421            info: self.info.clone(),
422            backend,
423        };
424
425        (dac, stats)
426    }
427
428    /// Run the stream with the zero-allocation callback API.
429    ///
430    /// This method uses **pure buffer-driven timing**:
431    /// - Callback is invoked when `buffered < target_buffer`
432    /// - Points requested varies based on buffer headroom (`min_points`, `target_points`)
433    /// - Callback fills a library-owned buffer (zero allocations in hot path)
434    ///
435    /// # Callback Contract
436    ///
437    /// The callback receives a `ChunkRequest` describing buffer state and requirements,
438    /// and a mutable slice to fill with points. It returns:
439    ///
440    /// - `ChunkResult::Filled(n)`: Wrote `n` points to the buffer
441    /// - `ChunkResult::Starved`: No data available (underrun policy applies)
442    /// - `ChunkResult::End`: Stream should end gracefully
443    ///
444    /// # Exit Conditions
445    ///
446    /// - **`RunExit::Stopped`**: Stop requested via `StreamControl::stop()`.
447    /// - **`RunExit::ProducerEnded`**: Callback returned `ChunkResult::End`.
448    /// - **`RunExit::Disconnected`**: Device disconnected.
449    ///
450    /// # Example
451    ///
452    /// ```ignore
453    /// use laser_dac::{ChunkRequest, ChunkResult, LaserPoint};
454    ///
455    /// stream.run(
456    ///     |req: &ChunkRequest, buffer: &mut [LaserPoint]| {
457    ///         let n = req.target_points;
458    ///         for i in 0..n {
459    ///             let t = req.start.as_secs_f64(req.pps) + (i as f64 / req.pps as f64);
460    ///             let angle = (t * std::f64::consts::TAU) as f32;
461    ///             buffer[i] = LaserPoint::new(angle.cos(), angle.sin(), 65535, 0, 0, 65535);
462    ///         }
463    ///         ChunkResult::Filled(n)
464    ///     },
465    ///     |err| eprintln!("Error: {}", err),
466    /// )?;
467    /// ```
468    pub fn run<F, E>(mut self, mut producer: F, mut on_error: E) -> Result<RunExit>
469    where
470        F: FnMut(&ChunkRequest, &mut [LaserPoint]) -> ChunkResult + Send + 'static,
471        E: FnMut(Error) + Send + 'static,
472    {
473        use std::time::Instant;
474
475        let pps = self.config.pps as f64;
476        let max_points = self.info.caps.max_points_per_chunk;
477
478        // Track time between iterations to decrement scheduled_ahead for backends
479        // that don't report queued_points(). This prevents stalls when buffered
480        // equals target_points exactly.
481        let mut last_iteration = Instant::now();
482
483        loop {
484            // 1. Check for stop request
485            if self.control.is_stop_requested() {
486                return Ok(RunExit::Stopped);
487            }
488
489            // Decrement scheduled_ahead based on elapsed time since last iteration.
490            // This is critical for backends without queued_points() - without this,
491            // scheduled_ahead would never decrease and target_points would stay 0.
492            let now = Instant::now();
493            let elapsed = now.duration_since(last_iteration);
494            let points_consumed = (elapsed.as_secs_f64() * pps) as u64;
495            self.state.scheduled_ahead = self.state.scheduled_ahead.saturating_sub(points_consumed);
496            last_iteration = now;
497
498            // 2. Estimate buffer state
499            let buffered = self.estimate_buffer_points();
500            let target_points = self.state.target_buffer_points;
501
502            // 3. If buffer is above target, sleep until it drains to target
503            // Note: use > not >= so we call producer when exactly at target
504            if buffered > target_points {
505                let excess_points = buffered - target_points;
506                let sleep_time = Duration::from_secs_f64(excess_points as f64 / pps);
507                if self.sleep_with_control_check(sleep_time)? {
508                    return Ok(RunExit::Stopped);
509                }
510                continue; // Re-check buffer after sleep
511            }
512
513            // 4. Check backend connection
514            if let Some(backend) = &self.backend {
515                if !backend.is_connected() {
516                    log::warn!("backend.is_connected() = false, exiting with Disconnected");
517                    on_error(Error::disconnected("backend disconnected"));
518                    return Ok(RunExit::Disconnected);
519                }
520            } else {
521                log::warn!("no backend, exiting with Disconnected");
522                on_error(Error::disconnected("no backend"));
523                return Ok(RunExit::Disconnected);
524            }
525
526            // 5. Process control messages before calling producer
527            if self.process_control_messages() {
528                return Ok(RunExit::Stopped);
529            }
530
531            // 6. Build fill request with buffer state (reuse cached estimate)
532            let req = self.build_fill_request(max_points, buffered);
533
534            // 7. Call producer with pre-allocated buffer
535            let buffer = &mut self.state.chunk_buffer[..max_points];
536            let result = producer(&req, buffer);
537
538            // 8. Handle result
539            match result {
540                ChunkResult::Filled(n) => {
541                    // Validate n doesn't exceed buffer
542                    let n = n.min(max_points);
543
544                    // Treat Filled(0) with target_points > 0 as Starved
545                    if n == 0 && req.target_points > 0 {
546                        self.handle_underrun(&req)?;
547                        continue;
548                    }
549
550                    // Write to backend if we have points
551                    if n > 0 {
552                        self.write_fill_points(n, &mut on_error)?;
553                    }
554                }
555                ChunkResult::Starved => {
556                    self.handle_underrun(&req)?;
557                }
558                ChunkResult::End => {
559                    // Graceful shutdown: let queued points drain, then blank/park
560                    self.drain_and_blank();
561                    return Ok(RunExit::ProducerEnded);
562                }
563            }
564        }
565    }
566
567    /// Sleep for the given duration while checking for control messages.
568    ///
569    /// Returns `true` if stop was requested, `false` otherwise.
570    fn sleep_with_control_check(&mut self, duration: Duration) -> Result<bool> {
571        const SLEEP_SLICE: Duration = Duration::from_millis(2);
572        let mut remaining = duration;
573
574        while remaining > Duration::ZERO {
575            let slice = remaining.min(SLEEP_SLICE);
576            std::thread::sleep(slice);
577            remaining = remaining.saturating_sub(slice);
578
579            // Process control messages for immediate response
580            if self.process_control_messages() {
581                return Ok(true);
582            }
583        }
584
585        Ok(false)
586    }
587
588    /// Write points from chunk_buffer to the backend.
589    ///
590    /// Called by `run` after the producer fills the buffer.
591    fn write_fill_points<E>(&mut self, n: usize, on_error: &mut E) -> Result<()>
592    where
593        E: FnMut(Error),
594    {
595        let is_armed = self.control.is_armed();
596        let pps = self.config.pps;
597
598        // Handle shutter transitions
599        self.handle_shutter_transition(is_armed);
600
601        // Blank points in-place when disarmed
602        if !is_armed {
603            for p in &mut self.state.chunk_buffer[..n] {
604                *p = LaserPoint::blanked(p.x, p.y);
605            }
606        }
607
608        // Apply startup blanking: force first N points after arming to blank
609        if is_armed && self.state.startup_blank_remaining > 0 {
610            let blank_count = n.min(self.state.startup_blank_remaining);
611            for p in &mut self.state.chunk_buffer[..blank_count] {
612                p.r = 0;
613                p.g = 0;
614                p.b = 0;
615                p.intensity = 0;
616            }
617            self.state.startup_blank_remaining -= blank_count;
618        }
619
620        // Apply color delay: read current setting, resize deque, shift colors
621        let delay_micros = self.control.inner.color_delay_micros.load(Ordering::SeqCst);
622        let color_delay_points = Self::duration_micros_to_points(delay_micros, self.config.pps);
623
624        if color_delay_points > 0 {
625            // Resize deque to match current delay (handles dynamic changes)
626            self.state
627                .color_delay_line
628                .resize(color_delay_points, (0, 0, 0, 0));
629            for p in &mut self.state.chunk_buffer[..n] {
630                self.state
631                    .color_delay_line
632                    .push_back((p.r, p.g, p.b, p.intensity));
633                let (r, g, b, i) = self.state.color_delay_line.pop_front().unwrap();
634                p.r = r;
635                p.g = g;
636                p.b = b;
637                p.intensity = i;
638            }
639        } else if !self.state.color_delay_line.is_empty() {
640            // Delay was disabled at runtime — flush the line
641            self.state.color_delay_line.clear();
642        }
643
644        // Try to write with backpressure handling
645        loop {
646            // Check backend exists
647            let backend = match self.backend.as_mut() {
648                Some(b) => b,
649                None => return Err(Error::disconnected("no backend")),
650            };
651
652            match backend.try_write_chunk(pps, &self.state.chunk_buffer[..n]) {
653                Ok(WriteOutcome::Written) => {
654                    self.record_write(n, is_armed);
655                    return Ok(());
656                }
657                Ok(WriteOutcome::WouldBlock) => {
658                    // Backend buffer full - yield and retry
659                    // Borrow of backend is dropped here, so we can call process_control_messages
660                }
661                Err(e) if e.is_stopped() => {
662                    return Err(Error::Stopped);
663                }
664                Err(e) if e.is_disconnected() => {
665                    log::warn!("write got Disconnected error, exiting stream: {e}");
666                    on_error(Error::disconnected("backend disconnected"));
667                    return Err(e);
668                }
669                Err(e) => {
670                    log::warn!("write error, disconnecting backend: {e}");
671                    let _ = backend.disconnect();
672                    on_error(e);
673                    return Ok(());
674                }
675            }
676
677            // Handle WouldBlock: yield and process control messages
678            std::thread::yield_now();
679            if self.process_control_messages() {
680                return Err(Error::Stopped);
681            }
682            std::thread::sleep(Duration::from_micros(100));
683        }
684    }
685
686    /// Handle underrun for the fill API by applying the underrun policy.
687    fn handle_underrun(&mut self, req: &ChunkRequest) -> Result<()> {
688        self.state.stats.underrun_count += 1;
689
690        let is_armed = self.control.is_armed();
691        self.handle_shutter_transition(is_armed);
692
693        // Calculate how many points we need (use target_points as the fill amount)
694        let n_points = req.target_points.max(1);
695
696        // Fill chunk_buffer with underrun content
697        let fill_start = if !is_armed {
698            // When disarmed, always output blanked points
699            for i in 0..n_points {
700                self.state.chunk_buffer[i] = LaserPoint::blanked(0.0, 0.0);
701            }
702            n_points
703        } else {
704            match &self.config.underrun {
705                UnderrunPolicy::RepeatLast => {
706                    if self.state.last_chunk_len > 0 {
707                        // Repeat last chunk cyclically
708                        for i in 0..n_points {
709                            self.state.chunk_buffer[i] =
710                                self.state.last_chunk[i % self.state.last_chunk_len];
711                        }
712                        n_points
713                    } else {
714                        // No last chunk, fall back to blank
715                        for i in 0..n_points {
716                            self.state.chunk_buffer[i] = LaserPoint::blanked(0.0, 0.0);
717                        }
718                        n_points
719                    }
720                }
721                UnderrunPolicy::Blank => {
722                    for i in 0..n_points {
723                        self.state.chunk_buffer[i] = LaserPoint::blanked(0.0, 0.0);
724                    }
725                    n_points
726                }
727                UnderrunPolicy::Park { x, y } => {
728                    for i in 0..n_points {
729                        self.state.chunk_buffer[i] = LaserPoint::blanked(*x, *y);
730                    }
731                    n_points
732                }
733                UnderrunPolicy::Stop => {
734                    self.control.stop()?;
735                    return Err(Error::Stopped);
736                }
737            }
738        };
739
740        // Write the fill points
741        if let Some(backend) = &mut self.backend {
742            match backend.try_write_chunk(self.config.pps, &self.state.chunk_buffer[..fill_start]) {
743                Ok(WriteOutcome::Written) => {
744                    self.record_write(fill_start, is_armed);
745                }
746                Ok(WriteOutcome::WouldBlock) => {
747                    // Backend is full - expected during underrun
748                }
749                Err(_) => {
750                    // Backend error during underrun handling - ignore
751                }
752            }
753        }
754
755        Ok(())
756    }
757
758    /// Record a successful write: update last_chunk, timebase, and stats.
759    ///
760    /// Handles `UsbFrameSwap` output model (Helios-style double-buffering) by
761    /// **replacing** `scheduled_ahead` instead of accumulating, since the device
762    /// holds at most one frame at a time.
763    fn record_write(&mut self, n: usize, is_armed: bool) {
764        if is_armed {
765            debug_assert!(
766                n <= self.state.last_chunk.len(),
767                "n ({}) exceeds last_chunk capacity ({})",
768                n,
769                self.state.last_chunk.len()
770            );
771            self.state.last_chunk[..n].copy_from_slice(&self.state.chunk_buffer[..n]);
772            self.state.last_chunk_len = n;
773        }
774        self.state.current_instant += n as u64;
775        if self.info.caps.output_model == OutputModel::UsbFrameSwap {
776            // Double-buffered devices (e.g. Helios) hold at most one frame.
777            // Replace rather than accumulate to reflect the actual queue depth.
778            self.state.scheduled_ahead = n as u64;
779        } else {
780            self.state.scheduled_ahead += n as u64;
781        }
782        self.state.stats.chunks_written += 1;
783        self.state.stats.points_written += n as u64;
784    }
785
786    // =========================================================================
787    // Internal helpers
788    // =========================================================================
789
790    /// Process any pending control messages from StreamControl.
791    ///
792    /// This method drains the control message queue and takes immediate action:
793    /// - `Arm`: Opens the shutter (best-effort)
794    /// - `Disarm`: Closes the shutter immediately
795    /// - `Stop`: Returns `true` to signal the caller to stop
796    ///
797    /// Returns `true` if stop was requested, `false` otherwise.
798    fn process_control_messages(&mut self) -> bool {
799        loop {
800            match self.control_rx.try_recv() {
801                Ok(ControlMsg::Arm) => {
802                    // Open shutter (best-effort) if not already open
803                    if !self.state.shutter_open {
804                        if let Some(backend) = &mut self.backend {
805                            let _ = backend.set_shutter(true);
806                        }
807                        self.state.shutter_open = true;
808                    }
809                }
810                Ok(ControlMsg::Disarm) => {
811                    // Close shutter immediately for safety
812                    if self.state.shutter_open {
813                        if let Some(backend) = &mut self.backend {
814                            let _ = backend.set_shutter(false);
815                        }
816                        self.state.shutter_open = false;
817                    }
818                }
819                Ok(ControlMsg::Stop) => {
820                    return true;
821                }
822                Err(TryRecvError::Empty) => break,
823                Err(TryRecvError::Disconnected) => break,
824            }
825        }
826        false
827    }
828
829    /// Estimate the current buffer level in points using conservative estimation.
830    ///
831    /// Uses `min(hardware, software)` to prevent underruns:
832    /// - If hardware reports fewer points than software estimates, hardware is truth
833    /// - Using `max` would overestimate buffer → underrequest points → underrun
834    /// - Conservative (lower) estimate is safer: we might overfill slightly, but won't underrun
835    ///
836    /// # Returns
837    ///
838    /// The estimated number of points currently buffered (points sent but not yet played).
839    fn estimate_buffer_points(&self) -> u64 {
840        let software = self.state.scheduled_ahead;
841
842        // When hardware reports queue depth, use MINIMUM of hardware and software.
843        if let Some(device_queue) = self.backend.as_ref().and_then(|b| b.queued_points()) {
844            return device_queue.min(software);
845        }
846
847        software
848    }
849
850    /// Build a ChunkRequest with calculated buffer state and point requirements.
851    ///
852    /// Calculates:
853    /// - `buffered_points`: Conservative estimate of points in buffer
854    /// - `buffered`: Buffer level as Duration
855    /// - `start`: Estimated playback time (playhead + buffered)
856    /// - `min_points`: Minimum points to avoid underrun (ceiling rounded)
857    /// - `target_points`: Ideal points to reach target buffer (clamped to max)
858    ///
859    /// # Arguments
860    ///
861    /// * `max_points` - Maximum points the callback can write (buffer length)
862    /// * `buffered_points` - Pre-computed buffer estimate from `estimate_buffer_points()`
863    fn build_fill_request(&self, max_points: usize, buffered_points: u64) -> ChunkRequest {
864        let pps = self.config.pps;
865        let pps_f64 = pps as f64;
866
867        // Compute buffered duration directly (avoid Duration intermediary)
868        let buffered_secs = buffered_points as f64 / pps_f64;
869        let buffered = Duration::from_secs_f64(buffered_secs);
870
871        // Calculate start time for this chunk (when these points will play).
872        let start = self.state.current_instant;
873
874        // Use cached config values (avoid per-iteration as_secs_f64() calls)
875        let target_buffer_secs = self.state.target_buffer_secs;
876        let min_buffer_secs = self.state.min_buffer_secs;
877
878        // target_points: ceil((target_buffer - buffered) * pps), clamped to max_points
879        let deficit_target = (target_buffer_secs - buffered_secs).max(0.0);
880        let target_points = (deficit_target * pps_f64).ceil() as usize;
881        let target_points = target_points.min(max_points);
882
883        // min_points: ceil((min_buffer - buffered) * pps) - minimum to avoid underrun
884        let deficit_min = (min_buffer_secs - buffered_secs).max(0.0);
885        let min_points = (deficit_min * pps_f64).ceil() as usize;
886        let min_points = min_points.min(max_points);
887
888        // Use buffered_points as the device queue estimate (already computed by
889        // estimate_buffer_points, which includes hardware feedback when available).
890        // This avoids a redundant second call to queued_points().
891        let device_queued_points = self.backend.as_ref().and_then(|b| b.queued_points());
892
893        ChunkRequest {
894            start,
895            pps,
896            min_points,
897            target_points,
898            buffered_points,
899            buffered,
900            device_queued_points,
901        }
902    }
903
904    /// Wait for queued points to drain, then blank/park the laser.
905    ///
906    /// Called on graceful shutdown (`ChunkResult::End`) to let buffered content
907    /// play out before stopping. Uses `drain_timeout` from config to cap the wait.
908    ///
909    /// - If `queued_points()` is available: polls until queue empties or timeout
910    /// - If `queued_points()` is `None`: sleeps for estimated buffer duration, capped by timeout
911    ///
912    /// After drain (or timeout), closes shutter and outputs blank points.
913    fn drain_and_blank(&mut self) {
914        use std::time::Instant;
915
916        let timeout = self.config.drain_timeout;
917        if timeout.is_zero() {
918            // Skip drain entirely if timeout is zero
919            self.blank_and_close_shutter();
920            return;
921        }
922
923        let deadline = Instant::now() + timeout;
924        let pps = self.config.pps;
925
926        // Check if backend supports queue depth reporting
927        let has_queue_depth = self
928            .backend
929            .as_ref()
930            .and_then(|b| b.queued_points())
931            .is_some();
932
933        if has_queue_depth {
934            // Poll until queue empties or timeout
935            const POLL_INTERVAL: Duration = Duration::from_millis(5);
936            while Instant::now() < deadline {
937                if let Some(queued) = self.backend.as_ref().and_then(|b| b.queued_points()) {
938                    if queued == 0 {
939                        break;
940                    }
941                } else {
942                    // Backend disconnected or stopped reporting
943                    break;
944                }
945
946                // Process control messages during drain (allow stop to interrupt)
947                if self.process_control_messages() {
948                    break;
949                }
950
951                std::thread::sleep(POLL_INTERVAL);
952            }
953        } else {
954            // No queue depth available: sleep for estimated buffer time, capped by timeout
955            let estimated_drain =
956                Duration::from_secs_f64(self.state.scheduled_ahead as f64 / pps as f64);
957            let wait_time = estimated_drain.min(timeout);
958
959            // Sleep in slices to allow control message processing
960            const SLEEP_SLICE: Duration = Duration::from_millis(10);
961            let mut remaining = wait_time;
962            while remaining > Duration::ZERO && Instant::now() < deadline {
963                let slice = remaining.min(SLEEP_SLICE);
964                std::thread::sleep(slice);
965                remaining = remaining.saturating_sub(slice);
966
967                if self.process_control_messages() {
968                    break;
969                }
970            }
971        }
972
973        self.blank_and_close_shutter();
974    }
975
976    /// Output blank points and close the hardware shutter.
977    ///
978    /// Best-effort safety shutdown - errors are ignored since we're already
979    /// in shutdown path.
980    fn blank_and_close_shutter(&mut self) {
981        // Close shutter (best-effort)
982        if let Some(backend) = &mut self.backend {
983            let _ = backend.set_shutter(false);
984        }
985        self.state.shutter_open = false;
986
987        // Output a small blank chunk to ensure laser is off
988        // (some DACs may hold the last point otherwise)
989        if let Some(backend) = &mut self.backend {
990            let blank_point = LaserPoint::blanked(0.0, 0.0);
991            let blank_chunk = [blank_point; 16];
992            let _ = backend.try_write_chunk(self.config.pps, &blank_chunk);
993        }
994    }
995}
996
997impl Drop for Stream {
998    fn drop(&mut self) {
999        let _ = self.stop();
1000    }
1001}
1002
1003// =============================================================================
1004// Device
1005// =============================================================================
1006
1007/// A connected device that can start streaming sessions.
1008///
1009/// When starting a stream, the device is consumed and the backend ownership
1010/// transfers to the stream. The `DacInfo` is returned alongside the stream
1011/// so metadata remains accessible.
1012///
1013/// # Example
1014///
1015/// ```ignore
1016/// use laser_dac::{open_device, StreamConfig};
1017///
1018/// let device = open_device("my-device")?;
1019/// let config = StreamConfig::new(30_000);
1020/// let (stream, info) = device.start_stream(config)?;
1021/// println!("Streaming to: {}", info.name);
1022/// ```
1023pub struct Dac {
1024    info: DacInfo,
1025    backend: Option<Box<dyn StreamBackend>>,
1026}
1027
1028impl Dac {
1029    /// Create a new device from a backend.
1030    pub fn new(info: DacInfo, backend: Box<dyn StreamBackend>) -> Self {
1031        Self {
1032            info,
1033            backend: Some(backend),
1034        }
1035    }
1036
1037    /// Returns the device info.
1038    pub fn info(&self) -> &DacInfo {
1039        &self.info
1040    }
1041
1042    /// Returns the device ID.
1043    pub fn id(&self) -> &str {
1044        &self.info.id
1045    }
1046
1047    /// Returns the device name.
1048    pub fn name(&self) -> &str {
1049        &self.info.name
1050    }
1051
1052    /// Returns the DAC type.
1053    pub fn kind(&self) -> &DacType {
1054        &self.info.kind
1055    }
1056
1057    /// Returns the device capabilities.
1058    pub fn caps(&self) -> &DacCapabilities {
1059        &self.info.caps
1060    }
1061
1062    /// Returns whether the device has a backend (not yet used for a stream).
1063    pub fn has_backend(&self) -> bool {
1064        self.backend.is_some()
1065    }
1066
1067    /// Returns whether the device is connected.
1068    pub fn is_connected(&self) -> bool {
1069        self.backend
1070            .as_ref()
1071            .map(|b| b.is_connected())
1072            .unwrap_or(false)
1073    }
1074
1075    /// Starts a streaming session, consuming the device.
1076    ///
1077    /// # Ownership
1078    ///
1079    /// This method consumes the `Dac` because:
1080    /// - Each device can only have one active stream at a time.
1081    /// - The backend is moved into the `Stream` to ensure exclusive access.
1082    /// - This prevents accidental reuse of a device that's already streaming.
1083    ///
1084    /// The method returns both the `Stream` and a copy of `DacInfo`, so you
1085    /// retain access to device metadata (id, name, capabilities) after starting.
1086    ///
1087    /// # Connection
1088    ///
1089    /// If the device is not already connected, this method will establish the
1090    /// connection before creating the stream. Connection failures are returned
1091    /// as errors.
1092    ///
1093    /// # Errors
1094    ///
1095    /// Returns an error if:
1096    /// - The device backend has already been used for a stream.
1097    /// - The configuration is invalid (PPS out of range, invalid chunk size, etc.).
1098    /// - The backend fails to connect.
1099    pub fn start_stream(mut self, cfg: StreamConfig) -> Result<(Stream, DacInfo)> {
1100        let mut backend = self.backend.take().ok_or_else(|| {
1101            Error::invalid_config("device backend has already been used for a stream")
1102        })?;
1103
1104        Self::validate_config(&self.info.caps, &cfg)?;
1105
1106        // Connect the backend if not already connected
1107        if !backend.is_connected() {
1108            backend.connect()?;
1109        }
1110
1111        let stream = Stream::with_backend(self.info.clone(), backend, cfg);
1112
1113        Ok((stream, self.info))
1114    }
1115
1116    fn validate_config(caps: &DacCapabilities, cfg: &StreamConfig) -> Result<()> {
1117        if cfg.pps < caps.pps_min || cfg.pps > caps.pps_max {
1118            return Err(Error::invalid_config(format!(
1119                "PPS {} is outside device range [{}, {}]",
1120                cfg.pps, caps.pps_min, caps.pps_max
1121            )));
1122        }
1123
1124        Ok(())
1125    }
1126}
1127
1128#[cfg(test)]
1129mod tests {
1130    use super::*;
1131    use crate::backend::{StreamBackend, WriteOutcome};
1132    use std::sync::atomic::{AtomicU64, AtomicUsize, Ordering};
1133    use std::sync::{Arc, Mutex};
1134
1135    /// A test backend for unit testing stream behavior.
1136    struct TestBackend {
1137        caps: DacCapabilities,
1138        connected: bool,
1139        /// Count of write attempts
1140        write_count: Arc<AtomicUsize>,
1141        /// Number of WouldBlock responses to return before accepting writes
1142        would_block_count: Arc<AtomicUsize>,
1143        /// Simulated queue depth
1144        queued: Arc<AtomicU64>,
1145        /// Track shutter state for testing
1146        shutter_open: Arc<AtomicBool>,
1147    }
1148
1149    impl TestBackend {
1150        fn new() -> Self {
1151            Self {
1152                caps: DacCapabilities {
1153                    pps_min: 1000,
1154                    pps_max: 100000,
1155                    max_points_per_chunk: 1000,
1156                    output_model: crate::types::OutputModel::NetworkFifo,
1157                },
1158                connected: false,
1159                write_count: Arc::new(AtomicUsize::new(0)),
1160                would_block_count: Arc::new(AtomicUsize::new(0)),
1161                queued: Arc::new(AtomicU64::new(0)),
1162                shutter_open: Arc::new(AtomicBool::new(false)),
1163            }
1164        }
1165
1166        fn with_would_block_count(mut self, count: usize) -> Self {
1167            self.would_block_count = Arc::new(AtomicUsize::new(count));
1168            self
1169        }
1170
1171        fn with_output_model(mut self, model: OutputModel) -> Self {
1172            self.caps.output_model = model;
1173            self
1174        }
1175
1176        /// Set the initial queue depth for testing buffer estimation.
1177        fn with_initial_queue(mut self, queue: u64) -> Self {
1178            self.queued = Arc::new(AtomicU64::new(queue));
1179            self
1180        }
1181    }
1182
1183    /// A test backend that does NOT report hardware queue depth (`queued_points()` returns `None`).
1184    ///
1185    /// Use this backend to test software-only buffer estimation, which is the fallback path
1186    /// for real backends like `HeliosBackend` that don't implement `queued_points()`.
1187    ///
1188    /// **When to use which test backend:**
1189    /// - `TestBackend`: Simulates DACs with hardware queue reporting (e.g., Ether Dream, IDN).
1190    ///   Use for testing buffer estimation with hardware feedback.
1191    /// - `NoQueueTestBackend`: Simulates DACs without queue reporting (e.g., Helios).
1192    ///   Use for testing the time-based `scheduled_ahead` decrement logic.
1193    struct NoQueueTestBackend {
1194        inner: TestBackend,
1195    }
1196
1197    impl NoQueueTestBackend {
1198        fn new() -> Self {
1199            Self {
1200                inner: TestBackend::new(),
1201            }
1202        }
1203
1204        fn with_output_model(mut self, model: OutputModel) -> Self {
1205            self.inner = self.inner.with_output_model(model);
1206            self
1207        }
1208    }
1209
1210    impl StreamBackend for NoQueueTestBackend {
1211        fn dac_type(&self) -> DacType {
1212            self.inner.dac_type()
1213        }
1214
1215        fn caps(&self) -> &DacCapabilities {
1216            self.inner.caps()
1217        }
1218
1219        fn connect(&mut self) -> Result<()> {
1220            self.inner.connect()
1221        }
1222
1223        fn disconnect(&mut self) -> Result<()> {
1224            self.inner.disconnect()
1225        }
1226
1227        fn is_connected(&self) -> bool {
1228            self.inner.is_connected()
1229        }
1230
1231        fn try_write_chunk(&mut self, pps: u32, points: &[LaserPoint]) -> Result<WriteOutcome> {
1232            self.inner.try_write_chunk(pps, points)
1233        }
1234
1235        fn stop(&mut self) -> Result<()> {
1236            self.inner.stop()
1237        }
1238
1239        fn set_shutter(&mut self, open: bool) -> Result<()> {
1240            self.inner.set_shutter(open)
1241        }
1242
1243        /// Returns None - simulates a DAC that cannot report queue depth
1244        fn queued_points(&self) -> Option<u64> {
1245            None
1246        }
1247    }
1248
1249    impl StreamBackend for TestBackend {
1250        fn dac_type(&self) -> DacType {
1251            DacType::Custom("Test".to_string())
1252        }
1253
1254        fn caps(&self) -> &DacCapabilities {
1255            &self.caps
1256        }
1257
1258        fn connect(&mut self) -> Result<()> {
1259            self.connected = true;
1260            Ok(())
1261        }
1262
1263        fn disconnect(&mut self) -> Result<()> {
1264            self.connected = false;
1265            Ok(())
1266        }
1267
1268        fn is_connected(&self) -> bool {
1269            self.connected
1270        }
1271
1272        fn try_write_chunk(&mut self, _pps: u32, points: &[LaserPoint]) -> Result<WriteOutcome> {
1273            self.write_count.fetch_add(1, Ordering::SeqCst);
1274
1275            // Return WouldBlock until count reaches 0
1276            let remaining = self.would_block_count.load(Ordering::SeqCst);
1277            if remaining > 0 {
1278                self.would_block_count.fetch_sub(1, Ordering::SeqCst);
1279                return Ok(WriteOutcome::WouldBlock);
1280            }
1281
1282            self.queued.fetch_add(points.len() as u64, Ordering::SeqCst);
1283            Ok(WriteOutcome::Written)
1284        }
1285
1286        fn stop(&mut self) -> Result<()> {
1287            Ok(())
1288        }
1289
1290        fn set_shutter(&mut self, open: bool) -> Result<()> {
1291            self.shutter_open.store(open, Ordering::SeqCst);
1292            Ok(())
1293        }
1294
1295        fn queued_points(&self) -> Option<u64> {
1296            Some(self.queued.load(Ordering::SeqCst))
1297        }
1298    }
1299
1300    #[test]
1301    fn test_stream_control_arm_disarm() {
1302        let (tx, _rx) = mpsc::channel();
1303        let control = StreamControl::new(tx, Duration::ZERO);
1304        assert!(!control.is_armed());
1305
1306        control.arm().unwrap();
1307        assert!(control.is_armed());
1308
1309        control.disarm().unwrap();
1310        assert!(!control.is_armed());
1311    }
1312
1313    #[test]
1314    fn test_stream_control_stop() {
1315        let (tx, _rx) = mpsc::channel();
1316        let control = StreamControl::new(tx, Duration::ZERO);
1317        assert!(!control.is_stop_requested());
1318
1319        control.stop().unwrap();
1320        assert!(control.is_stop_requested());
1321    }
1322
1323    #[test]
1324    fn test_stream_control_clone_shares_state() {
1325        let (tx, _rx) = mpsc::channel();
1326        let control1 = StreamControl::new(tx, Duration::ZERO);
1327        let control2 = control1.clone();
1328
1329        control1.arm().unwrap();
1330        assert!(control2.is_armed());
1331
1332        control2.stop().unwrap();
1333        assert!(control1.is_stop_requested());
1334    }
1335
1336    #[test]
1337    fn test_device_start_stream_connects_backend() {
1338        let backend = TestBackend::new();
1339        let info = DacInfo {
1340            id: "test".to_string(),
1341            name: "Test Device".to_string(),
1342            kind: DacType::Custom("Test".to_string()),
1343            caps: backend.caps().clone(),
1344        };
1345        let device = Dac::new(info, Box::new(backend));
1346
1347        // Device should not be connected initially
1348        assert!(!device.is_connected());
1349
1350        // start_stream should connect and return a usable stream
1351        let cfg = StreamConfig::new(30000);
1352        let result = device.start_stream(cfg);
1353        assert!(result.is_ok());
1354
1355        let (stream, _info) = result.unwrap();
1356        assert!(stream.backend.as_ref().unwrap().is_connected());
1357    }
1358
1359    #[test]
1360    fn test_handle_underrun_advances_state() {
1361        let mut backend = TestBackend::new();
1362        backend.connected = true;
1363        let info = DacInfo {
1364            id: "test".to_string(),
1365            name: "Test Device".to_string(),
1366            kind: DacType::Custom("Test".to_string()),
1367            caps: backend.caps().clone(),
1368        };
1369
1370        let cfg = StreamConfig::new(30000);
1371        let mut stream = Stream::with_backend(info, Box::new(backend), cfg);
1372
1373        // Record initial state
1374        let initial_instant = stream.state.current_instant;
1375        let initial_scheduled = stream.state.scheduled_ahead;
1376        let initial_chunks = stream.state.stats.chunks_written;
1377        let initial_points = stream.state.stats.points_written;
1378
1379        // Trigger underrun handling with ChunkRequest
1380        let req = ChunkRequest {
1381            start: StreamInstant::new(0),
1382            pps: 30000,
1383            min_points: 100,
1384            target_points: 100,
1385            buffered_points: 0,
1386            buffered: Duration::ZERO,
1387            device_queued_points: None,
1388        };
1389        stream.handle_underrun(&req).unwrap();
1390
1391        // State should have advanced
1392        assert!(stream.state.current_instant > initial_instant);
1393        assert!(stream.state.scheduled_ahead > initial_scheduled);
1394        assert_eq!(stream.state.stats.chunks_written, initial_chunks + 1);
1395        assert_eq!(stream.state.stats.points_written, initial_points + 100);
1396        assert_eq!(stream.state.stats.underrun_count, 1);
1397    }
1398
1399    #[test]
1400    fn test_run_retries_on_would_block() {
1401        // Create a backend that returns WouldBlock 3 times before accepting
1402        let backend = TestBackend::new().with_would_block_count(3);
1403        let write_count = backend.write_count.clone();
1404
1405        let mut backend_box: Box<dyn StreamBackend> = Box::new(backend);
1406        backend_box.connect().unwrap();
1407
1408        let info = DacInfo {
1409            id: "test".to_string(),
1410            name: "Test Device".to_string(),
1411            kind: DacType::Custom("Test".to_string()),
1412            caps: backend_box.caps().clone(),
1413        };
1414
1415        let cfg = StreamConfig::new(30000);
1416        let stream = Stream::with_backend(info, backend_box, cfg);
1417
1418        let produced_count = Arc::new(AtomicUsize::new(0));
1419        let produced_count_clone = produced_count.clone();
1420        let result = stream.run(
1421            move |req, buffer| {
1422                let count = produced_count_clone.fetch_add(1, Ordering::SeqCst);
1423                if count < 1 {
1424                    let n = req.target_points.min(buffer.len());
1425                    for i in 0..n {
1426                        buffer[i] = LaserPoint::blanked(0.0, 0.0);
1427                    }
1428                    ChunkResult::Filled(n)
1429                } else {
1430                    ChunkResult::End
1431                }
1432            },
1433            |_e| {},
1434        );
1435
1436        assert_eq!(result.unwrap(), RunExit::ProducerEnded);
1437        // With the new API, WouldBlock retries happen internally in write_fill_points
1438        // The exact count depends on timing, but we should see multiple writes
1439        assert!(write_count.load(Ordering::SeqCst) >= 1);
1440    }
1441
1442    #[test]
1443    fn test_arm_opens_shutter_disarm_closes_shutter() {
1444        let backend = TestBackend::new();
1445        let shutter_open = backend.shutter_open.clone();
1446
1447        let mut backend_box: Box<dyn StreamBackend> = Box::new(backend);
1448        backend_box.connect().unwrap();
1449
1450        let info = DacInfo {
1451            id: "test".to_string(),
1452            name: "Test Device".to_string(),
1453            kind: DacType::Custom("Test".to_string()),
1454            caps: backend_box.caps().clone(),
1455        };
1456
1457        let cfg = StreamConfig::new(30000);
1458        let mut stream = Stream::with_backend(info, backend_box, cfg);
1459
1460        // Initially shutter is closed
1461        assert!(!shutter_open.load(Ordering::SeqCst));
1462
1463        // Arm via control (this sends ControlMsg::Arm)
1464        let control = stream.control();
1465        control.arm().unwrap();
1466
1467        // Process control messages - this should open the shutter
1468        let stopped = stream.process_control_messages();
1469        assert!(!stopped);
1470        assert!(shutter_open.load(Ordering::SeqCst));
1471
1472        // Disarm (this sends ControlMsg::Disarm)
1473        control.disarm().unwrap();
1474
1475        // Process control messages - this should close the shutter
1476        let stopped = stream.process_control_messages();
1477        assert!(!stopped);
1478        assert!(!shutter_open.load(Ordering::SeqCst));
1479    }
1480
1481    #[test]
1482    fn test_handle_underrun_blanks_when_disarmed() {
1483        let backend = TestBackend::new();
1484
1485        let mut backend_box: Box<dyn StreamBackend> = Box::new(backend);
1486        backend_box.connect().unwrap();
1487
1488        let info = DacInfo {
1489            id: "test".to_string(),
1490            name: "Test Device".to_string(),
1491            kind: DacType::Custom("Test".to_string()),
1492            caps: backend_box.caps().clone(),
1493        };
1494
1495        // Use RepeatLast policy - but when disarmed, should still blank
1496        let cfg = StreamConfig::new(30000).with_underrun(UnderrunPolicy::RepeatLast);
1497        let mut stream = Stream::with_backend(info, backend_box, cfg);
1498
1499        // Set some last_chunk with colored points using the pre-allocated buffer
1500        let colored_point = LaserPoint::new(0.5, 0.5, 65535, 65535, 65535, 65535);
1501        for i in 0..100 {
1502            stream.state.last_chunk[i] = colored_point;
1503        }
1504        stream.state.last_chunk_len = 100;
1505
1506        // Ensure disarmed (default state)
1507        assert!(!stream.control.is_armed());
1508
1509        let req = ChunkRequest {
1510            start: StreamInstant::new(0),
1511            pps: 30000,
1512            min_points: 100,
1513            target_points: 100,
1514            buffered_points: 0,
1515            buffered: Duration::ZERO,
1516            device_queued_points: None,
1517        };
1518
1519        // Handle underrun while disarmed
1520        stream.handle_underrun(&req).unwrap();
1521
1522        // last_chunk should NOT be updated (we're disarmed)
1523        // The actual write was blanked points, but we don't update last_chunk when disarmed
1524        // because "last armed content" hasn't changed
1525        assert_eq!(stream.state.last_chunk[0].r, 65535); // Still the old colored points
1526    }
1527
1528    #[test]
1529    fn test_stop_closes_shutter() {
1530        let backend = TestBackend::new();
1531        let shutter_open = backend.shutter_open.clone();
1532
1533        let mut backend_box: Box<dyn StreamBackend> = Box::new(backend);
1534        backend_box.connect().unwrap();
1535
1536        let info = DacInfo {
1537            id: "test".to_string(),
1538            name: "Test Device".to_string(),
1539            kind: DacType::Custom("Test".to_string()),
1540            caps: backend_box.caps().clone(),
1541        };
1542
1543        let cfg = StreamConfig::new(30000);
1544        let mut stream = Stream::with_backend(info, backend_box, cfg);
1545
1546        // Arm first to open shutter
1547        stream.control.arm().unwrap();
1548        stream.process_control_messages();
1549        assert!(shutter_open.load(Ordering::SeqCst));
1550
1551        // Stop should close shutter
1552        stream.stop().unwrap();
1553        assert!(!shutter_open.load(Ordering::SeqCst));
1554    }
1555
1556    #[test]
1557    fn test_arm_disarm_arm_cycle() {
1558        let backend = TestBackend::new();
1559        let shutter_open = backend.shutter_open.clone();
1560
1561        let mut backend_box: Box<dyn StreamBackend> = Box::new(backend);
1562        backend_box.connect().unwrap();
1563
1564        let info = DacInfo {
1565            id: "test".to_string(),
1566            name: "Test Device".to_string(),
1567            kind: DacType::Custom("Test".to_string()),
1568            caps: backend_box.caps().clone(),
1569        };
1570
1571        let cfg = StreamConfig::new(30000);
1572        let mut stream = Stream::with_backend(info, backend_box, cfg);
1573        let control = stream.control();
1574
1575        // Initial state: disarmed
1576        assert!(!control.is_armed());
1577        assert!(!shutter_open.load(Ordering::SeqCst));
1578
1579        // Arm
1580        control.arm().unwrap();
1581        stream.process_control_messages();
1582        assert!(control.is_armed());
1583        assert!(shutter_open.load(Ordering::SeqCst));
1584
1585        // Disarm
1586        control.disarm().unwrap();
1587        stream.process_control_messages();
1588        assert!(!control.is_armed());
1589        assert!(!shutter_open.load(Ordering::SeqCst));
1590
1591        // Arm again
1592        control.arm().unwrap();
1593        stream.process_control_messages();
1594        assert!(control.is_armed());
1595        assert!(shutter_open.load(Ordering::SeqCst));
1596    }
1597
1598    // =========================================================================
1599    // Buffer-driven timing tests
1600    // =========================================================================
1601
1602    #[test]
1603    fn test_run_buffer_driven_behavior() {
1604        // Test that run uses buffer-driven timing
1605        // Use NoQueueTestBackend so we rely on software estimate (which decrements properly)
1606        let mut backend = NoQueueTestBackend::new();
1607        backend.inner.connected = true;
1608        let write_count = backend.inner.write_count.clone();
1609
1610        let info = DacInfo {
1611            id: "test".to_string(),
1612            name: "Test Device".to_string(),
1613            kind: DacType::Custom("Test".to_string()),
1614            caps: backend.inner.caps().clone(),
1615        };
1616
1617        // Use short target buffer for testing
1618        let cfg = StreamConfig::new(30000)
1619            .with_target_buffer(Duration::from_millis(10))
1620            .with_min_buffer(Duration::from_millis(5));
1621        let stream = Stream::with_backend(info, Box::new(backend), cfg);
1622
1623        let call_count = Arc::new(AtomicUsize::new(0));
1624        let call_count_clone = call_count.clone();
1625
1626        let result = stream.run(
1627            move |req, buffer| {
1628                let count = call_count_clone.fetch_add(1, Ordering::SeqCst);
1629
1630                // Run for 5 calls then end
1631                if count >= 4 {
1632                    ChunkResult::End
1633                } else {
1634                    let n = req.target_points.min(buffer.len()).min(100);
1635                    for i in 0..n {
1636                        buffer[i] = LaserPoint::blanked(0.0, 0.0);
1637                    }
1638                    ChunkResult::Filled(n)
1639                }
1640            },
1641            |_e| {},
1642        );
1643
1644        assert_eq!(result.unwrap(), RunExit::ProducerEnded);
1645        assert!(
1646            write_count.load(Ordering::SeqCst) >= 4,
1647            "Should have written multiple chunks"
1648        );
1649    }
1650
1651    #[test]
1652    fn test_run_sleeps_when_buffer_healthy() {
1653        // Test that run sleeps when buffer is above target
1654        // Use NoQueueTestBackend so we rely on software estimate (which decrements properly)
1655        use std::time::Instant;
1656
1657        let mut backend = NoQueueTestBackend::new();
1658        backend.inner.connected = true;
1659
1660        let info = DacInfo {
1661            id: "test".to_string(),
1662            name: "Test Device".to_string(),
1663            kind: DacType::Custom("Test".to_string()),
1664            caps: backend.inner.caps().clone(),
1665        };
1666
1667        // Very small target buffer, skip drain
1668        let cfg = StreamConfig::new(30000)
1669            .with_target_buffer(Duration::from_millis(5))
1670            .with_min_buffer(Duration::from_millis(2))
1671            .with_drain_timeout(Duration::ZERO);
1672        let stream = Stream::with_backend(info, Box::new(backend), cfg);
1673
1674        let call_count = Arc::new(AtomicUsize::new(0));
1675        let call_count_clone = call_count.clone();
1676        let start_time = Instant::now();
1677
1678        let result = stream.run(
1679            move |req, buffer| {
1680                let count = call_count_clone.fetch_add(1, Ordering::SeqCst);
1681
1682                // End after 3 callbacks
1683                if count >= 2 {
1684                    ChunkResult::End
1685                } else {
1686                    // Fill buffer to trigger sleep
1687                    let n = req.target_points.min(buffer.len());
1688                    for i in 0..n {
1689                        buffer[i] = LaserPoint::blanked(0.0, 0.0);
1690                    }
1691                    ChunkResult::Filled(n)
1692                }
1693            },
1694            |_e| {},
1695        );
1696
1697        assert_eq!(result.unwrap(), RunExit::ProducerEnded);
1698
1699        // Should have taken some time due to buffer-driven sleep
1700        let elapsed = start_time.elapsed();
1701        // With buffer-driven timing, we should see some elapsed time
1702        // (not instant return)
1703        assert!(
1704            elapsed.as_millis() < 100,
1705            "Elapsed time {:?} is too long for test",
1706            elapsed
1707        );
1708    }
1709
1710    #[test]
1711    fn test_run_stops_on_control_stop() {
1712        // Test that stop() via control handle terminates the loop promptly
1713        use std::thread;
1714
1715        let backend = TestBackend::new();
1716
1717        let mut backend_box: Box<dyn StreamBackend> = Box::new(backend);
1718        backend_box.connect().unwrap();
1719
1720        let info = DacInfo {
1721            id: "test".to_string(),
1722            name: "Test Device".to_string(),
1723            kind: DacType::Custom("Test".to_string()),
1724            caps: backend_box.caps().clone(),
1725        };
1726
1727        let cfg = StreamConfig::new(30000);
1728        let stream = Stream::with_backend(info, backend_box, cfg);
1729        let control = stream.control();
1730
1731        // Spawn a thread to stop the stream after a short delay
1732        let control_clone = control.clone();
1733        thread::spawn(move || {
1734            thread::sleep(Duration::from_millis(20));
1735            control_clone.stop().unwrap();
1736        });
1737
1738        let result = stream.run(
1739            |req, buffer| {
1740                let n = req.target_points.min(buffer.len()).min(10);
1741                for i in 0..n {
1742                    buffer[i] = LaserPoint::blanked(0.0, 0.0);
1743                }
1744                ChunkResult::Filled(n)
1745            },
1746            |_e| {},
1747        );
1748
1749        // Should exit with Stopped, not hang forever
1750        assert_eq!(result.unwrap(), RunExit::Stopped);
1751    }
1752
1753    #[test]
1754    fn test_run_producer_ended() {
1755        // Test that ChunkResult::End terminates the stream gracefully
1756        let backend = TestBackend::new();
1757
1758        let mut backend_box: Box<dyn StreamBackend> = Box::new(backend);
1759        backend_box.connect().unwrap();
1760
1761        let info = DacInfo {
1762            id: "test".to_string(),
1763            name: "Test Device".to_string(),
1764            kind: DacType::Custom("Test".to_string()),
1765            caps: backend_box.caps().clone(),
1766        };
1767
1768        let cfg = StreamConfig::new(30000);
1769        let stream = Stream::with_backend(info, backend_box, cfg);
1770
1771        let call_count = Arc::new(AtomicUsize::new(0));
1772        let call_count_clone = call_count.clone();
1773
1774        let result = stream.run(
1775            move |req, buffer| {
1776                let count = call_count_clone.fetch_add(1, Ordering::SeqCst);
1777
1778                if count == 0 {
1779                    // First call: return some data
1780                    let n = req.target_points.min(buffer.len()).min(100);
1781                    for i in 0..n {
1782                        buffer[i] = LaserPoint::blanked(0.0, 0.0);
1783                    }
1784                    ChunkResult::Filled(n)
1785                } else {
1786                    // Second call: end the stream
1787                    ChunkResult::End
1788                }
1789            },
1790            |_e| {},
1791        );
1792
1793        assert_eq!(result.unwrap(), RunExit::ProducerEnded);
1794        assert_eq!(call_count.load(Ordering::SeqCst), 2);
1795    }
1796
1797    #[test]
1798    fn test_run_starved_applies_underrun_policy() {
1799        // Test that ChunkResult::Starved triggers underrun policy
1800        let backend = TestBackend::new();
1801        let queued = backend.queued.clone();
1802
1803        let mut backend_box: Box<dyn StreamBackend> = Box::new(backend);
1804        backend_box.connect().unwrap();
1805
1806        let info = DacInfo {
1807            id: "test".to_string(),
1808            name: "Test Device".to_string(),
1809            kind: DacType::Custom("Test".to_string()),
1810            caps: backend_box.caps().clone(),
1811        };
1812
1813        // Use Blank policy for underrun
1814        let cfg = StreamConfig::new(30000).with_underrun(UnderrunPolicy::Blank);
1815        let stream = Stream::with_backend(info, backend_box, cfg);
1816
1817        let call_count = Arc::new(AtomicUsize::new(0));
1818        let call_count_clone = call_count.clone();
1819
1820        let result = stream.run(
1821            move |_req, _buffer| {
1822                let count = call_count_clone.fetch_add(1, Ordering::SeqCst);
1823
1824                if count == 0 {
1825                    // First call: return Starved to trigger underrun policy
1826                    ChunkResult::Starved
1827                } else {
1828                    // Second call: end the stream
1829                    ChunkResult::End
1830                }
1831            },
1832            |_e| {},
1833        );
1834
1835        assert_eq!(result.unwrap(), RunExit::ProducerEnded);
1836
1837        // Underrun policy should have written some points
1838        assert!(
1839            queued.load(Ordering::SeqCst) > 0,
1840            "Underrun policy should have written blank points"
1841        );
1842    }
1843
1844    #[test]
1845    fn test_run_filled_zero_with_target_treated_as_starved() {
1846        // Test that Filled(0) when target_points > 0 is treated as Starved
1847        let backend = TestBackend::new();
1848        let queued = backend.queued.clone();
1849
1850        let mut backend_box: Box<dyn StreamBackend> = Box::new(backend);
1851        backend_box.connect().unwrap();
1852
1853        let info = DacInfo {
1854            id: "test".to_string(),
1855            name: "Test Device".to_string(),
1856            kind: DacType::Custom("Test".to_string()),
1857            caps: backend_box.caps().clone(),
1858        };
1859
1860        let cfg = StreamConfig::new(30000).with_underrun(UnderrunPolicy::Blank);
1861        let stream = Stream::with_backend(info, backend_box, cfg);
1862
1863        let call_count = Arc::new(AtomicUsize::new(0));
1864        let call_count_clone = call_count.clone();
1865
1866        let result = stream.run(
1867            move |_req, _buffer| {
1868                let count = call_count_clone.fetch_add(1, Ordering::SeqCst);
1869
1870                if count == 0 {
1871                    // Return Filled(0) when buffer needs data - should be treated as Starved
1872                    ChunkResult::Filled(0)
1873                } else {
1874                    ChunkResult::End
1875                }
1876            },
1877            |_e| {},
1878        );
1879
1880        assert_eq!(result.unwrap(), RunExit::ProducerEnded);
1881
1882        // Filled(0) with target_points > 0 should trigger underrun policy
1883        assert!(
1884            queued.load(Ordering::SeqCst) > 0,
1885            "Filled(0) with target > 0 should trigger underrun and write blank points"
1886        );
1887    }
1888
1889    // =========================================================================
1890    // Buffer estimation tests (Task 6.3)
1891    // =========================================================================
1892
1893    #[test]
1894    fn test_estimate_buffer_uses_software_when_no_hardware() {
1895        // When hardware doesn't report queue depth, use software estimate
1896        let mut backend = NoQueueTestBackend::new();
1897        backend.inner.connected = true;
1898
1899        let info = DacInfo {
1900            id: "test".to_string(),
1901            name: "Test Device".to_string(),
1902            kind: DacType::Custom("Test".to_string()),
1903            caps: backend.inner.caps().clone(),
1904        };
1905
1906        let cfg = StreamConfig::new(30000);
1907        let mut stream = Stream::with_backend(info, Box::new(backend), cfg);
1908
1909        // Set software estimate to 500 points
1910        stream.state.scheduled_ahead = 500;
1911
1912        // Should use software estimate since hardware returns None
1913        let estimate = stream.estimate_buffer_points();
1914        assert_eq!(estimate, 500);
1915    }
1916
1917    #[test]
1918    fn test_estimate_buffer_uses_min_of_hardware_and_software() {
1919        // When hardware reports queue depth, use min(hardware, software)
1920        let backend = TestBackend::new().with_initial_queue(300);
1921        let queued = backend.queued.clone();
1922
1923        let mut backend_box: Box<dyn StreamBackend> = Box::new(backend);
1924        backend_box.connect().unwrap();
1925
1926        let info = DacInfo {
1927            id: "test".to_string(),
1928            name: "Test Device".to_string(),
1929            kind: DacType::Custom("Test".to_string()),
1930            caps: backend_box.caps().clone(),
1931        };
1932
1933        let cfg = StreamConfig::new(30000);
1934        let mut stream = Stream::with_backend(info, backend_box, cfg);
1935
1936        // Software says 500, hardware says 300 -> should use 300 (conservative)
1937        stream.state.scheduled_ahead = 500;
1938        let estimate = stream.estimate_buffer_points();
1939        assert_eq!(
1940            estimate, 300,
1941            "Should use hardware (300) when it's less than software (500)"
1942        );
1943
1944        // Now set hardware higher than software
1945        queued.store(800, Ordering::SeqCst);
1946        let estimate = stream.estimate_buffer_points();
1947        assert_eq!(
1948            estimate, 500,
1949            "Should use software (500) when it's less than hardware (800)"
1950        );
1951    }
1952
1953    #[test]
1954    fn test_estimate_buffer_conservative_prevents_underrun() {
1955        // Verify that conservative estimation (using min) prevents underruns
1956        // by ensuring we never overestimate the buffer
1957        let backend = TestBackend::new().with_initial_queue(100);
1958        let queued = backend.queued.clone();
1959
1960        let mut backend_box: Box<dyn StreamBackend> = Box::new(backend);
1961        backend_box.connect().unwrap();
1962
1963        let info = DacInfo {
1964            id: "test".to_string(),
1965            name: "Test Device".to_string(),
1966            kind: DacType::Custom("Test".to_string()),
1967            caps: backend_box.caps().clone(),
1968        };
1969
1970        let cfg = StreamConfig::new(30000);
1971        let mut stream = Stream::with_backend(info, backend_box, cfg);
1972
1973        // Simulate: software thinks 1000 points scheduled, but hardware only has 100
1974        // This can happen if hardware consumed points faster than expected
1975        stream.state.scheduled_ahead = 1000;
1976
1977        let estimate = stream.estimate_buffer_points();
1978
1979        // Should use the conservative (lower) estimate to avoid underrun
1980        assert_eq!(
1981            estimate, 100,
1982            "Should use conservative estimate (100) not optimistic (1000)"
1983        );
1984
1985        // Now simulate the opposite: hardware reports more than software
1986        // This can happen due to timing/synchronization issues
1987        queued.store(2000, Ordering::SeqCst);
1988        stream.state.scheduled_ahead = 500;
1989
1990        let estimate = stream.estimate_buffer_points();
1991        assert_eq!(
1992            estimate, 500,
1993            "Should use conservative estimate (500) not hardware (2000)"
1994        );
1995    }
1996
1997    #[test]
1998    fn test_build_fill_request_uses_conservative_estimation() {
1999        // Verify that build_fill_request uses conservative buffer estimation
2000        let backend = TestBackend::new().with_initial_queue(200);
2001
2002        let mut backend_box: Box<dyn StreamBackend> = Box::new(backend);
2003        backend_box.connect().unwrap();
2004
2005        let info = DacInfo {
2006            id: "test".to_string(),
2007            name: "Test Device".to_string(),
2008            kind: DacType::Custom("Test".to_string()),
2009            caps: backend_box.caps().clone(),
2010        };
2011
2012        let cfg = StreamConfig::new(30000)
2013            .with_target_buffer(Duration::from_millis(40))
2014            .with_min_buffer(Duration::from_millis(10));
2015        let mut stream = Stream::with_backend(info, backend_box, cfg);
2016
2017        // Set software estimate higher than hardware
2018        stream.state.scheduled_ahead = 500;
2019
2020        let req = stream.build_fill_request(1000, stream.estimate_buffer_points());
2021
2022        // Should use conservative estimate (hardware = 200)
2023        assert_eq!(req.buffered_points, 200);
2024        assert_eq!(req.device_queued_points, Some(200));
2025    }
2026
2027    #[test]
2028    fn test_build_fill_request_calculates_min_and_target_points() {
2029        // Verify that min_points and target_points are calculated correctly
2030        // based on buffer state. Use NoQueueTestBackend so software estimate is used directly.
2031        let mut backend = NoQueueTestBackend::new();
2032        backend.inner.connected = true;
2033
2034        let info = DacInfo {
2035            id: "test".to_string(),
2036            name: "Test Device".to_string(),
2037            kind: DacType::Custom("Test".to_string()),
2038            caps: backend.inner.caps().clone(),
2039        };
2040
2041        // 30000 PPS, target_buffer = 40ms, min_buffer = 10ms
2042        // target_buffer = 40ms * 30000 = 1200 points
2043        // min_buffer = 10ms * 30000 = 300 points
2044        let cfg = StreamConfig::new(30000)
2045            .with_target_buffer(Duration::from_millis(40))
2046            .with_min_buffer(Duration::from_millis(10));
2047        let mut stream = Stream::with_backend(info, Box::new(backend), cfg);
2048
2049        // Empty buffer: need full target
2050        stream.state.scheduled_ahead = 0;
2051        let req = stream.build_fill_request(1000, stream.estimate_buffer_points());
2052
2053        // target_points should be clamped to max_points (1000)
2054        assert_eq!(req.target_points, 1000);
2055        // min_points should be 300 (10ms * 30000), clamped to 1000
2056        assert_eq!(req.min_points, 300);
2057
2058        // Buffer at 500 points (16.67ms): below target (40ms), above min (10ms)
2059        stream.state.scheduled_ahead = 500;
2060        let req = stream.build_fill_request(1000, stream.estimate_buffer_points());
2061
2062        // target_points = (1200 - 500) = 700
2063        assert_eq!(req.target_points, 700);
2064        // min_points = (300 - 500) = 0 (buffer above min)
2065        assert_eq!(req.min_points, 0);
2066
2067        // Buffer full at 1200 points (40ms): at target
2068        stream.state.scheduled_ahead = 1200;
2069        let req = stream.build_fill_request(1000, stream.estimate_buffer_points());
2070
2071        // target_points = 0 (at target)
2072        assert_eq!(req.target_points, 0);
2073        // min_points = 0 (well above min)
2074        assert_eq!(req.min_points, 0);
2075    }
2076
2077    #[test]
2078    fn test_build_fill_request_ceiling_rounds_min_points() {
2079        // Verify that min_points uses ceiling to prevent underrun
2080        // Use NoQueueTestBackend so software estimate is used directly.
2081        let mut backend = NoQueueTestBackend::new();
2082        backend.inner.connected = true;
2083
2084        let info = DacInfo {
2085            id: "test".to_string(),
2086            name: "Test Device".to_string(),
2087            kind: DacType::Custom("Test".to_string()),
2088            caps: backend.inner.caps().clone(),
2089        };
2090
2091        // min_buffer = 10ms at 30000 PPS = 300 points exactly
2092        let cfg = StreamConfig::new(30000)
2093            .with_target_buffer(Duration::from_millis(40))
2094            .with_min_buffer(Duration::from_millis(10));
2095        let mut stream = Stream::with_backend(info, Box::new(backend), cfg);
2096
2097        // Buffer at 299 points: 1 point below min_buffer
2098        stream.state.scheduled_ahead = 299;
2099        let req = stream.build_fill_request(1000, stream.estimate_buffer_points());
2100
2101        // min_points should be ceil(300 - 299) = ceil(1) = 1
2102        // Actually it's ceil((10ms - 299/30000) * 30000) = ceil(300 - 299) = 1
2103        assert!(
2104            req.min_points >= 1,
2105            "min_points should be at least 1 to reach min_buffer"
2106        );
2107    }
2108
2109    // =========================================================================
2110    // ChunkResult handling tests (Task 6.4)
2111    // =========================================================================
2112
2113    #[test]
2114    fn test_fill_result_filled_writes_points_and_updates_state() {
2115        // Test that Filled(n) writes n points to backend and updates stream state
2116        let backend = TestBackend::new();
2117        let queued = backend.queued.clone();
2118
2119        let mut backend_box: Box<dyn StreamBackend> = Box::new(backend);
2120        backend_box.connect().unwrap();
2121
2122        let info = DacInfo {
2123            id: "test".to_string(),
2124            name: "Test Device".to_string(),
2125            kind: DacType::Custom("Test".to_string()),
2126            caps: backend_box.caps().clone(),
2127        };
2128
2129        let cfg = StreamConfig::new(30000);
2130        let stream = Stream::with_backend(info, backend_box, cfg);
2131
2132        let points_written = Arc::new(AtomicUsize::new(0));
2133        let points_written_clone = points_written.clone();
2134        let call_count = Arc::new(AtomicUsize::new(0));
2135        let call_count_clone = call_count.clone();
2136
2137        let result = stream.run(
2138            move |req, buffer| {
2139                let count = call_count_clone.fetch_add(1, Ordering::SeqCst);
2140
2141                if count < 3 {
2142                    // Fill with specific number of points
2143                    let n = req.target_points.min(50);
2144                    for i in 0..n {
2145                        buffer[i] =
2146                            LaserPoint::new(0.1 * i as f32, 0.2 * i as f32, 1000, 2000, 3000, 4000);
2147                    }
2148                    points_written_clone.fetch_add(n, Ordering::SeqCst);
2149                    ChunkResult::Filled(n)
2150                } else {
2151                    ChunkResult::End
2152                }
2153            },
2154            |_e| {},
2155        );
2156
2157        assert_eq!(result.unwrap(), RunExit::ProducerEnded);
2158
2159        // Points should have been written to backend
2160        // Note: drain adds 16 blank points at shutdown
2161        let total_queued = queued.load(Ordering::SeqCst);
2162        let total_written = points_written.load(Ordering::SeqCst);
2163        assert!(
2164            total_queued > 0,
2165            "Points should have been queued to backend"
2166        );
2167        assert!(
2168            total_queued as usize >= total_written,
2169            "Queued points ({}) should be at least written points ({})",
2170            total_queued,
2171            total_written
2172        );
2173    }
2174
2175    #[test]
2176    fn test_fill_result_filled_updates_last_chunk_when_armed() {
2177        // Test that Filled(n) updates last_chunk for RepeatLast policy when armed
2178        let backend = TestBackend::new();
2179
2180        let mut backend_box: Box<dyn StreamBackend> = Box::new(backend);
2181        backend_box.connect().unwrap();
2182
2183        let info = DacInfo {
2184            id: "test".to_string(),
2185            name: "Test Device".to_string(),
2186            kind: DacType::Custom("Test".to_string()),
2187            caps: backend_box.caps().clone(),
2188        };
2189
2190        let cfg = StreamConfig::new(30000).with_underrun(UnderrunPolicy::RepeatLast);
2191        let stream = Stream::with_backend(info, backend_box, cfg);
2192
2193        // Arm the stream so last_chunk gets updated
2194        let control = stream.control();
2195        control.arm().unwrap();
2196
2197        let call_count = Arc::new(AtomicUsize::new(0));
2198        let call_count_clone = call_count.clone();
2199
2200        let result = stream.run(
2201            move |req, buffer| {
2202                let count = call_count_clone.fetch_add(1, Ordering::SeqCst);
2203
2204                if count == 0 {
2205                    // Write specific points that we can verify later
2206                    let n = req.target_points.min(10);
2207                    for i in 0..n {
2208                        buffer[i] = LaserPoint::new(0.5, 0.5, 10000, 20000, 30000, 40000);
2209                    }
2210                    ChunkResult::Filled(n)
2211                } else if count == 1 {
2212                    // Return Starved to trigger RepeatLast
2213                    ChunkResult::Starved
2214                } else {
2215                    ChunkResult::End
2216                }
2217            },
2218            |_e| {},
2219        );
2220
2221        assert_eq!(result.unwrap(), RunExit::ProducerEnded);
2222        // If last_chunk wasn't updated, the Starved would have outputted blanks
2223        // The test passes if no assertion fails - the RepeatLast policy used the stored chunk
2224    }
2225
2226    #[test]
2227    fn test_fill_result_starved_repeat_last_with_stored_chunk() {
2228        // Test that Starved with RepeatLast policy repeats the last chunk
2229        let backend = TestBackend::new();
2230        let queued = backend.queued.clone();
2231
2232        let mut backend_box: Box<dyn StreamBackend> = Box::new(backend);
2233        backend_box.connect().unwrap();
2234
2235        let info = DacInfo {
2236            id: "test".to_string(),
2237            name: "Test Device".to_string(),
2238            kind: DacType::Custom("Test".to_string()),
2239            caps: backend_box.caps().clone(),
2240        };
2241
2242        let cfg = StreamConfig::new(30000).with_underrun(UnderrunPolicy::RepeatLast);
2243        let stream = Stream::with_backend(info, backend_box, cfg);
2244
2245        // Arm the stream
2246        let control = stream.control();
2247        control.arm().unwrap();
2248
2249        let call_count = Arc::new(AtomicUsize::new(0));
2250        let call_count_clone = call_count.clone();
2251
2252        let result = stream.run(
2253            move |req, buffer| {
2254                let count = call_count_clone.fetch_add(1, Ordering::SeqCst);
2255
2256                if count == 0 {
2257                    // First call: provide some data to establish last_chunk
2258                    let n = req.target_points.min(50);
2259                    for i in 0..n {
2260                        buffer[i] = LaserPoint::new(0.3, 0.3, 5000, 5000, 5000, 5000);
2261                    }
2262                    ChunkResult::Filled(n)
2263                } else if count == 1 {
2264                    // Second call: return Starved - should repeat last chunk
2265                    ChunkResult::Starved
2266                } else {
2267                    ChunkResult::End
2268                }
2269            },
2270            |_e| {},
2271        );
2272
2273        assert_eq!(result.unwrap(), RunExit::ProducerEnded);
2274
2275        // Both the initial fill and the repeated chunk should have been written
2276        let total_queued = queued.load(Ordering::SeqCst);
2277        assert!(
2278            total_queued >= 50,
2279            "Should have written initial chunk plus repeated chunk"
2280        );
2281    }
2282
2283    #[test]
2284    fn test_fill_result_starved_repeat_last_without_stored_chunk_falls_back_to_blank() {
2285        // Test that Starved with RepeatLast but no stored chunk falls back to blank
2286        let backend = TestBackend::new();
2287        let queued = backend.queued.clone();
2288
2289        let mut backend_box: Box<dyn StreamBackend> = Box::new(backend);
2290        backend_box.connect().unwrap();
2291
2292        let info = DacInfo {
2293            id: "test".to_string(),
2294            name: "Test Device".to_string(),
2295            kind: DacType::Custom("Test".to_string()),
2296            caps: backend_box.caps().clone(),
2297        };
2298
2299        let cfg = StreamConfig::new(30000).with_underrun(UnderrunPolicy::RepeatLast);
2300        let stream = Stream::with_backend(info, backend_box, cfg);
2301
2302        // Arm the stream
2303        let control = stream.control();
2304        control.arm().unwrap();
2305
2306        let call_count = Arc::new(AtomicUsize::new(0));
2307        let call_count_clone = call_count.clone();
2308
2309        let result = stream.run(
2310            move |_req, _buffer| {
2311                let count = call_count_clone.fetch_add(1, Ordering::SeqCst);
2312
2313                if count == 0 {
2314                    // First call: return Starved with no prior chunk
2315                    ChunkResult::Starved
2316                } else {
2317                    ChunkResult::End
2318                }
2319            },
2320            |_e| {},
2321        );
2322
2323        assert_eq!(result.unwrap(), RunExit::ProducerEnded);
2324
2325        // Should have written blank points as fallback
2326        let total_queued = queued.load(Ordering::SeqCst);
2327        assert!(
2328            total_queued > 0,
2329            "Should have written blank points as fallback"
2330        );
2331    }
2332
2333    #[test]
2334    fn test_fill_result_starved_with_park_policy() {
2335        // Test that Starved with Park policy outputs blanked points at park position
2336        let backend = TestBackend::new();
2337        let queued = backend.queued.clone();
2338
2339        let mut backend_box: Box<dyn StreamBackend> = Box::new(backend);
2340        backend_box.connect().unwrap();
2341
2342        let info = DacInfo {
2343            id: "test".to_string(),
2344            name: "Test Device".to_string(),
2345            kind: DacType::Custom("Test".to_string()),
2346            caps: backend_box.caps().clone(),
2347        };
2348
2349        // Park at specific position
2350        let cfg = StreamConfig::new(30000).with_underrun(UnderrunPolicy::Park { x: 0.5, y: -0.5 });
2351        let stream = Stream::with_backend(info, backend_box, cfg);
2352
2353        // Arm the stream
2354        let control = stream.control();
2355        control.arm().unwrap();
2356
2357        let call_count = Arc::new(AtomicUsize::new(0));
2358        let call_count_clone = call_count.clone();
2359
2360        let result = stream.run(
2361            move |_req, _buffer| {
2362                let count = call_count_clone.fetch_add(1, Ordering::SeqCst);
2363
2364                if count == 0 {
2365                    ChunkResult::Starved
2366                } else {
2367                    ChunkResult::End
2368                }
2369            },
2370            |_e| {},
2371        );
2372
2373        assert_eq!(result.unwrap(), RunExit::ProducerEnded);
2374
2375        // Should have written parked points
2376        let total_queued = queued.load(Ordering::SeqCst);
2377        assert!(total_queued > 0, "Should have written parked points");
2378    }
2379
2380    #[test]
2381    fn test_fill_result_starved_with_stop_policy() {
2382        // Test that Starved with Stop policy terminates the stream
2383        let backend = TestBackend::new();
2384
2385        let mut backend_box: Box<dyn StreamBackend> = Box::new(backend);
2386        backend_box.connect().unwrap();
2387
2388        let info = DacInfo {
2389            id: "test".to_string(),
2390            name: "Test Device".to_string(),
2391            kind: DacType::Custom("Test".to_string()),
2392            caps: backend_box.caps().clone(),
2393        };
2394
2395        let cfg = StreamConfig::new(30000).with_underrun(UnderrunPolicy::Stop);
2396        let stream = Stream::with_backend(info, backend_box, cfg);
2397
2398        // Must arm the stream for underrun policy to be checked
2399        // (disarmed streams always output blanks regardless of policy)
2400        let control = stream.control();
2401        control.arm().unwrap();
2402
2403        let result = stream.run(
2404            |_req, _buffer| {
2405                // Always return Starved - Stop policy should terminate the stream
2406                ChunkResult::Starved
2407            },
2408            |_e| {},
2409        );
2410
2411        // Stream should have stopped due to underrun with Stop policy
2412        // The Stop policy returns Err(Error::Stopped) to immediately terminate
2413        assert!(result.is_err(), "Stop policy should return an error");
2414        assert!(
2415            result.unwrap_err().is_stopped(),
2416            "Error should be Stopped variant"
2417        );
2418    }
2419
2420    #[test]
2421    fn test_fill_result_end_returns_producer_ended() {
2422        // Test that End terminates the stream with ProducerEnded
2423        let backend = TestBackend::new();
2424
2425        let mut backend_box: Box<dyn StreamBackend> = Box::new(backend);
2426        backend_box.connect().unwrap();
2427
2428        let info = DacInfo {
2429            id: "test".to_string(),
2430            name: "Test Device".to_string(),
2431            kind: DacType::Custom("Test".to_string()),
2432            caps: backend_box.caps().clone(),
2433        };
2434
2435        let cfg = StreamConfig::new(30000);
2436        let stream = Stream::with_backend(info, backend_box, cfg);
2437
2438        let result = stream.run(
2439            |_req, _buffer| {
2440                // Immediately end
2441                ChunkResult::End
2442            },
2443            |_e| {},
2444        );
2445
2446        assert_eq!(result.unwrap(), RunExit::ProducerEnded);
2447    }
2448
2449    #[test]
2450    fn test_fill_result_filled_exceeds_buffer_clamped() {
2451        // Test that Filled(n) where n > buffer.len() is clamped
2452        let backend = TestBackend::new();
2453        let queued = backend.queued.clone();
2454
2455        let mut backend_box: Box<dyn StreamBackend> = Box::new(backend);
2456        backend_box.connect().unwrap();
2457
2458        let info = DacInfo {
2459            id: "test".to_string(),
2460            name: "Test Device".to_string(),
2461            kind: DacType::Custom("Test".to_string()),
2462            caps: backend_box.caps().clone(),
2463        };
2464
2465        let cfg = StreamConfig::new(30000);
2466        let stream = Stream::with_backend(info, backend_box, cfg);
2467
2468        let call_count = Arc::new(AtomicUsize::new(0));
2469        let call_count_clone = call_count.clone();
2470
2471        let result = stream.run(
2472            move |_req, buffer| {
2473                let count = call_count_clone.fetch_add(1, Ordering::SeqCst);
2474
2475                if count == 0 {
2476                    // Fill some points but claim we wrote more than buffer size
2477                    for i in 0..buffer.len() {
2478                        buffer[i] = LaserPoint::blanked(0.0, 0.0);
2479                    }
2480                    // Return a value larger than buffer - should be clamped
2481                    ChunkResult::Filled(buffer.len() + 1000)
2482                } else {
2483                    ChunkResult::End
2484                }
2485            },
2486            |_e| {},
2487        );
2488
2489        assert_eq!(result.unwrap(), RunExit::ProducerEnded);
2490
2491        // Should have written clamped number of points (max_points_per_chunk)
2492        // plus 16 blank points from drain shutdown
2493        let total_queued = queued.load(Ordering::SeqCst);
2494        assert!(total_queued > 0, "Should have written some points");
2495        // The clamping should limit to max_points (1000 for TestBackend) + 16 blank drain points
2496        assert!(
2497            total_queued <= 1016,
2498            "Points should be clamped to max_points_per_chunk (+ drain)"
2499        );
2500    }
2501
2502    // =========================================================================
2503    // Integration tests (Task 6.5)
2504    // =========================================================================
2505
2506    #[test]
2507    fn test_full_stream_lifecycle_create_arm_stream_stop() {
2508        // Test the complete lifecycle: create -> arm -> stream data -> stop
2509        let backend = TestBackend::new();
2510        let queued = backend.queued.clone();
2511        let shutter_open = backend.shutter_open.clone();
2512
2513        let info = DacInfo {
2514            id: "test".to_string(),
2515            name: "Test Device".to_string(),
2516            kind: DacType::Custom("Test".to_string()),
2517            caps: backend.caps().clone(),
2518        };
2519
2520        // 1. Create device and start stream
2521        let device = Dac::new(info, Box::new(backend));
2522        assert!(!device.is_connected());
2523
2524        let cfg = StreamConfig::new(30000);
2525        let (stream, returned_info) = device.start_stream(cfg).unwrap();
2526        assert_eq!(returned_info.id, "test");
2527
2528        // 2. Get control handle and verify initial state
2529        let control = stream.control();
2530        assert!(!control.is_armed());
2531        assert!(!shutter_open.load(Ordering::SeqCst));
2532
2533        // 3. Arm the stream
2534        control.arm().unwrap();
2535        assert!(control.is_armed());
2536
2537        let call_count = Arc::new(AtomicUsize::new(0));
2538        let call_count_clone = call_count.clone();
2539
2540        // 4. Run the stream for a few iterations
2541        let result = stream.run(
2542            move |req, buffer| {
2543                let count = call_count_clone.fetch_add(1, Ordering::SeqCst);
2544
2545                if count < 5 {
2546                    // Fill with data
2547                    let n = req.target_points.min(buffer.len()).min(100);
2548                    for i in 0..n {
2549                        let t = i as f32 / 100.0;
2550                        buffer[i] = LaserPoint::new(t, t, 10000, 20000, 30000, 40000);
2551                    }
2552                    ChunkResult::Filled(n)
2553                } else {
2554                    ChunkResult::End
2555                }
2556            },
2557            |_e| {},
2558        );
2559
2560        // 5. Verify stream ended properly
2561        assert_eq!(result.unwrap(), RunExit::ProducerEnded);
2562        assert!(
2563            queued.load(Ordering::SeqCst) > 0,
2564            "Should have written points"
2565        );
2566        assert!(
2567            call_count.load(Ordering::SeqCst) >= 5,
2568            "Should have called producer multiple times"
2569        );
2570    }
2571
2572    #[test]
2573    fn test_full_stream_lifecycle_with_underrun_recovery() {
2574        // Test lifecycle with underrun and recovery
2575        let backend = TestBackend::new();
2576        let queued = backend.queued.clone();
2577
2578        let mut backend_box: Box<dyn StreamBackend> = Box::new(backend);
2579        backend_box.connect().unwrap();
2580
2581        let info = DacInfo {
2582            id: "test".to_string(),
2583            name: "Test Device".to_string(),
2584            kind: DacType::Custom("Test".to_string()),
2585            caps: backend_box.caps().clone(),
2586        };
2587
2588        let cfg = StreamConfig::new(30000).with_underrun(UnderrunPolicy::RepeatLast);
2589        let stream = Stream::with_backend(info, backend_box, cfg);
2590
2591        // Arm the stream for underrun policy to work
2592        let control = stream.control();
2593        control.arm().unwrap();
2594
2595        let call_count = Arc::new(AtomicUsize::new(0));
2596        let call_count_clone = call_count.clone();
2597
2598        let result = stream.run(
2599            move |req, buffer| {
2600                let count = call_count_clone.fetch_add(1, Ordering::SeqCst);
2601
2602                match count {
2603                    0 => {
2604                        // First call: provide data (establishes last_chunk)
2605                        let n = req.target_points.min(buffer.len()).min(50);
2606                        for i in 0..n {
2607                            buffer[i] = LaserPoint::new(0.5, 0.5, 30000, 30000, 30000, 30000);
2608                        }
2609                        ChunkResult::Filled(n)
2610                    }
2611                    1 => {
2612                        // Second call: underrun (triggers RepeatLast)
2613                        ChunkResult::Starved
2614                    }
2615                    2 => {
2616                        // Third call: recover with new data
2617                        let n = req.target_points.min(buffer.len()).min(50);
2618                        for i in 0..n {
2619                            buffer[i] = LaserPoint::new(-0.5, -0.5, 20000, 20000, 20000, 20000);
2620                        }
2621                        ChunkResult::Filled(n)
2622                    }
2623                    _ => ChunkResult::End,
2624                }
2625            },
2626            |_e| {},
2627        );
2628
2629        assert_eq!(result.unwrap(), RunExit::ProducerEnded);
2630        // Should have written: initial data + repeated chunk + recovered data
2631        let total = queued.load(Ordering::SeqCst);
2632        assert!(
2633            total >= 100,
2634            "Should have written multiple chunks including underrun recovery"
2635        );
2636    }
2637
2638    #[test]
2639    fn test_full_stream_lifecycle_external_stop() {
2640        // Test stopping stream from external control handle
2641        use std::thread;
2642
2643        let backend = TestBackend::new();
2644
2645        let mut backend_box: Box<dyn StreamBackend> = Box::new(backend);
2646        backend_box.connect().unwrap();
2647
2648        let info = DacInfo {
2649            id: "test".to_string(),
2650            name: "Test Device".to_string(),
2651            kind: DacType::Custom("Test".to_string()),
2652            caps: backend_box.caps().clone(),
2653        };
2654
2655        let cfg = StreamConfig::new(30000);
2656        let stream = Stream::with_backend(info, backend_box, cfg);
2657
2658        let control = stream.control();
2659        let control_clone = control.clone();
2660
2661        // Spawn thread to stop stream after delay
2662        thread::spawn(move || {
2663            thread::sleep(Duration::from_millis(30));
2664            control_clone.stop().unwrap();
2665        });
2666
2667        let result = stream.run(
2668            |req, buffer| {
2669                // Keep streaming until stopped
2670                let n = req.target_points.min(buffer.len()).min(10);
2671                for i in 0..n {
2672                    buffer[i] = LaserPoint::blanked(0.0, 0.0);
2673                }
2674                ChunkResult::Filled(n)
2675            },
2676            |_e| {},
2677        );
2678
2679        assert_eq!(result.unwrap(), RunExit::Stopped);
2680    }
2681
2682    #[test]
2683    fn test_full_stream_lifecycle_into_dac_recovery() {
2684        // Test recovering Dac from stream for reuse
2685        let backend = TestBackend::new();
2686
2687        let info = DacInfo {
2688            id: "test".to_string(),
2689            name: "Test Device".to_string(),
2690            kind: DacType::Custom("Test".to_string()),
2691            caps: backend.caps().clone(),
2692        };
2693
2694        // First stream session
2695        let device = Dac::new(info, Box::new(backend));
2696        let cfg = StreamConfig::new(30000);
2697        let (stream, _) = device.start_stream(cfg).unwrap();
2698
2699        let call_count = Arc::new(AtomicUsize::new(0));
2700        let call_count_clone = call_count.clone();
2701
2702        let result = stream.run(
2703            move |req, buffer| {
2704                let count = call_count_clone.fetch_add(1, Ordering::SeqCst);
2705                if count < 2 {
2706                    let n = req.target_points.min(buffer.len()).min(50);
2707                    for i in 0..n {
2708                        buffer[i] = LaserPoint::blanked(0.0, 0.0);
2709                    }
2710                    ChunkResult::Filled(n)
2711                } else {
2712                    ChunkResult::End
2713                }
2714            },
2715            |_e| {},
2716        );
2717
2718        assert_eq!(result.unwrap(), RunExit::ProducerEnded);
2719
2720        // Note: into_dac() would be tested here, but run consumes the stream
2721        // and doesn't return it. The into_dac pattern is for the blocking API.
2722        // This test verifies the stream lifecycle completes cleanly.
2723    }
2724
2725    #[test]
2726    fn test_stream_stats_tracking() {
2727        // Test that stream statistics are tracked correctly
2728        let backend = TestBackend::new();
2729
2730        let mut backend_box: Box<dyn StreamBackend> = Box::new(backend);
2731        backend_box.connect().unwrap();
2732
2733        let info = DacInfo {
2734            id: "test".to_string(),
2735            name: "Test Device".to_string(),
2736            kind: DacType::Custom("Test".to_string()),
2737            caps: backend_box.caps().clone(),
2738        };
2739
2740        let cfg = StreamConfig::new(30000);
2741        let stream = Stream::with_backend(info, backend_box, cfg);
2742
2743        // Arm the stream
2744        let control = stream.control();
2745        control.arm().unwrap();
2746
2747        let call_count = Arc::new(AtomicUsize::new(0));
2748        let call_count_clone = call_count.clone();
2749        let points_per_call = 50;
2750
2751        let result = stream.run(
2752            move |req, buffer| {
2753                let count = call_count_clone.fetch_add(1, Ordering::SeqCst);
2754                if count < 3 {
2755                    let n = req.target_points.min(buffer.len()).min(points_per_call);
2756                    for i in 0..n {
2757                        buffer[i] = LaserPoint::blanked(0.0, 0.0);
2758                    }
2759                    ChunkResult::Filled(n)
2760                } else {
2761                    ChunkResult::End
2762                }
2763            },
2764            |_e| {},
2765        );
2766
2767        assert_eq!(result.unwrap(), RunExit::ProducerEnded);
2768        // Stats tracking is verified by the successful completion
2769        // Detailed stats assertions would require access to stream after run
2770    }
2771
2772    #[test]
2773    fn test_stream_disarm_during_streaming() {
2774        // Test disarming stream while it's running
2775        use std::thread;
2776
2777        let backend = TestBackend::new();
2778        let shutter_open = backend.shutter_open.clone();
2779
2780        let mut backend_box: Box<dyn StreamBackend> = Box::new(backend);
2781        backend_box.connect().unwrap();
2782
2783        let info = DacInfo {
2784            id: "test".to_string(),
2785            name: "Test Device".to_string(),
2786            kind: DacType::Custom("Test".to_string()),
2787            caps: backend_box.caps().clone(),
2788        };
2789
2790        let cfg = StreamConfig::new(30000);
2791        let stream = Stream::with_backend(info, backend_box, cfg);
2792
2793        let control = stream.control();
2794        let control_clone = control.clone();
2795
2796        // Arm first
2797        control.arm().unwrap();
2798        assert!(control.is_armed());
2799
2800        // Spawn thread to disarm then stop
2801        thread::spawn(move || {
2802            thread::sleep(Duration::from_millis(15));
2803            control_clone.disarm().unwrap();
2804            thread::sleep(Duration::from_millis(15));
2805            control_clone.stop().unwrap();
2806        });
2807
2808        let result = stream.run(
2809            |req, buffer| {
2810                let n = req.target_points.min(buffer.len()).min(10);
2811                for i in 0..n {
2812                    buffer[i] = LaserPoint::new(0.1, 0.1, 50000, 50000, 50000, 50000);
2813                }
2814                ChunkResult::Filled(n)
2815            },
2816            |_e| {},
2817        );
2818
2819        assert_eq!(result.unwrap(), RunExit::Stopped);
2820        // Shutter should have been closed by disarm
2821        assert!(!shutter_open.load(Ordering::SeqCst));
2822    }
2823
2824    #[test]
2825    fn test_stream_with_mock_backend_disconnect() {
2826        // Test handling of backend disconnect during streaming
2827        use std::sync::atomic::AtomicBool;
2828
2829        struct DisconnectingBackend {
2830            inner: TestBackend,
2831            disconnect_after: Arc<AtomicUsize>,
2832            call_count: Arc<AtomicUsize>,
2833        }
2834
2835        impl StreamBackend for DisconnectingBackend {
2836            fn dac_type(&self) -> DacType {
2837                self.inner.dac_type()
2838            }
2839
2840            fn caps(&self) -> &DacCapabilities {
2841                self.inner.caps()
2842            }
2843
2844            fn connect(&mut self) -> Result<()> {
2845                self.inner.connect()
2846            }
2847
2848            fn disconnect(&mut self) -> Result<()> {
2849                self.inner.disconnect()
2850            }
2851
2852            fn is_connected(&self) -> bool {
2853                let count = self.call_count.load(Ordering::SeqCst);
2854                let disconnect_after = self.disconnect_after.load(Ordering::SeqCst);
2855                if count >= disconnect_after {
2856                    return false;
2857                }
2858                self.inner.is_connected()
2859            }
2860
2861            fn try_write_chunk(&mut self, pps: u32, points: &[LaserPoint]) -> Result<WriteOutcome> {
2862                self.call_count.fetch_add(1, Ordering::SeqCst);
2863                self.inner.try_write_chunk(pps, points)
2864            }
2865
2866            fn stop(&mut self) -> Result<()> {
2867                self.inner.stop()
2868            }
2869
2870            fn set_shutter(&mut self, open: bool) -> Result<()> {
2871                self.inner.set_shutter(open)
2872            }
2873
2874            fn queued_points(&self) -> Option<u64> {
2875                self.inner.queued_points()
2876            }
2877        }
2878
2879        let mut backend = DisconnectingBackend {
2880            inner: TestBackend::new(),
2881            disconnect_after: Arc::new(AtomicUsize::new(3)),
2882            call_count: Arc::new(AtomicUsize::new(0)),
2883        };
2884        backend.inner.connected = true;
2885
2886        let info = DacInfo {
2887            id: "test".to_string(),
2888            name: "Test Device".to_string(),
2889            kind: DacType::Custom("Test".to_string()),
2890            caps: backend.inner.caps().clone(),
2891        };
2892
2893        let cfg = StreamConfig::new(30000);
2894        let stream = Stream::with_backend(info, Box::new(backend), cfg);
2895
2896        let error_occurred = Arc::new(AtomicBool::new(false));
2897        let error_occurred_clone = error_occurred.clone();
2898
2899        let result = stream.run(
2900            |req, buffer| {
2901                let n = req.target_points.min(buffer.len()).min(10);
2902                for i in 0..n {
2903                    buffer[i] = LaserPoint::blanked(0.0, 0.0);
2904                }
2905                ChunkResult::Filled(n)
2906            },
2907            move |_e| {
2908                error_occurred_clone.store(true, Ordering::SeqCst);
2909            },
2910        );
2911
2912        // Should return Disconnected when backend reports disconnection
2913        assert_eq!(result.unwrap(), RunExit::Disconnected);
2914    }
2915
2916    // =========================================================================
2917    // Write error → disconnect tests
2918    //
2919    // These verify that a non-disconnected write error (Error::Backend) causes
2920    // the backend to be disconnected, so the stream exits with
2921    // RunExit::Disconnected and the device can be reconnected.
2922    //
2923    // Without the fix (backend.disconnect() call in the Err(e) branch of
2924    // write_fill_points), the backend stays "connected" and the stream loops
2925    // forever retrying writes that keep failing.
2926    // =========================================================================
2927
2928    /// A backend that returns `Error::backend()` after N successful writes.
2929    ///
2930    /// Configurable error kind and message to simulate different DAC failure
2931    /// modes: IDN write errors (BrokenPipe), Helios USB timeouts (TimedOut), etc.
2932    struct FailingWriteBackend {
2933        inner: TestBackend,
2934        fail_after: usize,
2935        write_count: Arc<AtomicUsize>,
2936        disconnect_called: Arc<AtomicBool>,
2937        error_kind: std::io::ErrorKind,
2938        error_message: &'static str,
2939        report_queue_depth: bool,
2940    }
2941
2942    impl FailingWriteBackend {
2943        /// Create a backend that fails with `BrokenPipe` after `fail_after` writes.
2944        fn new(fail_after: usize) -> Self {
2945            Self {
2946                inner: TestBackend::new(),
2947                fail_after,
2948                write_count: Arc::new(AtomicUsize::new(0)),
2949                disconnect_called: Arc::new(AtomicBool::new(false)),
2950                error_kind: std::io::ErrorKind::BrokenPipe,
2951                error_message: "simulated write failure",
2952                report_queue_depth: true,
2953            }
2954        }
2955
2956        /// Configure the error kind and message returned on failure.
2957        fn with_error(mut self, kind: std::io::ErrorKind, message: &'static str) -> Self {
2958            self.error_kind = kind;
2959            self.error_message = message;
2960            self
2961        }
2962
2963        /// Don't report queue depth (Helios-like behavior).
2964        fn without_queue_depth(mut self) -> Self {
2965            self.report_queue_depth = false;
2966            self
2967        }
2968    }
2969
2970    impl StreamBackend for FailingWriteBackend {
2971        fn dac_type(&self) -> DacType {
2972            DacType::Custom("FailingTest".to_string())
2973        }
2974
2975        fn caps(&self) -> &DacCapabilities {
2976            self.inner.caps()
2977        }
2978
2979        fn connect(&mut self) -> Result<()> {
2980            self.inner.connect()
2981        }
2982
2983        fn disconnect(&mut self) -> Result<()> {
2984            self.disconnect_called.store(true, Ordering::SeqCst);
2985            self.inner.disconnect()
2986        }
2987
2988        fn is_connected(&self) -> bool {
2989            self.inner.is_connected()
2990        }
2991
2992        fn try_write_chunk(&mut self, pps: u32, points: &[LaserPoint]) -> Result<WriteOutcome> {
2993            let count = self.write_count.fetch_add(1, Ordering::SeqCst);
2994            if count >= self.fail_after {
2995                Err(Error::backend(std::io::Error::new(
2996                    self.error_kind,
2997                    self.error_message,
2998                )))
2999            } else {
3000                self.inner.try_write_chunk(pps, points)
3001            }
3002        }
3003
3004        fn stop(&mut self) -> Result<()> {
3005            self.inner.stop()
3006        }
3007
3008        fn set_shutter(&mut self, open: bool) -> Result<()> {
3009            self.inner.set_shutter(open)
3010        }
3011
3012        fn queued_points(&self) -> Option<u64> {
3013            if self.report_queue_depth {
3014                self.inner.queued_points()
3015            } else {
3016                None
3017            }
3018        }
3019    }
3020
3021    /// Producer that fills up to 10 blanked points per call (shared by write-error tests).
3022    fn blank_producer(req: &ChunkRequest, buffer: &mut [LaserPoint]) -> ChunkResult {
3023        let n = req.target_points.min(buffer.len()).min(10);
3024        for i in 0..n {
3025            buffer[i] = LaserPoint::blanked(0.0, 0.0);
3026        }
3027        ChunkResult::Filled(n)
3028    }
3029
3030    /// Build a Stream from a backend with default test config (30 kpps).
3031    fn make_test_stream(mut backend: impl StreamBackend + 'static) -> Stream {
3032        backend.connect().unwrap();
3033        let info = DacInfo {
3034            id: "test".to_string(),
3035            name: "Test Device".to_string(),
3036            kind: backend.dac_type(),
3037            caps: backend.caps().clone(),
3038        };
3039        Stream::with_backend(info, Box::new(backend), StreamConfig::new(30000))
3040    }
3041
3042    #[test]
3043    fn test_backend_write_error_exits_with_disconnected() {
3044        // When try_write_chunk returns a non-disconnected error (Error::Backend),
3045        // the stream should disconnect the backend and exit with RunExit::Disconnected.
3046        //
3047        // Without the fix, the stream loops forever because is_connected() stays
3048        // true. This test would hang/timeout without the backend.disconnect() call.
3049        use std::thread;
3050
3051        let backend = FailingWriteBackend::new(2);
3052        let disconnect_called = backend.disconnect_called.clone();
3053        let stream = make_test_stream(backend);
3054
3055        let handle = thread::spawn(move || stream.run(blank_producer, |_err| {}));
3056        let result = handle.join().expect("stream thread panicked");
3057
3058        assert_eq!(
3059            result.unwrap(),
3060            RunExit::Disconnected,
3061            "Write error should cause stream to exit with Disconnected"
3062        );
3063        assert!(
3064            disconnect_called.load(Ordering::SeqCst),
3065            "backend.disconnect() should have been called after write error"
3066        );
3067    }
3068
3069    #[test]
3070    fn test_backend_write_error_fires_on_error() {
3071        // Verify the on_error callback is invoked with the backend error.
3072        let backend = FailingWriteBackend::new(1);
3073        let stream = make_test_stream(backend);
3074
3075        let got_backend_error = Arc::new(AtomicBool::new(false));
3076        let got_backend_error_clone = got_backend_error.clone();
3077
3078        let result = stream.run(blank_producer, move |err| {
3079            if matches!(err, Error::Backend(_)) {
3080                got_backend_error_clone.store(true, Ordering::SeqCst);
3081            }
3082        });
3083
3084        assert_eq!(result.unwrap(), RunExit::Disconnected);
3085        assert!(
3086            got_backend_error.load(Ordering::SeqCst),
3087            "on_error should have received the Backend error"
3088        );
3089    }
3090
3091    #[test]
3092    fn test_backend_write_error_immediate_fail() {
3093        // Backend that fails on the very first write should still exit cleanly.
3094        let stream = make_test_stream(FailingWriteBackend::new(0));
3095
3096        let result = stream.run(blank_producer, |_err| {});
3097
3098        assert_eq!(
3099            result.unwrap(),
3100            RunExit::Disconnected,
3101            "Immediate write failure should exit with Disconnected"
3102        );
3103    }
3104
3105    // =========================================================================
3106    // Helios-style disconnect tests
3107    //
3108    // The Helios DAC disconnects via a USB timeout on the status() poll
3109    // (inside try_write_chunk), producing Error::Backend with a TimedOut
3110    // error — not Error::Disconnected. Uses FailingWriteBackend configured
3111    // with TimedOut error kind and no queue depth reporting.
3112    // =========================================================================
3113
3114    /// Create a Helios-like backend: TimedOut error, no queue depth.
3115    fn helios_like_backend(fail_after: usize) -> FailingWriteBackend {
3116        FailingWriteBackend::new(fail_after)
3117            .with_error(
3118                std::io::ErrorKind::TimedOut,
3119                "usb connection error: Operation timed out",
3120            )
3121            .without_queue_depth()
3122    }
3123
3124    #[test]
3125    fn test_helios_status_timeout_exits_with_disconnected() {
3126        // Simulates a Helios DAC being unplugged mid-stream.
3127        //
3128        // Real-world sequence observed via USB logging:
3129        //   1. status() → read_response FAILED: Timeout (32ms interrupt read)
3130        //   2. Error mapped to Error::Backend (not Disconnected)
3131        //   3. Stream calls backend.disconnect() → dac = None
3132        //   4. Next loop: is_connected() = false → RunExit::Disconnected
3133        use std::thread;
3134
3135        let backend = helios_like_backend(3);
3136        let disconnect_called = backend.disconnect_called.clone();
3137        let stream = make_test_stream(backend);
3138
3139        let handle = thread::spawn(move || stream.run(blank_producer, |_err| {}));
3140        let result = handle.join().expect("stream thread panicked");
3141
3142        assert_eq!(
3143            result.unwrap(),
3144            RunExit::Disconnected,
3145            "Helios status timeout should cause stream to exit with Disconnected"
3146        );
3147        assert!(
3148            disconnect_called.load(Ordering::SeqCst),
3149            "backend.disconnect() should have been called on status timeout"
3150        );
3151    }
3152
3153    #[test]
3154    fn test_helios_status_timeout_fires_on_error_with_backend_variant() {
3155        // Verify the on_error callback receives an Error::Backend (not Disconnected).
3156        // This matches real Helios behavior: the USB timeout is wrapped as Backend error.
3157        let stream = make_test_stream(helios_like_backend(1));
3158
3159        let got_backend_error = Arc::new(AtomicBool::new(false));
3160        let error_received: Arc<Mutex<Option<String>>> = Arc::new(Mutex::new(None));
3161        let got_backend_error_clone = got_backend_error.clone();
3162        let error_received_clone = error_received.clone();
3163
3164        let result = stream.run(blank_producer, move |err| {
3165            if matches!(err, Error::Backend(_)) {
3166                got_backend_error_clone.store(true, Ordering::SeqCst);
3167                *error_received_clone.lock().unwrap() = Some(err.to_string());
3168            }
3169        });
3170
3171        assert_eq!(result.unwrap(), RunExit::Disconnected);
3172        assert!(
3173            got_backend_error.load(Ordering::SeqCst),
3174            "on_error should receive Error::Backend for Helios timeout"
3175        );
3176        let msg = error_received.lock().unwrap();
3177        assert!(
3178            msg.as_ref().unwrap().contains("Operation timed out"),
3179            "Error message should mention timeout, got: {:?}",
3180            msg
3181        );
3182    }
3183
3184    #[test]
3185    fn test_helios_immediate_status_timeout() {
3186        // Helios that fails on the very first status check (device was already
3187        // disconnected when stream started, or USB enumeration was stale).
3188        let backend = helios_like_backend(0);
3189        let disconnect_called = backend.disconnect_called.clone();
3190        let stream = make_test_stream(backend);
3191
3192        let result = stream.run(blank_producer, |_err| {});
3193
3194        assert_eq!(
3195            result.unwrap(),
3196            RunExit::Disconnected,
3197            "Immediate status timeout should exit with Disconnected"
3198        );
3199        assert!(
3200            disconnect_called.load(Ordering::SeqCst),
3201            "backend.disconnect() should be called even on first-write failure"
3202        );
3203    }
3204
3205    // =========================================================================
3206    // Drain wait tests
3207    // =========================================================================
3208
3209    #[test]
3210    fn test_fill_result_end_drains_with_queue_depth() {
3211        // Test that ChunkResult::End waits for queue to drain when queue depth is available
3212        use std::time::Instant;
3213
3214        let backend = TestBackend::new().with_initial_queue(1000);
3215        let queued = backend.queued.clone();
3216
3217        let mut backend_box: Box<dyn StreamBackend> = Box::new(backend);
3218        backend_box.connect().unwrap();
3219
3220        let info = DacInfo {
3221            id: "test".to_string(),
3222            name: "Test Device".to_string(),
3223            kind: DacType::Custom("Test".to_string()),
3224            caps: backend_box.caps().clone(),
3225        };
3226
3227        // Use short drain timeout for test
3228        let cfg = StreamConfig::new(30000).with_drain_timeout(Duration::from_millis(100));
3229        let stream = Stream::with_backend(info, backend_box, cfg);
3230
3231        // Simulate queue draining by setting it to 0 before the stream runs
3232        queued.store(0, Ordering::SeqCst);
3233
3234        let start = Instant::now();
3235        let result = stream.run(|_req, _buffer| ChunkResult::End, |_e| {});
3236
3237        let elapsed = start.elapsed();
3238
3239        assert_eq!(result.unwrap(), RunExit::ProducerEnded);
3240        // Should return quickly since queue was empty
3241        assert!(
3242            elapsed.as_millis() < 50,
3243            "Should return quickly when queue is empty, took {:?}",
3244            elapsed
3245        );
3246    }
3247
3248    #[test]
3249    fn test_fill_result_end_respects_drain_timeout() {
3250        // Test that drain respects timeout and doesn't block forever
3251        use std::time::Instant;
3252
3253        let backend = TestBackend::new().with_initial_queue(100000); // Large queue that won't drain
3254
3255        let mut backend_box: Box<dyn StreamBackend> = Box::new(backend);
3256        backend_box.connect().unwrap();
3257
3258        let info = DacInfo {
3259            id: "test".to_string(),
3260            name: "Test Device".to_string(),
3261            kind: DacType::Custom("Test".to_string()),
3262            caps: backend_box.caps().clone(),
3263        };
3264
3265        // Very short timeout
3266        let cfg = StreamConfig::new(30000).with_drain_timeout(Duration::from_millis(50));
3267        let stream = Stream::with_backend(info, backend_box, cfg);
3268
3269        let start = Instant::now();
3270        let result = stream.run(|_req, _buffer| ChunkResult::End, |_e| {});
3271
3272        let elapsed = start.elapsed();
3273
3274        assert_eq!(result.unwrap(), RunExit::ProducerEnded);
3275        // Should timeout around 50ms, allow some margin
3276        assert!(
3277            elapsed.as_millis() >= 40 && elapsed.as_millis() < 150,
3278            "Should respect drain timeout (~50ms), took {:?}",
3279            elapsed
3280        );
3281    }
3282
3283    #[test]
3284    fn test_fill_result_end_skips_drain_with_zero_timeout() {
3285        // Test that drain is skipped when timeout is zero
3286        use std::time::Instant;
3287
3288        let backend = TestBackend::new().with_initial_queue(100000);
3289
3290        let mut backend_box: Box<dyn StreamBackend> = Box::new(backend);
3291        backend_box.connect().unwrap();
3292
3293        let info = DacInfo {
3294            id: "test".to_string(),
3295            name: "Test Device".to_string(),
3296            kind: DacType::Custom("Test".to_string()),
3297            caps: backend_box.caps().clone(),
3298        };
3299
3300        // Zero timeout = skip drain
3301        let cfg = StreamConfig::new(30000).with_drain_timeout(Duration::ZERO);
3302        let stream = Stream::with_backend(info, backend_box, cfg);
3303
3304        let start = Instant::now();
3305        let result = stream.run(|_req, _buffer| ChunkResult::End, |_e| {});
3306
3307        let elapsed = start.elapsed();
3308
3309        assert_eq!(result.unwrap(), RunExit::ProducerEnded);
3310        // Should return immediately
3311        assert!(
3312            elapsed.as_millis() < 20,
3313            "Should skip drain with zero timeout, took {:?}",
3314            elapsed
3315        );
3316    }
3317
3318    #[test]
3319    fn test_fill_result_end_drains_without_queue_depth() {
3320        // Test drain behavior when queued_points() returns None
3321        use std::time::Instant;
3322
3323        let mut backend = NoQueueTestBackend::new();
3324        backend.inner.connected = true;
3325
3326        let info = DacInfo {
3327            id: "test".to_string(),
3328            name: "Test Device".to_string(),
3329            kind: DacType::Custom("Test".to_string()),
3330            caps: backend.inner.caps().clone(),
3331        };
3332
3333        // Short drain timeout
3334        let cfg = StreamConfig::new(30000).with_drain_timeout(Duration::from_millis(100));
3335        let stream = Stream::with_backend(info, Box::new(backend), cfg);
3336
3337        let start = Instant::now();
3338        let result = stream.run(|_req, _buffer| ChunkResult::End, |_e| {});
3339
3340        let elapsed = start.elapsed();
3341
3342        assert_eq!(result.unwrap(), RunExit::ProducerEnded);
3343        // Without queue depth, drain sleeps for estimated buffer time (0 here)
3344        // So should return quickly
3345        assert!(
3346            elapsed.as_millis() < 50,
3347            "Should return quickly with empty buffer estimate, took {:?}",
3348            elapsed
3349        );
3350    }
3351
3352    #[test]
3353    fn test_fill_result_end_closes_shutter() {
3354        // Test that shutter is closed after drain
3355        let backend = TestBackend::new();
3356        let shutter_open = backend.shutter_open.clone();
3357
3358        let mut backend_box: Box<dyn StreamBackend> = Box::new(backend);
3359        backend_box.connect().unwrap();
3360
3361        let info = DacInfo {
3362            id: "test".to_string(),
3363            name: "Test Device".to_string(),
3364            kind: DacType::Custom("Test".to_string()),
3365            caps: backend_box.caps().clone(),
3366        };
3367
3368        let cfg = StreamConfig::new(30000).with_drain_timeout(Duration::from_millis(10));
3369        let stream = Stream::with_backend(info, backend_box, cfg);
3370
3371        // Arm the stream first
3372        let control = stream.control();
3373        control.arm().unwrap();
3374
3375        let result = stream.run(
3376            |req, buffer| {
3377                // Fill some points then end
3378                let n = req.target_points.min(buffer.len()).min(10);
3379                for i in 0..n {
3380                    buffer[i] = LaserPoint::blanked(0.0, 0.0);
3381                }
3382                ChunkResult::End
3383            },
3384            |_e| {},
3385        );
3386
3387        assert_eq!(result.unwrap(), RunExit::ProducerEnded);
3388        // Shutter should be closed after graceful shutdown
3389        assert!(
3390            !shutter_open.load(Ordering::SeqCst),
3391            "Shutter should be closed after drain"
3392        );
3393    }
3394
3395    // =========================================================================
3396    // Color delay tests
3397    // =========================================================================
3398
3399    #[test]
3400    fn test_color_delay_zero_is_passthrough() {
3401        // With delay=0, colors should pass through unchanged
3402        let mut backend = TestBackend::new();
3403        backend.connected = true;
3404
3405        let info = DacInfo {
3406            id: "test".to_string(),
3407            name: "Test".to_string(),
3408            kind: DacType::Custom("Test".to_string()),
3409            caps: backend.caps().clone(),
3410        };
3411
3412        let cfg = StreamConfig::new(30000); // color_delay defaults to ZERO
3413        let mut stream = Stream::with_backend(info, Box::new(backend), cfg);
3414
3415        // Arm the stream so points aren't blanked
3416        stream.control.arm().unwrap();
3417        stream.process_control_messages();
3418        stream.state.last_armed = true;
3419
3420        // Fill chunk_buffer with colored points
3421        let n = 5;
3422        for i in 0..n {
3423            stream.state.chunk_buffer[i] =
3424                LaserPoint::new(0.0, 0.0, (i as u16 + 1) * 1000, 0, 0, 65535);
3425        }
3426
3427        // write_fill_points applies color delay internally
3428        let mut on_error = |_: Error| {};
3429        stream.write_fill_points(n, &mut on_error).unwrap();
3430
3431        // Delay line should remain empty
3432        assert!(stream.state.color_delay_line.is_empty());
3433    }
3434
3435    #[test]
3436    fn test_color_delay_shifts_colors() {
3437        // With delay=3 points, first 3 outputs should be blanked, rest shifted
3438        let mut backend = TestBackend::new();
3439        backend.connected = true;
3440
3441        let info = DacInfo {
3442            id: "test".to_string(),
3443            name: "Test".to_string(),
3444            kind: DacType::Custom("Test".to_string()),
3445            caps: backend.caps().clone(),
3446        };
3447
3448        // 10000 PPS, delay = 300µs → ceil(0.0003 * 10000) = 3 points
3449        let cfg = StreamConfig::new(10000).with_color_delay(Duration::from_micros(300));
3450        let mut stream = Stream::with_backend(info, Box::new(backend), cfg);
3451
3452        // Arm the stream
3453        stream.control.arm().unwrap();
3454        stream.process_control_messages();
3455        // handle_shutter_transition pre-fills the delay line on arm
3456        stream.state.last_armed = true;
3457
3458        // Pre-fill delay line as handle_shutter_transition would on arm
3459        stream.state.color_delay_line.clear();
3460        for _ in 0..3 {
3461            stream.state.color_delay_line.push_back((0, 0, 0, 0));
3462        }
3463
3464        // Fill 5 points with distinct colors
3465        let n = 5;
3466        for i in 0..n {
3467            stream.state.chunk_buffer[i] = LaserPoint::new(
3468                i as f32 * 0.1,
3469                0.0,
3470                (i as u16 + 1) * 10000,
3471                (i as u16 + 1) * 5000,
3472                (i as u16 + 1) * 2000,
3473                65535,
3474            );
3475        }
3476
3477        let mut on_error = |_: Error| {};
3478        stream.write_fill_points(n, &mut on_error).unwrap();
3479
3480        // After write, check the chunk_buffer was modified:
3481        // We can't inspect what was written to the backend directly,
3482        // but we can verify the delay line state.
3483        // After processing 5 points through a 3-point delay,
3484        // the delay line should still have 3 entries (the last 3 input colors).
3485        assert_eq!(stream.state.color_delay_line.len(), 3);
3486
3487        // The delay line should contain colors from inputs 3, 4, 5 (0-indexed: 2, 3, 4)
3488        let expected: Vec<(u16, u16, u16, u16)> = (3..=5)
3489            .map(|i| (i * 10000u16, i * 5000, i * 2000, 65535))
3490            .collect();
3491        let actual: Vec<(u16, u16, u16, u16)> =
3492            stream.state.color_delay_line.iter().copied().collect();
3493        assert_eq!(actual, expected);
3494    }
3495
3496    #[test]
3497    fn test_color_delay_resets_on_disarm_arm() {
3498        // Disarm should clear the delay line, arm should re-fill it
3499        let mut backend = TestBackend::new();
3500        backend.connected = true;
3501
3502        let info = DacInfo {
3503            id: "test".to_string(),
3504            name: "Test".to_string(),
3505            kind: DacType::Custom("Test".to_string()),
3506            caps: backend.caps().clone(),
3507        };
3508
3509        // 10000 PPS, delay = 200µs → ceil(0.0002 * 10000) = 2 points
3510        let cfg = StreamConfig::new(10000).with_color_delay(Duration::from_micros(200));
3511        let mut stream = Stream::with_backend(info, Box::new(backend), cfg);
3512
3513        // Arm: should pre-fill delay line
3514        stream.handle_shutter_transition(true);
3515        assert_eq!(stream.state.color_delay_line.len(), 2);
3516        assert_eq!(stream.state.color_delay_line.front(), Some(&(0, 0, 0, 0)));
3517
3518        // Disarm: should clear delay line
3519        stream.handle_shutter_transition(false);
3520        assert!(stream.state.color_delay_line.is_empty());
3521
3522        // Arm again: should re-fill
3523        stream.handle_shutter_transition(true);
3524        assert_eq!(stream.state.color_delay_line.len(), 2);
3525    }
3526
3527    #[test]
3528    fn test_color_delay_dynamic_change() {
3529        // Changing delay at runtime via atomic should resize the deque
3530        let mut backend = TestBackend::new();
3531        backend.connected = true;
3532
3533        let info = DacInfo {
3534            id: "test".to_string(),
3535            name: "Test".to_string(),
3536            kind: DacType::Custom("Test".to_string()),
3537            caps: backend.caps().clone(),
3538        };
3539
3540        // Start with 200µs delay at 10000 PPS → 2 points
3541        let cfg = StreamConfig::new(10000).with_color_delay(Duration::from_micros(200));
3542        let mut stream = Stream::with_backend(info, Box::new(backend), cfg);
3543
3544        // Arm
3545        stream.control.arm().unwrap();
3546        stream.process_control_messages();
3547        stream.state.last_armed = true;
3548
3549        // Pre-fill as handle_shutter_transition would
3550        stream.state.color_delay_line.clear();
3551        for _ in 0..2 {
3552            stream.state.color_delay_line.push_back((0, 0, 0, 0));
3553        }
3554
3555        // Fill and write a chunk
3556        let n = 3;
3557        for i in 0..n {
3558            stream.state.chunk_buffer[i] =
3559                LaserPoint::new(0.0, 0.0, (i as u16 + 1) * 10000, 0, 0, 65535);
3560        }
3561        let mut on_error = |_: Error| {};
3562        stream.write_fill_points(n, &mut on_error).unwrap();
3563
3564        // Now change delay to 500µs → ceil(0.0005 * 10000) = 5 points
3565        stream.control.set_color_delay(Duration::from_micros(500));
3566
3567        // Write another chunk — delay line should resize to 5
3568        for i in 0..n {
3569            stream.state.chunk_buffer[i] =
3570                LaserPoint::new(0.0, 0.0, (i as u16 + 4) * 10000, 0, 0, 65535);
3571        }
3572        stream.write_fill_points(n, &mut on_error).unwrap();
3573
3574        assert_eq!(stream.state.color_delay_line.len(), 5);
3575
3576        // Now disable delay entirely
3577        stream.control.set_color_delay(Duration::ZERO);
3578
3579        for i in 0..n {
3580            stream.state.chunk_buffer[i] = LaserPoint::new(0.0, 0.0, 50000, 0, 0, 65535);
3581        }
3582        stream.write_fill_points(n, &mut on_error).unwrap();
3583
3584        // Delay line should be cleared
3585        assert!(stream.state.color_delay_line.is_empty());
3586    }
3587
3588    // =========================================================================
3589    // Startup blanking tests
3590    // =========================================================================
3591
3592    #[test]
3593    fn test_startup_blank_blanks_first_n_points() {
3594        let backend = TestBackend::new();
3595        let mut backend_box: Box<dyn StreamBackend> = Box::new(backend);
3596        backend_box.connect().unwrap();
3597
3598        let info = DacInfo {
3599            id: "test".to_string(),
3600            name: "Test Device".to_string(),
3601            kind: DacType::Custom("Test".to_string()),
3602            caps: backend_box.caps().clone(),
3603        };
3604
3605        // 10000 PPS, startup_blank = 500µs → ceil(0.0005 * 10000) = 5 points
3606        // Disable color delay to isolate startup blanking
3607        let cfg = StreamConfig::new(10000)
3608            .with_startup_blank(Duration::from_micros(500))
3609            .with_color_delay(Duration::ZERO);
3610        let mut stream = Stream::with_backend(info, backend_box, cfg);
3611
3612        assert_eq!(stream.state.startup_blank_points, 5);
3613
3614        // Arm the stream (triggers handle_shutter_transition which resets counter)
3615        stream.control.arm().unwrap();
3616        stream.process_control_messages();
3617
3618        // Simulate arm transition in write path
3619        stream.state.last_armed = false; // Force transition detection
3620
3621        // Fill 10 colored points
3622        let n = 10;
3623        for i in 0..n {
3624            stream.state.chunk_buffer[i] =
3625                LaserPoint::new(i as f32 * 0.1, 0.0, 65535, 32000, 16000, 65535);
3626        }
3627
3628        let mut on_error = |_: Error| {};
3629        stream.write_fill_points(n, &mut on_error).unwrap();
3630
3631        // After write, check what was sent: we can't inspect backend directly,
3632        // but we can verify the counter decremented and the buffer was modified
3633        assert_eq!(stream.state.startup_blank_remaining, 0);
3634
3635        // Write another chunk — should NOT be blanked (counter exhausted)
3636        stream.state.last_armed = true; // No transition this time
3637        for i in 0..n {
3638            stream.state.chunk_buffer[i] = LaserPoint::new(0.0, 0.0, 65535, 32000, 16000, 65535);
3639        }
3640        stream.write_fill_points(n, &mut on_error).unwrap();
3641
3642        // Verify colors pass through unmodified (no startup blanking)
3643        // The chunk_buffer is modified in-place before write, so after write
3644        // it should still have the original colors (startup blank is exhausted)
3645        assert_eq!(stream.state.chunk_buffer[0].r, 65535);
3646        assert_eq!(stream.state.chunk_buffer[0].g, 32000);
3647    }
3648
3649    #[test]
3650    fn test_startup_blank_resets_on_rearm() {
3651        let backend = TestBackend::new();
3652        let mut backend_box: Box<dyn StreamBackend> = Box::new(backend);
3653        backend_box.connect().unwrap();
3654
3655        let info = DacInfo {
3656            id: "test".to_string(),
3657            name: "Test Device".to_string(),
3658            kind: DacType::Custom("Test".to_string()),
3659            caps: backend_box.caps().clone(),
3660        };
3661
3662        // 10000 PPS, startup_blank = 500µs → 5 points
3663        let cfg = StreamConfig::new(10000)
3664            .with_startup_blank(Duration::from_micros(500))
3665            .with_color_delay(Duration::ZERO);
3666        let mut stream = Stream::with_backend(info, backend_box, cfg);
3667
3668        // First arm cycle: consume startup blanking
3669        stream.state.last_armed = false;
3670        stream.control.arm().unwrap();
3671        stream.process_control_messages();
3672
3673        let n = 10;
3674        for i in 0..n {
3675            stream.state.chunk_buffer[i] = LaserPoint::new(0.0, 0.0, 65535, 65535, 65535, 65535);
3676        }
3677        let mut on_error = |_: Error| {};
3678        // This triggers disarmed→armed transition, which resets counter
3679        stream.state.last_armed = false;
3680        stream.write_fill_points(n, &mut on_error).unwrap();
3681        assert_eq!(stream.state.startup_blank_remaining, 0);
3682
3683        // Disarm → re-arm
3684        stream.control.disarm().unwrap();
3685        stream.process_control_messages();
3686
3687        stream.control.arm().unwrap();
3688        stream.process_control_messages();
3689
3690        // Write again — should trigger new arm transition and reset counter
3691        stream.state.last_armed = false;
3692        for i in 0..n {
3693            stream.state.chunk_buffer[i] = LaserPoint::new(0.0, 0.0, 65535, 65535, 65535, 65535);
3694        }
3695        stream.write_fill_points(n, &mut on_error).unwrap();
3696
3697        // Counter should have been reset to 5 and then decremented to 0
3698        assert_eq!(stream.state.startup_blank_remaining, 0);
3699    }
3700
3701    #[test]
3702    fn test_startup_blank_zero_is_noop() {
3703        let backend = TestBackend::new();
3704        let mut backend_box: Box<dyn StreamBackend> = Box::new(backend);
3705        backend_box.connect().unwrap();
3706
3707        let info = DacInfo {
3708            id: "test".to_string(),
3709            name: "Test Device".to_string(),
3710            kind: DacType::Custom("Test".to_string()),
3711            caps: backend_box.caps().clone(),
3712        };
3713
3714        // Disable startup blanking
3715        let cfg = StreamConfig::new(10000)
3716            .with_startup_blank(Duration::ZERO)
3717            .with_color_delay(Duration::ZERO);
3718        let mut stream = Stream::with_backend(info, backend_box, cfg);
3719
3720        assert_eq!(stream.state.startup_blank_points, 0);
3721
3722        // Arm and write colored points
3723        stream.control.arm().unwrap();
3724        stream.process_control_messages();
3725        stream.state.last_armed = false; // Force arm transition
3726
3727        let n = 5;
3728        for i in 0..n {
3729            stream.state.chunk_buffer[i] = LaserPoint::new(0.0, 0.0, 65535, 32000, 16000, 65535);
3730        }
3731        let mut on_error = |_: Error| {};
3732        stream.write_fill_points(n, &mut on_error).unwrap();
3733
3734        // Colors should pass through unmodified — no startup blanking
3735        assert_eq!(stream.state.chunk_buffer[0].r, 65535);
3736        assert_eq!(stream.state.chunk_buffer[0].g, 32000);
3737        assert_eq!(stream.state.chunk_buffer[0].b, 16000);
3738        assert_eq!(stream.state.chunk_buffer[0].intensity, 65535);
3739        assert_eq!(stream.state.startup_blank_remaining, 0);
3740    }
3741
3742    // =========================================================================
3743    // OutputModel coverage: UsbFrameSwap vs NetworkFifo scheduled_ahead
3744    // =========================================================================
3745
3746    #[test]
3747    fn test_usb_frame_swap_replaces_scheduled_ahead() {
3748        let backend = TestBackend::new().with_output_model(OutputModel::UsbFrameSwap);
3749        let mut backend_box: Box<dyn StreamBackend> = Box::new(backend);
3750        backend_box.connect().unwrap();
3751
3752        let info = DacInfo {
3753            id: "test".to_string(),
3754            name: "Test Device".to_string(),
3755            kind: DacType::Custom("Test".to_string()),
3756            caps: backend_box.caps().clone(),
3757        };
3758
3759        let cfg = StreamConfig::new(30000).with_color_delay(Duration::ZERO);
3760        let mut stream = Stream::with_backend(info, backend_box, cfg);
3761
3762        // Arm and write two chunks of 50 points each
3763        stream.control.arm().unwrap();
3764        stream.process_control_messages();
3765
3766        let n = 50;
3767        for _ in 0..2 {
3768            for i in 0..n {
3769                stream.state.chunk_buffer[i] = LaserPoint::new(0.0, 0.0, 0, 0, 0, 0);
3770            }
3771            let mut on_error = |_: Error| {};
3772            stream.write_fill_points(n, &mut on_error).unwrap();
3773        }
3774
3775        // UsbFrameSwap: scheduled_ahead should be SET to n, not accumulated to 2n
3776        assert_eq!(stream.state.scheduled_ahead, n as u64);
3777        assert_eq!(stream.state.stats.chunks_written, 2);
3778        assert_eq!(stream.state.stats.points_written, 2 * n as u64);
3779    }
3780
3781    #[test]
3782    fn test_usb_frame_swap_no_queue_reporting() {
3783        let backend = NoQueueTestBackend::new().with_output_model(OutputModel::UsbFrameSwap);
3784        let mut backend_box: Box<dyn StreamBackend> = Box::new(backend);
3785        backend_box.connect().unwrap();
3786
3787        let info = DacInfo {
3788            id: "test".to_string(),
3789            name: "Test Device".to_string(),
3790            kind: DacType::Custom("Test".to_string()),
3791            caps: backend_box.caps().clone(),
3792        };
3793
3794        let cfg = StreamConfig::new(30000).with_color_delay(Duration::ZERO);
3795        let mut stream = Stream::with_backend(info, backend_box, cfg);
3796
3797        // Arm and write two chunks
3798        stream.control.arm().unwrap();
3799        stream.process_control_messages();
3800
3801        let n = 50;
3802        for _ in 0..2 {
3803            for i in 0..n {
3804                stream.state.chunk_buffer[i] = LaserPoint::new(0.0, 0.0, 0, 0, 0, 0);
3805            }
3806            let mut on_error = |_: Error| {};
3807            stream.write_fill_points(n, &mut on_error).unwrap();
3808        }
3809
3810        // UsbFrameSwap + no queue reporting (like real Helios):
3811        // scheduled_ahead should be SET, not accumulated
3812        assert_eq!(stream.state.scheduled_ahead, n as u64);
3813
3814        // Buffer estimation should use software-only path (queued_points returns None)
3815        let est = stream.estimate_buffer_points();
3816        // Software estimate equals scheduled_ahead = n (SET, not accumulated)
3817        assert_eq!(est, n as u64);
3818    }
3819
3820    #[test]
3821    fn test_network_fifo_accumulates_scheduled_ahead() {
3822        let backend = TestBackend::new(); // default: NetworkFifo
3823        let mut backend_box: Box<dyn StreamBackend> = Box::new(backend);
3824        backend_box.connect().unwrap();
3825
3826        let info = DacInfo {
3827            id: "test".to_string(),
3828            name: "Test Device".to_string(),
3829            kind: DacType::Custom("Test".to_string()),
3830            caps: backend_box.caps().clone(),
3831        };
3832
3833        let cfg = StreamConfig::new(30000).with_color_delay(Duration::ZERO);
3834        let mut stream = Stream::with_backend(info, backend_box, cfg);
3835
3836        // Arm and write two chunks of 50 points each
3837        stream.control.arm().unwrap();
3838        stream.process_control_messages();
3839
3840        let n = 50;
3841        for _ in 0..2 {
3842            for i in 0..n {
3843                stream.state.chunk_buffer[i] = LaserPoint::new(0.0, 0.0, 0, 0, 0, 0);
3844            }
3845            let mut on_error = |_: Error| {};
3846            stream.write_fill_points(n, &mut on_error).unwrap();
3847        }
3848
3849        // NetworkFifo: scheduled_ahead should ACCUMULATE to 2n
3850        assert_eq!(stream.state.scheduled_ahead, 2 * n as u64);
3851        assert_eq!(stream.state.stats.chunks_written, 2);
3852        assert_eq!(stream.state.stats.points_written, 2 * n as u64);
3853    }
3854
3855    #[test]
3856    fn test_validate_config_rejects_pps_below_min() {
3857        // Helios-like caps: pps_min = 7
3858        let caps = DacCapabilities {
3859            pps_min: 7,
3860            pps_max: 65535,
3861            max_points_per_chunk: 1000,
3862            output_model: OutputModel::NetworkFifo,
3863        };
3864        let cfg = StreamConfig::new(5);
3865        let result = Dac::validate_config(&caps, &cfg);
3866        assert!(result.is_err());
3867        let msg = result.unwrap_err().to_string();
3868        assert!(msg.contains("PPS 5"), "expected PPS 5 in error: {msg}");
3869    }
3870
3871    #[test]
3872    fn test_validate_config_rejects_pps_above_max() {
3873        // LaserCube-like caps: pps_max = 35000
3874        let caps = DacCapabilities {
3875            pps_min: 1,
3876            pps_max: 35_000,
3877            max_points_per_chunk: 1000,
3878            output_model: OutputModel::NetworkFifo,
3879        };
3880        let cfg = StreamConfig::new(50_000);
3881        let result = Dac::validate_config(&caps, &cfg);
3882        assert!(result.is_err());
3883        let msg = result.unwrap_err().to_string();
3884        assert!(
3885            msg.contains("PPS 50000"),
3886            "expected PPS 50000 in error: {msg}"
3887        );
3888    }
3889
3890    #[test]
3891    fn test_validate_config_avb_accepts_standard_pps() {
3892        // AVB caps: wide range due to resampling
3893        let caps = DacCapabilities {
3894            pps_min: 1,
3895            pps_max: 100_000,
3896            max_points_per_chunk: 4096,
3897            output_model: OutputModel::NetworkFifo,
3898        };
3899        let cfg = StreamConfig::new(30_000);
3900        assert!(Dac::validate_config(&caps, &cfg).is_ok());
3901    }
3902}