laser_dac/stream/mod.rs
1//! Stream and Dac types for point output.
2//!
3//! This module provides the `Stream` type for streaming point chunks to a DAC,
4//! `StreamControl` for out-of-band control (arm/disarm/stop), and `Dac` for
5//! connected devices that can start streaming sessions.
6//!
7//! # Armed/Disarmed Model
8//!
9//! Streams use a binary armed/disarmed safety model:
10//!
11//! - **Armed**: Content passes through to device, shutter opened (best-effort).
12//! - **Disarmed** (default): Shutter closed, intensity/RGB forced to 0.
13//!
14//! All streams start disarmed. Call `arm()` to enable output.
15//! `arm()` and `disarm()` are the only safety controls — there is no separate
16//! shutter API. This keeps the mental model simple: armed = laser may emit.
17//!
18//! # Hardware Shutter Support
19//!
20//! Shutter control is best-effort and varies by backend:
21//! - **LaserCube USB/WiFi**: Actual hardware control
22//! - **Helios**: Hardware shutter control via USB interrupt
23//! - **Ether Dream, IDN**: No-op (safety relies on software blanking)
24//!
25//! # Disconnect Behavior
26//!
27//! By default, streams do not reconnect — on disconnect, `run()` returns
28//! `RunExit::Disconnected`. Configure reconnection via
29//! [`StreamConfig::with_reconnect`](crate::StreamConfig::with_reconnect) or
30//! [`FrameSessionConfig::with_reconnect`](crate::FrameSessionConfig::with_reconnect).
31//! New streams always start disarmed for safety.
32
33use std::collections::VecDeque;
34use std::sync::atomic::{AtomicBool, AtomicU32, AtomicU64, Ordering};
35use std::sync::mpsc::{self, Receiver, Sender, TryRecvError};
36use std::sync::{Arc, Mutex};
37use std::time::Duration;
38
39use crate::backend::{BackendKind, Error, Result, WriteOutcome};
40use crate::config::{IdlePolicy, StreamConfig};
41use crate::device::{DacCapabilities, DacInfo, DacType, OutputModel};
42use crate::discovery::DacDiscovery;
43use crate::point::LaserPoint;
44use crate::reconnect::{reconnect_backend_with_retry, ReconnectPolicy, ReconnectTarget};
45
46pub(crate) mod chunk_producer;
47
48// =============================================================================
49// Stream protocol types
50// =============================================================================
51
52#[cfg(feature = "serde")]
53use serde::{Deserialize, Serialize};
54
55/// Represents a point in stream time, anchored to estimated playback position.
56///
57/// `StreamInstant` represents the **estimated playback time** of points, not merely
58/// "points sent so far." When used in `ChunkRequest::start`, it represents:
59///
60/// `start` = playhead + buffered
61///
62/// Where:
63/// - `playhead` = stream_epoch + estimated_consumed_points
64/// - `buffered` = points sent but not yet played
65///
66/// This allows callbacks to generate content for the exact time it will be displayed,
67/// enabling accurate audio synchronization.
68#[derive(Clone, Copy, Debug, Default, PartialEq, Eq, PartialOrd, Ord, Hash)]
69#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))]
70pub struct StreamInstant(pub u64);
71
72impl StreamInstant {
73 /// Create a new stream instant from a point count.
74 pub fn new(points: u64) -> Self {
75 Self(points)
76 }
77
78 /// Returns the number of points since stream start.
79 pub fn points(&self) -> u64 {
80 self.0
81 }
82
83 /// Convert this instant to seconds at the given points-per-second rate.
84 pub fn as_seconds(&self, pps: u32) -> f64 {
85 self.0 as f64 / pps as f64
86 }
87
88 /// Convert to seconds at the given PPS.
89 ///
90 /// This is an alias for `as_seconds()` for consistency with standard Rust
91 /// duration naming conventions (e.g., `Duration::as_secs_f64()`).
92 #[inline]
93 pub fn as_secs_f64(&self, pps: u32) -> f64 {
94 self.as_seconds(pps)
95 }
96
97 /// Create a stream instant from a duration in seconds at the given PPS.
98 pub fn from_seconds(seconds: f64, pps: u32) -> Self {
99 Self((seconds * pps as f64) as u64)
100 }
101
102 /// Add a number of points to this instant.
103 pub fn add_points(&self, points: u64) -> Self {
104 Self(self.0.saturating_add(points))
105 }
106
107 /// Subtract a number of points from this instant (saturating at 0).
108 pub fn sub_points(&self, points: u64) -> Self {
109 Self(self.0.saturating_sub(points))
110 }
111}
112
113impl std::ops::Add<u64> for StreamInstant {
114 type Output = Self;
115 fn add(self, rhs: u64) -> Self::Output {
116 self.add_points(rhs)
117 }
118}
119
120impl std::ops::Sub<u64> for StreamInstant {
121 type Output = Self;
122 fn sub(self, rhs: u64) -> Self::Output {
123 self.sub_points(rhs)
124 }
125}
126
127impl std::ops::AddAssign<u64> for StreamInstant {
128 fn add_assign(&mut self, rhs: u64) {
129 self.0 = self.0.saturating_add(rhs);
130 }
131}
132
133impl std::ops::SubAssign<u64> for StreamInstant {
134 fn sub_assign(&mut self, rhs: u64) {
135 self.0 = self.0.saturating_sub(rhs);
136 }
137}
138
139/// A request to fill a buffer with points for streaming.
140///
141/// The callback receives a `ChunkRequest` describing the next chunk's timing
142/// requirements and fills points into a library-owned buffer.
143///
144/// `target_points` is the ideal number of points to reach the target buffer
145/// level, clamped to the provided buffer length. Returning fewer points
146/// triggers the configured [`IdlePolicy`] for the
147/// remainder.
148#[derive(Clone, Debug)]
149pub struct ChunkRequest {
150 /// Estimated playback time when this chunk starts.
151 ///
152 /// Use this for audio synchronization.
153 pub start: StreamInstant,
154
155 /// Points per second (current value, may change via `StreamControl::set_pps`).
156 pub pps: u32,
157
158 /// Ideal number of points to reach target buffer level.
159 ///
160 /// Calculated as: `ceil((target_buffer - buffered) * pps)`, clamped to buffer length.
161 pub target_points: usize,
162}
163
164/// Result returned by the fill callback indicating how the buffer was filled.
165///
166/// This enum allows the callback to communicate three distinct states:
167/// - Successfully filled some number of points
168/// - Temporarily unable to provide data (underrun policy applies)
169/// - Stream should end gracefully
170///
171/// # `Filled(0)` Semantics
172///
173/// - If `target_points == 0`: Buffer is full, nothing needed. This is fine.
174/// - If `target_points > 0`: We needed points but got none. Treated as `Starved`.
175#[derive(Clone, Copy, Debug, PartialEq, Eq)]
176pub enum ChunkResult {
177 /// Wrote n points to the buffer.
178 ///
179 /// `n` must be <= `buffer.len()`. Partial fills (`n < target_points`) are
180 /// accepted; the remainder is filled by the configured idle policy.
181 Filled(usize),
182
183 /// No data available right now.
184 ///
185 /// Underrun policy is applied (repeat last chunk or blank).
186 /// Stream continues; callback will be called again when buffer needs filling.
187 Starved,
188
189 /// Stream is finished. Shutdown sequence:
190 /// 1. Stop calling callback
191 /// 2. Let queued points drain (play out)
192 /// 3. Blank/park the laser at last position
193 /// 4. Return from stream() with `RunExit::ProducerEnded`
194 End,
195}
196
197/// Current status of a stream.
198#[derive(Clone, Debug)]
199pub struct StreamStatus {
200 /// Whether the device is connected.
201 pub connected: bool,
202 /// Library-owned scheduled amount.
203 pub scheduled_ahead_points: u64,
204 /// Best-effort device/backend estimate.
205 pub device_queued_points: Option<u64>,
206 /// Optional statistics for diagnostics.
207 pub stats: Option<StreamStats>,
208}
209
210/// Stream statistics for diagnostics and debugging.
211#[derive(Clone, Debug, Default)]
212pub struct StreamStats {
213 /// Number of times the stream underran.
214 pub underrun_count: u64,
215 /// Number of chunks that arrived late.
216 pub late_chunk_count: u64,
217 /// Number of times the device reconnected.
218 pub reconnect_count: u64,
219 /// Total chunks written since stream start.
220 pub chunks_written: u64,
221 /// Total points written since stream start.
222 pub points_written: u64,
223}
224
225/// How a callback-mode stream run ended.
226#[derive(Clone, Debug, PartialEq, Eq)]
227pub enum RunExit {
228 /// Stream was stopped via `StreamControl::stop()`.
229 Stopped,
230 /// Producer returned `None` (graceful completion).
231 ProducerEnded,
232 /// Device disconnected. No auto-reconnect; new streams start disarmed.
233 Disconnected,
234}
235
236// =============================================================================
237// Stream Control
238// =============================================================================
239
240/// Control messages sent from StreamControl to Stream.
241///
242/// These messages allow out-of-band control actions to take effect immediately,
243/// even when the stream is waiting (pacing, backpressure, etc.).
244#[derive(Debug, Clone, Copy)]
245pub(crate) enum ControlMsg {
246 /// Arm the output (opens hardware shutter).
247 Arm,
248 /// Disarm the output (closes hardware shutter).
249 Disarm,
250 /// Request the stream to stop.
251 Stop,
252}
253
254/// Thread-safe control handle for safety-critical actions.
255///
256/// This allows out-of-band control of the stream (arm/disarm/stop) from
257/// a different thread, e.g., for E-stop functionality.
258///
259/// Control actions take effect as soon as possible - the stream processes
260/// control messages at every opportunity (during waits, between retries, etc.).
261#[derive(Clone)]
262pub struct StreamControl {
263 inner: Arc<StreamControlInner>,
264}
265
266struct StreamControlInner {
267 /// Whether output is armed (laser can fire).
268 armed: AtomicBool,
269 /// Whether a stop has been requested.
270 stop_requested: AtomicBool,
271 /// Channel for sending control messages to the stream loop.
272 /// Wrapped in Mutex because Sender is Send but not Sync.
273 control_tx: Mutex<Sender<ControlMsg>>,
274 /// Color delay in microseconds (readable per-chunk without locking).
275 color_delay_micros: AtomicU64,
276 /// Points per second (hot-swappable without session restart).
277 pps: AtomicU32,
278}
279
280impl StreamControl {
281 pub(crate) fn new(control_tx: Sender<ControlMsg>, color_delay: Duration, pps: u32) -> Self {
282 Self {
283 inner: Arc::new(StreamControlInner {
284 armed: AtomicBool::new(false),
285 stop_requested: AtomicBool::new(false),
286 control_tx: Mutex::new(control_tx),
287 color_delay_micros: AtomicU64::new(color_delay.as_micros() as u64),
288 pps: AtomicU32::new(pps),
289 }),
290 }
291 }
292
293 /// Arm the output (allow laser to fire).
294 ///
295 /// When armed, content from the producer passes through unmodified
296 /// and the hardware shutter is opened (best-effort).
297 pub fn arm(&self) -> Result<()> {
298 self.inner.armed.store(true, Ordering::SeqCst);
299 // Send message to stream for immediate shutter control
300 if let Ok(tx) = self.inner.control_tx.lock() {
301 let _ = tx.send(ControlMsg::Arm);
302 }
303 Ok(())
304 }
305
306 /// Disarm the output (force laser off). Designed for E-stop use.
307 ///
308 /// Immediately sets an atomic flag (works even if stream loop is blocked),
309 /// then sends a message to close the hardware shutter. All future points
310 /// are blanked in software. The stream stays alive outputting blanks -
311 /// use `stop()` to terminate entirely.
312 ///
313 /// **Latency**: Points already in the device buffer will still play out.
314 /// `target_buffer` bounds this latency.
315 ///
316 /// **Hardware shutter**: Best-effort. LaserCube and Helios have actual hardware
317 /// control; Ether Dream, IDN are no-ops (safety relies on software blanking).
318 pub fn disarm(&self) -> Result<()> {
319 self.inner.armed.store(false, Ordering::SeqCst);
320 // Send message to stream for immediate shutter control
321 if let Ok(tx) = self.inner.control_tx.lock() {
322 let _ = tx.send(ControlMsg::Disarm);
323 }
324 Ok(())
325 }
326
327 /// Check if the output is armed.
328 pub fn is_armed(&self) -> bool {
329 self.inner.armed.load(Ordering::SeqCst)
330 }
331
332 /// Set the color delay for scanner sync compensation.
333 ///
334 /// Takes effect within one chunk period. The delay is quantized to
335 /// whole points: `ceil(delay * pps)`.
336 pub fn set_color_delay(&self, delay: Duration) {
337 self.inner
338 .color_delay_micros
339 .store(delay.as_micros() as u64, Ordering::SeqCst);
340 }
341
342 /// Get the current color delay.
343 pub fn color_delay(&self) -> Duration {
344 Duration::from_micros(self.inner.color_delay_micros.load(Ordering::SeqCst))
345 }
346
347 /// Set the points per second rate.
348 ///
349 /// Takes effect within one chunk period — the stream loop reads this
350 /// atomically each iteration and recalculates timing on the fly.
351 /// No session restart required.
352 pub fn set_pps(&self, pps: u32) {
353 self.inner.pps.store(pps, Ordering::SeqCst);
354 }
355
356 /// Get the current points per second rate.
357 pub fn pps(&self) -> u32 {
358 self.inner.pps.load(Ordering::SeqCst)
359 }
360
361 /// Request the stream to stop.
362 ///
363 /// Signals termination; `run()` returns `RunExit::Stopped`.
364 /// For clean shutdown with shutter close, prefer `Stream::stop()`.
365 pub fn stop(&self) -> Result<()> {
366 self.inner.stop_requested.store(true, Ordering::SeqCst);
367 // Send message to stream for immediate stop
368 if let Ok(tx) = self.inner.control_tx.lock() {
369 let _ = tx.send(ControlMsg::Stop);
370 }
371 Ok(())
372 }
373
374 /// Check if a stop has been requested.
375 pub fn is_stop_requested(&self) -> bool {
376 self.inner.stop_requested.load(Ordering::SeqCst)
377 }
378}
379
380// =============================================================================
381// Stream State
382// =============================================================================
383
384/// Legacy state shared by the in-Stream helpers retained for tests. The
385/// production path lives entirely in [`crate::presentation::driver::run`]
386/// via [`chunk_producer::ChunkProducer`].
387#[allow(dead_code)]
388struct StreamState {
389 /// Current position in stream time (points since start).
390 current_instant: StreamInstant,
391
392 // Pre-allocated buffers (no per-chunk allocation in hot path)
393 /// Buffer for callback to fill points into.
394 chunk_buffer: Vec<LaserPoint>,
395 /// Last chunk for RepeatLast underrun policy.
396 last_chunk: Vec<LaserPoint>,
397 /// Number of valid points in last_chunk.
398 last_chunk_len: usize,
399
400 /// FIFO for color delay (r, g, b, intensity per point).
401 color_delay_line: VecDeque<(u16, u16, u16, u16)>,
402
403 /// Points remaining in the startup blank window (decremented as points are written).
404 startup_blank_remaining: usize,
405 /// Total startup blank points (computed once from config).
406 startup_blank_points: usize,
407
408 // Cached config-derived values (avoid recomputing per-iteration float math)
409 /// target_buffer as seconds (cached from config).
410 target_buffer_secs: f64,
411 /// target_buffer as points (cached from config + pps).
412 target_buffer_points: u64,
413
414 /// Statistics.
415 stats: StreamStats,
416 /// Track the last armed state to detect transitions.
417 last_armed: bool,
418 /// Whether the hardware shutter is currently open.
419 shutter_open: bool,
420}
421
422impl StreamState {
423 /// Create new stream state with pre-allocated buffers.
424 ///
425 /// Buffers are sized to `max_points_per_chunk` from DAC capabilities,
426 /// ensuring we can handle any catch-up scenario without reallocation.
427 fn new(
428 max_points_per_chunk: usize,
429 startup_blank_points: usize,
430 config: &StreamConfig,
431 ) -> Self {
432 let pps = config.pps as f64;
433 let target_buffer_secs = config.target_buffer.as_secs_f64();
434 Self {
435 current_instant: StreamInstant::new(0),
436 chunk_buffer: vec![LaserPoint::default(); max_points_per_chunk],
437 last_chunk: vec![LaserPoint::default(); max_points_per_chunk],
438 last_chunk_len: 0,
439 color_delay_line: VecDeque::new(),
440 startup_blank_remaining: 0,
441 startup_blank_points,
442 target_buffer_secs,
443 target_buffer_points: (target_buffer_secs * pps) as u64,
444 stats: StreamStats::default(),
445 last_armed: false,
446 shutter_open: false,
447 }
448 }
449}
450
451// =============================================================================
452// Stream
453// =============================================================================
454
455/// A streaming session for outputting points to a DAC.
456///
457/// Use [`run()`](Self::run) to stream with buffer-driven timing.
458/// The callback is invoked when the buffer needs filling, providing automatic
459/// backpressure handling and zero allocations in the hot path.
460///
461/// The stream owns pacing, backpressure, and the timebase (`StreamInstant`).
462pub struct Stream {
463 /// Device info for this stream.
464 info: DacInfo,
465 /// The backend.
466 backend: Option<BackendKind>,
467 /// Stream configuration.
468 config: StreamConfig,
469 /// Thread-safe control handle.
470 control: StreamControl,
471 /// Receiver for control messages from StreamControl.
472 control_rx: Receiver<ControlMsg>,
473 /// Stream state.
474 state: StreamState,
475 /// Reconnection policy (None = no reconnection).
476 pub(crate) reconnect_policy: Option<ReconnectPolicy>,
477 /// Reopen identity, preserved for `into_dac()` even without reconnect enabled.
478 pub(crate) reconnect_target: Option<ReconnectTarget>,
479}
480
481// Inherent helpers retained for the test surface (see `run_legacy`); the
482// production `Stream::run` body lives in [`crate::presentation::driver::run`].
483#[allow(dead_code)]
484impl Stream {
485 /// Convert a duration in microseconds to a point count at the given PPS, rounding up.
486 fn duration_micros_to_points(micros: u64, pps: u32) -> usize {
487 if micros == 0 {
488 0
489 } else {
490 (micros as f64 * pps as f64 / 1_000_000.0).ceil() as usize
491 }
492 }
493
494 fn udp_timed_sleep_slice(remaining: Duration) -> Option<Duration> {
495 const UDP_TIMED_SLEEP_SLICE: Duration = Duration::from_millis(1);
496 const UDP_TIMED_BUSY_WAIT_THRESHOLD: Duration = Duration::from_micros(500);
497
498 if remaining <= UDP_TIMED_BUSY_WAIT_THRESHOLD {
499 None
500 } else {
501 Some(
502 remaining
503 .saturating_sub(UDP_TIMED_BUSY_WAIT_THRESHOLD)
504 .min(UDP_TIMED_SLEEP_SLICE),
505 )
506 }
507 }
508
509 /// Create a new stream with a backend.
510 pub(crate) fn with_backend(info: DacInfo, backend: BackendKind, config: StreamConfig) -> Self {
511 let (control_tx, control_rx) = mpsc::channel();
512 let max_points = info.caps.max_points_per_chunk;
513 let startup_blank_points =
514 Self::duration_micros_to_points(config.startup_blank.as_micros() as u64, config.pps);
515 let color_delay = config.color_delay;
516 let pps = config.pps;
517 let state = StreamState::new(max_points, startup_blank_points, &config);
518 Self {
519 info,
520 backend: Some(backend),
521 config,
522 control: StreamControl::new(control_tx, color_delay, pps),
523 control_rx,
524 state,
525 reconnect_policy: None,
526 reconnect_target: None,
527 }
528 }
529
530 /// Compute the software buffer target for scheduler pacing.
531 ///
532 /// UdpTimed backends use `max_points_per_chunk` as the target to keep
533 /// the device ringbuffer continuously topped up. A lower target creates
534 /// long idle gaps between bursts, causing glitches over WiFi.
535 fn scheduler_target_buffer_points(&self) -> u64 {
536 if self.info.caps.output_model == OutputModel::UdpTimed {
537 self.info.caps.max_points_per_chunk as u64
538 } else {
539 self.state.target_buffer_points
540 }
541 }
542
543 /// Returns the device info.
544 pub fn info(&self) -> &DacInfo {
545 &self.info
546 }
547
548 /// Returns the stream configuration.
549 pub fn config(&self) -> &StreamConfig {
550 &self.config
551 }
552
553 /// Returns a thread-safe control handle.
554 pub fn control(&self) -> StreamControl {
555 self.control.clone()
556 }
557
558 /// Returns the current stream status.
559 pub fn status(&self) -> Result<StreamStatus> {
560 let buffered = self.estimate_buffer_points();
561 Ok(StreamStatus {
562 connected: self.backend.as_ref().is_some_and(|b| b.is_connected()),
563 scheduled_ahead_points: buffered,
564 device_queued_points: Some(buffered),
565 stats: Some(self.state.stats.clone()),
566 })
567 }
568
569 /// Handle hardware shutter transitions based on arm state changes.
570 fn handle_shutter_transition(&mut self, is_armed: bool) {
571 let was_armed = self.state.last_armed;
572 self.state.last_armed = is_armed;
573
574 if was_armed && !is_armed {
575 // Disarmed: close the shutter for safety (best-effort)
576 self.state.color_delay_line.clear();
577 if self.state.shutter_open {
578 if let Some(backend) = &mut self.backend {
579 let _ = backend.set_shutter(false); // Best-effort, ignore errors
580 }
581 self.state.shutter_open = false;
582 }
583 } else if !was_armed && is_armed {
584 // Armed: open the shutter (best-effort)
585 // Pre-fill color delay line with blanked colors so early points
586 // come out dark while galvos settle.
587 let delay_micros = self.control.inner.color_delay_micros.load(Ordering::SeqCst);
588 let delay_points = Self::duration_micros_to_points(delay_micros, self.config.pps);
589 self.state.color_delay_line.clear();
590 for _ in 0..delay_points {
591 self.state.color_delay_line.push_back((0, 0, 0, 0));
592 }
593
594 self.state.startup_blank_remaining = self.state.startup_blank_points;
595
596 if !self.state.shutter_open {
597 if let Some(backend) = &mut self.backend {
598 let _ = backend.set_shutter(true); // Best-effort, ignore errors
599 }
600 self.state.shutter_open = true;
601 }
602 }
603 }
604
605 /// Disarm, close shutter, and stop the backend (best-effort).
606 ///
607 /// Shared shutdown sequence used by `stop()`, `into_dac()`, and `Drop`.
608 /// All errors are ignored since this is a safety-critical shutdown path.
609 fn shutdown_backend(&mut self) {
610 let _ = self.control.disarm();
611 let _ = self.control.stop();
612 if let Some(b) = &mut self.backend {
613 let _ = b.set_shutter(false);
614 let _ = b.stop();
615 }
616 }
617
618 /// Stop the stream and terminate output.
619 ///
620 /// Disarms the output (software blanking + hardware shutter) before stopping
621 /// the backend to prevent the "freeze on last bright point" hazard.
622 /// Use `disarm()` instead if you want to keep the stream alive but safe.
623 pub fn stop(&mut self) -> Result<()> {
624 self.shutdown_backend();
625
626 // Disconnect is needed for protocols like IDN where stop() only blanks
627 // but the DAC keeps replaying its buffer until the session is closed.
628 if let Some(b) = &mut self.backend {
629 b.disconnect()?;
630 }
631
632 Ok(())
633 }
634
635 /// Consume the stream and recover the device for reuse.
636 ///
637 /// This method disarms and stops the stream (software blanking + hardware shutter),
638 /// then returns the underlying `Dac` along with the final `StreamStats`.
639 /// The device can then be used to start a new stream with different configuration.
640 ///
641 /// # Example
642 ///
643 /// ```ignore
644 /// use laser_dac::StreamConfig;
645 ///
646 /// // device: Dac, config: StreamConfig (from prior setup)
647 /// let (stream, info) = device.start_stream(config)?;
648 /// // ... stream for a while ...
649 /// let (device, stats) = stream.into_dac();
650 /// println!("Streamed {} points", stats.points_written);
651 ///
652 /// // Restart with different config
653 /// let new_config = StreamConfig::new(60_000);
654 /// let (stream2, _) = device.start_stream(new_config)?;
655 /// ```
656 pub fn into_dac(mut self) -> (Dac, StreamStats) {
657 self.shutdown_backend();
658
659 // Take the backend (leaves None, so Drop won't try to stop again)
660 let backend = self.backend.take();
661 let stats = self.state.stats.clone();
662 let reconnect_target = self
663 .reconnect_target
664 .take()
665 .or_else(|| self.reconnect_policy.take().map(|p| p.target));
666
667 let dac = Dac {
668 info: self.info.clone(),
669 backend,
670 reconnect_target,
671 };
672
673 (dac, stats)
674 }
675
676 /// Attempt to reconnect the backend using the reconnection policy.
677 ///
678 /// Opens a new device, connects, and swaps the backend. Resets timing
679 /// state for the new connection. Returns `Err` on non-retriable errors
680 /// or if stop is requested.
681 fn handle_reconnect(&mut self) -> std::result::Result<(), RunExit> {
682 let policy = self.reconnect_policy.as_ref().unwrap();
683
684 let (info, new_backend) = reconnect_backend_with_retry(
685 policy,
686 || self.control.is_stop_requested(),
687 |info, new_backend| {
688 if new_backend.is_frame_swap() {
689 log::error!(
690 "'{}' reconnected device is frame-swap, incompatible with streaming",
691 policy.target.device_id
692 );
693 return Err(RunExit::Disconnected);
694 }
695
696 if Dac::validate_pps(&info.caps, self.config.pps).is_err() {
697 log::error!(
698 "'{}' config invalid for new device",
699 policy.target.device_id
700 );
701 return Err(RunExit::Disconnected);
702 }
703
704 Ok(())
705 },
706 || {},
707 )?;
708
709 // Swap the backend
710 self.backend = Some(new_backend);
711 self.info = info;
712
713 // Reset all runtime state for the new connection
714 self.reset_state_for_reconnect();
715
716 // Fire on_reconnect callback
717 let policy = self.reconnect_policy.as_ref().unwrap();
718 if let Some(cb) = policy.on_reconnect.lock().unwrap().as_mut() {
719 cb(&self.info);
720 }
721
722 Ok(())
723 }
724
725 /// Reset all runtime state for a reconnected device.
726 fn reset_state_for_reconnect(&mut self) {
727 let max_points = self.info.caps.max_points_per_chunk;
728 self.state
729 .chunk_buffer
730 .resize(max_points, LaserPoint::default());
731 self.state
732 .last_chunk
733 .resize(max_points, LaserPoint::default());
734 self.state.last_chunk_len = 0;
735 self.state.shutter_open = false;
736 self.state.last_armed = false;
737 self.state.color_delay_line.clear();
738 self.state.startup_blank_remaining = 0;
739 self.state.stats.reconnect_count += 1;
740 }
741
742 /// Run the stream with the zero-allocation callback API.
743 ///
744 /// This method uses **pure buffer-driven timing**:
745 /// - Callback is invoked when `buffered <= target_buffer`
746 /// - Points requested varies based on buffer headroom (`target_points`)
747 /// - Callback fills a library-owned buffer (zero allocations in hot path)
748 ///
749 /// # Callback Contract
750 ///
751 /// The callback receives a `ChunkRequest` describing buffer state and requirements,
752 /// and a mutable slice to fill with points. It returns:
753 ///
754 /// - `ChunkResult::Filled(n)`: Wrote `n` points to the buffer
755 /// - `ChunkResult::Starved`: No data available (underrun policy applies)
756 /// - `ChunkResult::End`: Stream should end gracefully
757 ///
758 /// # Exit Conditions
759 ///
760 /// - **`RunExit::Stopped`**: Stop requested via `StreamControl::stop()`.
761 /// - **`RunExit::ProducerEnded`**: Callback returned `ChunkResult::End`.
762 /// - **`RunExit::Disconnected`**: Device disconnected.
763 ///
764 /// # Example
765 ///
766 /// ```ignore
767 /// use laser_dac::{ChunkRequest, ChunkResult, LaserPoint};
768 ///
769 /// stream.run(
770 /// |req: &ChunkRequest, buffer: &mut [LaserPoint]| {
771 /// let n = req.target_points;
772 /// for i in 0..n {
773 /// let t = req.start.as_secs_f64(req.pps) + (i as f64 / req.pps as f64);
774 /// let angle = (t * std::f64::consts::TAU) as f32;
775 /// buffer[i] = LaserPoint::new(angle.cos(), angle.sin(), 65535, 0, 0, 65535);
776 /// }
777 /// ChunkResult::Filled(n)
778 /// },
779 /// |err| eprintln!("Error: {}", err),
780 /// )?;
781 /// ```
782 pub fn run<F, E>(mut self, producer: F, on_error: E) -> Result<RunExit>
783 where
784 F: FnMut(&ChunkRequest, &mut [LaserPoint]) -> ChunkResult + Send + 'static,
785 E: FnMut(Error) + Send + 'static,
786 {
787 use crate::presentation::driver::{self, DriverInputs, SourceOwned};
788 use crate::presentation::FrameSessionMetrics;
789
790 let backend = self
791 .backend
792 .take()
793 .ok_or_else(|| Error::disconnected("backend already consumed"))?;
794 if backend.is_frame_swap() {
795 return Err(Error::invalid_config(
796 "Stream::run is FIFO-only; use start_frame_session for frame-swap DACs",
797 ));
798 }
799
800 let max_points = self.info.caps.max_points_per_chunk;
801 let chunk_producer = chunk_producer::ChunkProducer::new(
802 producer,
803 self.control.clone(),
804 self.config.idle_policy.clone(),
805 self.config.startup_blank,
806 max_points,
807 );
808
809 let validator = Self::build_reconnect_validator();
810 let metrics = FrameSessionMetrics::new(true);
811 // Move the control receiver out of self; the Receiver isn't Clone so we
812 // swap in a fresh dummy channel that nothing will drive.
813 let (_dummy_tx, dummy_rx) = mpsc::channel();
814 let control_rx = std::mem::replace(&mut self.control_rx, dummy_rx);
815
816 driver::run(DriverInputs {
817 backend,
818 source: SourceOwned::Fifo(Box::new(chunk_producer)),
819 control: self.control.clone(),
820 control_rx,
821 metrics,
822 reconnect_policy: self.reconnect_policy.take(),
823 validator,
824 error_sink: Box::new(on_error),
825 target_buffer: self.config.target_buffer,
826 drain_timeout: self.config.drain_timeout,
827 pending_frame: None,
828 })
829 }
830
831 fn build_reconnect_validator() -> crate::presentation::driver::ReconnectValidator {
832 Box::new(
833 move |_info: &DacInfo, new_backend: &BackendKind, pps: u32| {
834 if new_backend.is_frame_swap() {
835 log::error!("reconnected device is frame-swap, incompatible with streaming");
836 return Err(RunExit::Disconnected);
837 }
838 if Dac::validate_pps(new_backend.caps(), pps).is_err() {
839 log::error!("reconnected device PPS range incompatible with stream config");
840 return Err(RunExit::Disconnected);
841 }
842 Ok(())
843 },
844 )
845 }
846
847 /// Legacy inline scheduler retained for tests that drive the helpers
848 /// directly. Phase 4 routes the production path through the unified
849 /// driver above; this body is unreachable from `run`.
850 #[cfg(test)]
851 #[allow(dead_code)]
852 fn run_legacy<F, E>(mut self, mut producer: F, mut on_error: E) -> Result<RunExit>
853 where
854 F: FnMut(&ChunkRequest, &mut [LaserPoint]) -> ChunkResult + Send + 'static,
855 E: FnMut(Error) + Send + 'static,
856 {
857 use std::time::Instant;
858
859 let mut max_points = self.info.caps.max_points_per_chunk;
860 let mut last_stats_log = Instant::now();
861
862 loop {
863 // 1. Check for stop request
864 if self.control.is_stop_requested() {
865 return Ok(RunExit::Stopped);
866 }
867
868 self.config.pps = self.control.pps();
869 let pps = self.config.pps as f64;
870 self.state.target_buffer_points = (self.state.target_buffer_secs * pps) as u64;
871 self.state.startup_blank_points = Self::duration_micros_to_points(
872 self.config.startup_blank.as_micros() as u64,
873 self.config.pps,
874 );
875
876 // 2. Estimate buffer state — the protocol-owned BufferEstimator is
877 // authoritative; backends update their estimator from inside
878 // try_write_points so it decays naturally without a separate
879 // scheduler-side timer.
880 let now = Instant::now();
881 let buffered = self.estimate_buffer_points();
882 let target_points = self.scheduler_target_buffer_points();
883
884 // Log scheduler state periodically (~2Hz)
885 if now.duration_since(last_stats_log) >= Duration::from_millis(500) {
886 let target_ms = target_points as f64 / pps * 1000.0;
887 log::debug!(
888 "scheduler: target={:.1}ms buffered={} target_pts={}",
889 target_ms,
890 buffered,
891 target_points,
892 );
893 last_stats_log = now;
894 }
895
896 // 3. If buffer is above target, sleep until it drains to target
897 // Note: use > not >= so we call producer when exactly at target
898 if buffered > target_points {
899 let excess_points = buffered - target_points;
900 let sleep_time = Duration::from_secs_f64(excess_points as f64 / pps);
901 let stop = if self.info.caps.output_model == OutputModel::UdpTimed {
902 self.sleep_until_with_control_check(Instant::now() + sleep_time)?
903 } else {
904 self.sleep_with_control_check(sleep_time)?
905 };
906 if stop {
907 return Ok(RunExit::Stopped);
908 }
909 continue; // Re-check buffer after sleep
910 }
911
912 // 4. Check backend connection
913 let disconnected = match &self.backend {
914 Some(b) => !b.is_connected(),
915 None => true,
916 };
917 if disconnected {
918 if self.reconnect_policy.is_some() {
919 match self.handle_reconnect() {
920 Ok(()) => {
921 max_points = self.info.caps.max_points_per_chunk;
922 continue;
923 }
924 Err(exit) => return Ok(exit),
925 }
926 }
927 log::warn!("backend disconnected, exiting");
928 on_error(Error::disconnected("backend disconnected"));
929 return Ok(RunExit::Disconnected);
930 }
931
932 // 5. Process control messages before calling producer
933 if self.process_control_messages() {
934 return Ok(RunExit::Stopped);
935 }
936
937 // 6. Build fill request with buffer state (reuse cached estimate)
938 let req = self.build_fill_request(max_points, buffered);
939
940 // 7. Call producer with pre-allocated buffer
941 let buffer = &mut self.state.chunk_buffer[..max_points];
942 let result = producer(&req, buffer);
943
944 // 8. Handle result
945 match result {
946 ChunkResult::Filled(n) => {
947 // Validate n doesn't exceed buffer
948 let n = n.min(max_points);
949
950 // Treat Filled(0) with target_points > 0 as Starved
951 if n == 0 && req.target_points > 0 {
952 self.handle_underrun(&req)?;
953 continue;
954 }
955
956 // Write to backend if we have points
957 if n > 0 {
958 match self.write_fill_points(n, &mut on_error) {
959 Ok(()) => {}
960 Err(e) if e.is_disconnected() && self.reconnect_policy.is_some() => {
961 match self.handle_reconnect() {
962 Ok(()) => {
963 max_points = self.info.caps.max_points_per_chunk;
964 continue;
965 }
966 Err(exit) => return Ok(exit),
967 }
968 }
969 Err(e) => return Err(e),
970 }
971 }
972 }
973 ChunkResult::Starved => {
974 self.handle_underrun(&req)?;
975 }
976 ChunkResult::End => {
977 // Graceful shutdown: let queued points drain, then blank/park
978 self.drain_and_blank();
979 return Ok(RunExit::ProducerEnded);
980 }
981 }
982 }
983 }
984
985 /// Sleep for the given duration while checking for control messages.
986 ///
987 /// Returns `true` if stop was requested, `false` otherwise.
988 fn sleep_with_control_check(&mut self, duration: Duration) -> Result<bool> {
989 const SLEEP_SLICE: Duration = Duration::from_millis(2);
990 let mut remaining = duration;
991
992 while remaining > Duration::ZERO {
993 let slice = remaining.min(SLEEP_SLICE);
994 std::thread::sleep(slice);
995 remaining = remaining.saturating_sub(slice);
996
997 if self.process_control_messages() {
998 return Ok(true);
999 }
1000 }
1001
1002 Ok(false)
1003 }
1004
1005 /// Sleep until a deadline with finer granularity for timed-UDP pacing.
1006 ///
1007 /// Uses coarse sleeps first, then yields near the deadline to reduce wake-up jitter.
1008 fn sleep_until_with_control_check(&mut self, deadline: std::time::Instant) -> Result<bool> {
1009 loop {
1010 let now = std::time::Instant::now();
1011 if now >= deadline {
1012 return Ok(false);
1013 }
1014
1015 let remaining = deadline.duration_since(now);
1016 if let Some(slice) = Self::udp_timed_sleep_slice(remaining) {
1017 std::thread::sleep(slice);
1018 } else {
1019 std::thread::yield_now();
1020 }
1021
1022 if self.process_control_messages() {
1023 return Ok(true);
1024 }
1025 }
1026 }
1027
1028 /// Write points from chunk_buffer to the backend.
1029 ///
1030 /// Called by `run` after the producer fills the buffer.
1031 fn write_fill_points<E>(&mut self, n: usize, on_error: &mut E) -> Result<()>
1032 where
1033 E: FnMut(Error),
1034 {
1035 let is_armed = self.control.is_armed();
1036 let pps = self.config.pps;
1037
1038 // Handle shutter transitions
1039 self.handle_shutter_transition(is_armed);
1040
1041 // When disarmed, apply idle policy (park scanners instead of tracing shapes)
1042 if !is_armed {
1043 let park = match &self.config.idle_policy {
1044 IdlePolicy::Park { x, y } => LaserPoint::blanked(*x, *y),
1045 // RepeatLast falls back to Blank when disarmed — repeating lit
1046 // content on a disarmed stream is never correct.
1047 _ => LaserPoint::blanked(0.0, 0.0),
1048 };
1049 self.state.chunk_buffer[..n].fill(park);
1050 }
1051
1052 // Apply startup blanking: force first N points after arming to blank
1053 if is_armed && self.state.startup_blank_remaining > 0 {
1054 let blank_count = n.min(self.state.startup_blank_remaining);
1055 for p in &mut self.state.chunk_buffer[..blank_count] {
1056 p.r = 0;
1057 p.g = 0;
1058 p.b = 0;
1059 p.intensity = 0;
1060 }
1061 self.state.startup_blank_remaining -= blank_count;
1062 }
1063
1064 // Apply color delay: read current setting, resize deque, shift colors
1065 let delay_micros = self.control.inner.color_delay_micros.load(Ordering::SeqCst);
1066 let color_delay_points = Self::duration_micros_to_points(delay_micros, self.config.pps);
1067
1068 if color_delay_points > 0 {
1069 // Resize deque to match current delay (handles dynamic changes)
1070 self.state
1071 .color_delay_line
1072 .resize(color_delay_points, (0, 0, 0, 0));
1073 for p in &mut self.state.chunk_buffer[..n] {
1074 self.state
1075 .color_delay_line
1076 .push_back((p.r, p.g, p.b, p.intensity));
1077 let (r, g, b, i) = self.state.color_delay_line.pop_front().unwrap();
1078 p.r = r;
1079 p.g = g;
1080 p.b = b;
1081 p.intensity = i;
1082 }
1083 } else if !self.state.color_delay_line.is_empty() {
1084 // Delay was disabled at runtime — flush the line
1085 self.state.color_delay_line.clear();
1086 }
1087
1088 // Try to write with backpressure handling
1089 loop {
1090 // Check backend exists
1091 let backend = match self.backend.as_mut() {
1092 Some(b) => b,
1093 None => return Err(Error::disconnected("no backend")),
1094 };
1095
1096 match backend.try_write(pps, &self.state.chunk_buffer[..n]) {
1097 Ok(WriteOutcome::Written) => {
1098 self.record_write(n, is_armed);
1099 return Ok(());
1100 }
1101 Ok(WriteOutcome::WouldBlock) => {
1102 // Backend buffer full - yield and retry
1103 // Borrow of backend is dropped here, so we can call process_control_messages
1104 }
1105 Err(e) if e.is_stopped() => {
1106 return Err(Error::Stopped);
1107 }
1108 Err(e) if e.is_disconnected() => {
1109 log::warn!("write got Disconnected error, exiting stream: {e}");
1110 on_error(Error::disconnected("backend disconnected"));
1111 return Err(e);
1112 }
1113 Err(e) => {
1114 log::warn!("write error, disconnecting backend: {e}");
1115 let _ = backend.disconnect();
1116 on_error(e);
1117 return Ok(());
1118 }
1119 }
1120
1121 // Handle WouldBlock: yield and process control messages
1122 std::thread::yield_now();
1123 if self.process_control_messages() {
1124 return Err(Error::Stopped);
1125 }
1126 std::thread::sleep(Duration::from_micros(100));
1127 }
1128 }
1129
1130 /// Handle underrun by applying the idle policy.
1131 fn handle_underrun(&mut self, req: &ChunkRequest) -> Result<()> {
1132 self.state.stats.underrun_count += 1;
1133
1134 let is_armed = self.control.is_armed();
1135 self.handle_shutter_transition(is_armed);
1136
1137 // Calculate how many points we need (use target_points as the fill amount)
1138 let n_points = req.target_points.max(1);
1139
1140 // Determine the fill point based on arm state and idle policy
1141 if is_armed {
1142 match &self.config.idle_policy {
1143 IdlePolicy::Stop => {
1144 self.control.stop()?;
1145 return Err(Error::Stopped);
1146 }
1147 IdlePolicy::RepeatLast if self.state.last_chunk_len > 0 => {
1148 for i in 0..n_points {
1149 self.state.chunk_buffer[i] =
1150 self.state.last_chunk[i % self.state.last_chunk_len];
1151 }
1152 }
1153 IdlePolicy::Park { x, y } => {
1154 let park = LaserPoint::blanked(*x, *y);
1155 self.state.chunk_buffer[..n_points].fill(park);
1156 }
1157 // Blank, or RepeatLast with no stored chunk
1158 _ => {
1159 self.state.chunk_buffer[..n_points].fill(LaserPoint::blanked(0.0, 0.0));
1160 }
1161 }
1162 } else {
1163 // When disarmed, apply idle policy for scanner parking
1164 let park = match &self.config.idle_policy {
1165 IdlePolicy::Park { x, y } => LaserPoint::blanked(*x, *y),
1166 _ => LaserPoint::blanked(0.0, 0.0),
1167 };
1168 self.state.chunk_buffer[..n_points].fill(park);
1169 }
1170
1171 // Write the fill points
1172 if let Some(backend) = &mut self.backend {
1173 match backend.try_write(self.config.pps, &self.state.chunk_buffer[..n_points]) {
1174 Ok(WriteOutcome::Written) => {
1175 self.record_write(n_points, is_armed);
1176 }
1177 Ok(WriteOutcome::WouldBlock) => {
1178 // Backend is full - expected during underrun
1179 }
1180 Err(_) => {
1181 // Backend error during underrun handling - ignore
1182 }
1183 }
1184 }
1185
1186 Ok(())
1187 }
1188
1189 /// Record a successful write: update last_chunk, timebase, and stats.
1190 ///
1191 /// Buffer-fullness bookkeeping lives inside the backend's `BufferEstimator`
1192 /// (driven from `try_write_points`), so this method does not touch it.
1193 fn record_write(&mut self, n: usize, is_armed: bool) {
1194 if is_armed {
1195 debug_assert!(
1196 n <= self.state.last_chunk.len(),
1197 "n ({}) exceeds last_chunk capacity ({})",
1198 n,
1199 self.state.last_chunk.len()
1200 );
1201 self.state.last_chunk[..n].copy_from_slice(&self.state.chunk_buffer[..n]);
1202 self.state.last_chunk_len = n;
1203 }
1204 self.state.current_instant += n as u64;
1205 self.state.stats.chunks_written += 1;
1206 self.state.stats.points_written += n as u64;
1207 }
1208
1209 // =========================================================================
1210 // Internal helpers
1211 // =========================================================================
1212
1213 /// Process any pending control messages from StreamControl.
1214 ///
1215 /// This method drains the control message queue and takes immediate action:
1216 /// - `Arm`: Opens the shutter (best-effort)
1217 /// - `Disarm`: Closes the shutter immediately
1218 /// - `Stop`: Returns `true` to signal the caller to stop
1219 ///
1220 /// Returns `true` if stop was requested, `false` otherwise.
1221 fn process_control_messages(&mut self) -> bool {
1222 loop {
1223 match self.control_rx.try_recv() {
1224 Ok(ControlMsg::Arm) => {
1225 if !self.state.shutter_open {
1226 if let Some(backend) = &mut self.backend {
1227 let _ = backend.set_shutter(true);
1228 }
1229 self.state.shutter_open = true;
1230 }
1231 }
1232 Ok(ControlMsg::Disarm) => {
1233 if self.state.shutter_open {
1234 if let Some(backend) = &mut self.backend {
1235 let _ = backend.set_shutter(false);
1236 }
1237 self.state.shutter_open = false;
1238 }
1239 }
1240 Ok(ControlMsg::Stop) => {
1241 return true;
1242 }
1243 Err(TryRecvError::Empty) => break,
1244 Err(TryRecvError::Disconnected) => break,
1245 }
1246 }
1247 false
1248 }
1249
1250 /// Estimate the current buffer level in points via the backend's
1251 /// [`BufferEstimator`].
1252 ///
1253 /// Each FIFO backend owns a strategy (status-anchored, dual-track ACK,
1254 /// runtime-authority, or pure software) and updates it from inside its
1255 /// own protocol code. The scheduler trusts that single source of truth.
1256 fn estimate_buffer_points(&self) -> u64 {
1257 let pps = self.config.pps;
1258 let now = std::time::Instant::now();
1259 self.backend
1260 .as_ref()
1261 .and_then(|b| b.estimator())
1262 .map_or(0, |e| e.estimated_fullness(now, pps))
1263 }
1264
1265 /// Build a `ChunkRequest` with the calculated `target_points` for the
1266 /// next chunk. Retained for the legacy in-`Stream` test helpers; the
1267 /// production path lives in [`crate::presentation::driver::run`].
1268 fn build_fill_request(&self, max_points: usize, buffered_points: u64) -> ChunkRequest {
1269 let pps = self.config.pps;
1270 let pps_f64 = pps as f64;
1271 let start = self.state.current_instant;
1272
1273 let target_points = if self.info.caps.output_model == OutputModel::UdpTimed {
1274 max_points
1275 } else {
1276 let buffered_secs = buffered_points as f64 / pps_f64;
1277 let deficit_target = (self.state.target_buffer_secs - buffered_secs).max(0.0);
1278 ((deficit_target * pps_f64).ceil() as usize).min(max_points)
1279 };
1280
1281 ChunkRequest {
1282 start,
1283 pps,
1284 target_points,
1285 }
1286 }
1287
1288 /// Wait for queued points to drain, then blank/park the laser.
1289 ///
1290 /// Called on graceful shutdown (`ChunkResult::End`) to let buffered content
1291 /// play out before stopping. Uses `drain_timeout` from config to cap the wait.
1292 /// Polls the backend's [`BufferEstimator`] until the estimated fullness
1293 /// reaches zero or the timeout elapses.
1294 fn drain_and_blank(&mut self) {
1295 use std::time::Instant;
1296
1297 let timeout = self.config.drain_timeout;
1298 if timeout.is_zero() {
1299 // Skip drain entirely if timeout is zero
1300 self.blank_and_close_shutter();
1301 return;
1302 }
1303
1304 let deadline = Instant::now() + timeout;
1305 const POLL_INTERVAL: Duration = Duration::from_millis(5);
1306 while Instant::now() < deadline {
1307 if self.estimate_buffer_points() == 0 {
1308 break;
1309 }
1310
1311 // Process control messages during drain (allow stop to interrupt)
1312 if self.process_control_messages() {
1313 break;
1314 }
1315
1316 std::thread::sleep(POLL_INTERVAL);
1317 }
1318
1319 self.blank_and_close_shutter();
1320 }
1321
1322 /// Output blank points and close the hardware shutter.
1323 ///
1324 /// Best-effort safety shutdown - errors are ignored since we're already
1325 /// in shutdown path.
1326 fn blank_and_close_shutter(&mut self) {
1327 // Close shutter (best-effort)
1328 if let Some(b) = &mut self.backend {
1329 let _ = b.set_shutter(false);
1330 }
1331 self.state.shutter_open = false;
1332
1333 // Output a small blank chunk to ensure laser is off
1334 // (some DACs may hold the last point otherwise)
1335 if let Some(b) = &mut self.backend {
1336 let blank_point = LaserPoint::blanked(0.0, 0.0);
1337 let blank_chunk = [blank_point; 16];
1338 let _ = b.try_write(self.config.pps, &blank_chunk);
1339 }
1340 }
1341}
1342
1343impl Drop for Stream {
1344 fn drop(&mut self) {
1345 let _ = self.stop();
1346 }
1347}
1348
1349// =============================================================================
1350// Device
1351// =============================================================================
1352
1353/// A connected device that can start streaming sessions.
1354///
1355/// When starting a stream, the device is consumed and the backend ownership
1356/// transfers to the stream. The `DacInfo` is returned alongside the stream
1357/// so metadata remains accessible.
1358///
1359/// # Example
1360///
1361/// ```ignore
1362/// use laser_dac::{open_device, StreamConfig};
1363///
1364/// let device = open_device("my-device")?;
1365/// let config = StreamConfig::new(30_000);
1366/// let (stream, info) = device.start_stream(config)?;
1367/// println!("Streaming to: {}", info.name);
1368/// ```
1369pub struct Dac {
1370 info: DacInfo,
1371 backend: Option<BackendKind>,
1372 pub(crate) reconnect_target: Option<ReconnectTarget>,
1373}
1374
1375impl Dac {
1376 /// Create a new device from a backend.
1377 pub fn new(info: DacInfo, backend: BackendKind) -> Self {
1378 Self {
1379 info,
1380 backend: Some(backend),
1381 reconnect_target: None,
1382 }
1383 }
1384
1385 /// Set a custom discovery factory for reconnection.
1386 ///
1387 /// When reconnection is enabled (via [`crate::FrameSessionConfig::with_reconnect`] or
1388 /// [`StreamConfig::with_reconnect`]), the factory is called to create a
1389 /// [`DacDiscovery`] instance for each reconnection attempt. This is required
1390 /// for custom backends registered via [`DacDiscovery::register`] — without it,
1391 /// reconnection uses the default discovery which only finds built-in DAC types.
1392 ///
1393 /// For most cases, prefer [`open_device_with`](crate::open_device_with) which
1394 /// handles both initial discovery and reconnection in one call. Use this method
1395 /// when you build `Dac` instances yourself via `scan()` + `connect()` + `Dac::new()`.
1396 ///
1397 /// # Example
1398 ///
1399 /// ```ignore
1400 /// use laser_dac::{Dac, DacDiscovery, EnabledDacTypes, FrameSessionConfig, ReconnectConfig};
1401 ///
1402 /// // Device opened through custom discovery path
1403 /// let mut discovery = DacDiscovery::new(EnabledDacTypes::all());
1404 /// discovery.register(Box::new(MyCustomDiscoverer::new()));
1405 /// let devices = discovery.scan();
1406 /// let backend = discovery.connect(devices.into_iter().next().unwrap())?;
1407 /// let dac = Dac::new(info, backend);
1408 ///
1409 /// // Attach factory so reconnection can also find custom backends
1410 /// let dac = dac.with_discovery_factory(|| {
1411 /// let mut d = DacDiscovery::new(EnabledDacTypes::all());
1412 /// d.register(Box::new(MyCustomDiscoverer::new()));
1413 /// d
1414 /// });
1415 ///
1416 /// let config = FrameSessionConfig::new(30_000)
1417 /// .with_reconnect(ReconnectConfig::new());
1418 /// let (session, _info) = dac.start_frame_session(config)?;
1419 /// ```
1420 pub fn with_discovery_factory<F>(mut self, factory: F) -> Self
1421 where
1422 F: Fn() -> DacDiscovery + Send + 'static,
1423 {
1424 match self.reconnect_target {
1425 Some(ref mut target) => {
1426 target.discovery_factory = Some(Box::new(factory));
1427 }
1428 None => {
1429 self.reconnect_target = Some(ReconnectTarget {
1430 device_id: self.info.id.clone(),
1431 discovery_factory: Some(Box::new(factory)),
1432 });
1433 }
1434 }
1435 self
1436 }
1437
1438 /// Returns the device info.
1439 pub fn info(&self) -> &DacInfo {
1440 &self.info
1441 }
1442
1443 /// Returns the device ID.
1444 pub fn id(&self) -> &str {
1445 &self.info.id
1446 }
1447
1448 /// Returns the device name.
1449 pub fn name(&self) -> &str {
1450 &self.info.name
1451 }
1452
1453 /// Returns the DAC type.
1454 pub fn kind(&self) -> &DacType {
1455 &self.info.kind
1456 }
1457
1458 /// Returns the device capabilities.
1459 pub fn caps(&self) -> &DacCapabilities {
1460 &self.info.caps
1461 }
1462
1463 /// Returns whether the device has a backend (not yet used for a stream).
1464 pub fn has_backend(&self) -> bool {
1465 self.backend.is_some()
1466 }
1467
1468 /// Consume the Dac and return the backend, if available.
1469 pub(crate) fn into_backend(mut self) -> Option<BackendKind> {
1470 self.backend.take()
1471 }
1472
1473 /// Returns whether the device is connected.
1474 pub fn is_connected(&self) -> bool {
1475 self.backend.as_ref().is_some_and(|b| b.is_connected())
1476 }
1477
1478 /// Starts a streaming session, consuming the device.
1479 ///
1480 /// # Ownership
1481 ///
1482 /// This method consumes the `Dac` because:
1483 /// - Each device can only have one active stream at a time.
1484 /// - The backend is moved into the `Stream` to ensure exclusive access.
1485 /// - This prevents accidental reuse of a device that's already streaming.
1486 ///
1487 /// The method returns both the `Stream` and a copy of `DacInfo`, so you
1488 /// retain access to device metadata (id, name, capabilities) after starting.
1489 ///
1490 /// # Connection
1491 ///
1492 /// If the device is not already connected, this method will establish the
1493 /// connection before creating the stream. Connection failures are returned
1494 /// as errors.
1495 ///
1496 /// # Errors
1497 ///
1498 /// Returns an error if:
1499 /// - The device backend has already been used for a stream.
1500 /// - The configuration is invalid (PPS out of range, invalid chunk size, etc.).
1501 /// - The backend fails to connect.
1502 pub fn start_stream(mut self, mut cfg: StreamConfig) -> Result<(Stream, DacInfo)> {
1503 // Extract reconnect config before consuming cfg
1504 let reconnect_config = cfg.reconnect.take();
1505
1506 let mut backend = self.backend.take().ok_or_else(|| {
1507 Error::invalid_config("device backend has already been used for a stream")
1508 })?;
1509
1510 if backend.is_frame_swap() {
1511 return Err(Error::invalid_config(
1512 "streaming is not supported on frame-swap DACs (e.g. Helios); \
1513 use start_frame_session() instead",
1514 ));
1515 }
1516
1517 let cfg = Self::apply_backend_buffer_defaults(&self.info.caps, cfg);
1518
1519 Self::validate_pps(&self.info.caps, cfg.pps)?;
1520
1521 // Connect the backend if not already connected
1522 if !backend.is_connected() {
1523 backend.connect()?;
1524 }
1525
1526 let mut stream = Stream::with_backend(self.info.clone(), backend, cfg);
1527
1528 // Always preserve the reopen target on the stream (for into_dac recovery)
1529 stream.reconnect_target = self.reconnect_target.take();
1530
1531 // Wire reconnect policy if configured
1532 if let Some(rc) = reconnect_config {
1533 let target = stream.reconnect_target.take().ok_or_else(|| {
1534 Error::invalid_config("reconnect requires a reconnect target — use open_device(), open_device_with(), or Dac::with_discovery_factory()")
1535 })?;
1536 stream.reconnect_policy = Some(ReconnectPolicy::new(rc, target));
1537 }
1538
1539 Ok((stream, self.info))
1540 }
1541
1542 fn apply_backend_buffer_defaults(
1543 caps: &DacCapabilities,
1544 mut cfg: StreamConfig,
1545 ) -> StreamConfig {
1546 if cfg.target_buffer == StreamConfig::DEFAULT_TARGET_BUFFER
1547 && matches!(
1548 caps.output_model,
1549 OutputModel::NetworkFifo | OutputModel::UdpTimed
1550 )
1551 {
1552 cfg.target_buffer = StreamConfig::NETWORK_DEFAULT_TARGET_BUFFER;
1553 }
1554
1555 cfg
1556 }
1557
1558 fn validate_pps(caps: &DacCapabilities, pps: u32) -> Result<()> {
1559 if pps < caps.pps_min || pps > caps.pps_max {
1560 return Err(Error::invalid_config(format!(
1561 "PPS {} is outside device range [{}, {}]",
1562 pps, caps.pps_min, caps.pps_max
1563 )));
1564 }
1565
1566 Ok(())
1567 }
1568
1569 /// Starts a frame-mode session, consuming the device.
1570 ///
1571 /// Similar to [`start_stream`](Self::start_stream) but uses the frame-first
1572 /// API where you submit complete [`crate::presentation::Frame`]s instead of filling
1573 /// point buffers via callback.
1574 ///
1575 /// Returns a [`crate::presentation::FrameSession`] that owns the scheduler thread and a
1576 /// [`DacInfo`] with device metadata.
1577 pub fn start_frame_session(
1578 mut self,
1579 mut config: crate::presentation::FrameSessionConfig,
1580 ) -> Result<(crate::presentation::FrameSession, DacInfo)> {
1581 let reconnect_config = config.reconnect.take();
1582
1583 let backend = self.backend.take().ok_or_else(|| {
1584 Error::invalid_config("device backend has already been used for a session")
1585 })?;
1586
1587 Self::validate_pps(backend.caps(), config.pps)?;
1588
1589 let reconnect_policy = match reconnect_config {
1590 Some(rc) => {
1591 let target = self.reconnect_target.take().ok_or_else(|| {
1592 Error::invalid_config("reconnect requires a reconnect target — use open_device(), open_device_with(), or Dac::with_discovery_factory()")
1593 })?;
1594 Some(ReconnectPolicy::new(rc, target))
1595 }
1596 None => None,
1597 };
1598
1599 let session = crate::presentation::FrameSession::start(backend, config, reconnect_policy)?;
1600 Ok((session, self.info))
1601 }
1602}
1603
1604#[cfg(test)]
1605mod tests;