Skip to main content

laser_dac/stream/
mod.rs

1//! Stream and Dac types for point output.
2//!
3//! This module provides the `Stream` type for streaming point chunks to a DAC,
4//! `StreamControl` for out-of-band control (arm/disarm/stop), and `Dac` for
5//! connected devices that can start streaming sessions.
6//!
7//! # Armed/Disarmed Model
8//!
9//! Streams use a binary armed/disarmed safety model:
10//!
11//! - **Armed**: Content passes through to device, shutter opened (best-effort).
12//! - **Disarmed** (default): Shutter closed, intensity/RGB forced to 0.
13//!
14//! All streams start disarmed. Call `arm()` to enable output.
15//! `arm()` and `disarm()` are the only safety controls — there is no separate
16//! shutter API. This keeps the mental model simple: armed = laser may emit.
17//!
18//! # Hardware Shutter Support
19//!
20//! Shutter control is best-effort and varies by backend:
21//! - **LaserCube USB/WiFi**: Actual hardware control
22//! - **Helios**: Hardware shutter control via USB interrupt
23//! - **Ether Dream, IDN**: No-op (safety relies on software blanking)
24//!
25//! # Disconnect Behavior
26//!
27//! By default, streams do not reconnect — on disconnect, `run()` returns
28//! `RunExit::Disconnected`. Configure reconnection via
29//! [`StreamConfig::with_reconnect`](crate::StreamConfig::with_reconnect) or
30//! [`FrameSessionConfig::with_reconnect`](crate::FrameSessionConfig::with_reconnect).
31//! New streams always start disarmed for safety.
32
33use std::collections::VecDeque;
34use std::sync::atomic::{AtomicBool, AtomicU32, AtomicU64, Ordering};
35use std::sync::mpsc::{self, Receiver, Sender, TryRecvError};
36use std::sync::{Arc, Mutex};
37use std::time::Duration;
38
39use crate::backend::{BackendKind, Error, Result, WriteOutcome};
40use crate::config::{IdlePolicy, StreamConfig};
41use crate::device::{DacCapabilities, DacInfo, DacType, OutputModel};
42use crate::discovery::DacDiscovery;
43use crate::point::LaserPoint;
44use crate::reconnect::{reconnect_backend_with_retry, ReconnectPolicy, ReconnectTarget};
45
46pub(crate) mod chunk_producer;
47
48// =============================================================================
49// Stream protocol types
50// =============================================================================
51
52#[cfg(feature = "serde")]
53use serde::{Deserialize, Serialize};
54
55/// Represents a point in stream time, anchored to estimated playback position.
56///
57/// `StreamInstant` represents the **estimated playback time** of points, not merely
58/// "points sent so far." When used in `ChunkRequest::start`, it represents:
59///
60/// `start` = playhead + buffered
61///
62/// Where:
63/// - `playhead` = stream_epoch + estimated_consumed_points
64/// - `buffered` = points sent but not yet played
65///
66/// This allows callbacks to generate content for the exact time it will be displayed,
67/// enabling accurate audio synchronization.
68#[derive(Clone, Copy, Debug, Default, PartialEq, Eq, PartialOrd, Ord, Hash)]
69#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))]
70pub struct StreamInstant(pub u64);
71
72impl StreamInstant {
73    /// Create a new stream instant from a point count.
74    pub fn new(points: u64) -> Self {
75        Self(points)
76    }
77
78    /// Returns the number of points since stream start.
79    pub fn points(&self) -> u64 {
80        self.0
81    }
82
83    /// Convert this instant to seconds at the given points-per-second rate.
84    pub fn as_seconds(&self, pps: u32) -> f64 {
85        self.0 as f64 / pps as f64
86    }
87
88    /// Convert to seconds at the given PPS.
89    ///
90    /// This is an alias for `as_seconds()` for consistency with standard Rust
91    /// duration naming conventions (e.g., `Duration::as_secs_f64()`).
92    #[inline]
93    pub fn as_secs_f64(&self, pps: u32) -> f64 {
94        self.as_seconds(pps)
95    }
96
97    /// Create a stream instant from a duration in seconds at the given PPS.
98    pub fn from_seconds(seconds: f64, pps: u32) -> Self {
99        Self((seconds * pps as f64) as u64)
100    }
101
102    /// Add a number of points to this instant.
103    pub fn add_points(&self, points: u64) -> Self {
104        Self(self.0.saturating_add(points))
105    }
106
107    /// Subtract a number of points from this instant (saturating at 0).
108    pub fn sub_points(&self, points: u64) -> Self {
109        Self(self.0.saturating_sub(points))
110    }
111}
112
113impl std::ops::Add<u64> for StreamInstant {
114    type Output = Self;
115    fn add(self, rhs: u64) -> Self::Output {
116        self.add_points(rhs)
117    }
118}
119
120impl std::ops::Sub<u64> for StreamInstant {
121    type Output = Self;
122    fn sub(self, rhs: u64) -> Self::Output {
123        self.sub_points(rhs)
124    }
125}
126
127impl std::ops::AddAssign<u64> for StreamInstant {
128    fn add_assign(&mut self, rhs: u64) {
129        self.0 = self.0.saturating_add(rhs);
130    }
131}
132
133impl std::ops::SubAssign<u64> for StreamInstant {
134    fn sub_assign(&mut self, rhs: u64) {
135        self.0 = self.0.saturating_sub(rhs);
136    }
137}
138
139/// A request to fill a buffer with points for streaming.
140///
141/// The callback receives a `ChunkRequest` describing the next chunk's timing
142/// requirements and fills points into a library-owned buffer.
143///
144/// `target_points` is the ideal number of points to reach the target buffer
145/// level, clamped to the provided buffer length. Returning fewer points
146/// triggers the configured [`IdlePolicy`] for the
147/// remainder.
148#[derive(Clone, Debug)]
149pub struct ChunkRequest {
150    /// Estimated playback time when this chunk starts.
151    ///
152    /// Use this for audio synchronization.
153    pub start: StreamInstant,
154
155    /// Points per second (current value, may change via `StreamControl::set_pps`).
156    pub pps: u32,
157
158    /// Ideal number of points to reach target buffer level.
159    ///
160    /// Calculated as: `ceil((target_buffer - buffered) * pps)`, clamped to buffer length.
161    pub target_points: usize,
162}
163
164/// Result returned by the fill callback indicating how the buffer was filled.
165///
166/// This enum allows the callback to communicate three distinct states:
167/// - Successfully filled some number of points
168/// - Temporarily unable to provide data (underrun policy applies)
169/// - Stream should end gracefully
170///
171/// # `Filled(0)` Semantics
172///
173/// - If `target_points == 0`: Buffer is full, nothing needed. This is fine.
174/// - If `target_points > 0`: We needed points but got none. Treated as `Starved`.
175#[derive(Clone, Copy, Debug, PartialEq, Eq)]
176pub enum ChunkResult {
177    /// Wrote n points to the buffer.
178    ///
179    /// `n` must be <= `buffer.len()`. Partial fills (`n < target_points`) are
180    /// accepted; the remainder is filled by the configured idle policy.
181    Filled(usize),
182
183    /// No data available right now.
184    ///
185    /// Underrun policy is applied (repeat last chunk or blank).
186    /// Stream continues; callback will be called again when buffer needs filling.
187    Starved,
188
189    /// Stream is finished. Shutdown sequence:
190    /// 1. Stop calling callback
191    /// 2. Let queued points drain (play out)
192    /// 3. Blank/park the laser at last position
193    /// 4. Return from stream() with `RunExit::ProducerEnded`
194    End,
195}
196
197/// Current status of a stream.
198#[derive(Clone, Debug)]
199pub struct StreamStatus {
200    /// Whether the device is connected.
201    pub connected: bool,
202    /// Library-owned scheduled amount.
203    pub scheduled_ahead_points: u64,
204    /// Best-effort device/backend estimate.
205    pub device_queued_points: Option<u64>,
206    /// Optional statistics for diagnostics.
207    pub stats: Option<StreamStats>,
208}
209
210/// Stream statistics for diagnostics and debugging.
211#[derive(Clone, Debug, Default)]
212pub struct StreamStats {
213    /// Number of times the stream underran.
214    pub underrun_count: u64,
215    /// Number of chunks that arrived late.
216    pub late_chunk_count: u64,
217    /// Number of times the device reconnected.
218    pub reconnect_count: u64,
219    /// Total chunks written since stream start.
220    pub chunks_written: u64,
221    /// Total points written since stream start.
222    pub points_written: u64,
223}
224
225/// How a callback-mode stream run ended.
226#[derive(Clone, Debug, PartialEq, Eq)]
227pub enum RunExit {
228    /// Stream was stopped via `StreamControl::stop()`.
229    Stopped,
230    /// Producer returned `None` (graceful completion).
231    ProducerEnded,
232    /// Device disconnected. No auto-reconnect; new streams start disarmed.
233    Disconnected,
234}
235
236// =============================================================================
237// Stream Control
238// =============================================================================
239
240/// Control messages sent from StreamControl to Stream.
241///
242/// These messages allow out-of-band control actions to take effect immediately,
243/// even when the stream is waiting (pacing, backpressure, etc.).
244#[derive(Debug, Clone, Copy)]
245pub(crate) enum ControlMsg {
246    /// Arm the output (opens hardware shutter).
247    Arm,
248    /// Disarm the output (closes hardware shutter).
249    Disarm,
250    /// Request the stream to stop.
251    Stop,
252}
253
254/// Thread-safe control handle for safety-critical actions.
255///
256/// This allows out-of-band control of the stream (arm/disarm/stop) from
257/// a different thread, e.g., for E-stop functionality.
258///
259/// Control actions take effect as soon as possible - the stream processes
260/// control messages at every opportunity (during waits, between retries, etc.).
261#[derive(Clone)]
262pub struct StreamControl {
263    inner: Arc<StreamControlInner>,
264}
265
266struct StreamControlInner {
267    /// Whether output is armed (laser can fire).
268    armed: AtomicBool,
269    /// Whether a stop has been requested.
270    stop_requested: AtomicBool,
271    /// Channel for sending control messages to the stream loop.
272    /// Wrapped in Mutex because Sender is Send but not Sync.
273    control_tx: Mutex<Sender<ControlMsg>>,
274    /// Color delay in microseconds (readable per-chunk without locking).
275    color_delay_micros: AtomicU64,
276    /// Points per second (hot-swappable without session restart).
277    pps: AtomicU32,
278}
279
280impl StreamControl {
281    pub(crate) fn new(control_tx: Sender<ControlMsg>, color_delay: Duration, pps: u32) -> Self {
282        Self {
283            inner: Arc::new(StreamControlInner {
284                armed: AtomicBool::new(false),
285                stop_requested: AtomicBool::new(false),
286                control_tx: Mutex::new(control_tx),
287                color_delay_micros: AtomicU64::new(color_delay.as_micros() as u64),
288                pps: AtomicU32::new(pps),
289            }),
290        }
291    }
292
293    /// Arm the output (allow laser to fire).
294    ///
295    /// When armed, content from the producer passes through unmodified
296    /// and the hardware shutter is opened (best-effort).
297    pub fn arm(&self) -> Result<()> {
298        self.inner.armed.store(true, Ordering::SeqCst);
299        // Send message to stream for immediate shutter control
300        if let Ok(tx) = self.inner.control_tx.lock() {
301            let _ = tx.send(ControlMsg::Arm);
302        }
303        Ok(())
304    }
305
306    /// Disarm the output (force laser off). Designed for E-stop use.
307    ///
308    /// Immediately sets an atomic flag (works even if stream loop is blocked),
309    /// then sends a message to close the hardware shutter. All future points
310    /// are blanked in software. The stream stays alive outputting blanks -
311    /// use `stop()` to terminate entirely.
312    ///
313    /// **Latency**: Points already in the device buffer will still play out.
314    /// `target_buffer` bounds this latency.
315    ///
316    /// **Hardware shutter**: Best-effort. LaserCube and Helios have actual hardware
317    /// control; Ether Dream, IDN are no-ops (safety relies on software blanking).
318    pub fn disarm(&self) -> Result<()> {
319        self.inner.armed.store(false, Ordering::SeqCst);
320        // Send message to stream for immediate shutter control
321        if let Ok(tx) = self.inner.control_tx.lock() {
322            let _ = tx.send(ControlMsg::Disarm);
323        }
324        Ok(())
325    }
326
327    /// Check if the output is armed.
328    pub fn is_armed(&self) -> bool {
329        self.inner.armed.load(Ordering::SeqCst)
330    }
331
332    /// Set the color delay for scanner sync compensation.
333    ///
334    /// Takes effect within one chunk period. The delay is quantized to
335    /// whole points: `ceil(delay * pps)`.
336    pub fn set_color_delay(&self, delay: Duration) {
337        self.inner
338            .color_delay_micros
339            .store(delay.as_micros() as u64, Ordering::SeqCst);
340    }
341
342    /// Get the current color delay.
343    pub fn color_delay(&self) -> Duration {
344        Duration::from_micros(self.inner.color_delay_micros.load(Ordering::SeqCst))
345    }
346
347    /// Set the points per second rate.
348    ///
349    /// Takes effect within one chunk period — the stream loop reads this
350    /// atomically each iteration and recalculates timing on the fly.
351    /// No session restart required.
352    pub fn set_pps(&self, pps: u32) {
353        self.inner.pps.store(pps, Ordering::SeqCst);
354    }
355
356    /// Get the current points per second rate.
357    pub fn pps(&self) -> u32 {
358        self.inner.pps.load(Ordering::SeqCst)
359    }
360
361    /// Request the stream to stop.
362    ///
363    /// Signals termination; `run()` returns `RunExit::Stopped`.
364    /// For clean shutdown with shutter close, prefer `Stream::stop()`.
365    pub fn stop(&self) -> Result<()> {
366        self.inner.stop_requested.store(true, Ordering::SeqCst);
367        // Send message to stream for immediate stop
368        if let Ok(tx) = self.inner.control_tx.lock() {
369            let _ = tx.send(ControlMsg::Stop);
370        }
371        Ok(())
372    }
373
374    /// Check if a stop has been requested.
375    pub fn is_stop_requested(&self) -> bool {
376        self.inner.stop_requested.load(Ordering::SeqCst)
377    }
378}
379
380// =============================================================================
381// Stream State
382// =============================================================================
383
384/// Legacy state shared by the in-Stream helpers retained for tests. The
385/// production path lives entirely in [`crate::presentation::driver::run`]
386/// via [`chunk_producer::ChunkProducer`].
387#[allow(dead_code)]
388struct StreamState {
389    /// Current position in stream time (points since start).
390    current_instant: StreamInstant,
391
392    // Pre-allocated buffers (no per-chunk allocation in hot path)
393    /// Buffer for callback to fill points into.
394    chunk_buffer: Vec<LaserPoint>,
395    /// Last chunk for RepeatLast underrun policy.
396    last_chunk: Vec<LaserPoint>,
397    /// Number of valid points in last_chunk.
398    last_chunk_len: usize,
399
400    /// FIFO for color delay (r, g, b, intensity per point).
401    color_delay_line: VecDeque<(u16, u16, u16, u16)>,
402
403    /// Points remaining in the startup blank window (decremented as points are written).
404    startup_blank_remaining: usize,
405    /// Total startup blank points (computed once from config).
406    startup_blank_points: usize,
407
408    // Cached config-derived values (avoid recomputing per-iteration float math)
409    /// target_buffer as seconds (cached from config).
410    target_buffer_secs: f64,
411    /// target_buffer as points (cached from config + pps).
412    target_buffer_points: u64,
413
414    /// Statistics.
415    stats: StreamStats,
416    /// Track the last armed state to detect transitions.
417    last_armed: bool,
418    /// Whether the hardware shutter is currently open.
419    shutter_open: bool,
420}
421
422impl StreamState {
423    /// Create new stream state with pre-allocated buffers.
424    ///
425    /// Buffers are sized to `max_points_per_chunk` from DAC capabilities,
426    /// ensuring we can handle any catch-up scenario without reallocation.
427    fn new(
428        max_points_per_chunk: usize,
429        startup_blank_points: usize,
430        config: &StreamConfig,
431    ) -> Self {
432        let pps = config.pps as f64;
433        let target_buffer_secs = config.target_buffer.as_secs_f64();
434        Self {
435            current_instant: StreamInstant::new(0),
436            chunk_buffer: vec![LaserPoint::default(); max_points_per_chunk],
437            last_chunk: vec![LaserPoint::default(); max_points_per_chunk],
438            last_chunk_len: 0,
439            color_delay_line: VecDeque::new(),
440            startup_blank_remaining: 0,
441            startup_blank_points,
442            target_buffer_secs,
443            target_buffer_points: (target_buffer_secs * pps) as u64,
444            stats: StreamStats::default(),
445            last_armed: false,
446            shutter_open: false,
447        }
448    }
449}
450
451// =============================================================================
452// Stream
453// =============================================================================
454
455/// A streaming session for outputting points to a DAC.
456///
457/// Use [`run()`](Self::run) to stream with buffer-driven timing.
458/// The callback is invoked when the buffer needs filling, providing automatic
459/// backpressure handling and zero allocations in the hot path.
460///
461/// The stream owns pacing, backpressure, and the timebase (`StreamInstant`).
462pub struct Stream {
463    /// Device info for this stream.
464    info: DacInfo,
465    /// The backend.
466    backend: Option<BackendKind>,
467    /// Stream configuration.
468    config: StreamConfig,
469    /// Thread-safe control handle.
470    control: StreamControl,
471    /// Receiver for control messages from StreamControl.
472    control_rx: Receiver<ControlMsg>,
473    /// Stream state.
474    state: StreamState,
475    /// Reconnection policy (None = no reconnection).
476    pub(crate) reconnect_policy: Option<ReconnectPolicy>,
477    /// Reopen identity, preserved for `into_dac()` even without reconnect enabled.
478    pub(crate) reconnect_target: Option<ReconnectTarget>,
479}
480
481// Inherent helpers retained for the test surface (see `run_legacy`); the
482// production `Stream::run` body lives in [`crate::presentation::driver::run`].
483#[allow(dead_code)]
484impl Stream {
485    /// Convert a duration in microseconds to a point count at the given PPS, rounding up.
486    fn duration_micros_to_points(micros: u64, pps: u32) -> usize {
487        if micros == 0 {
488            0
489        } else {
490            (micros as f64 * pps as f64 / 1_000_000.0).ceil() as usize
491        }
492    }
493
494    fn udp_timed_sleep_slice(remaining: Duration) -> Option<Duration> {
495        const UDP_TIMED_SLEEP_SLICE: Duration = Duration::from_millis(1);
496        const UDP_TIMED_BUSY_WAIT_THRESHOLD: Duration = Duration::from_micros(500);
497
498        if remaining <= UDP_TIMED_BUSY_WAIT_THRESHOLD {
499            None
500        } else {
501            Some(
502                remaining
503                    .saturating_sub(UDP_TIMED_BUSY_WAIT_THRESHOLD)
504                    .min(UDP_TIMED_SLEEP_SLICE),
505            )
506        }
507    }
508
509    /// Create a new stream with a backend.
510    pub(crate) fn with_backend(info: DacInfo, backend: BackendKind, config: StreamConfig) -> Self {
511        let (control_tx, control_rx) = mpsc::channel();
512        let max_points = info.caps.max_points_per_chunk;
513        let startup_blank_points =
514            Self::duration_micros_to_points(config.startup_blank.as_micros() as u64, config.pps);
515        let color_delay = config.color_delay;
516        let pps = config.pps;
517        let state = StreamState::new(max_points, startup_blank_points, &config);
518        Self {
519            info,
520            backend: Some(backend),
521            config,
522            control: StreamControl::new(control_tx, color_delay, pps),
523            control_rx,
524            state,
525            reconnect_policy: None,
526            reconnect_target: None,
527        }
528    }
529
530    /// Compute the software buffer target for scheduler pacing.
531    ///
532    /// UdpTimed backends use `max_points_per_chunk` as the target to keep
533    /// the device ringbuffer continuously topped up. A lower target creates
534    /// long idle gaps between bursts, causing glitches over WiFi.
535    fn scheduler_target_buffer_points(&self) -> u64 {
536        if self.info.caps.output_model == OutputModel::UdpTimed {
537            self.info.caps.max_points_per_chunk as u64
538        } else {
539            self.state.target_buffer_points
540        }
541    }
542
543    /// Returns the device info.
544    pub fn info(&self) -> &DacInfo {
545        &self.info
546    }
547
548    /// Returns the stream configuration.
549    pub fn config(&self) -> &StreamConfig {
550        &self.config
551    }
552
553    /// Returns a thread-safe control handle.
554    pub fn control(&self) -> StreamControl {
555        self.control.clone()
556    }
557
558    /// Returns the current stream status.
559    pub fn status(&self) -> Result<StreamStatus> {
560        let buffered = self.estimate_buffer_points();
561        Ok(StreamStatus {
562            connected: self.backend.as_ref().is_some_and(|b| b.is_connected()),
563            scheduled_ahead_points: buffered,
564            device_queued_points: Some(buffered),
565            stats: Some(self.state.stats.clone()),
566        })
567    }
568
569    /// Handle hardware shutter transitions based on arm state changes.
570    fn handle_shutter_transition(&mut self, is_armed: bool) {
571        let was_armed = self.state.last_armed;
572        self.state.last_armed = is_armed;
573
574        if was_armed && !is_armed {
575            // Disarmed: close the shutter for safety (best-effort)
576            self.state.color_delay_line.clear();
577            if self.state.shutter_open {
578                if let Some(backend) = &mut self.backend {
579                    let _ = backend.set_shutter(false); // Best-effort, ignore errors
580                }
581                self.state.shutter_open = false;
582            }
583        } else if !was_armed && is_armed {
584            // Armed: open the shutter (best-effort)
585            // Pre-fill color delay line with blanked colors so early points
586            // come out dark while galvos settle.
587            let delay_micros = self.control.inner.color_delay_micros.load(Ordering::SeqCst);
588            let delay_points = Self::duration_micros_to_points(delay_micros, self.config.pps);
589            self.state.color_delay_line.clear();
590            for _ in 0..delay_points {
591                self.state.color_delay_line.push_back((0, 0, 0, 0));
592            }
593
594            self.state.startup_blank_remaining = self.state.startup_blank_points;
595
596            if !self.state.shutter_open {
597                if let Some(backend) = &mut self.backend {
598                    let _ = backend.set_shutter(true); // Best-effort, ignore errors
599                }
600                self.state.shutter_open = true;
601            }
602        }
603    }
604
605    /// Disarm, close shutter, and stop the backend (best-effort).
606    ///
607    /// Shared shutdown sequence used by `stop()`, `into_dac()`, and `Drop`.
608    /// All errors are ignored since this is a safety-critical shutdown path.
609    fn shutdown_backend(&mut self) {
610        let _ = self.control.disarm();
611        let _ = self.control.stop();
612        if let Some(b) = &mut self.backend {
613            let _ = b.set_shutter(false);
614            let _ = b.stop();
615        }
616    }
617
618    /// Stop the stream and terminate output.
619    ///
620    /// Disarms the output (software blanking + hardware shutter) before stopping
621    /// the backend to prevent the "freeze on last bright point" hazard.
622    /// Use `disarm()` instead if you want to keep the stream alive but safe.
623    pub fn stop(&mut self) -> Result<()> {
624        self.shutdown_backend();
625
626        // Disconnect is needed for protocols like IDN where stop() only blanks
627        // but the DAC keeps replaying its buffer until the session is closed.
628        if let Some(b) = &mut self.backend {
629            b.disconnect()?;
630        }
631
632        Ok(())
633    }
634
635    /// Consume the stream and recover the device for reuse.
636    ///
637    /// This method disarms and stops the stream (software blanking + hardware shutter),
638    /// then returns the underlying `Dac` along with the final `StreamStats`.
639    /// The device can then be used to start a new stream with different configuration.
640    ///
641    /// # Example
642    ///
643    /// ```ignore
644    /// use laser_dac::StreamConfig;
645    ///
646    /// // device: Dac, config: StreamConfig (from prior setup)
647    /// let (stream, info) = device.start_stream(config)?;
648    /// // ... stream for a while ...
649    /// let (device, stats) = stream.into_dac();
650    /// println!("Streamed {} points", stats.points_written);
651    ///
652    /// // Restart with different config
653    /// let new_config = StreamConfig::new(60_000);
654    /// let (stream2, _) = device.start_stream(new_config)?;
655    /// ```
656    pub fn into_dac(mut self) -> (Dac, StreamStats) {
657        self.shutdown_backend();
658
659        // Take the backend (leaves None, so Drop won't try to stop again)
660        let backend = self.backend.take();
661        let stats = self.state.stats.clone();
662        let reconnect_target = self
663            .reconnect_target
664            .take()
665            .or_else(|| self.reconnect_policy.take().map(|p| p.target));
666
667        let dac = Dac {
668            info: self.info.clone(),
669            backend,
670            reconnect_target,
671        };
672
673        (dac, stats)
674    }
675
676    /// Attempt to reconnect the backend using the reconnection policy.
677    ///
678    /// Opens a new device, connects, and swaps the backend. Resets timing
679    /// state for the new connection. Returns `Err` on non-retriable errors
680    /// or if stop is requested.
681    fn handle_reconnect(&mut self) -> std::result::Result<(), RunExit> {
682        let policy = self.reconnect_policy.as_ref().unwrap();
683
684        let (info, new_backend) = reconnect_backend_with_retry(
685            policy,
686            || self.control.is_stop_requested(),
687            |info, new_backend| {
688                if new_backend.is_frame_swap() {
689                    log::error!(
690                        "'{}' reconnected device is frame-swap, incompatible with streaming",
691                        policy.target.device_id
692                    );
693                    return Err(RunExit::Disconnected);
694                }
695
696                if Dac::validate_pps(&info.caps, self.config.pps).is_err() {
697                    log::error!(
698                        "'{}' config invalid for new device",
699                        policy.target.device_id
700                    );
701                    return Err(RunExit::Disconnected);
702                }
703
704                Ok(())
705            },
706            || {},
707        )?;
708
709        // Swap the backend
710        self.backend = Some(new_backend);
711        self.info = info;
712
713        // Reset all runtime state for the new connection
714        self.reset_state_for_reconnect();
715
716        // Fire on_reconnect callback
717        let policy = self.reconnect_policy.as_ref().unwrap();
718        if let Some(cb) = policy.on_reconnect.lock().unwrap().as_mut() {
719            cb(&self.info);
720        }
721
722        Ok(())
723    }
724
725    /// Reset all runtime state for a reconnected device.
726    fn reset_state_for_reconnect(&mut self) {
727        let max_points = self.info.caps.max_points_per_chunk;
728        self.state
729            .chunk_buffer
730            .resize(max_points, LaserPoint::default());
731        self.state
732            .last_chunk
733            .resize(max_points, LaserPoint::default());
734        self.state.last_chunk_len = 0;
735        self.state.shutter_open = false;
736        self.state.last_armed = false;
737        self.state.color_delay_line.clear();
738        self.state.startup_blank_remaining = 0;
739        self.state.stats.reconnect_count += 1;
740    }
741
742    /// Run the stream with the zero-allocation callback API.
743    ///
744    /// This method uses **pure buffer-driven timing**:
745    /// - Callback is invoked when `buffered <= target_buffer`
746    /// - Points requested varies based on buffer headroom (`target_points`)
747    /// - Callback fills a library-owned buffer (zero allocations in hot path)
748    ///
749    /// # Callback Contract
750    ///
751    /// The callback receives a `ChunkRequest` describing buffer state and requirements,
752    /// and a mutable slice to fill with points. It returns:
753    ///
754    /// - `ChunkResult::Filled(n)`: Wrote `n` points to the buffer
755    /// - `ChunkResult::Starved`: No data available (underrun policy applies)
756    /// - `ChunkResult::End`: Stream should end gracefully
757    ///
758    /// # Exit Conditions
759    ///
760    /// - **`RunExit::Stopped`**: Stop requested via `StreamControl::stop()`.
761    /// - **`RunExit::ProducerEnded`**: Callback returned `ChunkResult::End`.
762    /// - **`RunExit::Disconnected`**: Device disconnected.
763    ///
764    /// # Example
765    ///
766    /// ```ignore
767    /// use laser_dac::{ChunkRequest, ChunkResult, LaserPoint};
768    ///
769    /// stream.run(
770    ///     |req: &ChunkRequest, buffer: &mut [LaserPoint]| {
771    ///         let n = req.target_points;
772    ///         for i in 0..n {
773    ///             let t = req.start.as_secs_f64(req.pps) + (i as f64 / req.pps as f64);
774    ///             let angle = (t * std::f64::consts::TAU) as f32;
775    ///             buffer[i] = LaserPoint::new(angle.cos(), angle.sin(), 65535, 0, 0, 65535);
776    ///         }
777    ///         ChunkResult::Filled(n)
778    ///     },
779    ///     |err| eprintln!("Error: {}", err),
780    /// )?;
781    /// ```
782    pub fn run<F, E>(mut self, producer: F, on_error: E) -> Result<RunExit>
783    where
784        F: FnMut(&ChunkRequest, &mut [LaserPoint]) -> ChunkResult + Send + 'static,
785        E: FnMut(Error) + Send + 'static,
786    {
787        use crate::presentation::driver::{self, DriverInputs, SourceOwned};
788        use crate::presentation::FrameSessionMetrics;
789
790        let backend = self
791            .backend
792            .take()
793            .ok_or_else(|| Error::disconnected("backend already consumed"))?;
794        if backend.is_frame_swap() {
795            return Err(Error::invalid_config(
796                "Stream::run is FIFO-only; use start_frame_session for frame-swap DACs",
797            ));
798        }
799
800        let max_points = self.info.caps.max_points_per_chunk;
801        let chunk_producer = chunk_producer::ChunkProducer::new(
802            producer,
803            self.control.clone(),
804            self.config.idle_policy.clone(),
805            self.config.startup_blank,
806            max_points,
807        );
808
809        let validator = Self::build_reconnect_validator();
810        let metrics = FrameSessionMetrics::new(true);
811        // Move the control receiver out of self; the Receiver isn't Clone so we
812        // swap in a fresh dummy channel that nothing will drive.
813        let (_dummy_tx, dummy_rx) = mpsc::channel();
814        let control_rx = std::mem::replace(&mut self.control_rx, dummy_rx);
815
816        driver::run(DriverInputs {
817            backend,
818            source: SourceOwned::Fifo(Box::new(chunk_producer)),
819            control: self.control.clone(),
820            control_rx,
821            metrics,
822            reconnect_policy: self.reconnect_policy.take(),
823            validator,
824            error_sink: Box::new(on_error),
825            target_buffer: self.config.target_buffer,
826            drain_timeout: self.config.drain_timeout,
827            pending_frame: None,
828        })
829    }
830
831    fn build_reconnect_validator() -> crate::presentation::driver::ReconnectValidator {
832        Box::new(
833            move |_info: &DacInfo, new_backend: &BackendKind, pps: u32| {
834                if new_backend.is_frame_swap() {
835                    log::error!("reconnected device is frame-swap, incompatible with streaming");
836                    return Err(RunExit::Disconnected);
837                }
838                if Dac::validate_pps(new_backend.caps(), pps).is_err() {
839                    log::error!("reconnected device PPS range incompatible with stream config");
840                    return Err(RunExit::Disconnected);
841                }
842                Ok(())
843            },
844        )
845    }
846
847    /// Legacy inline scheduler retained for tests that drive the helpers
848    /// directly. Phase 4 routes the production path through the unified
849    /// driver above; this body is unreachable from `run`.
850    #[cfg(test)]
851    #[allow(dead_code)]
852    fn run_legacy<F, E>(mut self, mut producer: F, mut on_error: E) -> Result<RunExit>
853    where
854        F: FnMut(&ChunkRequest, &mut [LaserPoint]) -> ChunkResult + Send + 'static,
855        E: FnMut(Error) + Send + 'static,
856    {
857        use std::time::Instant;
858
859        let mut max_points = self.info.caps.max_points_per_chunk;
860        let mut last_stats_log = Instant::now();
861
862        loop {
863            // 1. Check for stop request
864            if self.control.is_stop_requested() {
865                return Ok(RunExit::Stopped);
866            }
867
868            self.config.pps = self.control.pps();
869            let pps = self.config.pps as f64;
870            self.state.target_buffer_points = (self.state.target_buffer_secs * pps) as u64;
871            self.state.startup_blank_points = Self::duration_micros_to_points(
872                self.config.startup_blank.as_micros() as u64,
873                self.config.pps,
874            );
875
876            // 2. Estimate buffer state — the protocol-owned BufferEstimator is
877            // authoritative; backends update their estimator from inside
878            // try_write_points so it decays naturally without a separate
879            // scheduler-side timer.
880            let now = Instant::now();
881            let buffered = self.estimate_buffer_points();
882            let target_points = self.scheduler_target_buffer_points();
883
884            // Log scheduler state periodically (~2Hz)
885            if now.duration_since(last_stats_log) >= Duration::from_millis(500) {
886                let target_ms = target_points as f64 / pps * 1000.0;
887                log::debug!(
888                    "scheduler: target={:.1}ms buffered={} target_pts={}",
889                    target_ms,
890                    buffered,
891                    target_points,
892                );
893                last_stats_log = now;
894            }
895
896            // 3. If buffer is above target, sleep until it drains to target
897            // Note: use > not >= so we call producer when exactly at target
898            if buffered > target_points {
899                let excess_points = buffered - target_points;
900                let sleep_time = Duration::from_secs_f64(excess_points as f64 / pps);
901                let stop = if self.info.caps.output_model == OutputModel::UdpTimed {
902                    self.sleep_until_with_control_check(Instant::now() + sleep_time)?
903                } else {
904                    self.sleep_with_control_check(sleep_time)?
905                };
906                if stop {
907                    return Ok(RunExit::Stopped);
908                }
909                continue; // Re-check buffer after sleep
910            }
911
912            // 4. Check backend connection
913            let disconnected = match &self.backend {
914                Some(b) => !b.is_connected(),
915                None => true,
916            };
917            if disconnected {
918                if self.reconnect_policy.is_some() {
919                    match self.handle_reconnect() {
920                        Ok(()) => {
921                            max_points = self.info.caps.max_points_per_chunk;
922                            continue;
923                        }
924                        Err(exit) => return Ok(exit),
925                    }
926                }
927                log::warn!("backend disconnected, exiting");
928                on_error(Error::disconnected("backend disconnected"));
929                return Ok(RunExit::Disconnected);
930            }
931
932            // 5. Process control messages before calling producer
933            if self.process_control_messages() {
934                return Ok(RunExit::Stopped);
935            }
936
937            // 6. Build fill request with buffer state (reuse cached estimate)
938            let req = self.build_fill_request(max_points, buffered);
939
940            // 7. Call producer with pre-allocated buffer
941            let buffer = &mut self.state.chunk_buffer[..max_points];
942            let result = producer(&req, buffer);
943
944            // 8. Handle result
945            match result {
946                ChunkResult::Filled(n) => {
947                    // Validate n doesn't exceed buffer
948                    let n = n.min(max_points);
949
950                    // Treat Filled(0) with target_points > 0 as Starved
951                    if n == 0 && req.target_points > 0 {
952                        self.handle_underrun(&req)?;
953                        continue;
954                    }
955
956                    // Write to backend if we have points
957                    if n > 0 {
958                        match self.write_fill_points(n, &mut on_error) {
959                            Ok(()) => {}
960                            Err(e) if e.is_disconnected() && self.reconnect_policy.is_some() => {
961                                match self.handle_reconnect() {
962                                    Ok(()) => {
963                                        max_points = self.info.caps.max_points_per_chunk;
964                                        continue;
965                                    }
966                                    Err(exit) => return Ok(exit),
967                                }
968                            }
969                            Err(e) => return Err(e),
970                        }
971                    }
972                }
973                ChunkResult::Starved => {
974                    self.handle_underrun(&req)?;
975                }
976                ChunkResult::End => {
977                    // Graceful shutdown: let queued points drain, then blank/park
978                    self.drain_and_blank();
979                    return Ok(RunExit::ProducerEnded);
980                }
981            }
982        }
983    }
984
985    /// Sleep for the given duration while checking for control messages.
986    ///
987    /// Returns `true` if stop was requested, `false` otherwise.
988    fn sleep_with_control_check(&mut self, duration: Duration) -> Result<bool> {
989        const SLEEP_SLICE: Duration = Duration::from_millis(2);
990        let mut remaining = duration;
991
992        while remaining > Duration::ZERO {
993            let slice = remaining.min(SLEEP_SLICE);
994            std::thread::sleep(slice);
995            remaining = remaining.saturating_sub(slice);
996
997            if self.process_control_messages() {
998                return Ok(true);
999            }
1000        }
1001
1002        Ok(false)
1003    }
1004
1005    /// Sleep until a deadline with finer granularity for timed-UDP pacing.
1006    ///
1007    /// Uses coarse sleeps first, then yields near the deadline to reduce wake-up jitter.
1008    fn sleep_until_with_control_check(&mut self, deadline: std::time::Instant) -> Result<bool> {
1009        loop {
1010            let now = std::time::Instant::now();
1011            if now >= deadline {
1012                return Ok(false);
1013            }
1014
1015            let remaining = deadline.duration_since(now);
1016            if let Some(slice) = Self::udp_timed_sleep_slice(remaining) {
1017                std::thread::sleep(slice);
1018            } else {
1019                std::thread::yield_now();
1020            }
1021
1022            if self.process_control_messages() {
1023                return Ok(true);
1024            }
1025        }
1026    }
1027
1028    /// Write points from chunk_buffer to the backend.
1029    ///
1030    /// Called by `run` after the producer fills the buffer.
1031    fn write_fill_points<E>(&mut self, n: usize, on_error: &mut E) -> Result<()>
1032    where
1033        E: FnMut(Error),
1034    {
1035        let is_armed = self.control.is_armed();
1036        let pps = self.config.pps;
1037
1038        // Handle shutter transitions
1039        self.handle_shutter_transition(is_armed);
1040
1041        // When disarmed, apply idle policy (park scanners instead of tracing shapes)
1042        if !is_armed {
1043            let park = match &self.config.idle_policy {
1044                IdlePolicy::Park { x, y } => LaserPoint::blanked(*x, *y),
1045                // RepeatLast falls back to Blank when disarmed — repeating lit
1046                // content on a disarmed stream is never correct.
1047                _ => LaserPoint::blanked(0.0, 0.0),
1048            };
1049            self.state.chunk_buffer[..n].fill(park);
1050        }
1051
1052        // Apply startup blanking: force first N points after arming to blank
1053        if is_armed && self.state.startup_blank_remaining > 0 {
1054            let blank_count = n.min(self.state.startup_blank_remaining);
1055            for p in &mut self.state.chunk_buffer[..blank_count] {
1056                p.r = 0;
1057                p.g = 0;
1058                p.b = 0;
1059                p.intensity = 0;
1060            }
1061            self.state.startup_blank_remaining -= blank_count;
1062        }
1063
1064        // Apply color delay: read current setting, resize deque, shift colors
1065        let delay_micros = self.control.inner.color_delay_micros.load(Ordering::SeqCst);
1066        let color_delay_points = Self::duration_micros_to_points(delay_micros, self.config.pps);
1067
1068        if color_delay_points > 0 {
1069            // Resize deque to match current delay (handles dynamic changes)
1070            self.state
1071                .color_delay_line
1072                .resize(color_delay_points, (0, 0, 0, 0));
1073            for p in &mut self.state.chunk_buffer[..n] {
1074                self.state
1075                    .color_delay_line
1076                    .push_back((p.r, p.g, p.b, p.intensity));
1077                let (r, g, b, i) = self.state.color_delay_line.pop_front().unwrap();
1078                p.r = r;
1079                p.g = g;
1080                p.b = b;
1081                p.intensity = i;
1082            }
1083        } else if !self.state.color_delay_line.is_empty() {
1084            // Delay was disabled at runtime — flush the line
1085            self.state.color_delay_line.clear();
1086        }
1087
1088        // Try to write with backpressure handling
1089        loop {
1090            // Check backend exists
1091            let backend = match self.backend.as_mut() {
1092                Some(b) => b,
1093                None => return Err(Error::disconnected("no backend")),
1094            };
1095
1096            match backend.try_write(pps, &self.state.chunk_buffer[..n]) {
1097                Ok(WriteOutcome::Written) => {
1098                    self.record_write(n, is_armed);
1099                    return Ok(());
1100                }
1101                Ok(WriteOutcome::WouldBlock) => {
1102                    // Backend buffer full - yield and retry
1103                    // Borrow of backend is dropped here, so we can call process_control_messages
1104                }
1105                Err(e) if e.is_stopped() => {
1106                    return Err(Error::Stopped);
1107                }
1108                Err(e) if e.is_disconnected() => {
1109                    log::warn!("write got Disconnected error, exiting stream: {e}");
1110                    on_error(Error::disconnected("backend disconnected"));
1111                    return Err(e);
1112                }
1113                Err(e) => {
1114                    log::warn!("write error, disconnecting backend: {e}");
1115                    let _ = backend.disconnect();
1116                    on_error(e);
1117                    return Ok(());
1118                }
1119            }
1120
1121            // Handle WouldBlock: yield and process control messages
1122            std::thread::yield_now();
1123            if self.process_control_messages() {
1124                return Err(Error::Stopped);
1125            }
1126            std::thread::sleep(Duration::from_micros(100));
1127        }
1128    }
1129
1130    /// Handle underrun by applying the idle policy.
1131    fn handle_underrun(&mut self, req: &ChunkRequest) -> Result<()> {
1132        self.state.stats.underrun_count += 1;
1133
1134        let is_armed = self.control.is_armed();
1135        self.handle_shutter_transition(is_armed);
1136
1137        // Calculate how many points we need (use target_points as the fill amount)
1138        let n_points = req.target_points.max(1);
1139
1140        // Determine the fill point based on arm state and idle policy
1141        if is_armed {
1142            match &self.config.idle_policy {
1143                IdlePolicy::Stop => {
1144                    self.control.stop()?;
1145                    return Err(Error::Stopped);
1146                }
1147                IdlePolicy::RepeatLast if self.state.last_chunk_len > 0 => {
1148                    for i in 0..n_points {
1149                        self.state.chunk_buffer[i] =
1150                            self.state.last_chunk[i % self.state.last_chunk_len];
1151                    }
1152                }
1153                IdlePolicy::Park { x, y } => {
1154                    let park = LaserPoint::blanked(*x, *y);
1155                    self.state.chunk_buffer[..n_points].fill(park);
1156                }
1157                // Blank, or RepeatLast with no stored chunk
1158                _ => {
1159                    self.state.chunk_buffer[..n_points].fill(LaserPoint::blanked(0.0, 0.0));
1160                }
1161            }
1162        } else {
1163            // When disarmed, apply idle policy for scanner parking
1164            let park = match &self.config.idle_policy {
1165                IdlePolicy::Park { x, y } => LaserPoint::blanked(*x, *y),
1166                _ => LaserPoint::blanked(0.0, 0.0),
1167            };
1168            self.state.chunk_buffer[..n_points].fill(park);
1169        }
1170
1171        // Write the fill points
1172        if let Some(backend) = &mut self.backend {
1173            match backend.try_write(self.config.pps, &self.state.chunk_buffer[..n_points]) {
1174                Ok(WriteOutcome::Written) => {
1175                    self.record_write(n_points, is_armed);
1176                }
1177                Ok(WriteOutcome::WouldBlock) => {
1178                    // Backend is full - expected during underrun
1179                }
1180                Err(_) => {
1181                    // Backend error during underrun handling - ignore
1182                }
1183            }
1184        }
1185
1186        Ok(())
1187    }
1188
1189    /// Record a successful write: update last_chunk, timebase, and stats.
1190    ///
1191    /// Buffer-fullness bookkeeping lives inside the backend's `BufferEstimator`
1192    /// (driven from `try_write_points`), so this method does not touch it.
1193    fn record_write(&mut self, n: usize, is_armed: bool) {
1194        if is_armed {
1195            debug_assert!(
1196                n <= self.state.last_chunk.len(),
1197                "n ({}) exceeds last_chunk capacity ({})",
1198                n,
1199                self.state.last_chunk.len()
1200            );
1201            self.state.last_chunk[..n].copy_from_slice(&self.state.chunk_buffer[..n]);
1202            self.state.last_chunk_len = n;
1203        }
1204        self.state.current_instant += n as u64;
1205        self.state.stats.chunks_written += 1;
1206        self.state.stats.points_written += n as u64;
1207    }
1208
1209    // =========================================================================
1210    // Internal helpers
1211    // =========================================================================
1212
1213    /// Process any pending control messages from StreamControl.
1214    ///
1215    /// This method drains the control message queue and takes immediate action:
1216    /// - `Arm`: Opens the shutter (best-effort)
1217    /// - `Disarm`: Closes the shutter immediately
1218    /// - `Stop`: Returns `true` to signal the caller to stop
1219    ///
1220    /// Returns `true` if stop was requested, `false` otherwise.
1221    fn process_control_messages(&mut self) -> bool {
1222        loop {
1223            match self.control_rx.try_recv() {
1224                Ok(ControlMsg::Arm) => {
1225                    if !self.state.shutter_open {
1226                        if let Some(backend) = &mut self.backend {
1227                            let _ = backend.set_shutter(true);
1228                        }
1229                        self.state.shutter_open = true;
1230                    }
1231                }
1232                Ok(ControlMsg::Disarm) => {
1233                    if self.state.shutter_open {
1234                        if let Some(backend) = &mut self.backend {
1235                            let _ = backend.set_shutter(false);
1236                        }
1237                        self.state.shutter_open = false;
1238                    }
1239                }
1240                Ok(ControlMsg::Stop) => {
1241                    return true;
1242                }
1243                Err(TryRecvError::Empty) => break,
1244                Err(TryRecvError::Disconnected) => break,
1245            }
1246        }
1247        false
1248    }
1249
1250    /// Estimate the current buffer level in points via the backend's
1251    /// [`BufferEstimator`].
1252    ///
1253    /// Each FIFO backend owns a strategy (status-anchored, dual-track ACK,
1254    /// runtime-authority, or pure software) and updates it from inside its
1255    /// own protocol code. The scheduler trusts that single source of truth.
1256    fn estimate_buffer_points(&self) -> u64 {
1257        let pps = self.config.pps;
1258        let now = std::time::Instant::now();
1259        self.backend
1260            .as_ref()
1261            .and_then(|b| b.estimator())
1262            .map_or(0, |e| e.estimated_fullness(now, pps))
1263    }
1264
1265    /// Build a `ChunkRequest` with the calculated `target_points` for the
1266    /// next chunk. Retained for the legacy in-`Stream` test helpers; the
1267    /// production path lives in [`crate::presentation::driver::run`].
1268    fn build_fill_request(&self, max_points: usize, buffered_points: u64) -> ChunkRequest {
1269        let pps = self.config.pps;
1270        let pps_f64 = pps as f64;
1271        let start = self.state.current_instant;
1272
1273        let target_points = if self.info.caps.output_model == OutputModel::UdpTimed {
1274            max_points
1275        } else {
1276            let buffered_secs = buffered_points as f64 / pps_f64;
1277            let deficit_target = (self.state.target_buffer_secs - buffered_secs).max(0.0);
1278            ((deficit_target * pps_f64).ceil() as usize).min(max_points)
1279        };
1280
1281        ChunkRequest {
1282            start,
1283            pps,
1284            target_points,
1285        }
1286    }
1287
1288    /// Wait for queued points to drain, then blank/park the laser.
1289    ///
1290    /// Called on graceful shutdown (`ChunkResult::End`) to let buffered content
1291    /// play out before stopping. Uses `drain_timeout` from config to cap the wait.
1292    /// Polls the backend's [`BufferEstimator`] until the estimated fullness
1293    /// reaches zero or the timeout elapses.
1294    fn drain_and_blank(&mut self) {
1295        use std::time::Instant;
1296
1297        let timeout = self.config.drain_timeout;
1298        if timeout.is_zero() {
1299            // Skip drain entirely if timeout is zero
1300            self.blank_and_close_shutter();
1301            return;
1302        }
1303
1304        let deadline = Instant::now() + timeout;
1305        const POLL_INTERVAL: Duration = Duration::from_millis(5);
1306        while Instant::now() < deadline {
1307            if self.estimate_buffer_points() == 0 {
1308                break;
1309            }
1310
1311            // Process control messages during drain (allow stop to interrupt)
1312            if self.process_control_messages() {
1313                break;
1314            }
1315
1316            std::thread::sleep(POLL_INTERVAL);
1317        }
1318
1319        self.blank_and_close_shutter();
1320    }
1321
1322    /// Output blank points and close the hardware shutter.
1323    ///
1324    /// Best-effort safety shutdown - errors are ignored since we're already
1325    /// in shutdown path.
1326    fn blank_and_close_shutter(&mut self) {
1327        // Close shutter (best-effort)
1328        if let Some(b) = &mut self.backend {
1329            let _ = b.set_shutter(false);
1330        }
1331        self.state.shutter_open = false;
1332
1333        // Output a small blank chunk to ensure laser is off
1334        // (some DACs may hold the last point otherwise)
1335        if let Some(b) = &mut self.backend {
1336            let blank_point = LaserPoint::blanked(0.0, 0.0);
1337            let blank_chunk = [blank_point; 16];
1338            let _ = b.try_write(self.config.pps, &blank_chunk);
1339        }
1340    }
1341}
1342
1343impl Drop for Stream {
1344    fn drop(&mut self) {
1345        let _ = self.stop();
1346    }
1347}
1348
1349// =============================================================================
1350// Device
1351// =============================================================================
1352
1353/// A connected device that can start streaming sessions.
1354///
1355/// When starting a stream, the device is consumed and the backend ownership
1356/// transfers to the stream. The `DacInfo` is returned alongside the stream
1357/// so metadata remains accessible.
1358///
1359/// # Example
1360///
1361/// ```ignore
1362/// use laser_dac::{open_device, StreamConfig};
1363///
1364/// let device = open_device("my-device")?;
1365/// let config = StreamConfig::new(30_000);
1366/// let (stream, info) = device.start_stream(config)?;
1367/// println!("Streaming to: {}", info.name);
1368/// ```
1369pub struct Dac {
1370    info: DacInfo,
1371    backend: Option<BackendKind>,
1372    pub(crate) reconnect_target: Option<ReconnectTarget>,
1373}
1374
1375impl Dac {
1376    /// Create a new device from a backend.
1377    pub fn new(info: DacInfo, backend: BackendKind) -> Self {
1378        Self {
1379            info,
1380            backend: Some(backend),
1381            reconnect_target: None,
1382        }
1383    }
1384
1385    /// Set a custom discovery factory for reconnection.
1386    ///
1387    /// When reconnection is enabled (via [`crate::FrameSessionConfig::with_reconnect`] or
1388    /// [`StreamConfig::with_reconnect`]), the factory is called to create a
1389    /// [`DacDiscovery`] instance for each reconnection attempt. This is required
1390    /// for custom backends registered via [`DacDiscovery::register`] — without it,
1391    /// reconnection uses the default discovery which only finds built-in DAC types.
1392    ///
1393    /// For most cases, prefer [`open_device_with`](crate::open_device_with) which
1394    /// handles both initial discovery and reconnection in one call. Use this method
1395    /// when you build `Dac` instances yourself via `scan()` + `connect()` + `Dac::new()`.
1396    ///
1397    /// # Example
1398    ///
1399    /// ```ignore
1400    /// use laser_dac::{Dac, DacDiscovery, EnabledDacTypes, FrameSessionConfig, ReconnectConfig};
1401    ///
1402    /// // Device opened through custom discovery path
1403    /// let mut discovery = DacDiscovery::new(EnabledDacTypes::all());
1404    /// discovery.register(Box::new(MyCustomDiscoverer::new()));
1405    /// let devices = discovery.scan();
1406    /// let backend = discovery.connect(devices.into_iter().next().unwrap())?;
1407    /// let dac = Dac::new(info, backend);
1408    ///
1409    /// // Attach factory so reconnection can also find custom backends
1410    /// let dac = dac.with_discovery_factory(|| {
1411    ///     let mut d = DacDiscovery::new(EnabledDacTypes::all());
1412    ///     d.register(Box::new(MyCustomDiscoverer::new()));
1413    ///     d
1414    /// });
1415    ///
1416    /// let config = FrameSessionConfig::new(30_000)
1417    ///     .with_reconnect(ReconnectConfig::new());
1418    /// let (session, _info) = dac.start_frame_session(config)?;
1419    /// ```
1420    pub fn with_discovery_factory<F>(mut self, factory: F) -> Self
1421    where
1422        F: Fn() -> DacDiscovery + Send + 'static,
1423    {
1424        match self.reconnect_target {
1425            Some(ref mut target) => {
1426                target.discovery_factory = Some(Box::new(factory));
1427            }
1428            None => {
1429                self.reconnect_target = Some(ReconnectTarget {
1430                    device_id: self.info.id.clone(),
1431                    discovery_factory: Some(Box::new(factory)),
1432                });
1433            }
1434        }
1435        self
1436    }
1437
1438    /// Returns the device info.
1439    pub fn info(&self) -> &DacInfo {
1440        &self.info
1441    }
1442
1443    /// Returns the device ID.
1444    pub fn id(&self) -> &str {
1445        &self.info.id
1446    }
1447
1448    /// Returns the device name.
1449    pub fn name(&self) -> &str {
1450        &self.info.name
1451    }
1452
1453    /// Returns the DAC type.
1454    pub fn kind(&self) -> &DacType {
1455        &self.info.kind
1456    }
1457
1458    /// Returns the device capabilities.
1459    pub fn caps(&self) -> &DacCapabilities {
1460        &self.info.caps
1461    }
1462
1463    /// Returns whether the device has a backend (not yet used for a stream).
1464    pub fn has_backend(&self) -> bool {
1465        self.backend.is_some()
1466    }
1467
1468    /// Consume the Dac and return the backend, if available.
1469    pub(crate) fn into_backend(mut self) -> Option<BackendKind> {
1470        self.backend.take()
1471    }
1472
1473    /// Returns whether the device is connected.
1474    pub fn is_connected(&self) -> bool {
1475        self.backend.as_ref().is_some_and(|b| b.is_connected())
1476    }
1477
1478    /// Starts a streaming session, consuming the device.
1479    ///
1480    /// # Ownership
1481    ///
1482    /// This method consumes the `Dac` because:
1483    /// - Each device can only have one active stream at a time.
1484    /// - The backend is moved into the `Stream` to ensure exclusive access.
1485    /// - This prevents accidental reuse of a device that's already streaming.
1486    ///
1487    /// The method returns both the `Stream` and a copy of `DacInfo`, so you
1488    /// retain access to device metadata (id, name, capabilities) after starting.
1489    ///
1490    /// # Connection
1491    ///
1492    /// If the device is not already connected, this method will establish the
1493    /// connection before creating the stream. Connection failures are returned
1494    /// as errors.
1495    ///
1496    /// # Errors
1497    ///
1498    /// Returns an error if:
1499    /// - The device backend has already been used for a stream.
1500    /// - The configuration is invalid (PPS out of range, invalid chunk size, etc.).
1501    /// - The backend fails to connect.
1502    pub fn start_stream(mut self, mut cfg: StreamConfig) -> Result<(Stream, DacInfo)> {
1503        // Extract reconnect config before consuming cfg
1504        let reconnect_config = cfg.reconnect.take();
1505
1506        let mut backend = self.backend.take().ok_or_else(|| {
1507            Error::invalid_config("device backend has already been used for a stream")
1508        })?;
1509
1510        if backend.is_frame_swap() {
1511            return Err(Error::invalid_config(
1512                "streaming is not supported on frame-swap DACs (e.g. Helios); \
1513                 use start_frame_session() instead",
1514            ));
1515        }
1516
1517        let cfg = Self::apply_backend_buffer_defaults(&self.info.caps, cfg);
1518
1519        Self::validate_pps(&self.info.caps, cfg.pps)?;
1520
1521        // Connect the backend if not already connected
1522        if !backend.is_connected() {
1523            backend.connect()?;
1524        }
1525
1526        let mut stream = Stream::with_backend(self.info.clone(), backend, cfg);
1527
1528        // Always preserve the reopen target on the stream (for into_dac recovery)
1529        stream.reconnect_target = self.reconnect_target.take();
1530
1531        // Wire reconnect policy if configured
1532        if let Some(rc) = reconnect_config {
1533            let target = stream.reconnect_target.take().ok_or_else(|| {
1534                Error::invalid_config("reconnect requires a reconnect target — use open_device(), open_device_with(), or Dac::with_discovery_factory()")
1535            })?;
1536            stream.reconnect_policy = Some(ReconnectPolicy::new(rc, target));
1537        }
1538
1539        Ok((stream, self.info))
1540    }
1541
1542    fn apply_backend_buffer_defaults(
1543        caps: &DacCapabilities,
1544        mut cfg: StreamConfig,
1545    ) -> StreamConfig {
1546        if cfg.target_buffer == StreamConfig::DEFAULT_TARGET_BUFFER
1547            && matches!(
1548                caps.output_model,
1549                OutputModel::NetworkFifo | OutputModel::UdpTimed
1550            )
1551        {
1552            cfg.target_buffer = StreamConfig::NETWORK_DEFAULT_TARGET_BUFFER;
1553        }
1554
1555        cfg
1556    }
1557
1558    fn validate_pps(caps: &DacCapabilities, pps: u32) -> Result<()> {
1559        if pps < caps.pps_min || pps > caps.pps_max {
1560            return Err(Error::invalid_config(format!(
1561                "PPS {} is outside device range [{}, {}]",
1562                pps, caps.pps_min, caps.pps_max
1563            )));
1564        }
1565
1566        Ok(())
1567    }
1568
1569    /// Starts a frame-mode session, consuming the device.
1570    ///
1571    /// Similar to [`start_stream`](Self::start_stream) but uses the frame-first
1572    /// API where you submit complete [`crate::presentation::Frame`]s instead of filling
1573    /// point buffers via callback.
1574    ///
1575    /// Returns a [`crate::presentation::FrameSession`] that owns the scheduler thread and a
1576    /// [`DacInfo`] with device metadata.
1577    pub fn start_frame_session(
1578        mut self,
1579        mut config: crate::presentation::FrameSessionConfig,
1580    ) -> Result<(crate::presentation::FrameSession, DacInfo)> {
1581        let reconnect_config = config.reconnect.take();
1582
1583        let backend = self.backend.take().ok_or_else(|| {
1584            Error::invalid_config("device backend has already been used for a session")
1585        })?;
1586
1587        Self::validate_pps(backend.caps(), config.pps)?;
1588
1589        let reconnect_policy = match reconnect_config {
1590            Some(rc) => {
1591                let target = self.reconnect_target.take().ok_or_else(|| {
1592                    Error::invalid_config("reconnect requires a reconnect target — use open_device(), open_device_with(), or Dac::with_discovery_factory()")
1593                })?;
1594                Some(ReconnectPolicy::new(rc, target))
1595            }
1596            None => None,
1597        };
1598
1599        let session = crate::presentation::FrameSession::start(backend, config, reconnect_policy)?;
1600        Ok((session, self.info))
1601    }
1602}
1603
1604#[cfg(test)]
1605mod tests;