Skip to main content

laser_dac/
stream.rs

1//! Stream and Dac types for point output.
2//!
3//! This module provides the `Stream` type for streaming point chunks to a DAC,
4//! `StreamControl` for out-of-band control (arm/disarm/stop), and `Dac` for
5//! connected devices that can start streaming sessions.
6//!
7//! # Armed/Disarmed Model
8//!
9//! Streams use a binary armed/disarmed safety model:
10//!
11//! - **Armed**: Content passes through to device, shutter opened (best-effort).
12//! - **Disarmed** (default): Shutter closed, intensity/RGB forced to 0.
13//!
14//! All streams start disarmed. Call `arm()` to enable output.
15//! `arm()` and `disarm()` are the only safety controls — there is no separate
16//! shutter API. This keeps the mental model simple: armed = laser may emit.
17//!
18//! # Hardware Shutter Support
19//!
20//! Shutter control is best-effort and varies by backend:
21//! - **LaserCube USB/WiFi**: Actual hardware control
22//! - **Helios**: Hardware shutter control via USB interrupt
23//! - **Ether Dream, IDN**: No-op (safety relies on software blanking)
24//!
25//! # Disconnect Behavior
26//!
27//! No automatic reconnection. On disconnect, create a new `Dac` and `Stream`.
28//! New streams always start disarmed for safety.
29
30use std::collections::VecDeque;
31use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
32use std::sync::mpsc::{self, Receiver, Sender, TryRecvError};
33use std::sync::{Arc, Mutex};
34use std::time::Duration;
35
36use crate::backend::{Error, Result, StreamBackend, WriteOutcome};
37use crate::types::{
38    ChunkRequest, ChunkResult, DacCapabilities, DacInfo, DacType, LaserPoint, OutputModel, RunExit,
39    StreamConfig, StreamInstant, StreamStats, StreamStatus, UnderrunPolicy,
40};
41
42// =============================================================================
43// Stream Control
44// =============================================================================
45
46/// Control messages sent from StreamControl to Stream.
47///
48/// These messages allow out-of-band control actions to take effect immediately,
49/// even when the stream is waiting (pacing, backpressure, etc.).
50#[derive(Debug, Clone, Copy)]
51enum ControlMsg {
52    /// Arm the output (opens hardware shutter).
53    Arm,
54    /// Disarm the output (closes hardware shutter).
55    Disarm,
56    /// Request the stream to stop.
57    Stop,
58}
59
60/// Thread-safe control handle for safety-critical actions.
61///
62/// This allows out-of-band control of the stream (arm/disarm/stop) from
63/// a different thread, e.g., for E-stop functionality.
64///
65/// Control actions take effect as soon as possible - the stream processes
66/// control messages at every opportunity (during waits, between retries, etc.).
67#[derive(Clone)]
68pub struct StreamControl {
69    inner: Arc<StreamControlInner>,
70}
71
72struct StreamControlInner {
73    /// Whether output is armed (laser can fire).
74    armed: AtomicBool,
75    /// Whether a stop has been requested.
76    stop_requested: AtomicBool,
77    /// Channel for sending control messages to the stream loop.
78    /// Wrapped in Mutex because Sender is Send but not Sync.
79    control_tx: Mutex<Sender<ControlMsg>>,
80    /// Color delay in microseconds (readable per-chunk without locking).
81    color_delay_micros: AtomicU64,
82}
83
84impl StreamControl {
85    fn new(control_tx: Sender<ControlMsg>, color_delay: Duration) -> Self {
86        Self {
87            inner: Arc::new(StreamControlInner {
88                armed: AtomicBool::new(false),
89                stop_requested: AtomicBool::new(false),
90                control_tx: Mutex::new(control_tx),
91                color_delay_micros: AtomicU64::new(color_delay.as_micros() as u64),
92            }),
93        }
94    }
95
96    /// Arm the output (allow laser to fire).
97    ///
98    /// When armed, content from the producer passes through unmodified
99    /// and the hardware shutter is opened (best-effort).
100    pub fn arm(&self) -> Result<()> {
101        self.inner.armed.store(true, Ordering::SeqCst);
102        // Send message to stream for immediate shutter control
103        if let Ok(tx) = self.inner.control_tx.lock() {
104            let _ = tx.send(ControlMsg::Arm);
105        }
106        Ok(())
107    }
108
109    /// Disarm the output (force laser off). Designed for E-stop use.
110    ///
111    /// Immediately sets an atomic flag (works even if stream loop is blocked),
112    /// then sends a message to close the hardware shutter. All future points
113    /// are blanked in software. The stream stays alive outputting blanks -
114    /// use `stop()` to terminate entirely.
115    ///
116    /// **Latency**: Points already in the device buffer will still play out.
117    /// `target_buffer` bounds this latency.
118    ///
119    /// **Hardware shutter**: Best-effort. LaserCube and Helios have actual hardware
120    /// control; Ether Dream, IDN are no-ops (safety relies on software blanking).
121    pub fn disarm(&self) -> Result<()> {
122        self.inner.armed.store(false, Ordering::SeqCst);
123        // Send message to stream for immediate shutter control
124        if let Ok(tx) = self.inner.control_tx.lock() {
125            let _ = tx.send(ControlMsg::Disarm);
126        }
127        Ok(())
128    }
129
130    /// Check if the output is armed.
131    pub fn is_armed(&self) -> bool {
132        self.inner.armed.load(Ordering::SeqCst)
133    }
134
135    /// Set the color delay for scanner sync compensation.
136    ///
137    /// Takes effect within one chunk period. The delay is quantized to
138    /// whole points: `ceil(delay * pps)`.
139    pub fn set_color_delay(&self, delay: Duration) {
140        self.inner
141            .color_delay_micros
142            .store(delay.as_micros() as u64, Ordering::SeqCst);
143    }
144
145    /// Get the current color delay.
146    pub fn color_delay(&self) -> Duration {
147        Duration::from_micros(self.inner.color_delay_micros.load(Ordering::SeqCst))
148    }
149
150    /// Request the stream to stop.
151    ///
152    /// Signals termination; `run()` returns `RunExit::Stopped`.
153    /// For clean shutdown with shutter close, prefer `Stream::stop()`.
154    pub fn stop(&self) -> Result<()> {
155        self.inner.stop_requested.store(true, Ordering::SeqCst);
156        // Send message to stream for immediate stop
157        if let Ok(tx) = self.inner.control_tx.lock() {
158            let _ = tx.send(ControlMsg::Stop);
159        }
160        Ok(())
161    }
162
163    /// Check if a stop has been requested.
164    pub fn is_stop_requested(&self) -> bool {
165        self.inner.stop_requested.load(Ordering::SeqCst)
166    }
167}
168
169// =============================================================================
170// Stream State
171// =============================================================================
172
173struct StreamState {
174    /// Current position in stream time (points since start).
175    current_instant: StreamInstant,
176    /// Points scheduled ahead of current_instant.
177    scheduled_ahead: u64,
178
179    // Pre-allocated buffers (no per-chunk allocation in hot path)
180    /// Buffer for callback to fill points into.
181    chunk_buffer: Vec<LaserPoint>,
182    /// Last chunk for RepeatLast underrun policy.
183    last_chunk: Vec<LaserPoint>,
184    /// Number of valid points in last_chunk.
185    last_chunk_len: usize,
186
187    /// FIFO for color delay (r, g, b, intensity per point).
188    color_delay_line: VecDeque<(u16, u16, u16, u16)>,
189
190    /// Points remaining in the startup blank window (decremented as points are written).
191    startup_blank_remaining: usize,
192    /// Total startup blank points (computed once from config).
193    startup_blank_points: usize,
194
195    /// Statistics.
196    stats: StreamStats,
197    /// Track the last armed state to detect transitions.
198    last_armed: bool,
199    /// Whether the hardware shutter is currently open.
200    shutter_open: bool,
201}
202
203impl StreamState {
204    /// Create new stream state with pre-allocated buffers.
205    ///
206    /// Buffers are sized to `max_points_per_chunk` from DAC capabilities,
207    /// ensuring we can handle any catch-up scenario without reallocation.
208    fn new(max_points_per_chunk: usize, startup_blank_points: usize) -> Self {
209        Self {
210            current_instant: StreamInstant::new(0),
211            scheduled_ahead: 0,
212            chunk_buffer: vec![LaserPoint::default(); max_points_per_chunk],
213            last_chunk: vec![LaserPoint::default(); max_points_per_chunk],
214            last_chunk_len: 0,
215            color_delay_line: VecDeque::new(),
216            startup_blank_remaining: 0,
217            startup_blank_points,
218            stats: StreamStats::default(),
219            last_armed: false,
220            shutter_open: false,
221        }
222    }
223}
224
225// =============================================================================
226// Stream
227// =============================================================================
228
229/// A streaming session for outputting points to a DAC.
230///
231/// Use [`run()`](Self::run) to stream with buffer-driven timing.
232/// The callback is invoked when the buffer needs filling, providing automatic
233/// backpressure handling and zero allocations in the hot path.
234///
235/// The stream owns pacing, backpressure, and the timebase (`StreamInstant`).
236pub struct Stream {
237    /// Device info for this stream.
238    info: DacInfo,
239    /// The backend.
240    backend: Option<Box<dyn StreamBackend>>,
241    /// Stream configuration.
242    config: StreamConfig,
243    /// Thread-safe control handle.
244    control: StreamControl,
245    /// Receiver for control messages from StreamControl.
246    control_rx: Receiver<ControlMsg>,
247    /// Stream state.
248    state: StreamState,
249}
250
251impl Stream {
252    /// Create a new stream with a backend.
253    pub(crate) fn with_backend(
254        info: DacInfo,
255        backend: Box<dyn StreamBackend>,
256        config: StreamConfig,
257    ) -> Self {
258        let (control_tx, control_rx) = mpsc::channel();
259        let max_points = info.caps.max_points_per_chunk;
260        let startup_blank_points = if config.startup_blank.is_zero() {
261            0
262        } else {
263            (config.startup_blank.as_secs_f64() * config.pps as f64).ceil() as usize
264        };
265        Self {
266            info,
267            backend: Some(backend),
268            config: config.clone(),
269            control: StreamControl::new(control_tx, config.color_delay),
270            control_rx,
271            state: StreamState::new(max_points, startup_blank_points),
272        }
273    }
274
275    /// Returns the device info.
276    pub fn info(&self) -> &DacInfo {
277        &self.info
278    }
279
280    /// Returns the stream configuration.
281    pub fn config(&self) -> &StreamConfig {
282        &self.config
283    }
284
285    /// Returns a thread-safe control handle.
286    pub fn control(&self) -> StreamControl {
287        self.control.clone()
288    }
289
290    /// Returns the current stream status.
291    pub fn status(&self) -> Result<StreamStatus> {
292        let device_queued_points = self.backend.as_ref().and_then(|b| b.queued_points());
293
294        Ok(StreamStatus {
295            connected: self
296                .backend
297                .as_ref()
298                .map(|b| b.is_connected())
299                .unwrap_or(false),
300            scheduled_ahead_points: self.state.scheduled_ahead,
301            device_queued_points,
302            stats: Some(self.state.stats.clone()),
303        })
304    }
305
306    /// Handle hardware shutter transitions based on arm state changes.
307    fn handle_shutter_transition(&mut self, is_armed: bool) {
308        let was_armed = self.state.last_armed;
309        self.state.last_armed = is_armed;
310
311        if was_armed && !is_armed {
312            // Disarmed: close the shutter for safety (best-effort)
313            self.state.color_delay_line.clear();
314            if self.state.shutter_open {
315                if let Some(backend) = &mut self.backend {
316                    let _ = backend.set_shutter(false); // Best-effort, ignore errors
317                }
318                self.state.shutter_open = false;
319            }
320        } else if !was_armed && is_armed {
321            // Armed: open the shutter (best-effort)
322            // Pre-fill color delay line with blanked colors so early points
323            // come out dark while galvos settle.
324            let delay_micros = self.control.inner.color_delay_micros.load(Ordering::SeqCst);
325            let delay_points = if delay_micros == 0 {
326                0
327            } else {
328                (Duration::from_micros(delay_micros).as_secs_f64() * self.config.pps as f64).ceil()
329                    as usize
330            };
331            self.state.color_delay_line.clear();
332            for _ in 0..delay_points {
333                self.state.color_delay_line.push_back((0, 0, 0, 0));
334            }
335
336            self.state.startup_blank_remaining = self.state.startup_blank_points;
337
338            if !self.state.shutter_open {
339                if let Some(backend) = &mut self.backend {
340                    let _ = backend.set_shutter(true); // Best-effort, ignore errors
341                }
342                self.state.shutter_open = true;
343            }
344        }
345    }
346
347    /// Stop the stream and terminate output.
348    ///
349    /// Disarms the output (software blanking + hardware shutter) before stopping
350    /// the backend to prevent the "freeze on last bright point" hazard.
351    /// Use `disarm()` instead if you want to keep the stream alive but safe.
352    pub fn stop(&mut self) -> Result<()> {
353        // Disarm: sets armed flag for software blanking
354        self.control.disarm()?;
355
356        self.control.stop()?;
357
358        // Directly close shutter and stop backend (defense-in-depth)
359        if let Some(backend) = &mut self.backend {
360            let _ = backend.set_shutter(false);
361            backend.stop()?;
362        }
363
364        Ok(())
365    }
366
367    /// Consume the stream and recover the device for reuse.
368    ///
369    /// This method disarms and stops the stream (software blanking + hardware shutter),
370    /// then returns the underlying `Dac` along with the final `StreamStats`.
371    /// The device can then be used to start a new stream with different configuration.
372    ///
373    /// # Example
374    ///
375    /// ```ignore
376    /// use laser_dac::StreamConfig;
377    ///
378    /// // device: Dac, config: StreamConfig (from prior setup)
379    /// let (stream, info) = device.start_stream(config)?;
380    /// // ... stream for a while ...
381    /// let (device, stats) = stream.into_dac();
382    /// println!("Streamed {} points", stats.points_written);
383    ///
384    /// // Restart with different config
385    /// let new_config = StreamConfig::new(60_000);
386    /// let (stream2, _) = device.start_stream(new_config)?;
387    /// ```
388    pub fn into_dac(mut self) -> (Dac, StreamStats) {
389        // Disarm (software blanking) and close shutter before stopping
390        let _ = self.control.disarm();
391        let _ = self.control.stop();
392        if let Some(backend) = &mut self.backend {
393            let _ = backend.set_shutter(false);
394            let _ = backend.stop();
395        }
396
397        // Take the backend (leaves None, so Drop won't try to stop again)
398        let backend = self.backend.take();
399        let stats = self.state.stats.clone();
400
401        let dac = Dac {
402            info: self.info.clone(),
403            backend,
404        };
405
406        (dac, stats)
407    }
408
409    /// Run the stream with the zero-allocation callback API.
410    ///
411    /// This method uses **pure buffer-driven timing**:
412    /// - Callback is invoked when `buffered < target_buffer`
413    /// - Points requested varies based on buffer headroom (`min_points`, `target_points`)
414    /// - Callback fills a library-owned buffer (zero allocations in hot path)
415    ///
416    /// # Callback Contract
417    ///
418    /// The callback receives a `ChunkRequest` describing buffer state and requirements,
419    /// and a mutable slice to fill with points. It returns:
420    ///
421    /// - `ChunkResult::Filled(n)`: Wrote `n` points to the buffer
422    /// - `ChunkResult::Starved`: No data available (underrun policy applies)
423    /// - `ChunkResult::End`: Stream should end gracefully
424    ///
425    /// # Exit Conditions
426    ///
427    /// - **`RunExit::Stopped`**: Stop requested via `StreamControl::stop()`.
428    /// - **`RunExit::ProducerEnded`**: Callback returned `ChunkResult::End`.
429    /// - **`RunExit::Disconnected`**: Device disconnected.
430    ///
431    /// # Example
432    ///
433    /// ```ignore
434    /// use laser_dac::{ChunkRequest, ChunkResult, LaserPoint};
435    ///
436    /// stream.run(
437    ///     |req: &ChunkRequest, buffer: &mut [LaserPoint]| {
438    ///         let n = req.target_points;
439    ///         for i in 0..n {
440    ///             let t = req.start.as_secs_f64(req.pps) + (i as f64 / req.pps as f64);
441    ///             let angle = (t * std::f64::consts::TAU) as f32;
442    ///             buffer[i] = LaserPoint::new(angle.cos(), angle.sin(), 65535, 0, 0, 65535);
443    ///         }
444    ///         ChunkResult::Filled(n)
445    ///     },
446    ///     |err| eprintln!("Error: {}", err),
447    /// )?;
448    /// ```
449    pub fn run<F, E>(mut self, mut producer: F, mut on_error: E) -> Result<RunExit>
450    where
451        F: FnMut(&ChunkRequest, &mut [LaserPoint]) -> ChunkResult + Send + 'static,
452        E: FnMut(Error) + Send + 'static,
453    {
454        use std::time::Instant;
455
456        let pps = self.config.pps as f64;
457        let max_points = self.info.caps.max_points_per_chunk;
458
459        // Track time between iterations to decrement scheduled_ahead for backends
460        // that don't report queued_points(). This prevents stalls when buffered
461        // equals target_points exactly.
462        let mut last_iteration = Instant::now();
463
464        loop {
465            // 1. Check for stop request
466            if self.control.is_stop_requested() {
467                return Ok(RunExit::Stopped);
468            }
469
470            // Decrement scheduled_ahead based on elapsed time since last iteration.
471            // This is critical for backends without queued_points() - without this,
472            // scheduled_ahead would never decrease and target_points would stay 0.
473            let now = Instant::now();
474            let elapsed = now.duration_since(last_iteration);
475            let points_consumed = (elapsed.as_secs_f64() * pps) as u64;
476            self.state.scheduled_ahead = self.state.scheduled_ahead.saturating_sub(points_consumed);
477            last_iteration = now;
478
479            // 2. Estimate buffer state
480            let buffered = self.estimate_buffer_points();
481            let target_points = (self.config.target_buffer.as_secs_f64() * pps) as u64;
482
483            // 3. If buffer is above target, sleep until it drains to target
484            // Note: use > not >= so we call producer when exactly at target
485            if buffered > target_points {
486                let excess_points = buffered - target_points;
487                let sleep_time = Duration::from_secs_f64(excess_points as f64 / pps);
488                if self.sleep_with_control_check(sleep_time)? {
489                    return Ok(RunExit::Stopped);
490                }
491                continue; // Re-check buffer after sleep
492            }
493
494            // 4. Check backend connection
495            if let Some(backend) = &self.backend {
496                if !backend.is_connected() {
497                    log::warn!("backend.is_connected() = false, exiting with Disconnected");
498                    on_error(Error::disconnected("backend disconnected"));
499                    return Ok(RunExit::Disconnected);
500                }
501            } else {
502                log::warn!("no backend, exiting with Disconnected");
503                on_error(Error::disconnected("no backend"));
504                return Ok(RunExit::Disconnected);
505            }
506
507            // 5. Process control messages before calling producer
508            if self.process_control_messages() {
509                return Ok(RunExit::Stopped);
510            }
511
512            // 6. Build fill request with buffer state
513            let req = self.build_fill_request(max_points);
514
515            // 7. Call producer with pre-allocated buffer
516            let buffer = &mut self.state.chunk_buffer[..max_points];
517            let result = producer(&req, buffer);
518
519            // 8. Handle result
520            match result {
521                ChunkResult::Filled(n) => {
522                    // Validate n doesn't exceed buffer
523                    let n = n.min(max_points);
524
525                    // Treat Filled(0) with target_points > 0 as Starved
526                    if n == 0 && req.target_points > 0 {
527                        self.handle_underrun(&req)?;
528                        continue;
529                    }
530
531                    // Write to backend if we have points
532                    if n > 0 {
533                        self.write_fill_points(n, &mut on_error)?;
534                    }
535                }
536                ChunkResult::Starved => {
537                    self.handle_underrun(&req)?;
538                }
539                ChunkResult::End => {
540                    // Graceful shutdown: let queued points drain, then blank/park
541                    self.drain_and_blank();
542                    return Ok(RunExit::ProducerEnded);
543                }
544            }
545        }
546    }
547
548    /// Sleep for the given duration while checking for control messages.
549    ///
550    /// Returns `true` if stop was requested, `false` otherwise.
551    fn sleep_with_control_check(&mut self, duration: Duration) -> Result<bool> {
552        const SLEEP_SLICE: Duration = Duration::from_millis(2);
553        let mut remaining = duration;
554
555        while remaining > Duration::ZERO {
556            let slice = remaining.min(SLEEP_SLICE);
557            std::thread::sleep(slice);
558            remaining = remaining.saturating_sub(slice);
559
560            // Process control messages for immediate response
561            if self.process_control_messages() {
562                return Ok(true);
563            }
564        }
565
566        Ok(false)
567    }
568
569    /// Write points from chunk_buffer to the backend.
570    ///
571    /// Called by `run` after the producer fills the buffer.
572    fn write_fill_points<E>(&mut self, n: usize, on_error: &mut E) -> Result<()>
573    where
574        E: FnMut(Error),
575    {
576        let is_armed = self.control.is_armed();
577        let pps = self.config.pps;
578
579        // Handle shutter transitions
580        self.handle_shutter_transition(is_armed);
581
582        // Blank points in-place when disarmed
583        if !is_armed {
584            for p in &mut self.state.chunk_buffer[..n] {
585                *p = LaserPoint::blanked(p.x, p.y);
586            }
587        }
588
589        // Apply startup blanking: force first N points after arming to blank
590        if is_armed && self.state.startup_blank_remaining > 0 {
591            let blank_count = n.min(self.state.startup_blank_remaining);
592            for p in &mut self.state.chunk_buffer[..blank_count] {
593                p.r = 0;
594                p.g = 0;
595                p.b = 0;
596                p.intensity = 0;
597            }
598            self.state.startup_blank_remaining -= blank_count;
599        }
600
601        // Apply color delay: read current setting, resize deque, shift colors
602        let delay_micros = self.control.inner.color_delay_micros.load(Ordering::SeqCst);
603        let color_delay_points = if delay_micros == 0 {
604            0
605        } else {
606            (Duration::from_micros(delay_micros).as_secs_f64() * self.config.pps as f64).ceil()
607                as usize
608        };
609
610        if color_delay_points > 0 {
611            // Resize deque to match current delay (handles dynamic changes)
612            self.state
613                .color_delay_line
614                .resize(color_delay_points, (0, 0, 0, 0));
615            for p in &mut self.state.chunk_buffer[..n] {
616                self.state
617                    .color_delay_line
618                    .push_back((p.r, p.g, p.b, p.intensity));
619                let (r, g, b, i) = self.state.color_delay_line.pop_front().unwrap();
620                p.r = r;
621                p.g = g;
622                p.b = b;
623                p.intensity = i;
624            }
625        } else if !self.state.color_delay_line.is_empty() {
626            // Delay was disabled at runtime — flush the line
627            self.state.color_delay_line.clear();
628        }
629
630        // Try to write with backpressure handling
631        loop {
632            // Check backend exists
633            let backend = match self.backend.as_mut() {
634                Some(b) => b,
635                None => return Err(Error::disconnected("no backend")),
636            };
637
638            match backend.try_write_chunk(pps, &self.state.chunk_buffer[..n]) {
639                Ok(WriteOutcome::Written) => {
640                    self.record_write(n, is_armed);
641                    return Ok(());
642                }
643                Ok(WriteOutcome::WouldBlock) => {
644                    // Backend buffer full - yield and retry
645                    // Borrow of backend is dropped here, so we can call process_control_messages
646                }
647                Err(e) if e.is_stopped() => {
648                    return Err(Error::Stopped);
649                }
650                Err(e) if e.is_disconnected() => {
651                    log::warn!("write got Disconnected error, exiting stream: {e}");
652                    on_error(Error::disconnected("backend disconnected"));
653                    return Err(e);
654                }
655                Err(e) => {
656                    log::warn!("write error, disconnecting backend: {e}");
657                    let _ = backend.disconnect();
658                    on_error(e);
659                    return Ok(());
660                }
661            }
662
663            // Handle WouldBlock: yield and process control messages
664            std::thread::yield_now();
665            if self.process_control_messages() {
666                return Err(Error::Stopped);
667            }
668            std::thread::sleep(Duration::from_micros(100));
669        }
670    }
671
672    /// Handle underrun for the fill API by applying the underrun policy.
673    fn handle_underrun(&mut self, req: &ChunkRequest) -> Result<()> {
674        self.state.stats.underrun_count += 1;
675
676        let is_armed = self.control.is_armed();
677        self.handle_shutter_transition(is_armed);
678
679        // Calculate how many points we need (use target_points as the fill amount)
680        let n_points = req.target_points.max(1);
681
682        // Fill chunk_buffer with underrun content
683        let fill_start = if !is_armed {
684            // When disarmed, always output blanked points
685            for i in 0..n_points {
686                self.state.chunk_buffer[i] = LaserPoint::blanked(0.0, 0.0);
687            }
688            n_points
689        } else {
690            match &self.config.underrun {
691                UnderrunPolicy::RepeatLast => {
692                    if self.state.last_chunk_len > 0 {
693                        // Repeat last chunk cyclically
694                        for i in 0..n_points {
695                            self.state.chunk_buffer[i] =
696                                self.state.last_chunk[i % self.state.last_chunk_len];
697                        }
698                        n_points
699                    } else {
700                        // No last chunk, fall back to blank
701                        for i in 0..n_points {
702                            self.state.chunk_buffer[i] = LaserPoint::blanked(0.0, 0.0);
703                        }
704                        n_points
705                    }
706                }
707                UnderrunPolicy::Blank => {
708                    for i in 0..n_points {
709                        self.state.chunk_buffer[i] = LaserPoint::blanked(0.0, 0.0);
710                    }
711                    n_points
712                }
713                UnderrunPolicy::Park { x, y } => {
714                    for i in 0..n_points {
715                        self.state.chunk_buffer[i] = LaserPoint::blanked(*x, *y);
716                    }
717                    n_points
718                }
719                UnderrunPolicy::Stop => {
720                    self.control.stop()?;
721                    return Err(Error::Stopped);
722                }
723            }
724        };
725
726        // Write the fill points
727        if let Some(backend) = &mut self.backend {
728            match backend.try_write_chunk(self.config.pps, &self.state.chunk_buffer[..fill_start]) {
729                Ok(WriteOutcome::Written) => {
730                    self.record_write(fill_start, is_armed);
731                }
732                Ok(WriteOutcome::WouldBlock) => {
733                    // Backend is full - expected during underrun
734                }
735                Err(_) => {
736                    // Backend error during underrun handling - ignore
737                }
738            }
739        }
740
741        Ok(())
742    }
743
744    /// Record a successful write: update last_chunk, timebase, and stats.
745    ///
746    /// Handles `UsbFrameSwap` output model (Helios-style double-buffering) by
747    /// **replacing** `scheduled_ahead` instead of accumulating, since the device
748    /// holds at most one frame at a time.
749    fn record_write(&mut self, n: usize, is_armed: bool) {
750        if is_armed {
751            debug_assert!(
752                n <= self.state.last_chunk.len(),
753                "n ({}) exceeds last_chunk capacity ({})",
754                n,
755                self.state.last_chunk.len()
756            );
757            self.state.last_chunk[..n].copy_from_slice(&self.state.chunk_buffer[..n]);
758            self.state.last_chunk_len = n;
759        }
760        self.state.current_instant += n as u64;
761        if self.info.caps.output_model == OutputModel::UsbFrameSwap {
762            // Double-buffered devices (e.g. Helios) hold at most one frame.
763            // Replace rather than accumulate to reflect the actual queue depth.
764            self.state.scheduled_ahead = n as u64;
765        } else {
766            self.state.scheduled_ahead += n as u64;
767        }
768        self.state.stats.chunks_written += 1;
769        self.state.stats.points_written += n as u64;
770    }
771
772    // =========================================================================
773    // Internal helpers
774    // =========================================================================
775
776    /// Process any pending control messages from StreamControl.
777    ///
778    /// This method drains the control message queue and takes immediate action:
779    /// - `Arm`: Opens the shutter (best-effort)
780    /// - `Disarm`: Closes the shutter immediately
781    /// - `Stop`: Returns `true` to signal the caller to stop
782    ///
783    /// Returns `true` if stop was requested, `false` otherwise.
784    fn process_control_messages(&mut self) -> bool {
785        loop {
786            match self.control_rx.try_recv() {
787                Ok(ControlMsg::Arm) => {
788                    // Open shutter (best-effort) if not already open
789                    if !self.state.shutter_open {
790                        if let Some(backend) = &mut self.backend {
791                            let _ = backend.set_shutter(true);
792                        }
793                        self.state.shutter_open = true;
794                    }
795                }
796                Ok(ControlMsg::Disarm) => {
797                    // Close shutter immediately for safety
798                    if self.state.shutter_open {
799                        if let Some(backend) = &mut self.backend {
800                            let _ = backend.set_shutter(false);
801                        }
802                        self.state.shutter_open = false;
803                    }
804                }
805                Ok(ControlMsg::Stop) => {
806                    return true;
807                }
808                Err(TryRecvError::Empty) => break,
809                Err(TryRecvError::Disconnected) => break,
810            }
811        }
812        false
813    }
814
815    /// Estimate the current buffer level in points using conservative estimation.
816    ///
817    /// Uses `min(hardware, software)` to prevent underruns:
818    /// - If hardware reports fewer points than software estimates, hardware is truth
819    /// - Using `max` would overestimate buffer → underrequest points → underrun
820    /// - Conservative (lower) estimate is safer: we might overfill slightly, but won't underrun
821    ///
822    /// # Returns
823    ///
824    /// The estimated number of points currently buffered (points sent but not yet played).
825    fn estimate_buffer_points(&self) -> u64 {
826        let software = self.state.scheduled_ahead;
827
828        // When hardware reports queue depth, use MINIMUM of hardware and software.
829        if let Some(device_queue) = self.backend.as_ref().and_then(|b| b.queued_points()) {
830            return device_queue.min(software);
831        }
832
833        software
834    }
835
836    /// Build a ChunkRequest with calculated buffer state and point requirements.
837    ///
838    /// Calculates:
839    /// - `buffered_points`: Conservative estimate of points in buffer
840    /// - `buffered`: Buffer level as Duration
841    /// - `start`: Estimated playback time (playhead + buffered)
842    /// - `min_points`: Minimum points to avoid underrun (ceiling rounded)
843    /// - `target_points`: Ideal points to reach target buffer (clamped to max)
844    ///
845    /// # Arguments
846    ///
847    /// * `max_points` - Maximum points the callback can write (buffer length)
848    fn build_fill_request(&self, max_points: usize) -> ChunkRequest {
849        let pps = self.config.pps;
850
851        // Calculate buffer state using conservative estimation
852        let buffered_points = self.estimate_buffer_points();
853        let buffered = Duration::from_secs_f64(buffered_points as f64 / pps as f64);
854
855        // Calculate start time for this chunk (when these points will play).
856        // current_instant = total points written = playhead + buffered, so it represents
857        // the stream time at which the next generated points will be played back.
858        // This is the correct value for audio synchronization.
859        let start = self.state.current_instant;
860
861        // Calculate point requirements
862        // deficit_target = target_buffer - buffered (how much we're below target)
863        let target_buffer_secs = self.config.target_buffer.as_secs_f64();
864        let min_buffer_secs = self.config.min_buffer.as_secs_f64();
865        let buffered_secs = buffered.as_secs_f64();
866
867        // target_points: ceil((target_buffer - buffered) * pps), clamped to max_points
868        let deficit_target = (target_buffer_secs - buffered_secs).max(0.0);
869        let target_points = (deficit_target * pps as f64).ceil() as usize;
870        let target_points = target_points.min(max_points);
871
872        // min_points: ceil((min_buffer - buffered) * pps) - minimum to avoid underrun
873        let deficit_min = (min_buffer_secs - buffered_secs).max(0.0);
874        let min_points = (deficit_min * pps as f64).ceil() as usize;
875        let min_points = min_points.min(max_points);
876
877        // Get raw device queue if available
878        let device_queued_points = self.backend.as_ref().and_then(|b| b.queued_points());
879
880        ChunkRequest {
881            start,
882            pps,
883            min_points,
884            target_points,
885            buffered_points,
886            buffered,
887            device_queued_points,
888        }
889    }
890
891    /// Wait for queued points to drain, then blank/park the laser.
892    ///
893    /// Called on graceful shutdown (`ChunkResult::End`) to let buffered content
894    /// play out before stopping. Uses `drain_timeout` from config to cap the wait.
895    ///
896    /// - If `queued_points()` is available: polls until queue empties or timeout
897    /// - If `queued_points()` is `None`: sleeps for estimated buffer duration, capped by timeout
898    ///
899    /// After drain (or timeout), closes shutter and outputs blank points.
900    fn drain_and_blank(&mut self) {
901        use std::time::Instant;
902
903        let timeout = self.config.drain_timeout;
904        if timeout.is_zero() {
905            // Skip drain entirely if timeout is zero
906            self.blank_and_close_shutter();
907            return;
908        }
909
910        let deadline = Instant::now() + timeout;
911        let pps = self.config.pps;
912
913        // Check if backend supports queue depth reporting
914        let has_queue_depth = self
915            .backend
916            .as_ref()
917            .and_then(|b| b.queued_points())
918            .is_some();
919
920        if has_queue_depth {
921            // Poll until queue empties or timeout
922            const POLL_INTERVAL: Duration = Duration::from_millis(5);
923            while Instant::now() < deadline {
924                if let Some(queued) = self.backend.as_ref().and_then(|b| b.queued_points()) {
925                    if queued == 0 {
926                        break;
927                    }
928                } else {
929                    // Backend disconnected or stopped reporting
930                    break;
931                }
932
933                // Process control messages during drain (allow stop to interrupt)
934                if self.process_control_messages() {
935                    break;
936                }
937
938                std::thread::sleep(POLL_INTERVAL);
939            }
940        } else {
941            // No queue depth available: sleep for estimated buffer time, capped by timeout
942            let estimated_drain =
943                Duration::from_secs_f64(self.state.scheduled_ahead as f64 / pps as f64);
944            let wait_time = estimated_drain.min(timeout);
945
946            // Sleep in slices to allow control message processing
947            const SLEEP_SLICE: Duration = Duration::from_millis(10);
948            let mut remaining = wait_time;
949            while remaining > Duration::ZERO && Instant::now() < deadline {
950                let slice = remaining.min(SLEEP_SLICE);
951                std::thread::sleep(slice);
952                remaining = remaining.saturating_sub(slice);
953
954                if self.process_control_messages() {
955                    break;
956                }
957            }
958        }
959
960        self.blank_and_close_shutter();
961    }
962
963    /// Output blank points and close the hardware shutter.
964    ///
965    /// Best-effort safety shutdown - errors are ignored since we're already
966    /// in shutdown path.
967    fn blank_and_close_shutter(&mut self) {
968        // Close shutter (best-effort)
969        if let Some(backend) = &mut self.backend {
970            let _ = backend.set_shutter(false);
971        }
972        self.state.shutter_open = false;
973
974        // Output a small blank chunk to ensure laser is off
975        // (some DACs may hold the last point otherwise)
976        if let Some(backend) = &mut self.backend {
977            let blank_point = LaserPoint::blanked(0.0, 0.0);
978            let blank_chunk = [blank_point; 16];
979            let _ = backend.try_write_chunk(self.config.pps, &blank_chunk);
980        }
981    }
982}
983
984impl Drop for Stream {
985    fn drop(&mut self) {
986        let _ = self.stop();
987    }
988}
989
990// =============================================================================
991// Device
992// =============================================================================
993
994/// A connected device that can start streaming sessions.
995///
996/// When starting a stream, the device is consumed and the backend ownership
997/// transfers to the stream. The `DacInfo` is returned alongside the stream
998/// so metadata remains accessible.
999///
1000/// # Example
1001///
1002/// ```ignore
1003/// use laser_dac::{open_device, StreamConfig};
1004///
1005/// let device = open_device("my-device")?;
1006/// let config = StreamConfig::new(30_000);
1007/// let (stream, info) = device.start_stream(config)?;
1008/// println!("Streaming to: {}", info.name);
1009/// ```
1010pub struct Dac {
1011    info: DacInfo,
1012    backend: Option<Box<dyn StreamBackend>>,
1013}
1014
1015impl Dac {
1016    /// Create a new device from a backend.
1017    pub fn new(info: DacInfo, backend: Box<dyn StreamBackend>) -> Self {
1018        Self {
1019            info,
1020            backend: Some(backend),
1021        }
1022    }
1023
1024    /// Returns the device info.
1025    pub fn info(&self) -> &DacInfo {
1026        &self.info
1027    }
1028
1029    /// Returns the device ID.
1030    pub fn id(&self) -> &str {
1031        &self.info.id
1032    }
1033
1034    /// Returns the device name.
1035    pub fn name(&self) -> &str {
1036        &self.info.name
1037    }
1038
1039    /// Returns the DAC type.
1040    pub fn kind(&self) -> &DacType {
1041        &self.info.kind
1042    }
1043
1044    /// Returns the device capabilities.
1045    pub fn caps(&self) -> &DacCapabilities {
1046        &self.info.caps
1047    }
1048
1049    /// Returns whether the device has a backend (not yet used for a stream).
1050    pub fn has_backend(&self) -> bool {
1051        self.backend.is_some()
1052    }
1053
1054    /// Returns whether the device is connected.
1055    pub fn is_connected(&self) -> bool {
1056        self.backend
1057            .as_ref()
1058            .map(|b| b.is_connected())
1059            .unwrap_or(false)
1060    }
1061
1062    /// Starts a streaming session, consuming the device.
1063    ///
1064    /// # Ownership
1065    ///
1066    /// This method consumes the `Dac` because:
1067    /// - Each device can only have one active stream at a time.
1068    /// - The backend is moved into the `Stream` to ensure exclusive access.
1069    /// - This prevents accidental reuse of a device that's already streaming.
1070    ///
1071    /// The method returns both the `Stream` and a copy of `DacInfo`, so you
1072    /// retain access to device metadata (id, name, capabilities) after starting.
1073    ///
1074    /// # Connection
1075    ///
1076    /// If the device is not already connected, this method will establish the
1077    /// connection before creating the stream. Connection failures are returned
1078    /// as errors.
1079    ///
1080    /// # Errors
1081    ///
1082    /// Returns an error if:
1083    /// - The device backend has already been used for a stream.
1084    /// - The configuration is invalid (PPS out of range, invalid chunk size, etc.).
1085    /// - The backend fails to connect.
1086    pub fn start_stream(mut self, cfg: StreamConfig) -> Result<(Stream, DacInfo)> {
1087        let mut backend = self.backend.take().ok_or_else(|| {
1088            Error::invalid_config("device backend has already been used for a stream")
1089        })?;
1090
1091        Self::validate_config(&self.info.caps, &cfg)?;
1092
1093        // Connect the backend if not already connected
1094        if !backend.is_connected() {
1095            backend.connect()?;
1096        }
1097
1098        let stream = Stream::with_backend(self.info.clone(), backend, cfg);
1099
1100        Ok((stream, self.info))
1101    }
1102
1103    fn validate_config(caps: &DacCapabilities, cfg: &StreamConfig) -> Result<()> {
1104        if cfg.pps < caps.pps_min || cfg.pps > caps.pps_max {
1105            return Err(Error::invalid_config(format!(
1106                "PPS {} is outside device range [{}, {}]",
1107                cfg.pps, caps.pps_min, caps.pps_max
1108            )));
1109        }
1110
1111        Ok(())
1112    }
1113}
1114
1115#[cfg(test)]
1116mod tests {
1117    use super::*;
1118    use crate::backend::{StreamBackend, WriteOutcome};
1119    use std::sync::atomic::{AtomicU64, AtomicUsize, Ordering};
1120    use std::sync::{Arc, Mutex};
1121
1122    /// A test backend for unit testing stream behavior.
1123    struct TestBackend {
1124        caps: DacCapabilities,
1125        connected: bool,
1126        /// Count of write attempts
1127        write_count: Arc<AtomicUsize>,
1128        /// Number of WouldBlock responses to return before accepting writes
1129        would_block_count: Arc<AtomicUsize>,
1130        /// Simulated queue depth
1131        queued: Arc<AtomicU64>,
1132        /// Track shutter state for testing
1133        shutter_open: Arc<AtomicBool>,
1134    }
1135
1136    impl TestBackend {
1137        fn new() -> Self {
1138            Self {
1139                caps: DacCapabilities {
1140                    pps_min: 1000,
1141                    pps_max: 100000,
1142                    max_points_per_chunk: 1000,
1143                    output_model: crate::types::OutputModel::NetworkFifo,
1144                },
1145                connected: false,
1146                write_count: Arc::new(AtomicUsize::new(0)),
1147                would_block_count: Arc::new(AtomicUsize::new(0)),
1148                queued: Arc::new(AtomicU64::new(0)),
1149                shutter_open: Arc::new(AtomicBool::new(false)),
1150            }
1151        }
1152
1153        fn with_would_block_count(mut self, count: usize) -> Self {
1154            self.would_block_count = Arc::new(AtomicUsize::new(count));
1155            self
1156        }
1157
1158        fn with_output_model(mut self, model: OutputModel) -> Self {
1159            self.caps.output_model = model;
1160            self
1161        }
1162
1163        /// Set the initial queue depth for testing buffer estimation.
1164        fn with_initial_queue(mut self, queue: u64) -> Self {
1165            self.queued = Arc::new(AtomicU64::new(queue));
1166            self
1167        }
1168    }
1169
1170    /// A test backend that does NOT report hardware queue depth (`queued_points()` returns `None`).
1171    ///
1172    /// Use this backend to test software-only buffer estimation, which is the fallback path
1173    /// for real backends like `HeliosBackend` that don't implement `queued_points()`.
1174    ///
1175    /// **When to use which test backend:**
1176    /// - `TestBackend`: Simulates DACs with hardware queue reporting (e.g., Ether Dream, IDN).
1177    ///   Use for testing buffer estimation with hardware feedback.
1178    /// - `NoQueueTestBackend`: Simulates DACs without queue reporting (e.g., Helios).
1179    ///   Use for testing the time-based `scheduled_ahead` decrement logic.
1180    struct NoQueueTestBackend {
1181        inner: TestBackend,
1182    }
1183
1184    impl NoQueueTestBackend {
1185        fn new() -> Self {
1186            Self {
1187                inner: TestBackend::new(),
1188            }
1189        }
1190
1191        fn with_output_model(mut self, model: OutputModel) -> Self {
1192            self.inner = self.inner.with_output_model(model);
1193            self
1194        }
1195    }
1196
1197    impl StreamBackend for NoQueueTestBackend {
1198        fn dac_type(&self) -> DacType {
1199            self.inner.dac_type()
1200        }
1201
1202        fn caps(&self) -> &DacCapabilities {
1203            self.inner.caps()
1204        }
1205
1206        fn connect(&mut self) -> Result<()> {
1207            self.inner.connect()
1208        }
1209
1210        fn disconnect(&mut self) -> Result<()> {
1211            self.inner.disconnect()
1212        }
1213
1214        fn is_connected(&self) -> bool {
1215            self.inner.is_connected()
1216        }
1217
1218        fn try_write_chunk(&mut self, pps: u32, points: &[LaserPoint]) -> Result<WriteOutcome> {
1219            self.inner.try_write_chunk(pps, points)
1220        }
1221
1222        fn stop(&mut self) -> Result<()> {
1223            self.inner.stop()
1224        }
1225
1226        fn set_shutter(&mut self, open: bool) -> Result<()> {
1227            self.inner.set_shutter(open)
1228        }
1229
1230        /// Returns None - simulates a DAC that cannot report queue depth
1231        fn queued_points(&self) -> Option<u64> {
1232            None
1233        }
1234    }
1235
1236    impl StreamBackend for TestBackend {
1237        fn dac_type(&self) -> DacType {
1238            DacType::Custom("Test".to_string())
1239        }
1240
1241        fn caps(&self) -> &DacCapabilities {
1242            &self.caps
1243        }
1244
1245        fn connect(&mut self) -> Result<()> {
1246            self.connected = true;
1247            Ok(())
1248        }
1249
1250        fn disconnect(&mut self) -> Result<()> {
1251            self.connected = false;
1252            Ok(())
1253        }
1254
1255        fn is_connected(&self) -> bool {
1256            self.connected
1257        }
1258
1259        fn try_write_chunk(&mut self, _pps: u32, points: &[LaserPoint]) -> Result<WriteOutcome> {
1260            self.write_count.fetch_add(1, Ordering::SeqCst);
1261
1262            // Return WouldBlock until count reaches 0
1263            let remaining = self.would_block_count.load(Ordering::SeqCst);
1264            if remaining > 0 {
1265                self.would_block_count.fetch_sub(1, Ordering::SeqCst);
1266                return Ok(WriteOutcome::WouldBlock);
1267            }
1268
1269            self.queued.fetch_add(points.len() as u64, Ordering::SeqCst);
1270            Ok(WriteOutcome::Written)
1271        }
1272
1273        fn stop(&mut self) -> Result<()> {
1274            Ok(())
1275        }
1276
1277        fn set_shutter(&mut self, open: bool) -> Result<()> {
1278            self.shutter_open.store(open, Ordering::SeqCst);
1279            Ok(())
1280        }
1281
1282        fn queued_points(&self) -> Option<u64> {
1283            Some(self.queued.load(Ordering::SeqCst))
1284        }
1285    }
1286
1287    #[test]
1288    fn test_stream_control_arm_disarm() {
1289        let (tx, _rx) = mpsc::channel();
1290        let control = StreamControl::new(tx, Duration::ZERO);
1291        assert!(!control.is_armed());
1292
1293        control.arm().unwrap();
1294        assert!(control.is_armed());
1295
1296        control.disarm().unwrap();
1297        assert!(!control.is_armed());
1298    }
1299
1300    #[test]
1301    fn test_stream_control_stop() {
1302        let (tx, _rx) = mpsc::channel();
1303        let control = StreamControl::new(tx, Duration::ZERO);
1304        assert!(!control.is_stop_requested());
1305
1306        control.stop().unwrap();
1307        assert!(control.is_stop_requested());
1308    }
1309
1310    #[test]
1311    fn test_stream_control_clone_shares_state() {
1312        let (tx, _rx) = mpsc::channel();
1313        let control1 = StreamControl::new(tx, Duration::ZERO);
1314        let control2 = control1.clone();
1315
1316        control1.arm().unwrap();
1317        assert!(control2.is_armed());
1318
1319        control2.stop().unwrap();
1320        assert!(control1.is_stop_requested());
1321    }
1322
1323    #[test]
1324    fn test_device_start_stream_connects_backend() {
1325        let backend = TestBackend::new();
1326        let info = DacInfo {
1327            id: "test".to_string(),
1328            name: "Test Device".to_string(),
1329            kind: DacType::Custom("Test".to_string()),
1330            caps: backend.caps().clone(),
1331        };
1332        let device = Dac::new(info, Box::new(backend));
1333
1334        // Device should not be connected initially
1335        assert!(!device.is_connected());
1336
1337        // start_stream should connect and return a usable stream
1338        let cfg = StreamConfig::new(30000);
1339        let result = device.start_stream(cfg);
1340        assert!(result.is_ok());
1341
1342        let (stream, _info) = result.unwrap();
1343        assert!(stream.backend.as_ref().unwrap().is_connected());
1344    }
1345
1346    #[test]
1347    fn test_handle_underrun_advances_state() {
1348        let mut backend = TestBackend::new();
1349        backend.connected = true;
1350        let info = DacInfo {
1351            id: "test".to_string(),
1352            name: "Test Device".to_string(),
1353            kind: DacType::Custom("Test".to_string()),
1354            caps: backend.caps().clone(),
1355        };
1356
1357        let cfg = StreamConfig::new(30000);
1358        let mut stream = Stream::with_backend(info, Box::new(backend), cfg);
1359
1360        // Record initial state
1361        let initial_instant = stream.state.current_instant;
1362        let initial_scheduled = stream.state.scheduled_ahead;
1363        let initial_chunks = stream.state.stats.chunks_written;
1364        let initial_points = stream.state.stats.points_written;
1365
1366        // Trigger underrun handling with ChunkRequest
1367        let req = ChunkRequest {
1368            start: StreamInstant::new(0),
1369            pps: 30000,
1370            min_points: 100,
1371            target_points: 100,
1372            buffered_points: 0,
1373            buffered: Duration::ZERO,
1374            device_queued_points: None,
1375        };
1376        stream.handle_underrun(&req).unwrap();
1377
1378        // State should have advanced
1379        assert!(stream.state.current_instant > initial_instant);
1380        assert!(stream.state.scheduled_ahead > initial_scheduled);
1381        assert_eq!(stream.state.stats.chunks_written, initial_chunks + 1);
1382        assert_eq!(stream.state.stats.points_written, initial_points + 100);
1383        assert_eq!(stream.state.stats.underrun_count, 1);
1384    }
1385
1386    #[test]
1387    fn test_run_retries_on_would_block() {
1388        // Create a backend that returns WouldBlock 3 times before accepting
1389        let backend = TestBackend::new().with_would_block_count(3);
1390        let write_count = backend.write_count.clone();
1391
1392        let mut backend_box: Box<dyn StreamBackend> = Box::new(backend);
1393        backend_box.connect().unwrap();
1394
1395        let info = DacInfo {
1396            id: "test".to_string(),
1397            name: "Test Device".to_string(),
1398            kind: DacType::Custom("Test".to_string()),
1399            caps: backend_box.caps().clone(),
1400        };
1401
1402        let cfg = StreamConfig::new(30000);
1403        let stream = Stream::with_backend(info, backend_box, cfg);
1404
1405        let produced_count = Arc::new(AtomicUsize::new(0));
1406        let produced_count_clone = produced_count.clone();
1407        let result = stream.run(
1408            move |req, buffer| {
1409                let count = produced_count_clone.fetch_add(1, Ordering::SeqCst);
1410                if count < 1 {
1411                    let n = req.target_points.min(buffer.len());
1412                    for i in 0..n {
1413                        buffer[i] = LaserPoint::blanked(0.0, 0.0);
1414                    }
1415                    ChunkResult::Filled(n)
1416                } else {
1417                    ChunkResult::End
1418                }
1419            },
1420            |_e| {},
1421        );
1422
1423        assert_eq!(result.unwrap(), RunExit::ProducerEnded);
1424        // With the new API, WouldBlock retries happen internally in write_fill_points
1425        // The exact count depends on timing, but we should see multiple writes
1426        assert!(write_count.load(Ordering::SeqCst) >= 1);
1427    }
1428
1429    #[test]
1430    fn test_arm_opens_shutter_disarm_closes_shutter() {
1431        let backend = TestBackend::new();
1432        let shutter_open = backend.shutter_open.clone();
1433
1434        let mut backend_box: Box<dyn StreamBackend> = Box::new(backend);
1435        backend_box.connect().unwrap();
1436
1437        let info = DacInfo {
1438            id: "test".to_string(),
1439            name: "Test Device".to_string(),
1440            kind: DacType::Custom("Test".to_string()),
1441            caps: backend_box.caps().clone(),
1442        };
1443
1444        let cfg = StreamConfig::new(30000);
1445        let mut stream = Stream::with_backend(info, backend_box, cfg);
1446
1447        // Initially shutter is closed
1448        assert!(!shutter_open.load(Ordering::SeqCst));
1449
1450        // Arm via control (this sends ControlMsg::Arm)
1451        let control = stream.control();
1452        control.arm().unwrap();
1453
1454        // Process control messages - this should open the shutter
1455        let stopped = stream.process_control_messages();
1456        assert!(!stopped);
1457        assert!(shutter_open.load(Ordering::SeqCst));
1458
1459        // Disarm (this sends ControlMsg::Disarm)
1460        control.disarm().unwrap();
1461
1462        // Process control messages - this should close the shutter
1463        let stopped = stream.process_control_messages();
1464        assert!(!stopped);
1465        assert!(!shutter_open.load(Ordering::SeqCst));
1466    }
1467
1468    #[test]
1469    fn test_handle_underrun_blanks_when_disarmed() {
1470        let backend = TestBackend::new();
1471
1472        let mut backend_box: Box<dyn StreamBackend> = Box::new(backend);
1473        backend_box.connect().unwrap();
1474
1475        let info = DacInfo {
1476            id: "test".to_string(),
1477            name: "Test Device".to_string(),
1478            kind: DacType::Custom("Test".to_string()),
1479            caps: backend_box.caps().clone(),
1480        };
1481
1482        // Use RepeatLast policy - but when disarmed, should still blank
1483        let cfg = StreamConfig::new(30000).with_underrun(UnderrunPolicy::RepeatLast);
1484        let mut stream = Stream::with_backend(info, backend_box, cfg);
1485
1486        // Set some last_chunk with colored points using the pre-allocated buffer
1487        let colored_point = LaserPoint::new(0.5, 0.5, 65535, 65535, 65535, 65535);
1488        for i in 0..100 {
1489            stream.state.last_chunk[i] = colored_point;
1490        }
1491        stream.state.last_chunk_len = 100;
1492
1493        // Ensure disarmed (default state)
1494        assert!(!stream.control.is_armed());
1495
1496        let req = ChunkRequest {
1497            start: StreamInstant::new(0),
1498            pps: 30000,
1499            min_points: 100,
1500            target_points: 100,
1501            buffered_points: 0,
1502            buffered: Duration::ZERO,
1503            device_queued_points: None,
1504        };
1505
1506        // Handle underrun while disarmed
1507        stream.handle_underrun(&req).unwrap();
1508
1509        // last_chunk should NOT be updated (we're disarmed)
1510        // The actual write was blanked points, but we don't update last_chunk when disarmed
1511        // because "last armed content" hasn't changed
1512        assert_eq!(stream.state.last_chunk[0].r, 65535); // Still the old colored points
1513    }
1514
1515    #[test]
1516    fn test_stop_closes_shutter() {
1517        let backend = TestBackend::new();
1518        let shutter_open = backend.shutter_open.clone();
1519
1520        let mut backend_box: Box<dyn StreamBackend> = Box::new(backend);
1521        backend_box.connect().unwrap();
1522
1523        let info = DacInfo {
1524            id: "test".to_string(),
1525            name: "Test Device".to_string(),
1526            kind: DacType::Custom("Test".to_string()),
1527            caps: backend_box.caps().clone(),
1528        };
1529
1530        let cfg = StreamConfig::new(30000);
1531        let mut stream = Stream::with_backend(info, backend_box, cfg);
1532
1533        // Arm first to open shutter
1534        stream.control.arm().unwrap();
1535        stream.process_control_messages();
1536        assert!(shutter_open.load(Ordering::SeqCst));
1537
1538        // Stop should close shutter
1539        stream.stop().unwrap();
1540        assert!(!shutter_open.load(Ordering::SeqCst));
1541    }
1542
1543    #[test]
1544    fn test_arm_disarm_arm_cycle() {
1545        let backend = TestBackend::new();
1546        let shutter_open = backend.shutter_open.clone();
1547
1548        let mut backend_box: Box<dyn StreamBackend> = Box::new(backend);
1549        backend_box.connect().unwrap();
1550
1551        let info = DacInfo {
1552            id: "test".to_string(),
1553            name: "Test Device".to_string(),
1554            kind: DacType::Custom("Test".to_string()),
1555            caps: backend_box.caps().clone(),
1556        };
1557
1558        let cfg = StreamConfig::new(30000);
1559        let mut stream = Stream::with_backend(info, backend_box, cfg);
1560        let control = stream.control();
1561
1562        // Initial state: disarmed
1563        assert!(!control.is_armed());
1564        assert!(!shutter_open.load(Ordering::SeqCst));
1565
1566        // Arm
1567        control.arm().unwrap();
1568        stream.process_control_messages();
1569        assert!(control.is_armed());
1570        assert!(shutter_open.load(Ordering::SeqCst));
1571
1572        // Disarm
1573        control.disarm().unwrap();
1574        stream.process_control_messages();
1575        assert!(!control.is_armed());
1576        assert!(!shutter_open.load(Ordering::SeqCst));
1577
1578        // Arm again
1579        control.arm().unwrap();
1580        stream.process_control_messages();
1581        assert!(control.is_armed());
1582        assert!(shutter_open.load(Ordering::SeqCst));
1583    }
1584
1585    // =========================================================================
1586    // Buffer-driven timing tests
1587    // =========================================================================
1588
1589    #[test]
1590    fn test_run_buffer_driven_behavior() {
1591        // Test that run uses buffer-driven timing
1592        // Use NoQueueTestBackend so we rely on software estimate (which decrements properly)
1593        let mut backend = NoQueueTestBackend::new();
1594        backend.inner.connected = true;
1595        let write_count = backend.inner.write_count.clone();
1596
1597        let info = DacInfo {
1598            id: "test".to_string(),
1599            name: "Test Device".to_string(),
1600            kind: DacType::Custom("Test".to_string()),
1601            caps: backend.inner.caps().clone(),
1602        };
1603
1604        // Use short target buffer for testing
1605        let cfg = StreamConfig::new(30000)
1606            .with_target_buffer(Duration::from_millis(10))
1607            .with_min_buffer(Duration::from_millis(5));
1608        let stream = Stream::with_backend(info, Box::new(backend), cfg);
1609
1610        let call_count = Arc::new(AtomicUsize::new(0));
1611        let call_count_clone = call_count.clone();
1612
1613        let result = stream.run(
1614            move |req, buffer| {
1615                let count = call_count_clone.fetch_add(1, Ordering::SeqCst);
1616
1617                // Run for 5 calls then end
1618                if count >= 4 {
1619                    ChunkResult::End
1620                } else {
1621                    let n = req.target_points.min(buffer.len()).min(100);
1622                    for i in 0..n {
1623                        buffer[i] = LaserPoint::blanked(0.0, 0.0);
1624                    }
1625                    ChunkResult::Filled(n)
1626                }
1627            },
1628            |_e| {},
1629        );
1630
1631        assert_eq!(result.unwrap(), RunExit::ProducerEnded);
1632        assert!(
1633            write_count.load(Ordering::SeqCst) >= 4,
1634            "Should have written multiple chunks"
1635        );
1636    }
1637
1638    #[test]
1639    fn test_run_sleeps_when_buffer_healthy() {
1640        // Test that run sleeps when buffer is above target
1641        // Use NoQueueTestBackend so we rely on software estimate (which decrements properly)
1642        use std::time::Instant;
1643
1644        let mut backend = NoQueueTestBackend::new();
1645        backend.inner.connected = true;
1646
1647        let info = DacInfo {
1648            id: "test".to_string(),
1649            name: "Test Device".to_string(),
1650            kind: DacType::Custom("Test".to_string()),
1651            caps: backend.inner.caps().clone(),
1652        };
1653
1654        // Very small target buffer, skip drain
1655        let cfg = StreamConfig::new(30000)
1656            .with_target_buffer(Duration::from_millis(5))
1657            .with_min_buffer(Duration::from_millis(2))
1658            .with_drain_timeout(Duration::ZERO);
1659        let stream = Stream::with_backend(info, Box::new(backend), cfg);
1660
1661        let call_count = Arc::new(AtomicUsize::new(0));
1662        let call_count_clone = call_count.clone();
1663        let start_time = Instant::now();
1664
1665        let result = stream.run(
1666            move |req, buffer| {
1667                let count = call_count_clone.fetch_add(1, Ordering::SeqCst);
1668
1669                // End after 3 callbacks
1670                if count >= 2 {
1671                    ChunkResult::End
1672                } else {
1673                    // Fill buffer to trigger sleep
1674                    let n = req.target_points.min(buffer.len());
1675                    for i in 0..n {
1676                        buffer[i] = LaserPoint::blanked(0.0, 0.0);
1677                    }
1678                    ChunkResult::Filled(n)
1679                }
1680            },
1681            |_e| {},
1682        );
1683
1684        assert_eq!(result.unwrap(), RunExit::ProducerEnded);
1685
1686        // Should have taken some time due to buffer-driven sleep
1687        let elapsed = start_time.elapsed();
1688        // With buffer-driven timing, we should see some elapsed time
1689        // (not instant return)
1690        assert!(
1691            elapsed.as_millis() < 100,
1692            "Elapsed time {:?} is too long for test",
1693            elapsed
1694        );
1695    }
1696
1697    #[test]
1698    fn test_run_stops_on_control_stop() {
1699        // Test that stop() via control handle terminates the loop promptly
1700        use std::thread;
1701
1702        let backend = TestBackend::new();
1703
1704        let mut backend_box: Box<dyn StreamBackend> = Box::new(backend);
1705        backend_box.connect().unwrap();
1706
1707        let info = DacInfo {
1708            id: "test".to_string(),
1709            name: "Test Device".to_string(),
1710            kind: DacType::Custom("Test".to_string()),
1711            caps: backend_box.caps().clone(),
1712        };
1713
1714        let cfg = StreamConfig::new(30000);
1715        let stream = Stream::with_backend(info, backend_box, cfg);
1716        let control = stream.control();
1717
1718        // Spawn a thread to stop the stream after a short delay
1719        let control_clone = control.clone();
1720        thread::spawn(move || {
1721            thread::sleep(Duration::from_millis(20));
1722            control_clone.stop().unwrap();
1723        });
1724
1725        let result = stream.run(
1726            |req, buffer| {
1727                let n = req.target_points.min(buffer.len()).min(10);
1728                for i in 0..n {
1729                    buffer[i] = LaserPoint::blanked(0.0, 0.0);
1730                }
1731                ChunkResult::Filled(n)
1732            },
1733            |_e| {},
1734        );
1735
1736        // Should exit with Stopped, not hang forever
1737        assert_eq!(result.unwrap(), RunExit::Stopped);
1738    }
1739
1740    #[test]
1741    fn test_run_producer_ended() {
1742        // Test that ChunkResult::End terminates the stream gracefully
1743        let backend = TestBackend::new();
1744
1745        let mut backend_box: Box<dyn StreamBackend> = Box::new(backend);
1746        backend_box.connect().unwrap();
1747
1748        let info = DacInfo {
1749            id: "test".to_string(),
1750            name: "Test Device".to_string(),
1751            kind: DacType::Custom("Test".to_string()),
1752            caps: backend_box.caps().clone(),
1753        };
1754
1755        let cfg = StreamConfig::new(30000);
1756        let stream = Stream::with_backend(info, backend_box, cfg);
1757
1758        let call_count = Arc::new(AtomicUsize::new(0));
1759        let call_count_clone = call_count.clone();
1760
1761        let result = stream.run(
1762            move |req, buffer| {
1763                let count = call_count_clone.fetch_add(1, Ordering::SeqCst);
1764
1765                if count == 0 {
1766                    // First call: return some data
1767                    let n = req.target_points.min(buffer.len()).min(100);
1768                    for i in 0..n {
1769                        buffer[i] = LaserPoint::blanked(0.0, 0.0);
1770                    }
1771                    ChunkResult::Filled(n)
1772                } else {
1773                    // Second call: end the stream
1774                    ChunkResult::End
1775                }
1776            },
1777            |_e| {},
1778        );
1779
1780        assert_eq!(result.unwrap(), RunExit::ProducerEnded);
1781        assert_eq!(call_count.load(Ordering::SeqCst), 2);
1782    }
1783
1784    #[test]
1785    fn test_run_starved_applies_underrun_policy() {
1786        // Test that ChunkResult::Starved triggers underrun policy
1787        let backend = TestBackend::new();
1788        let queued = backend.queued.clone();
1789
1790        let mut backend_box: Box<dyn StreamBackend> = Box::new(backend);
1791        backend_box.connect().unwrap();
1792
1793        let info = DacInfo {
1794            id: "test".to_string(),
1795            name: "Test Device".to_string(),
1796            kind: DacType::Custom("Test".to_string()),
1797            caps: backend_box.caps().clone(),
1798        };
1799
1800        // Use Blank policy for underrun
1801        let cfg = StreamConfig::new(30000).with_underrun(UnderrunPolicy::Blank);
1802        let stream = Stream::with_backend(info, backend_box, cfg);
1803
1804        let call_count = Arc::new(AtomicUsize::new(0));
1805        let call_count_clone = call_count.clone();
1806
1807        let result = stream.run(
1808            move |_req, _buffer| {
1809                let count = call_count_clone.fetch_add(1, Ordering::SeqCst);
1810
1811                if count == 0 {
1812                    // First call: return Starved to trigger underrun policy
1813                    ChunkResult::Starved
1814                } else {
1815                    // Second call: end the stream
1816                    ChunkResult::End
1817                }
1818            },
1819            |_e| {},
1820        );
1821
1822        assert_eq!(result.unwrap(), RunExit::ProducerEnded);
1823
1824        // Underrun policy should have written some points
1825        assert!(
1826            queued.load(Ordering::SeqCst) > 0,
1827            "Underrun policy should have written blank points"
1828        );
1829    }
1830
1831    #[test]
1832    fn test_run_filled_zero_with_target_treated_as_starved() {
1833        // Test that Filled(0) when target_points > 0 is treated as Starved
1834        let backend = TestBackend::new();
1835        let queued = backend.queued.clone();
1836
1837        let mut backend_box: Box<dyn StreamBackend> = Box::new(backend);
1838        backend_box.connect().unwrap();
1839
1840        let info = DacInfo {
1841            id: "test".to_string(),
1842            name: "Test Device".to_string(),
1843            kind: DacType::Custom("Test".to_string()),
1844            caps: backend_box.caps().clone(),
1845        };
1846
1847        let cfg = StreamConfig::new(30000).with_underrun(UnderrunPolicy::Blank);
1848        let stream = Stream::with_backend(info, backend_box, cfg);
1849
1850        let call_count = Arc::new(AtomicUsize::new(0));
1851        let call_count_clone = call_count.clone();
1852
1853        let result = stream.run(
1854            move |_req, _buffer| {
1855                let count = call_count_clone.fetch_add(1, Ordering::SeqCst);
1856
1857                if count == 0 {
1858                    // Return Filled(0) when buffer needs data - should be treated as Starved
1859                    ChunkResult::Filled(0)
1860                } else {
1861                    ChunkResult::End
1862                }
1863            },
1864            |_e| {},
1865        );
1866
1867        assert_eq!(result.unwrap(), RunExit::ProducerEnded);
1868
1869        // Filled(0) with target_points > 0 should trigger underrun policy
1870        assert!(
1871            queued.load(Ordering::SeqCst) > 0,
1872            "Filled(0) with target > 0 should trigger underrun and write blank points"
1873        );
1874    }
1875
1876    // =========================================================================
1877    // Buffer estimation tests (Task 6.3)
1878    // =========================================================================
1879
1880    #[test]
1881    fn test_estimate_buffer_uses_software_when_no_hardware() {
1882        // When hardware doesn't report queue depth, use software estimate
1883        let mut backend = NoQueueTestBackend::new();
1884        backend.inner.connected = true;
1885
1886        let info = DacInfo {
1887            id: "test".to_string(),
1888            name: "Test Device".to_string(),
1889            kind: DacType::Custom("Test".to_string()),
1890            caps: backend.inner.caps().clone(),
1891        };
1892
1893        let cfg = StreamConfig::new(30000);
1894        let mut stream = Stream::with_backend(info, Box::new(backend), cfg);
1895
1896        // Set software estimate to 500 points
1897        stream.state.scheduled_ahead = 500;
1898
1899        // Should use software estimate since hardware returns None
1900        let estimate = stream.estimate_buffer_points();
1901        assert_eq!(estimate, 500);
1902    }
1903
1904    #[test]
1905    fn test_estimate_buffer_uses_min_of_hardware_and_software() {
1906        // When hardware reports queue depth, use min(hardware, software)
1907        let backend = TestBackend::new().with_initial_queue(300);
1908        let queued = backend.queued.clone();
1909
1910        let mut backend_box: Box<dyn StreamBackend> = Box::new(backend);
1911        backend_box.connect().unwrap();
1912
1913        let info = DacInfo {
1914            id: "test".to_string(),
1915            name: "Test Device".to_string(),
1916            kind: DacType::Custom("Test".to_string()),
1917            caps: backend_box.caps().clone(),
1918        };
1919
1920        let cfg = StreamConfig::new(30000);
1921        let mut stream = Stream::with_backend(info, backend_box, cfg);
1922
1923        // Software says 500, hardware says 300 -> should use 300 (conservative)
1924        stream.state.scheduled_ahead = 500;
1925        let estimate = stream.estimate_buffer_points();
1926        assert_eq!(
1927            estimate, 300,
1928            "Should use hardware (300) when it's less than software (500)"
1929        );
1930
1931        // Now set hardware higher than software
1932        queued.store(800, Ordering::SeqCst);
1933        let estimate = stream.estimate_buffer_points();
1934        assert_eq!(
1935            estimate, 500,
1936            "Should use software (500) when it's less than hardware (800)"
1937        );
1938    }
1939
1940    #[test]
1941    fn test_estimate_buffer_conservative_prevents_underrun() {
1942        // Verify that conservative estimation (using min) prevents underruns
1943        // by ensuring we never overestimate the buffer
1944        let backend = TestBackend::new().with_initial_queue(100);
1945        let queued = backend.queued.clone();
1946
1947        let mut backend_box: Box<dyn StreamBackend> = Box::new(backend);
1948        backend_box.connect().unwrap();
1949
1950        let info = DacInfo {
1951            id: "test".to_string(),
1952            name: "Test Device".to_string(),
1953            kind: DacType::Custom("Test".to_string()),
1954            caps: backend_box.caps().clone(),
1955        };
1956
1957        let cfg = StreamConfig::new(30000);
1958        let mut stream = Stream::with_backend(info, backend_box, cfg);
1959
1960        // Simulate: software thinks 1000 points scheduled, but hardware only has 100
1961        // This can happen if hardware consumed points faster than expected
1962        stream.state.scheduled_ahead = 1000;
1963
1964        let estimate = stream.estimate_buffer_points();
1965
1966        // Should use the conservative (lower) estimate to avoid underrun
1967        assert_eq!(
1968            estimate, 100,
1969            "Should use conservative estimate (100) not optimistic (1000)"
1970        );
1971
1972        // Now simulate the opposite: hardware reports more than software
1973        // This can happen due to timing/synchronization issues
1974        queued.store(2000, Ordering::SeqCst);
1975        stream.state.scheduled_ahead = 500;
1976
1977        let estimate = stream.estimate_buffer_points();
1978        assert_eq!(
1979            estimate, 500,
1980            "Should use conservative estimate (500) not hardware (2000)"
1981        );
1982    }
1983
1984    #[test]
1985    fn test_build_fill_request_uses_conservative_estimation() {
1986        // Verify that build_fill_request uses conservative buffer estimation
1987        let backend = TestBackend::new().with_initial_queue(200);
1988
1989        let mut backend_box: Box<dyn StreamBackend> = Box::new(backend);
1990        backend_box.connect().unwrap();
1991
1992        let info = DacInfo {
1993            id: "test".to_string(),
1994            name: "Test Device".to_string(),
1995            kind: DacType::Custom("Test".to_string()),
1996            caps: backend_box.caps().clone(),
1997        };
1998
1999        let cfg = StreamConfig::new(30000)
2000            .with_target_buffer(Duration::from_millis(40))
2001            .with_min_buffer(Duration::from_millis(10));
2002        let mut stream = Stream::with_backend(info, backend_box, cfg);
2003
2004        // Set software estimate higher than hardware
2005        stream.state.scheduled_ahead = 500;
2006
2007        let req = stream.build_fill_request(1000);
2008
2009        // Should use conservative estimate (hardware = 200)
2010        assert_eq!(req.buffered_points, 200);
2011        assert_eq!(req.device_queued_points, Some(200));
2012    }
2013
2014    #[test]
2015    fn test_build_fill_request_calculates_min_and_target_points() {
2016        // Verify that min_points and target_points are calculated correctly
2017        // based on buffer state. Use NoQueueTestBackend so software estimate is used directly.
2018        let mut backend = NoQueueTestBackend::new();
2019        backend.inner.connected = true;
2020
2021        let info = DacInfo {
2022            id: "test".to_string(),
2023            name: "Test Device".to_string(),
2024            kind: DacType::Custom("Test".to_string()),
2025            caps: backend.inner.caps().clone(),
2026        };
2027
2028        // 30000 PPS, target_buffer = 40ms, min_buffer = 10ms
2029        // target_buffer = 40ms * 30000 = 1200 points
2030        // min_buffer = 10ms * 30000 = 300 points
2031        let cfg = StreamConfig::new(30000)
2032            .with_target_buffer(Duration::from_millis(40))
2033            .with_min_buffer(Duration::from_millis(10));
2034        let mut stream = Stream::with_backend(info, Box::new(backend), cfg);
2035
2036        // Empty buffer: need full target
2037        stream.state.scheduled_ahead = 0;
2038        let req = stream.build_fill_request(1000);
2039
2040        // target_points should be clamped to max_points (1000)
2041        assert_eq!(req.target_points, 1000);
2042        // min_points should be 300 (10ms * 30000), clamped to 1000
2043        assert_eq!(req.min_points, 300);
2044
2045        // Buffer at 500 points (16.67ms): below target (40ms), above min (10ms)
2046        stream.state.scheduled_ahead = 500;
2047        let req = stream.build_fill_request(1000);
2048
2049        // target_points = (1200 - 500) = 700
2050        assert_eq!(req.target_points, 700);
2051        // min_points = (300 - 500) = 0 (buffer above min)
2052        assert_eq!(req.min_points, 0);
2053
2054        // Buffer full at 1200 points (40ms): at target
2055        stream.state.scheduled_ahead = 1200;
2056        let req = stream.build_fill_request(1000);
2057
2058        // target_points = 0 (at target)
2059        assert_eq!(req.target_points, 0);
2060        // min_points = 0 (well above min)
2061        assert_eq!(req.min_points, 0);
2062    }
2063
2064    #[test]
2065    fn test_build_fill_request_ceiling_rounds_min_points() {
2066        // Verify that min_points uses ceiling to prevent underrun
2067        // Use NoQueueTestBackend so software estimate is used directly.
2068        let mut backend = NoQueueTestBackend::new();
2069        backend.inner.connected = true;
2070
2071        let info = DacInfo {
2072            id: "test".to_string(),
2073            name: "Test Device".to_string(),
2074            kind: DacType::Custom("Test".to_string()),
2075            caps: backend.inner.caps().clone(),
2076        };
2077
2078        // min_buffer = 10ms at 30000 PPS = 300 points exactly
2079        let cfg = StreamConfig::new(30000)
2080            .with_target_buffer(Duration::from_millis(40))
2081            .with_min_buffer(Duration::from_millis(10));
2082        let mut stream = Stream::with_backend(info, Box::new(backend), cfg);
2083
2084        // Buffer at 299 points: 1 point below min_buffer
2085        stream.state.scheduled_ahead = 299;
2086        let req = stream.build_fill_request(1000);
2087
2088        // min_points should be ceil(300 - 299) = ceil(1) = 1
2089        // Actually it's ceil((10ms - 299/30000) * 30000) = ceil(300 - 299) = 1
2090        assert!(
2091            req.min_points >= 1,
2092            "min_points should be at least 1 to reach min_buffer"
2093        );
2094    }
2095
2096    // =========================================================================
2097    // ChunkResult handling tests (Task 6.4)
2098    // =========================================================================
2099
2100    #[test]
2101    fn test_fill_result_filled_writes_points_and_updates_state() {
2102        // Test that Filled(n) writes n points to backend and updates stream state
2103        let backend = TestBackend::new();
2104        let queued = backend.queued.clone();
2105
2106        let mut backend_box: Box<dyn StreamBackend> = Box::new(backend);
2107        backend_box.connect().unwrap();
2108
2109        let info = DacInfo {
2110            id: "test".to_string(),
2111            name: "Test Device".to_string(),
2112            kind: DacType::Custom("Test".to_string()),
2113            caps: backend_box.caps().clone(),
2114        };
2115
2116        let cfg = StreamConfig::new(30000);
2117        let stream = Stream::with_backend(info, backend_box, cfg);
2118
2119        let points_written = Arc::new(AtomicUsize::new(0));
2120        let points_written_clone = points_written.clone();
2121        let call_count = Arc::new(AtomicUsize::new(0));
2122        let call_count_clone = call_count.clone();
2123
2124        let result = stream.run(
2125            move |req, buffer| {
2126                let count = call_count_clone.fetch_add(1, Ordering::SeqCst);
2127
2128                if count < 3 {
2129                    // Fill with specific number of points
2130                    let n = req.target_points.min(50);
2131                    for i in 0..n {
2132                        buffer[i] =
2133                            LaserPoint::new(0.1 * i as f32, 0.2 * i as f32, 1000, 2000, 3000, 4000);
2134                    }
2135                    points_written_clone.fetch_add(n, Ordering::SeqCst);
2136                    ChunkResult::Filled(n)
2137                } else {
2138                    ChunkResult::End
2139                }
2140            },
2141            |_e| {},
2142        );
2143
2144        assert_eq!(result.unwrap(), RunExit::ProducerEnded);
2145
2146        // Points should have been written to backend
2147        // Note: drain adds 16 blank points at shutdown
2148        let total_queued = queued.load(Ordering::SeqCst);
2149        let total_written = points_written.load(Ordering::SeqCst);
2150        assert!(
2151            total_queued > 0,
2152            "Points should have been queued to backend"
2153        );
2154        assert!(
2155            total_queued as usize >= total_written,
2156            "Queued points ({}) should be at least written points ({})",
2157            total_queued,
2158            total_written
2159        );
2160    }
2161
2162    #[test]
2163    fn test_fill_result_filled_updates_last_chunk_when_armed() {
2164        // Test that Filled(n) updates last_chunk for RepeatLast policy when armed
2165        let backend = TestBackend::new();
2166
2167        let mut backend_box: Box<dyn StreamBackend> = Box::new(backend);
2168        backend_box.connect().unwrap();
2169
2170        let info = DacInfo {
2171            id: "test".to_string(),
2172            name: "Test Device".to_string(),
2173            kind: DacType::Custom("Test".to_string()),
2174            caps: backend_box.caps().clone(),
2175        };
2176
2177        let cfg = StreamConfig::new(30000).with_underrun(UnderrunPolicy::RepeatLast);
2178        let stream = Stream::with_backend(info, backend_box, cfg);
2179
2180        // Arm the stream so last_chunk gets updated
2181        let control = stream.control();
2182        control.arm().unwrap();
2183
2184        let call_count = Arc::new(AtomicUsize::new(0));
2185        let call_count_clone = call_count.clone();
2186
2187        let result = stream.run(
2188            move |req, buffer| {
2189                let count = call_count_clone.fetch_add(1, Ordering::SeqCst);
2190
2191                if count == 0 {
2192                    // Write specific points that we can verify later
2193                    let n = req.target_points.min(10);
2194                    for i in 0..n {
2195                        buffer[i] = LaserPoint::new(0.5, 0.5, 10000, 20000, 30000, 40000);
2196                    }
2197                    ChunkResult::Filled(n)
2198                } else if count == 1 {
2199                    // Return Starved to trigger RepeatLast
2200                    ChunkResult::Starved
2201                } else {
2202                    ChunkResult::End
2203                }
2204            },
2205            |_e| {},
2206        );
2207
2208        assert_eq!(result.unwrap(), RunExit::ProducerEnded);
2209        // If last_chunk wasn't updated, the Starved would have outputted blanks
2210        // The test passes if no assertion fails - the RepeatLast policy used the stored chunk
2211    }
2212
2213    #[test]
2214    fn test_fill_result_starved_repeat_last_with_stored_chunk() {
2215        // Test that Starved with RepeatLast policy repeats the last chunk
2216        let backend = TestBackend::new();
2217        let queued = backend.queued.clone();
2218
2219        let mut backend_box: Box<dyn StreamBackend> = Box::new(backend);
2220        backend_box.connect().unwrap();
2221
2222        let info = DacInfo {
2223            id: "test".to_string(),
2224            name: "Test Device".to_string(),
2225            kind: DacType::Custom("Test".to_string()),
2226            caps: backend_box.caps().clone(),
2227        };
2228
2229        let cfg = StreamConfig::new(30000).with_underrun(UnderrunPolicy::RepeatLast);
2230        let stream = Stream::with_backend(info, backend_box, cfg);
2231
2232        // Arm the stream
2233        let control = stream.control();
2234        control.arm().unwrap();
2235
2236        let call_count = Arc::new(AtomicUsize::new(0));
2237        let call_count_clone = call_count.clone();
2238
2239        let result = stream.run(
2240            move |req, buffer| {
2241                let count = call_count_clone.fetch_add(1, Ordering::SeqCst);
2242
2243                if count == 0 {
2244                    // First call: provide some data to establish last_chunk
2245                    let n = req.target_points.min(50);
2246                    for i in 0..n {
2247                        buffer[i] = LaserPoint::new(0.3, 0.3, 5000, 5000, 5000, 5000);
2248                    }
2249                    ChunkResult::Filled(n)
2250                } else if count == 1 {
2251                    // Second call: return Starved - should repeat last chunk
2252                    ChunkResult::Starved
2253                } else {
2254                    ChunkResult::End
2255                }
2256            },
2257            |_e| {},
2258        );
2259
2260        assert_eq!(result.unwrap(), RunExit::ProducerEnded);
2261
2262        // Both the initial fill and the repeated chunk should have been written
2263        let total_queued = queued.load(Ordering::SeqCst);
2264        assert!(
2265            total_queued >= 50,
2266            "Should have written initial chunk plus repeated chunk"
2267        );
2268    }
2269
2270    #[test]
2271    fn test_fill_result_starved_repeat_last_without_stored_chunk_falls_back_to_blank() {
2272        // Test that Starved with RepeatLast but no stored chunk falls back to blank
2273        let backend = TestBackend::new();
2274        let queued = backend.queued.clone();
2275
2276        let mut backend_box: Box<dyn StreamBackend> = Box::new(backend);
2277        backend_box.connect().unwrap();
2278
2279        let info = DacInfo {
2280            id: "test".to_string(),
2281            name: "Test Device".to_string(),
2282            kind: DacType::Custom("Test".to_string()),
2283            caps: backend_box.caps().clone(),
2284        };
2285
2286        let cfg = StreamConfig::new(30000).with_underrun(UnderrunPolicy::RepeatLast);
2287        let stream = Stream::with_backend(info, backend_box, cfg);
2288
2289        // Arm the stream
2290        let control = stream.control();
2291        control.arm().unwrap();
2292
2293        let call_count = Arc::new(AtomicUsize::new(0));
2294        let call_count_clone = call_count.clone();
2295
2296        let result = stream.run(
2297            move |_req, _buffer| {
2298                let count = call_count_clone.fetch_add(1, Ordering::SeqCst);
2299
2300                if count == 0 {
2301                    // First call: return Starved with no prior chunk
2302                    ChunkResult::Starved
2303                } else {
2304                    ChunkResult::End
2305                }
2306            },
2307            |_e| {},
2308        );
2309
2310        assert_eq!(result.unwrap(), RunExit::ProducerEnded);
2311
2312        // Should have written blank points as fallback
2313        let total_queued = queued.load(Ordering::SeqCst);
2314        assert!(
2315            total_queued > 0,
2316            "Should have written blank points as fallback"
2317        );
2318    }
2319
2320    #[test]
2321    fn test_fill_result_starved_with_park_policy() {
2322        // Test that Starved with Park policy outputs blanked points at park position
2323        let backend = TestBackend::new();
2324        let queued = backend.queued.clone();
2325
2326        let mut backend_box: Box<dyn StreamBackend> = Box::new(backend);
2327        backend_box.connect().unwrap();
2328
2329        let info = DacInfo {
2330            id: "test".to_string(),
2331            name: "Test Device".to_string(),
2332            kind: DacType::Custom("Test".to_string()),
2333            caps: backend_box.caps().clone(),
2334        };
2335
2336        // Park at specific position
2337        let cfg = StreamConfig::new(30000).with_underrun(UnderrunPolicy::Park { x: 0.5, y: -0.5 });
2338        let stream = Stream::with_backend(info, backend_box, cfg);
2339
2340        // Arm the stream
2341        let control = stream.control();
2342        control.arm().unwrap();
2343
2344        let call_count = Arc::new(AtomicUsize::new(0));
2345        let call_count_clone = call_count.clone();
2346
2347        let result = stream.run(
2348            move |_req, _buffer| {
2349                let count = call_count_clone.fetch_add(1, Ordering::SeqCst);
2350
2351                if count == 0 {
2352                    ChunkResult::Starved
2353                } else {
2354                    ChunkResult::End
2355                }
2356            },
2357            |_e| {},
2358        );
2359
2360        assert_eq!(result.unwrap(), RunExit::ProducerEnded);
2361
2362        // Should have written parked points
2363        let total_queued = queued.load(Ordering::SeqCst);
2364        assert!(total_queued > 0, "Should have written parked points");
2365    }
2366
2367    #[test]
2368    fn test_fill_result_starved_with_stop_policy() {
2369        // Test that Starved with Stop policy terminates the stream
2370        let backend = TestBackend::new();
2371
2372        let mut backend_box: Box<dyn StreamBackend> = Box::new(backend);
2373        backend_box.connect().unwrap();
2374
2375        let info = DacInfo {
2376            id: "test".to_string(),
2377            name: "Test Device".to_string(),
2378            kind: DacType::Custom("Test".to_string()),
2379            caps: backend_box.caps().clone(),
2380        };
2381
2382        let cfg = StreamConfig::new(30000).with_underrun(UnderrunPolicy::Stop);
2383        let stream = Stream::with_backend(info, backend_box, cfg);
2384
2385        // Must arm the stream for underrun policy to be checked
2386        // (disarmed streams always output blanks regardless of policy)
2387        let control = stream.control();
2388        control.arm().unwrap();
2389
2390        let result = stream.run(
2391            |_req, _buffer| {
2392                // Always return Starved - Stop policy should terminate the stream
2393                ChunkResult::Starved
2394            },
2395            |_e| {},
2396        );
2397
2398        // Stream should have stopped due to underrun with Stop policy
2399        // The Stop policy returns Err(Error::Stopped) to immediately terminate
2400        assert!(result.is_err(), "Stop policy should return an error");
2401        assert!(
2402            result.unwrap_err().is_stopped(),
2403            "Error should be Stopped variant"
2404        );
2405    }
2406
2407    #[test]
2408    fn test_fill_result_end_returns_producer_ended() {
2409        // Test that End terminates the stream with ProducerEnded
2410        let backend = TestBackend::new();
2411
2412        let mut backend_box: Box<dyn StreamBackend> = Box::new(backend);
2413        backend_box.connect().unwrap();
2414
2415        let info = DacInfo {
2416            id: "test".to_string(),
2417            name: "Test Device".to_string(),
2418            kind: DacType::Custom("Test".to_string()),
2419            caps: backend_box.caps().clone(),
2420        };
2421
2422        let cfg = StreamConfig::new(30000);
2423        let stream = Stream::with_backend(info, backend_box, cfg);
2424
2425        let result = stream.run(
2426            |_req, _buffer| {
2427                // Immediately end
2428                ChunkResult::End
2429            },
2430            |_e| {},
2431        );
2432
2433        assert_eq!(result.unwrap(), RunExit::ProducerEnded);
2434    }
2435
2436    #[test]
2437    fn test_fill_result_filled_exceeds_buffer_clamped() {
2438        // Test that Filled(n) where n > buffer.len() is clamped
2439        let backend = TestBackend::new();
2440        let queued = backend.queued.clone();
2441
2442        let mut backend_box: Box<dyn StreamBackend> = Box::new(backend);
2443        backend_box.connect().unwrap();
2444
2445        let info = DacInfo {
2446            id: "test".to_string(),
2447            name: "Test Device".to_string(),
2448            kind: DacType::Custom("Test".to_string()),
2449            caps: backend_box.caps().clone(),
2450        };
2451
2452        let cfg = StreamConfig::new(30000);
2453        let stream = Stream::with_backend(info, backend_box, cfg);
2454
2455        let call_count = Arc::new(AtomicUsize::new(0));
2456        let call_count_clone = call_count.clone();
2457
2458        let result = stream.run(
2459            move |_req, buffer| {
2460                let count = call_count_clone.fetch_add(1, Ordering::SeqCst);
2461
2462                if count == 0 {
2463                    // Fill some points but claim we wrote more than buffer size
2464                    for i in 0..buffer.len() {
2465                        buffer[i] = LaserPoint::blanked(0.0, 0.0);
2466                    }
2467                    // Return a value larger than buffer - should be clamped
2468                    ChunkResult::Filled(buffer.len() + 1000)
2469                } else {
2470                    ChunkResult::End
2471                }
2472            },
2473            |_e| {},
2474        );
2475
2476        assert_eq!(result.unwrap(), RunExit::ProducerEnded);
2477
2478        // Should have written clamped number of points (max_points_per_chunk)
2479        // plus 16 blank points from drain shutdown
2480        let total_queued = queued.load(Ordering::SeqCst);
2481        assert!(total_queued > 0, "Should have written some points");
2482        // The clamping should limit to max_points (1000 for TestBackend) + 16 blank drain points
2483        assert!(
2484            total_queued <= 1016,
2485            "Points should be clamped to max_points_per_chunk (+ drain)"
2486        );
2487    }
2488
2489    // =========================================================================
2490    // Integration tests (Task 6.5)
2491    // =========================================================================
2492
2493    #[test]
2494    fn test_full_stream_lifecycle_create_arm_stream_stop() {
2495        // Test the complete lifecycle: create -> arm -> stream data -> stop
2496        let backend = TestBackend::new();
2497        let queued = backend.queued.clone();
2498        let shutter_open = backend.shutter_open.clone();
2499
2500        let info = DacInfo {
2501            id: "test".to_string(),
2502            name: "Test Device".to_string(),
2503            kind: DacType::Custom("Test".to_string()),
2504            caps: backend.caps().clone(),
2505        };
2506
2507        // 1. Create device and start stream
2508        let device = Dac::new(info, Box::new(backend));
2509        assert!(!device.is_connected());
2510
2511        let cfg = StreamConfig::new(30000);
2512        let (stream, returned_info) = device.start_stream(cfg).unwrap();
2513        assert_eq!(returned_info.id, "test");
2514
2515        // 2. Get control handle and verify initial state
2516        let control = stream.control();
2517        assert!(!control.is_armed());
2518        assert!(!shutter_open.load(Ordering::SeqCst));
2519
2520        // 3. Arm the stream
2521        control.arm().unwrap();
2522        assert!(control.is_armed());
2523
2524        let call_count = Arc::new(AtomicUsize::new(0));
2525        let call_count_clone = call_count.clone();
2526
2527        // 4. Run the stream for a few iterations
2528        let result = stream.run(
2529            move |req, buffer| {
2530                let count = call_count_clone.fetch_add(1, Ordering::SeqCst);
2531
2532                if count < 5 {
2533                    // Fill with data
2534                    let n = req.target_points.min(buffer.len()).min(100);
2535                    for i in 0..n {
2536                        let t = i as f32 / 100.0;
2537                        buffer[i] = LaserPoint::new(t, t, 10000, 20000, 30000, 40000);
2538                    }
2539                    ChunkResult::Filled(n)
2540                } else {
2541                    ChunkResult::End
2542                }
2543            },
2544            |_e| {},
2545        );
2546
2547        // 5. Verify stream ended properly
2548        assert_eq!(result.unwrap(), RunExit::ProducerEnded);
2549        assert!(
2550            queued.load(Ordering::SeqCst) > 0,
2551            "Should have written points"
2552        );
2553        assert!(
2554            call_count.load(Ordering::SeqCst) >= 5,
2555            "Should have called producer multiple times"
2556        );
2557    }
2558
2559    #[test]
2560    fn test_full_stream_lifecycle_with_underrun_recovery() {
2561        // Test lifecycle with underrun and recovery
2562        let backend = TestBackend::new();
2563        let queued = backend.queued.clone();
2564
2565        let mut backend_box: Box<dyn StreamBackend> = Box::new(backend);
2566        backend_box.connect().unwrap();
2567
2568        let info = DacInfo {
2569            id: "test".to_string(),
2570            name: "Test Device".to_string(),
2571            kind: DacType::Custom("Test".to_string()),
2572            caps: backend_box.caps().clone(),
2573        };
2574
2575        let cfg = StreamConfig::new(30000).with_underrun(UnderrunPolicy::RepeatLast);
2576        let stream = Stream::with_backend(info, backend_box, cfg);
2577
2578        // Arm the stream for underrun policy to work
2579        let control = stream.control();
2580        control.arm().unwrap();
2581
2582        let call_count = Arc::new(AtomicUsize::new(0));
2583        let call_count_clone = call_count.clone();
2584
2585        let result = stream.run(
2586            move |req, buffer| {
2587                let count = call_count_clone.fetch_add(1, Ordering::SeqCst);
2588
2589                match count {
2590                    0 => {
2591                        // First call: provide data (establishes last_chunk)
2592                        let n = req.target_points.min(buffer.len()).min(50);
2593                        for i in 0..n {
2594                            buffer[i] = LaserPoint::new(0.5, 0.5, 30000, 30000, 30000, 30000);
2595                        }
2596                        ChunkResult::Filled(n)
2597                    }
2598                    1 => {
2599                        // Second call: underrun (triggers RepeatLast)
2600                        ChunkResult::Starved
2601                    }
2602                    2 => {
2603                        // Third call: recover with new data
2604                        let n = req.target_points.min(buffer.len()).min(50);
2605                        for i in 0..n {
2606                            buffer[i] = LaserPoint::new(-0.5, -0.5, 20000, 20000, 20000, 20000);
2607                        }
2608                        ChunkResult::Filled(n)
2609                    }
2610                    _ => ChunkResult::End,
2611                }
2612            },
2613            |_e| {},
2614        );
2615
2616        assert_eq!(result.unwrap(), RunExit::ProducerEnded);
2617        // Should have written: initial data + repeated chunk + recovered data
2618        let total = queued.load(Ordering::SeqCst);
2619        assert!(
2620            total >= 100,
2621            "Should have written multiple chunks including underrun recovery"
2622        );
2623    }
2624
2625    #[test]
2626    fn test_full_stream_lifecycle_external_stop() {
2627        // Test stopping stream from external control handle
2628        use std::thread;
2629
2630        let backend = TestBackend::new();
2631
2632        let mut backend_box: Box<dyn StreamBackend> = Box::new(backend);
2633        backend_box.connect().unwrap();
2634
2635        let info = DacInfo {
2636            id: "test".to_string(),
2637            name: "Test Device".to_string(),
2638            kind: DacType::Custom("Test".to_string()),
2639            caps: backend_box.caps().clone(),
2640        };
2641
2642        let cfg = StreamConfig::new(30000);
2643        let stream = Stream::with_backend(info, backend_box, cfg);
2644
2645        let control = stream.control();
2646        let control_clone = control.clone();
2647
2648        // Spawn thread to stop stream after delay
2649        thread::spawn(move || {
2650            thread::sleep(Duration::from_millis(30));
2651            control_clone.stop().unwrap();
2652        });
2653
2654        let result = stream.run(
2655            |req, buffer| {
2656                // Keep streaming until stopped
2657                let n = req.target_points.min(buffer.len()).min(10);
2658                for i in 0..n {
2659                    buffer[i] = LaserPoint::blanked(0.0, 0.0);
2660                }
2661                ChunkResult::Filled(n)
2662            },
2663            |_e| {},
2664        );
2665
2666        assert_eq!(result.unwrap(), RunExit::Stopped);
2667    }
2668
2669    #[test]
2670    fn test_full_stream_lifecycle_into_dac_recovery() {
2671        // Test recovering Dac from stream for reuse
2672        let backend = TestBackend::new();
2673
2674        let info = DacInfo {
2675            id: "test".to_string(),
2676            name: "Test Device".to_string(),
2677            kind: DacType::Custom("Test".to_string()),
2678            caps: backend.caps().clone(),
2679        };
2680
2681        // First stream session
2682        let device = Dac::new(info, Box::new(backend));
2683        let cfg = StreamConfig::new(30000);
2684        let (stream, _) = device.start_stream(cfg).unwrap();
2685
2686        let call_count = Arc::new(AtomicUsize::new(0));
2687        let call_count_clone = call_count.clone();
2688
2689        let result = stream.run(
2690            move |req, buffer| {
2691                let count = call_count_clone.fetch_add(1, Ordering::SeqCst);
2692                if count < 2 {
2693                    let n = req.target_points.min(buffer.len()).min(50);
2694                    for i in 0..n {
2695                        buffer[i] = LaserPoint::blanked(0.0, 0.0);
2696                    }
2697                    ChunkResult::Filled(n)
2698                } else {
2699                    ChunkResult::End
2700                }
2701            },
2702            |_e| {},
2703        );
2704
2705        assert_eq!(result.unwrap(), RunExit::ProducerEnded);
2706
2707        // Note: into_dac() would be tested here, but run consumes the stream
2708        // and doesn't return it. The into_dac pattern is for the blocking API.
2709        // This test verifies the stream lifecycle completes cleanly.
2710    }
2711
2712    #[test]
2713    fn test_stream_stats_tracking() {
2714        // Test that stream statistics are tracked correctly
2715        let backend = TestBackend::new();
2716
2717        let mut backend_box: Box<dyn StreamBackend> = Box::new(backend);
2718        backend_box.connect().unwrap();
2719
2720        let info = DacInfo {
2721            id: "test".to_string(),
2722            name: "Test Device".to_string(),
2723            kind: DacType::Custom("Test".to_string()),
2724            caps: backend_box.caps().clone(),
2725        };
2726
2727        let cfg = StreamConfig::new(30000);
2728        let stream = Stream::with_backend(info, backend_box, cfg);
2729
2730        // Arm the stream
2731        let control = stream.control();
2732        control.arm().unwrap();
2733
2734        let call_count = Arc::new(AtomicUsize::new(0));
2735        let call_count_clone = call_count.clone();
2736        let points_per_call = 50;
2737
2738        let result = stream.run(
2739            move |req, buffer| {
2740                let count = call_count_clone.fetch_add(1, Ordering::SeqCst);
2741                if count < 3 {
2742                    let n = req.target_points.min(buffer.len()).min(points_per_call);
2743                    for i in 0..n {
2744                        buffer[i] = LaserPoint::blanked(0.0, 0.0);
2745                    }
2746                    ChunkResult::Filled(n)
2747                } else {
2748                    ChunkResult::End
2749                }
2750            },
2751            |_e| {},
2752        );
2753
2754        assert_eq!(result.unwrap(), RunExit::ProducerEnded);
2755        // Stats tracking is verified by the successful completion
2756        // Detailed stats assertions would require access to stream after run
2757    }
2758
2759    #[test]
2760    fn test_stream_disarm_during_streaming() {
2761        // Test disarming stream while it's running
2762        use std::thread;
2763
2764        let backend = TestBackend::new();
2765        let shutter_open = backend.shutter_open.clone();
2766
2767        let mut backend_box: Box<dyn StreamBackend> = Box::new(backend);
2768        backend_box.connect().unwrap();
2769
2770        let info = DacInfo {
2771            id: "test".to_string(),
2772            name: "Test Device".to_string(),
2773            kind: DacType::Custom("Test".to_string()),
2774            caps: backend_box.caps().clone(),
2775        };
2776
2777        let cfg = StreamConfig::new(30000);
2778        let stream = Stream::with_backend(info, backend_box, cfg);
2779
2780        let control = stream.control();
2781        let control_clone = control.clone();
2782
2783        // Arm first
2784        control.arm().unwrap();
2785        assert!(control.is_armed());
2786
2787        // Spawn thread to disarm then stop
2788        thread::spawn(move || {
2789            thread::sleep(Duration::from_millis(15));
2790            control_clone.disarm().unwrap();
2791            thread::sleep(Duration::from_millis(15));
2792            control_clone.stop().unwrap();
2793        });
2794
2795        let result = stream.run(
2796            |req, buffer| {
2797                let n = req.target_points.min(buffer.len()).min(10);
2798                for i in 0..n {
2799                    buffer[i] = LaserPoint::new(0.1, 0.1, 50000, 50000, 50000, 50000);
2800                }
2801                ChunkResult::Filled(n)
2802            },
2803            |_e| {},
2804        );
2805
2806        assert_eq!(result.unwrap(), RunExit::Stopped);
2807        // Shutter should have been closed by disarm
2808        assert!(!shutter_open.load(Ordering::SeqCst));
2809    }
2810
2811    #[test]
2812    fn test_stream_with_mock_backend_disconnect() {
2813        // Test handling of backend disconnect during streaming
2814        use std::sync::atomic::AtomicBool;
2815
2816        struct DisconnectingBackend {
2817            inner: TestBackend,
2818            disconnect_after: Arc<AtomicUsize>,
2819            call_count: Arc<AtomicUsize>,
2820        }
2821
2822        impl StreamBackend for DisconnectingBackend {
2823            fn dac_type(&self) -> DacType {
2824                self.inner.dac_type()
2825            }
2826
2827            fn caps(&self) -> &DacCapabilities {
2828                self.inner.caps()
2829            }
2830
2831            fn connect(&mut self) -> Result<()> {
2832                self.inner.connect()
2833            }
2834
2835            fn disconnect(&mut self) -> Result<()> {
2836                self.inner.disconnect()
2837            }
2838
2839            fn is_connected(&self) -> bool {
2840                let count = self.call_count.load(Ordering::SeqCst);
2841                let disconnect_after = self.disconnect_after.load(Ordering::SeqCst);
2842                if count >= disconnect_after {
2843                    return false;
2844                }
2845                self.inner.is_connected()
2846            }
2847
2848            fn try_write_chunk(&mut self, pps: u32, points: &[LaserPoint]) -> Result<WriteOutcome> {
2849                self.call_count.fetch_add(1, Ordering::SeqCst);
2850                self.inner.try_write_chunk(pps, points)
2851            }
2852
2853            fn stop(&mut self) -> Result<()> {
2854                self.inner.stop()
2855            }
2856
2857            fn set_shutter(&mut self, open: bool) -> Result<()> {
2858                self.inner.set_shutter(open)
2859            }
2860
2861            fn queued_points(&self) -> Option<u64> {
2862                self.inner.queued_points()
2863            }
2864        }
2865
2866        let mut backend = DisconnectingBackend {
2867            inner: TestBackend::new(),
2868            disconnect_after: Arc::new(AtomicUsize::new(3)),
2869            call_count: Arc::new(AtomicUsize::new(0)),
2870        };
2871        backend.inner.connected = true;
2872
2873        let info = DacInfo {
2874            id: "test".to_string(),
2875            name: "Test Device".to_string(),
2876            kind: DacType::Custom("Test".to_string()),
2877            caps: backend.inner.caps().clone(),
2878        };
2879
2880        let cfg = StreamConfig::new(30000);
2881        let stream = Stream::with_backend(info, Box::new(backend), cfg);
2882
2883        let error_occurred = Arc::new(AtomicBool::new(false));
2884        let error_occurred_clone = error_occurred.clone();
2885
2886        let result = stream.run(
2887            |req, buffer| {
2888                let n = req.target_points.min(buffer.len()).min(10);
2889                for i in 0..n {
2890                    buffer[i] = LaserPoint::blanked(0.0, 0.0);
2891                }
2892                ChunkResult::Filled(n)
2893            },
2894            move |_e| {
2895                error_occurred_clone.store(true, Ordering::SeqCst);
2896            },
2897        );
2898
2899        // Should return Disconnected when backend reports disconnection
2900        assert_eq!(result.unwrap(), RunExit::Disconnected);
2901    }
2902
2903    // =========================================================================
2904    // Write error → disconnect tests
2905    //
2906    // These verify that a non-disconnected write error (Error::Backend) causes
2907    // the backend to be disconnected, so the stream exits with
2908    // RunExit::Disconnected and the device can be reconnected.
2909    //
2910    // Without the fix (backend.disconnect() call in the Err(e) branch of
2911    // write_fill_points), the backend stays "connected" and the stream loops
2912    // forever retrying writes that keep failing.
2913    // =========================================================================
2914
2915    /// A backend that returns Error::backend() after N successful writes.
2916    /// Simulates the IDN scenario where write_frame() fails with an IO error
2917    /// that maps to Error::Backend (not Error::Disconnected).
2918    struct FailingWriteBackend {
2919        inner: TestBackend,
2920        fail_after: usize,
2921        write_count: Arc<AtomicUsize>,
2922        disconnect_called: Arc<AtomicBool>,
2923    }
2924
2925    impl FailingWriteBackend {
2926        fn new(fail_after: usize) -> Self {
2927            Self {
2928                inner: TestBackend::new(),
2929                fail_after,
2930                write_count: Arc::new(AtomicUsize::new(0)),
2931                disconnect_called: Arc::new(AtomicBool::new(false)),
2932            }
2933        }
2934    }
2935
2936    impl StreamBackend for FailingWriteBackend {
2937        fn dac_type(&self) -> DacType {
2938            DacType::Custom("FailingTest".to_string())
2939        }
2940
2941        fn caps(&self) -> &DacCapabilities {
2942            self.inner.caps()
2943        }
2944
2945        fn connect(&mut self) -> Result<()> {
2946            self.inner.connect()
2947        }
2948
2949        fn disconnect(&mut self) -> Result<()> {
2950            self.disconnect_called.store(true, Ordering::SeqCst);
2951            self.inner.disconnect()
2952        }
2953
2954        fn is_connected(&self) -> bool {
2955            self.inner.is_connected()
2956        }
2957
2958        fn try_write_chunk(&mut self, pps: u32, points: &[LaserPoint]) -> Result<WriteOutcome> {
2959            let count = self.write_count.fetch_add(1, Ordering::SeqCst);
2960            if count >= self.fail_after {
2961                // Return a backend error (non-disconnected), like IDN's
2962                // write_frame().map_err(Error::backend) would produce.
2963                Err(Error::backend(std::io::Error::new(
2964                    std::io::ErrorKind::BrokenPipe,
2965                    "simulated write failure",
2966                )))
2967            } else {
2968                self.inner.try_write_chunk(pps, points)
2969            }
2970        }
2971
2972        fn stop(&mut self) -> Result<()> {
2973            self.inner.stop()
2974        }
2975
2976        fn set_shutter(&mut self, open: bool) -> Result<()> {
2977            self.inner.set_shutter(open)
2978        }
2979
2980        fn queued_points(&self) -> Option<u64> {
2981            self.inner.queued_points()
2982        }
2983    }
2984
2985    #[test]
2986    fn test_backend_write_error_exits_with_disconnected() {
2987        // When try_write_chunk returns a non-disconnected error (Error::Backend),
2988        // the stream should disconnect the backend and exit with RunExit::Disconnected.
2989        //
2990        // Without the fix, the stream loops forever because is_connected() stays
2991        // true. This test would hang/timeout without the backend.disconnect() call.
2992        use std::thread;
2993
2994        let mut backend = FailingWriteBackend::new(2); // fail after 2 successful writes
2995        backend.inner.connected = true;
2996        let disconnect_called = backend.disconnect_called.clone();
2997
2998        let info = DacInfo {
2999            id: "test".to_string(),
3000            name: "Test Device".to_string(),
3001            kind: DacType::Custom("FailingTest".to_string()),
3002            caps: backend.inner.caps().clone(),
3003        };
3004
3005        let cfg = StreamConfig::new(30000);
3006        let stream = Stream::with_backend(info, Box::new(backend), cfg);
3007
3008        // Run in a thread so the "loops forever" case (without the fix) would
3009        // be caught by the test harness timeout rather than blocking forever.
3010        let handle = thread::spawn(move || {
3011            stream.run(
3012                |req, buffer| {
3013                    let n = req.target_points.min(buffer.len()).min(10);
3014                    for i in 0..n {
3015                        buffer[i] = LaserPoint::blanked(0.0, 0.0);
3016                    }
3017                    ChunkResult::Filled(n)
3018                },
3019                |_err| {},
3020            )
3021        });
3022
3023        // If the fix is missing, the stream loops forever. Give it a generous timeout.
3024        let result = handle.join().expect("stream thread panicked");
3025
3026        // With the fix: backend.disconnect() is called, stream exits Disconnected
3027        assert_eq!(
3028            result.unwrap(),
3029            RunExit::Disconnected,
3030            "Write error should cause stream to exit with Disconnected"
3031        );
3032        assert!(
3033            disconnect_called.load(Ordering::SeqCst),
3034            "backend.disconnect() should have been called after write error"
3035        );
3036    }
3037
3038    #[test]
3039    fn test_backend_write_error_fires_on_error() {
3040        // Verify the on_error callback is invoked with the backend error.
3041        let mut backend = FailingWriteBackend::new(1); // fail after 1 write
3042        backend.inner.connected = true;
3043
3044        let info = DacInfo {
3045            id: "test".to_string(),
3046            name: "Test Device".to_string(),
3047            kind: DacType::Custom("FailingTest".to_string()),
3048            caps: backend.inner.caps().clone(),
3049        };
3050
3051        let cfg = StreamConfig::new(30000);
3052        let stream = Stream::with_backend(info, Box::new(backend), cfg);
3053
3054        let error_count = Arc::new(AtomicUsize::new(0));
3055        let got_backend_error = Arc::new(AtomicBool::new(false));
3056        let error_count_clone = error_count.clone();
3057        let got_backend_error_clone = got_backend_error.clone();
3058
3059        let result = stream.run(
3060            |req, buffer| {
3061                let n = req.target_points.min(buffer.len()).min(10);
3062                for i in 0..n {
3063                    buffer[i] = LaserPoint::blanked(0.0, 0.0);
3064                }
3065                ChunkResult::Filled(n)
3066            },
3067            move |err| {
3068                error_count_clone.fetch_add(1, Ordering::SeqCst);
3069                if matches!(err, Error::Backend(_)) {
3070                    got_backend_error_clone.store(true, Ordering::SeqCst);
3071                }
3072            },
3073        );
3074
3075        assert_eq!(result.unwrap(), RunExit::Disconnected);
3076        assert!(
3077            got_backend_error.load(Ordering::SeqCst),
3078            "on_error should have received the Backend error"
3079        );
3080    }
3081
3082    #[test]
3083    fn test_backend_write_error_immediate_fail() {
3084        // Backend that fails on the very first write should still exit cleanly.
3085        let mut backend = FailingWriteBackend::new(0); // fail immediately
3086        backend.inner.connected = true;
3087
3088        let info = DacInfo {
3089            id: "test".to_string(),
3090            name: "Test Device".to_string(),
3091            kind: DacType::Custom("FailingTest".to_string()),
3092            caps: backend.inner.caps().clone(),
3093        };
3094
3095        let cfg = StreamConfig::new(30000);
3096        let stream = Stream::with_backend(info, Box::new(backend), cfg);
3097
3098        let result = stream.run(
3099            |req, buffer| {
3100                let n = req.target_points.min(buffer.len()).min(10);
3101                for i in 0..n {
3102                    buffer[i] = LaserPoint::blanked(0.0, 0.0);
3103                }
3104                ChunkResult::Filled(n)
3105            },
3106            |_err| {},
3107        );
3108
3109        assert_eq!(
3110            result.unwrap(),
3111            RunExit::Disconnected,
3112            "Immediate write failure should exit with Disconnected"
3113        );
3114    }
3115
3116    // =========================================================================
3117    // Helios-style disconnect tests
3118    //
3119    // The Helios DAC disconnects via a USB timeout on the status() poll
3120    // (inside try_write_chunk), producing Error::Backend with a TimedOut
3121    // error — not Error::Disconnected. This backend simulates that exact
3122    // behavior: N successful writes, then status() times out.
3123    // =========================================================================
3124
3125    /// A backend that simulates Helios USB disconnect behavior.
3126    ///
3127    /// The Helios DAC performs a status() poll (USB interrupt read) before
3128    /// every write_frame(). On physical disconnect, the interrupt read times
3129    /// out, producing `Error::Backend("Operation timed out")`. This backend
3130    /// replicates that pattern: after `fail_after` successful writes, the
3131    /// next try_write_chunk returns a TimedOut backend error.
3132    struct HeliosLikeBackend {
3133        inner: TestBackend,
3134        fail_after: usize,
3135        write_count: Arc<AtomicUsize>,
3136        disconnect_called: Arc<AtomicBool>,
3137        error_received: Arc<Mutex<Option<String>>>,
3138    }
3139
3140    impl HeliosLikeBackend {
3141        fn new(fail_after: usize) -> Self {
3142            Self {
3143                inner: TestBackend::new(),
3144                fail_after,
3145                write_count: Arc::new(AtomicUsize::new(0)),
3146                disconnect_called: Arc::new(AtomicBool::new(false)),
3147                error_received: Arc::new(Mutex::new(None)),
3148            }
3149        }
3150    }
3151
3152    impl StreamBackend for HeliosLikeBackend {
3153        fn dac_type(&self) -> DacType {
3154            DacType::Custom("HeliosLikeTest".to_string())
3155        }
3156
3157        fn caps(&self) -> &DacCapabilities {
3158            self.inner.caps()
3159        }
3160
3161        fn connect(&mut self) -> Result<()> {
3162            self.inner.connect()
3163        }
3164
3165        fn disconnect(&mut self) -> Result<()> {
3166            self.disconnect_called.store(true, Ordering::SeqCst);
3167            self.inner.disconnect()
3168        }
3169
3170        fn is_connected(&self) -> bool {
3171            self.inner.is_connected()
3172        }
3173
3174        fn try_write_chunk(&mut self, pps: u32, points: &[LaserPoint]) -> Result<WriteOutcome> {
3175            let count = self.write_count.fetch_add(1, Ordering::SeqCst);
3176            if count >= self.fail_after {
3177                // Simulate Helios: status() interrupt read times out on USB disconnect.
3178                // Real error chain: rusb::Error::Timeout → HeliosDacError::UsbError
3179                //   → Error::backend("usb connection error: Operation timed out")
3180                Err(Error::backend(std::io::Error::new(
3181                    std::io::ErrorKind::TimedOut,
3182                    "usb connection error: Operation timed out",
3183                )))
3184            } else {
3185                self.inner.try_write_chunk(pps, points)
3186            }
3187        }
3188
3189        fn stop(&mut self) -> Result<()> {
3190            self.inner.stop()
3191        }
3192
3193        fn set_shutter(&mut self, open: bool) -> Result<()> {
3194            self.inner.set_shutter(open)
3195        }
3196
3197        fn queued_points(&self) -> Option<u64> {
3198            // Helios does not report queue depth
3199            None
3200        }
3201    }
3202
3203    #[test]
3204    fn test_helios_status_timeout_exits_with_disconnected() {
3205        // Simulates a Helios DAC being unplugged mid-stream.
3206        //
3207        // Real-world sequence observed via USB logging:
3208        //   1. status() → read_response FAILED: Timeout (32ms interrupt read)
3209        //   2. Error mapped to Error::Backend (not Disconnected)
3210        //   3. Stream calls backend.disconnect() → dac = None
3211        //   4. Next loop: is_connected() = false → RunExit::Disconnected
3212        use std::thread;
3213
3214        let mut backend = HeliosLikeBackend::new(3);
3215        backend.inner.connected = true;
3216        let disconnect_called = backend.disconnect_called.clone();
3217
3218        let info = DacInfo {
3219            id: "test-helios".to_string(),
3220            name: "Test Helios".to_string(),
3221            kind: DacType::Custom("HeliosLikeTest".to_string()),
3222            caps: backend.inner.caps().clone(),
3223        };
3224
3225        let cfg = StreamConfig::new(30000);
3226        let stream = Stream::with_backend(info, Box::new(backend), cfg);
3227
3228        let handle = thread::spawn(move || {
3229            stream.run(
3230                |req, buffer| {
3231                    let n = req.target_points.min(buffer.len()).min(10);
3232                    for i in 0..n {
3233                        buffer[i] = LaserPoint::blanked(0.0, 0.0);
3234                    }
3235                    ChunkResult::Filled(n)
3236                },
3237                |_err| {},
3238            )
3239        });
3240
3241        let result = handle.join().expect("stream thread panicked");
3242
3243        assert_eq!(
3244            result.unwrap(),
3245            RunExit::Disconnected,
3246            "Helios status timeout should cause stream to exit with Disconnected"
3247        );
3248        assert!(
3249            disconnect_called.load(Ordering::SeqCst),
3250            "backend.disconnect() should have been called on status timeout"
3251        );
3252    }
3253
3254    #[test]
3255    fn test_helios_status_timeout_fires_on_error_with_backend_variant() {
3256        // Verify the on_error callback receives an Error::Backend (not Disconnected).
3257        // This matches real Helios behavior: the USB timeout is wrapped as Backend error.
3258        let mut backend = HeliosLikeBackend::new(1);
3259        backend.inner.connected = true;
3260        let error_received = backend.error_received.clone();
3261
3262        let info = DacInfo {
3263            id: "test-helios".to_string(),
3264            name: "Test Helios".to_string(),
3265            kind: DacType::Custom("HeliosLikeTest".to_string()),
3266            caps: backend.inner.caps().clone(),
3267        };
3268
3269        let cfg = StreamConfig::new(30000);
3270        let stream = Stream::with_backend(info, Box::new(backend), cfg);
3271
3272        let got_backend_error = Arc::new(AtomicBool::new(false));
3273        let got_backend_error_clone = got_backend_error.clone();
3274        let error_received_clone = error_received.clone();
3275
3276        let result = stream.run(
3277            |req, buffer| {
3278                let n = req.target_points.min(buffer.len()).min(10);
3279                for i in 0..n {
3280                    buffer[i] = LaserPoint::blanked(0.0, 0.0);
3281                }
3282                ChunkResult::Filled(n)
3283            },
3284            move |err| {
3285                if matches!(err, Error::Backend(_)) {
3286                    got_backend_error_clone.store(true, Ordering::SeqCst);
3287                    *error_received_clone.lock().unwrap() = Some(err.to_string());
3288                }
3289            },
3290        );
3291
3292        assert_eq!(result.unwrap(), RunExit::Disconnected);
3293        assert!(
3294            got_backend_error.load(Ordering::SeqCst),
3295            "on_error should receive Error::Backend for Helios timeout"
3296        );
3297        let msg = error_received.lock().unwrap();
3298        assert!(
3299            msg.as_ref().unwrap().contains("Operation timed out"),
3300            "Error message should mention timeout, got: {:?}",
3301            msg
3302        );
3303    }
3304
3305    #[test]
3306    fn test_helios_immediate_status_timeout() {
3307        // Helios that fails on the very first status check (device was already
3308        // disconnected when stream started, or USB enumeration was stale).
3309        let mut backend = HeliosLikeBackend::new(0);
3310        backend.inner.connected = true;
3311        let disconnect_called = backend.disconnect_called.clone();
3312
3313        let info = DacInfo {
3314            id: "test-helios".to_string(),
3315            name: "Test Helios".to_string(),
3316            kind: DacType::Custom("HeliosLikeTest".to_string()),
3317            caps: backend.inner.caps().clone(),
3318        };
3319
3320        let cfg = StreamConfig::new(30000);
3321        let stream = Stream::with_backend(info, Box::new(backend), cfg);
3322
3323        let result = stream.run(
3324            |req, buffer| {
3325                let n = req.target_points.min(buffer.len()).min(10);
3326                for i in 0..n {
3327                    buffer[i] = LaserPoint::blanked(0.0, 0.0);
3328                }
3329                ChunkResult::Filled(n)
3330            },
3331            |_err| {},
3332        );
3333
3334        assert_eq!(
3335            result.unwrap(),
3336            RunExit::Disconnected,
3337            "Immediate status timeout should exit with Disconnected"
3338        );
3339        assert!(
3340            disconnect_called.load(Ordering::SeqCst),
3341            "backend.disconnect() should be called even on first-write failure"
3342        );
3343    }
3344
3345    // =========================================================================
3346    // Drain wait tests
3347    // =========================================================================
3348
3349    #[test]
3350    fn test_fill_result_end_drains_with_queue_depth() {
3351        // Test that ChunkResult::End waits for queue to drain when queue depth is available
3352        use std::time::Instant;
3353
3354        let backend = TestBackend::new().with_initial_queue(1000);
3355        let queued = backend.queued.clone();
3356
3357        let mut backend_box: Box<dyn StreamBackend> = Box::new(backend);
3358        backend_box.connect().unwrap();
3359
3360        let info = DacInfo {
3361            id: "test".to_string(),
3362            name: "Test Device".to_string(),
3363            kind: DacType::Custom("Test".to_string()),
3364            caps: backend_box.caps().clone(),
3365        };
3366
3367        // Use short drain timeout for test
3368        let cfg = StreamConfig::new(30000).with_drain_timeout(Duration::from_millis(100));
3369        let stream = Stream::with_backend(info, backend_box, cfg);
3370
3371        // Simulate queue draining by setting it to 0 before the stream runs
3372        queued.store(0, Ordering::SeqCst);
3373
3374        let start = Instant::now();
3375        let result = stream.run(|_req, _buffer| ChunkResult::End, |_e| {});
3376
3377        let elapsed = start.elapsed();
3378
3379        assert_eq!(result.unwrap(), RunExit::ProducerEnded);
3380        // Should return quickly since queue was empty
3381        assert!(
3382            elapsed.as_millis() < 50,
3383            "Should return quickly when queue is empty, took {:?}",
3384            elapsed
3385        );
3386    }
3387
3388    #[test]
3389    fn test_fill_result_end_respects_drain_timeout() {
3390        // Test that drain respects timeout and doesn't block forever
3391        use std::time::Instant;
3392
3393        let backend = TestBackend::new().with_initial_queue(100000); // Large queue that won't drain
3394
3395        let mut backend_box: Box<dyn StreamBackend> = Box::new(backend);
3396        backend_box.connect().unwrap();
3397
3398        let info = DacInfo {
3399            id: "test".to_string(),
3400            name: "Test Device".to_string(),
3401            kind: DacType::Custom("Test".to_string()),
3402            caps: backend_box.caps().clone(),
3403        };
3404
3405        // Very short timeout
3406        let cfg = StreamConfig::new(30000).with_drain_timeout(Duration::from_millis(50));
3407        let stream = Stream::with_backend(info, backend_box, cfg);
3408
3409        let start = Instant::now();
3410        let result = stream.run(|_req, _buffer| ChunkResult::End, |_e| {});
3411
3412        let elapsed = start.elapsed();
3413
3414        assert_eq!(result.unwrap(), RunExit::ProducerEnded);
3415        // Should timeout around 50ms, allow some margin
3416        assert!(
3417            elapsed.as_millis() >= 40 && elapsed.as_millis() < 150,
3418            "Should respect drain timeout (~50ms), took {:?}",
3419            elapsed
3420        );
3421    }
3422
3423    #[test]
3424    fn test_fill_result_end_skips_drain_with_zero_timeout() {
3425        // Test that drain is skipped when timeout is zero
3426        use std::time::Instant;
3427
3428        let backend = TestBackend::new().with_initial_queue(100000);
3429
3430        let mut backend_box: Box<dyn StreamBackend> = Box::new(backend);
3431        backend_box.connect().unwrap();
3432
3433        let info = DacInfo {
3434            id: "test".to_string(),
3435            name: "Test Device".to_string(),
3436            kind: DacType::Custom("Test".to_string()),
3437            caps: backend_box.caps().clone(),
3438        };
3439
3440        // Zero timeout = skip drain
3441        let cfg = StreamConfig::new(30000).with_drain_timeout(Duration::ZERO);
3442        let stream = Stream::with_backend(info, backend_box, cfg);
3443
3444        let start = Instant::now();
3445        let result = stream.run(|_req, _buffer| ChunkResult::End, |_e| {});
3446
3447        let elapsed = start.elapsed();
3448
3449        assert_eq!(result.unwrap(), RunExit::ProducerEnded);
3450        // Should return immediately
3451        assert!(
3452            elapsed.as_millis() < 20,
3453            "Should skip drain with zero timeout, took {:?}",
3454            elapsed
3455        );
3456    }
3457
3458    #[test]
3459    fn test_fill_result_end_drains_without_queue_depth() {
3460        // Test drain behavior when queued_points() returns None
3461        use std::time::Instant;
3462
3463        let mut backend = NoQueueTestBackend::new();
3464        backend.inner.connected = true;
3465
3466        let info = DacInfo {
3467            id: "test".to_string(),
3468            name: "Test Device".to_string(),
3469            kind: DacType::Custom("Test".to_string()),
3470            caps: backend.inner.caps().clone(),
3471        };
3472
3473        // Short drain timeout
3474        let cfg = StreamConfig::new(30000).with_drain_timeout(Duration::from_millis(100));
3475        let stream = Stream::with_backend(info, Box::new(backend), cfg);
3476
3477        let start = Instant::now();
3478        let result = stream.run(|_req, _buffer| ChunkResult::End, |_e| {});
3479
3480        let elapsed = start.elapsed();
3481
3482        assert_eq!(result.unwrap(), RunExit::ProducerEnded);
3483        // Without queue depth, drain sleeps for estimated buffer time (0 here)
3484        // So should return quickly
3485        assert!(
3486            elapsed.as_millis() < 50,
3487            "Should return quickly with empty buffer estimate, took {:?}",
3488            elapsed
3489        );
3490    }
3491
3492    #[test]
3493    fn test_fill_result_end_closes_shutter() {
3494        // Test that shutter is closed after drain
3495        let backend = TestBackend::new();
3496        let shutter_open = backend.shutter_open.clone();
3497
3498        let mut backend_box: Box<dyn StreamBackend> = Box::new(backend);
3499        backend_box.connect().unwrap();
3500
3501        let info = DacInfo {
3502            id: "test".to_string(),
3503            name: "Test Device".to_string(),
3504            kind: DacType::Custom("Test".to_string()),
3505            caps: backend_box.caps().clone(),
3506        };
3507
3508        let cfg = StreamConfig::new(30000).with_drain_timeout(Duration::from_millis(10));
3509        let stream = Stream::with_backend(info, backend_box, cfg);
3510
3511        // Arm the stream first
3512        let control = stream.control();
3513        control.arm().unwrap();
3514
3515        let result = stream.run(
3516            |req, buffer| {
3517                // Fill some points then end
3518                let n = req.target_points.min(buffer.len()).min(10);
3519                for i in 0..n {
3520                    buffer[i] = LaserPoint::blanked(0.0, 0.0);
3521                }
3522                ChunkResult::End
3523            },
3524            |_e| {},
3525        );
3526
3527        assert_eq!(result.unwrap(), RunExit::ProducerEnded);
3528        // Shutter should be closed after graceful shutdown
3529        assert!(
3530            !shutter_open.load(Ordering::SeqCst),
3531            "Shutter should be closed after drain"
3532        );
3533    }
3534
3535    // =========================================================================
3536    // Color delay tests
3537    // =========================================================================
3538
3539    #[test]
3540    fn test_color_delay_zero_is_passthrough() {
3541        // With delay=0, colors should pass through unchanged
3542        let mut backend = TestBackend::new();
3543        backend.connected = true;
3544
3545        let info = DacInfo {
3546            id: "test".to_string(),
3547            name: "Test".to_string(),
3548            kind: DacType::Custom("Test".to_string()),
3549            caps: backend.caps().clone(),
3550        };
3551
3552        let cfg = StreamConfig::new(30000); // color_delay defaults to ZERO
3553        let mut stream = Stream::with_backend(info, Box::new(backend), cfg);
3554
3555        // Arm the stream so points aren't blanked
3556        stream.control.arm().unwrap();
3557        stream.process_control_messages();
3558        stream.state.last_armed = true;
3559
3560        // Fill chunk_buffer with colored points
3561        let n = 5;
3562        for i in 0..n {
3563            stream.state.chunk_buffer[i] =
3564                LaserPoint::new(0.0, 0.0, (i as u16 + 1) * 1000, 0, 0, 65535);
3565        }
3566
3567        // write_fill_points applies color delay internally
3568        let mut on_error = |_: Error| {};
3569        stream.write_fill_points(n, &mut on_error).unwrap();
3570
3571        // Delay line should remain empty
3572        assert!(stream.state.color_delay_line.is_empty());
3573    }
3574
3575    #[test]
3576    fn test_color_delay_shifts_colors() {
3577        // With delay=3 points, first 3 outputs should be blanked, rest shifted
3578        let mut backend = TestBackend::new();
3579        backend.connected = true;
3580
3581        let info = DacInfo {
3582            id: "test".to_string(),
3583            name: "Test".to_string(),
3584            kind: DacType::Custom("Test".to_string()),
3585            caps: backend.caps().clone(),
3586        };
3587
3588        // 10000 PPS, delay = 300µs → ceil(0.0003 * 10000) = 3 points
3589        let cfg = StreamConfig::new(10000).with_color_delay(Duration::from_micros(300));
3590        let mut stream = Stream::with_backend(info, Box::new(backend), cfg);
3591
3592        // Arm the stream
3593        stream.control.arm().unwrap();
3594        stream.process_control_messages();
3595        // handle_shutter_transition pre-fills the delay line on arm
3596        stream.state.last_armed = true;
3597
3598        // Pre-fill delay line as handle_shutter_transition would on arm
3599        stream.state.color_delay_line.clear();
3600        for _ in 0..3 {
3601            stream.state.color_delay_line.push_back((0, 0, 0, 0));
3602        }
3603
3604        // Fill 5 points with distinct colors
3605        let n = 5;
3606        for i in 0..n {
3607            stream.state.chunk_buffer[i] = LaserPoint::new(
3608                i as f32 * 0.1,
3609                0.0,
3610                (i as u16 + 1) * 10000,
3611                (i as u16 + 1) * 5000,
3612                (i as u16 + 1) * 2000,
3613                65535,
3614            );
3615        }
3616
3617        let mut on_error = |_: Error| {};
3618        stream.write_fill_points(n, &mut on_error).unwrap();
3619
3620        // After write, check the chunk_buffer was modified:
3621        // We can't inspect what was written to the backend directly,
3622        // but we can verify the delay line state.
3623        // After processing 5 points through a 3-point delay,
3624        // the delay line should still have 3 entries (the last 3 input colors).
3625        assert_eq!(stream.state.color_delay_line.len(), 3);
3626
3627        // The delay line should contain colors from inputs 3, 4, 5 (0-indexed: 2, 3, 4)
3628        let expected: Vec<(u16, u16, u16, u16)> = (3..=5)
3629            .map(|i| (i * 10000u16, i * 5000, i * 2000, 65535))
3630            .collect();
3631        let actual: Vec<(u16, u16, u16, u16)> =
3632            stream.state.color_delay_line.iter().copied().collect();
3633        assert_eq!(actual, expected);
3634    }
3635
3636    #[test]
3637    fn test_color_delay_resets_on_disarm_arm() {
3638        // Disarm should clear the delay line, arm should re-fill it
3639        let mut backend = TestBackend::new();
3640        backend.connected = true;
3641
3642        let info = DacInfo {
3643            id: "test".to_string(),
3644            name: "Test".to_string(),
3645            kind: DacType::Custom("Test".to_string()),
3646            caps: backend.caps().clone(),
3647        };
3648
3649        // 10000 PPS, delay = 200µs → ceil(0.0002 * 10000) = 2 points
3650        let cfg = StreamConfig::new(10000).with_color_delay(Duration::from_micros(200));
3651        let mut stream = Stream::with_backend(info, Box::new(backend), cfg);
3652
3653        // Arm: should pre-fill delay line
3654        stream.handle_shutter_transition(true);
3655        assert_eq!(stream.state.color_delay_line.len(), 2);
3656        assert_eq!(stream.state.color_delay_line.front(), Some(&(0, 0, 0, 0)));
3657
3658        // Disarm: should clear delay line
3659        stream.handle_shutter_transition(false);
3660        assert!(stream.state.color_delay_line.is_empty());
3661
3662        // Arm again: should re-fill
3663        stream.handle_shutter_transition(true);
3664        assert_eq!(stream.state.color_delay_line.len(), 2);
3665    }
3666
3667    #[test]
3668    fn test_color_delay_dynamic_change() {
3669        // Changing delay at runtime via atomic should resize the deque
3670        let mut backend = TestBackend::new();
3671        backend.connected = true;
3672
3673        let info = DacInfo {
3674            id: "test".to_string(),
3675            name: "Test".to_string(),
3676            kind: DacType::Custom("Test".to_string()),
3677            caps: backend.caps().clone(),
3678        };
3679
3680        // Start with 200µs delay at 10000 PPS → 2 points
3681        let cfg = StreamConfig::new(10000).with_color_delay(Duration::from_micros(200));
3682        let mut stream = Stream::with_backend(info, Box::new(backend), cfg);
3683
3684        // Arm
3685        stream.control.arm().unwrap();
3686        stream.process_control_messages();
3687        stream.state.last_armed = true;
3688
3689        // Pre-fill as handle_shutter_transition would
3690        stream.state.color_delay_line.clear();
3691        for _ in 0..2 {
3692            stream.state.color_delay_line.push_back((0, 0, 0, 0));
3693        }
3694
3695        // Fill and write a chunk
3696        let n = 3;
3697        for i in 0..n {
3698            stream.state.chunk_buffer[i] =
3699                LaserPoint::new(0.0, 0.0, (i as u16 + 1) * 10000, 0, 0, 65535);
3700        }
3701        let mut on_error = |_: Error| {};
3702        stream.write_fill_points(n, &mut on_error).unwrap();
3703
3704        // Now change delay to 500µs → ceil(0.0005 * 10000) = 5 points
3705        stream.control.set_color_delay(Duration::from_micros(500));
3706
3707        // Write another chunk — delay line should resize to 5
3708        for i in 0..n {
3709            stream.state.chunk_buffer[i] =
3710                LaserPoint::new(0.0, 0.0, (i as u16 + 4) * 10000, 0, 0, 65535);
3711        }
3712        stream.write_fill_points(n, &mut on_error).unwrap();
3713
3714        assert_eq!(stream.state.color_delay_line.len(), 5);
3715
3716        // Now disable delay entirely
3717        stream.control.set_color_delay(Duration::ZERO);
3718
3719        for i in 0..n {
3720            stream.state.chunk_buffer[i] = LaserPoint::new(0.0, 0.0, 50000, 0, 0, 65535);
3721        }
3722        stream.write_fill_points(n, &mut on_error).unwrap();
3723
3724        // Delay line should be cleared
3725        assert!(stream.state.color_delay_line.is_empty());
3726    }
3727
3728    // =========================================================================
3729    // Startup blanking tests
3730    // =========================================================================
3731
3732    #[test]
3733    fn test_startup_blank_blanks_first_n_points() {
3734        let backend = TestBackend::new();
3735        let mut backend_box: Box<dyn StreamBackend> = Box::new(backend);
3736        backend_box.connect().unwrap();
3737
3738        let info = DacInfo {
3739            id: "test".to_string(),
3740            name: "Test Device".to_string(),
3741            kind: DacType::Custom("Test".to_string()),
3742            caps: backend_box.caps().clone(),
3743        };
3744
3745        // 10000 PPS, startup_blank = 500µs → ceil(0.0005 * 10000) = 5 points
3746        // Disable color delay to isolate startup blanking
3747        let cfg = StreamConfig::new(10000)
3748            .with_startup_blank(Duration::from_micros(500))
3749            .with_color_delay(Duration::ZERO);
3750        let mut stream = Stream::with_backend(info, backend_box, cfg);
3751
3752        assert_eq!(stream.state.startup_blank_points, 5);
3753
3754        // Arm the stream (triggers handle_shutter_transition which resets counter)
3755        stream.control.arm().unwrap();
3756        stream.process_control_messages();
3757
3758        // Simulate arm transition in write path
3759        stream.state.last_armed = false; // Force transition detection
3760
3761        // Fill 10 colored points
3762        let n = 10;
3763        for i in 0..n {
3764            stream.state.chunk_buffer[i] =
3765                LaserPoint::new(i as f32 * 0.1, 0.0, 65535, 32000, 16000, 65535);
3766        }
3767
3768        let mut on_error = |_: Error| {};
3769        stream.write_fill_points(n, &mut on_error).unwrap();
3770
3771        // After write, check what was sent: we can't inspect backend directly,
3772        // but we can verify the counter decremented and the buffer was modified
3773        assert_eq!(stream.state.startup_blank_remaining, 0);
3774
3775        // Write another chunk — should NOT be blanked (counter exhausted)
3776        stream.state.last_armed = true; // No transition this time
3777        for i in 0..n {
3778            stream.state.chunk_buffer[i] = LaserPoint::new(0.0, 0.0, 65535, 32000, 16000, 65535);
3779        }
3780        stream.write_fill_points(n, &mut on_error).unwrap();
3781
3782        // Verify colors pass through unmodified (no startup blanking)
3783        // The chunk_buffer is modified in-place before write, so after write
3784        // it should still have the original colors (startup blank is exhausted)
3785        assert_eq!(stream.state.chunk_buffer[0].r, 65535);
3786        assert_eq!(stream.state.chunk_buffer[0].g, 32000);
3787    }
3788
3789    #[test]
3790    fn test_startup_blank_resets_on_rearm() {
3791        let backend = TestBackend::new();
3792        let mut backend_box: Box<dyn StreamBackend> = Box::new(backend);
3793        backend_box.connect().unwrap();
3794
3795        let info = DacInfo {
3796            id: "test".to_string(),
3797            name: "Test Device".to_string(),
3798            kind: DacType::Custom("Test".to_string()),
3799            caps: backend_box.caps().clone(),
3800        };
3801
3802        // 10000 PPS, startup_blank = 500µs → 5 points
3803        let cfg = StreamConfig::new(10000)
3804            .with_startup_blank(Duration::from_micros(500))
3805            .with_color_delay(Duration::ZERO);
3806        let mut stream = Stream::with_backend(info, backend_box, cfg);
3807
3808        // First arm cycle: consume startup blanking
3809        stream.state.last_armed = false;
3810        stream.control.arm().unwrap();
3811        stream.process_control_messages();
3812
3813        let n = 10;
3814        for i in 0..n {
3815            stream.state.chunk_buffer[i] = LaserPoint::new(0.0, 0.0, 65535, 65535, 65535, 65535);
3816        }
3817        let mut on_error = |_: Error| {};
3818        // This triggers disarmed→armed transition, which resets counter
3819        stream.state.last_armed = false;
3820        stream.write_fill_points(n, &mut on_error).unwrap();
3821        assert_eq!(stream.state.startup_blank_remaining, 0);
3822
3823        // Disarm → re-arm
3824        stream.control.disarm().unwrap();
3825        stream.process_control_messages();
3826
3827        stream.control.arm().unwrap();
3828        stream.process_control_messages();
3829
3830        // Write again — should trigger new arm transition and reset counter
3831        stream.state.last_armed = false;
3832        for i in 0..n {
3833            stream.state.chunk_buffer[i] = LaserPoint::new(0.0, 0.0, 65535, 65535, 65535, 65535);
3834        }
3835        stream.write_fill_points(n, &mut on_error).unwrap();
3836
3837        // Counter should have been reset to 5 and then decremented to 0
3838        assert_eq!(stream.state.startup_blank_remaining, 0);
3839    }
3840
3841    #[test]
3842    fn test_startup_blank_zero_is_noop() {
3843        let backend = TestBackend::new();
3844        let mut backend_box: Box<dyn StreamBackend> = Box::new(backend);
3845        backend_box.connect().unwrap();
3846
3847        let info = DacInfo {
3848            id: "test".to_string(),
3849            name: "Test Device".to_string(),
3850            kind: DacType::Custom("Test".to_string()),
3851            caps: backend_box.caps().clone(),
3852        };
3853
3854        // Disable startup blanking
3855        let cfg = StreamConfig::new(10000)
3856            .with_startup_blank(Duration::ZERO)
3857            .with_color_delay(Duration::ZERO);
3858        let mut stream = Stream::with_backend(info, backend_box, cfg);
3859
3860        assert_eq!(stream.state.startup_blank_points, 0);
3861
3862        // Arm and write colored points
3863        stream.control.arm().unwrap();
3864        stream.process_control_messages();
3865        stream.state.last_armed = false; // Force arm transition
3866
3867        let n = 5;
3868        for i in 0..n {
3869            stream.state.chunk_buffer[i] = LaserPoint::new(0.0, 0.0, 65535, 32000, 16000, 65535);
3870        }
3871        let mut on_error = |_: Error| {};
3872        stream.write_fill_points(n, &mut on_error).unwrap();
3873
3874        // Colors should pass through unmodified — no startup blanking
3875        assert_eq!(stream.state.chunk_buffer[0].r, 65535);
3876        assert_eq!(stream.state.chunk_buffer[0].g, 32000);
3877        assert_eq!(stream.state.chunk_buffer[0].b, 16000);
3878        assert_eq!(stream.state.chunk_buffer[0].intensity, 65535);
3879        assert_eq!(stream.state.startup_blank_remaining, 0);
3880    }
3881
3882    // =========================================================================
3883    // OutputModel coverage: UsbFrameSwap vs NetworkFifo scheduled_ahead
3884    // =========================================================================
3885
3886    #[test]
3887    fn test_usb_frame_swap_replaces_scheduled_ahead() {
3888        let backend = TestBackend::new().with_output_model(OutputModel::UsbFrameSwap);
3889        let mut backend_box: Box<dyn StreamBackend> = Box::new(backend);
3890        backend_box.connect().unwrap();
3891
3892        let info = DacInfo {
3893            id: "test".to_string(),
3894            name: "Test Device".to_string(),
3895            kind: DacType::Custom("Test".to_string()),
3896            caps: backend_box.caps().clone(),
3897        };
3898
3899        let cfg = StreamConfig::new(30000).with_color_delay(Duration::ZERO);
3900        let mut stream = Stream::with_backend(info, backend_box, cfg);
3901
3902        // Arm and write two chunks of 50 points each
3903        stream.control.arm().unwrap();
3904        stream.process_control_messages();
3905
3906        let n = 50;
3907        for _ in 0..2 {
3908            for i in 0..n {
3909                stream.state.chunk_buffer[i] = LaserPoint::new(0.0, 0.0, 0, 0, 0, 0);
3910            }
3911            let mut on_error = |_: Error| {};
3912            stream.write_fill_points(n, &mut on_error).unwrap();
3913        }
3914
3915        // UsbFrameSwap: scheduled_ahead should be SET to n, not accumulated to 2n
3916        assert_eq!(stream.state.scheduled_ahead, n as u64);
3917        assert_eq!(stream.state.stats.chunks_written, 2);
3918        assert_eq!(stream.state.stats.points_written, 2 * n as u64);
3919    }
3920
3921    #[test]
3922    fn test_usb_frame_swap_no_queue_reporting() {
3923        let backend = NoQueueTestBackend::new().with_output_model(OutputModel::UsbFrameSwap);
3924        let mut backend_box: Box<dyn StreamBackend> = Box::new(backend);
3925        backend_box.connect().unwrap();
3926
3927        let info = DacInfo {
3928            id: "test".to_string(),
3929            name: "Test Device".to_string(),
3930            kind: DacType::Custom("Test".to_string()),
3931            caps: backend_box.caps().clone(),
3932        };
3933
3934        let cfg = StreamConfig::new(30000).with_color_delay(Duration::ZERO);
3935        let mut stream = Stream::with_backend(info, backend_box, cfg);
3936
3937        // Arm and write two chunks
3938        stream.control.arm().unwrap();
3939        stream.process_control_messages();
3940
3941        let n = 50;
3942        for _ in 0..2 {
3943            for i in 0..n {
3944                stream.state.chunk_buffer[i] = LaserPoint::new(0.0, 0.0, 0, 0, 0, 0);
3945            }
3946            let mut on_error = |_: Error| {};
3947            stream.write_fill_points(n, &mut on_error).unwrap();
3948        }
3949
3950        // UsbFrameSwap + no queue reporting (like real Helios):
3951        // scheduled_ahead should be SET, not accumulated
3952        assert_eq!(stream.state.scheduled_ahead, n as u64);
3953
3954        // Buffer estimation should use software-only path (queued_points returns None)
3955        let est = stream.estimate_buffer_points();
3956        // Software estimate equals scheduled_ahead = n (SET, not accumulated)
3957        assert_eq!(est, n as u64);
3958    }
3959
3960    #[test]
3961    fn test_network_fifo_accumulates_scheduled_ahead() {
3962        let backend = TestBackend::new(); // default: NetworkFifo
3963        let mut backend_box: Box<dyn StreamBackend> = Box::new(backend);
3964        backend_box.connect().unwrap();
3965
3966        let info = DacInfo {
3967            id: "test".to_string(),
3968            name: "Test Device".to_string(),
3969            kind: DacType::Custom("Test".to_string()),
3970            caps: backend_box.caps().clone(),
3971        };
3972
3973        let cfg = StreamConfig::new(30000).with_color_delay(Duration::ZERO);
3974        let mut stream = Stream::with_backend(info, backend_box, cfg);
3975
3976        // Arm and write two chunks of 50 points each
3977        stream.control.arm().unwrap();
3978        stream.process_control_messages();
3979
3980        let n = 50;
3981        for _ in 0..2 {
3982            for i in 0..n {
3983                stream.state.chunk_buffer[i] = LaserPoint::new(0.0, 0.0, 0, 0, 0, 0);
3984            }
3985            let mut on_error = |_: Error| {};
3986            stream.write_fill_points(n, &mut on_error).unwrap();
3987        }
3988
3989        // NetworkFifo: scheduled_ahead should ACCUMULATE to 2n
3990        assert_eq!(stream.state.scheduled_ahead, 2 * n as u64);
3991        assert_eq!(stream.state.stats.chunks_written, 2);
3992        assert_eq!(stream.state.stats.points_written, 2 * n as u64);
3993    }
3994}