Skip to main content

laser_dac/presentation/
session.rs

1//! FrameSession and FrameSessionConfig — public frame-mode API.
2
3use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
4use std::sync::mpsc;
5use std::sync::{Arc, Mutex};
6use std::thread::JoinHandle;
7use std::time::{Duration, Instant};
8
9use crate::backend::BackendKind;
10use crate::device::{DacInfo, OutputModel};
11use crate::error::{Error, Result};
12use crate::reconnect::ReconnectPolicy;
13use crate::stream::{ControlMsg, RunExit, StreamControl};
14
15use super::driver::{self, DriverInputs, SourceOwned};
16use super::engine::PresentationEngine;
17use super::slice_pipeline::SlicePipeline;
18use super::{default_transition, Frame, OutputResetReason, TransitionFn};
19
20// =============================================================================
21// FrameSessionConfig
22// =============================================================================
23
24/// Configuration for a frame-mode streaming session.
25pub struct FrameSessionConfig {
26    /// Points per second output rate.
27    pub pps: u32,
28    /// Transition function for blanking between frames.
29    pub transition_fn: TransitionFn,
30    /// Duration of forced blanking after arming (default: 1ms).
31    pub startup_blank: std::time::Duration,
32    /// Number of points to shift RGB relative to XY (0 = disabled).
33    ///
34    /// Delays color channels relative to XY coordinates, compensating for
35    /// the difference in galvo mirror and laser modulation response times.
36    /// Applied at composition time. Set to 0 to disable.
37    pub color_delay_points: usize,
38    /// Reconnection configuration (default: disabled).
39    ///
40    /// Set via [`with_reconnect`](Self::with_reconnect) to enable automatic
41    /// reconnection when the device disconnects.
42    pub reconnect: Option<crate::config::ReconnectConfig>,
43    /// Policy for what to output when the stream is idle (disarmed).
44    ///
45    /// Controls scanner behavior when disarmed. Default: [`Blank`](crate::config::IdlePolicy::Blank)
46    /// (park at origin with laser off). Use [`Park`](crate::config::IdlePolicy::Park) to park at a
47    /// specific position.
48    pub idle_policy: crate::config::IdlePolicy,
49    /// Optional hook for processing the final presented output.
50    pub output_filter: Option<Box<dyn super::OutputFilter>>,
51}
52
53impl FrameSessionConfig {
54    const DEFAULT_COLOR_DELAY: std::time::Duration = std::time::Duration::from_micros(150);
55
56    /// Create a new config with the given PPS and default transition.
57    pub fn new(pps: u32) -> Self {
58        let color_delay_points =
59            (Self::DEFAULT_COLOR_DELAY.as_secs_f64() * pps as f64).ceil() as usize;
60        Self {
61            pps,
62            transition_fn: default_transition(pps),
63            startup_blank: std::time::Duration::from_millis(1),
64            color_delay_points,
65            idle_policy: crate::config::IdlePolicy::default(),
66            reconnect: None,
67            output_filter: None,
68        }
69    }
70
71    /// Set the transition function (builder pattern).
72    pub fn with_transition_fn(mut self, f: TransitionFn) -> Self {
73        self.transition_fn = f;
74        self
75    }
76
77    /// Set the startup blank duration (builder pattern).
78    pub fn with_startup_blank(mut self, duration: std::time::Duration) -> Self {
79        self.startup_blank = duration;
80        self
81    }
82
83    /// Set the color delay in points (builder pattern).
84    pub fn with_color_delay_points(mut self, n: usize) -> Self {
85        self.color_delay_points = n;
86        self
87    }
88
89    /// Enable automatic reconnection (builder pattern).
90    ///
91    /// Requires the device to have been opened via [`open_device`](crate::open_device).
92    pub fn with_reconnect(mut self, config: crate::config::ReconnectConfig) -> Self {
93        self.reconnect = Some(config);
94        self
95    }
96
97    /// Set the idle policy (builder pattern).
98    ///
99    /// Controls scanner behavior when disarmed. See [`crate::config::IdlePolicy`].
100    pub fn with_idle_policy(mut self, policy: crate::config::IdlePolicy) -> Self {
101        self.idle_policy = policy;
102        self
103    }
104
105    /// Install a final-output filter (builder pattern).
106    pub fn with_output_filter(mut self, filter: Box<dyn super::OutputFilter>) -> Self {
107        self.output_filter = Some(filter);
108        self
109    }
110}
111
112// =============================================================================
113// FrameSession
114// =============================================================================
115
116/// Read-only liveness and connectivity metrics for a [`FrameSession`].
117#[derive(Clone)]
118pub struct FrameSessionMetrics {
119    inner: Arc<FrameSessionMetricsInner>,
120}
121
122struct FrameSessionMetricsInner {
123    connected: AtomicBool,
124    origin: Instant,
125    last_loop_activity_nanos: AtomicU64,
126    last_write_success_nanos: AtomicU64,
127}
128
129impl FrameSessionMetrics {
130    pub(crate) fn new(connected: bool) -> Self {
131        let metrics = Self {
132            inner: Arc::new(FrameSessionMetricsInner {
133                connected: AtomicBool::new(connected),
134                origin: Instant::now(),
135                last_loop_activity_nanos: AtomicU64::new(0),
136                last_write_success_nanos: AtomicU64::new(0),
137            }),
138        };
139        metrics.mark_loop_activity();
140        metrics
141    }
142
143    /// Returns whether the session currently has a connected backend.
144    pub fn connected(&self) -> bool {
145        self.inner.connected.load(Ordering::SeqCst)
146    }
147
148    /// Returns the last time the scheduler thread showed progress.
149    pub fn last_loop_activity(&self) -> Option<Instant> {
150        self.instant_from_nanos(self.inner.last_loop_activity_nanos.load(Ordering::SeqCst))
151    }
152
153    /// Returns the last time a backend write completed successfully.
154    pub fn last_write_success(&self) -> Option<Instant> {
155        self.instant_from_nanos(self.inner.last_write_success_nanos.load(Ordering::SeqCst))
156    }
157
158    fn instant_from_nanos(&self, nanos: u64) -> Option<Instant> {
159        if nanos == 0 {
160            None
161        } else {
162            self.inner.origin.checked_add(Duration::from_nanos(nanos))
163        }
164    }
165
166    fn now_nanos(&self) -> u64 {
167        (self.inner.origin.elapsed().as_nanos().min(u64::MAX as u128) as u64).max(1)
168    }
169
170    pub(super) fn mark_loop_activity(&self) {
171        self.inner
172            .last_loop_activity_nanos
173            .store(self.now_nanos(), Ordering::SeqCst);
174    }
175
176    pub(super) fn mark_write_success(&self) {
177        let now = self.now_nanos();
178        self.inner
179            .last_loop_activity_nanos
180            .store(now, Ordering::SeqCst);
181        self.inner
182            .last_write_success_nanos
183            .store(now, Ordering::SeqCst);
184    }
185
186    pub(super) fn set_connected(&self, connected: bool) {
187        self.inner.connected.store(connected, Ordering::SeqCst);
188        self.mark_loop_activity();
189    }
190}
191
192struct MetricsDisconnectGuard(FrameSessionMetrics);
193
194impl Drop for MetricsDisconnectGuard {
195    fn drop(&mut self) {
196        self.0.set_connected(false);
197    }
198}
199
200/// A frame-mode streaming session.
201///
202/// Owns a scheduler thread that reads frames from a channel and writes them
203/// to the DAC backend using the appropriate strategy (FIFO or frame-swap).
204///
205/// # Example
206///
207/// ```ignore
208/// use laser_dac::{open_device, FrameSessionConfig, Frame, LaserPoint};
209///
210/// let device = open_device("my-device")?;
211/// let config = FrameSessionConfig::new(30_000);
212/// let (session, _info) = device.start_frame_session(config)?;
213///
214/// session.control().arm()?;
215/// session.send_frame(Frame::new(vec![
216///     LaserPoint::new(0.0, 0.0, 65535, 0, 0, 65535),
217/// ]));
218/// ```
219pub struct FrameSession {
220    control: StreamControl,
221    thread: Option<JoinHandle<Result<RunExit>>>,
222    frame_slot: Arc<Mutex<Option<Frame>>>,
223    metrics: FrameSessionMetrics,
224}
225
226impl FrameSession {
227    /// Start a frame session on the given backend.
228    pub(crate) fn start(
229        mut backend: BackendKind,
230        config: FrameSessionConfig,
231        reconnect_policy: Option<ReconnectPolicy>,
232    ) -> Result<Self> {
233        if !backend.is_connected() {
234            backend.connect()?;
235        }
236
237        let (control_tx, control_rx) = mpsc::channel();
238        let initial_color_delay = if config.color_delay_points > 0 {
239            Duration::from_secs_f64(config.color_delay_points as f64 / config.pps as f64)
240        } else {
241            Duration::ZERO
242        };
243        let control = StreamControl::new(control_tx, initial_color_delay, config.pps);
244        let frame_slot: Arc<Mutex<Option<Frame>>> = Arc::new(Mutex::new(None));
245        let metrics = FrameSessionMetrics::new(backend.is_connected());
246
247        let control_clone = control.clone();
248        let slot_clone = frame_slot.clone();
249        let metrics_clone = metrics.clone();
250
251        let thread = std::thread::spawn(move || {
252            let _disconnect_guard = MetricsDisconnectGuard(metrics_clone.clone());
253            Self::run_loop(
254                backend,
255                config,
256                control_clone,
257                control_rx,
258                slot_clone,
259                metrics_clone,
260                reconnect_policy,
261            )
262        });
263
264        Ok(Self {
265            control,
266            thread: Some(thread),
267            frame_slot,
268            metrics,
269        })
270    }
271
272    /// Returns a control handle for arm/disarm/stop.
273    pub fn control(&self) -> StreamControl {
274        self.control.clone()
275    }
276
277    /// Submit a frame for display. Latest-wins: overwrites any unconsumed
278    /// pending frame immediately, with no buffering or memory growth.
279    pub fn send_frame(&self, frame: Frame) {
280        *self.frame_slot.lock().unwrap() = Some(frame);
281    }
282
283    /// Returns true if the scheduler thread has finished.
284    pub fn is_finished(&self) -> bool {
285        self.thread.as_ref().is_some_and(|h| h.is_finished())
286    }
287
288    /// Returns a metrics handle for observing scheduler liveness.
289    pub fn metrics(&self) -> FrameSessionMetrics {
290        self.metrics.clone()
291    }
292
293    /// Wait for the session thread to finish and return the exit reason.
294    pub fn join(mut self) -> Result<RunExit> {
295        if let Some(handle) = self.thread.take() {
296            handle
297                .join()
298                .unwrap_or(Err(Error::disconnected("thread panicked")))
299        } else {
300            Ok(RunExit::Stopped)
301        }
302    }
303
304    // =========================================================================
305    // Driver loop
306    // =========================================================================
307
308    fn run_loop(
309        mut backend: BackendKind,
310        config: FrameSessionConfig,
311        control: StreamControl,
312        control_rx: mpsc::Receiver<ControlMsg>,
313        frame_slot: Arc<Mutex<Option<Frame>>>,
314        metrics: FrameSessionMetrics,
315        reconnect_policy: Option<ReconnectPolicy>,
316    ) -> Result<RunExit> {
317        let FrameSessionConfig {
318            pps: _,
319            transition_fn,
320            startup_blank,
321            color_delay_points,
322            idle_policy,
323            output_filter,
324            reconnect: _,
325        } = config;
326
327        let mut engine = PresentationEngine::new(transition_fn);
328        if backend.is_frame_swap() {
329            engine.set_frame_capacity(backend.frame_capacity());
330        }
331
332        // Per-OutputModel initial buffer sizing: FIFO bounds by max_points_per_chunk;
333        // frame-swap bounds by frame_capacity (max_points_per_chunk is meaningless there).
334        let initial_buf_capacity = match backend.caps().output_model {
335            OutputModel::UsbFrameSwap => backend.frame_capacity().unwrap_or(0),
336            OutputModel::NetworkFifo | OutputModel::UdpTimed => backend.caps().max_points_per_chunk,
337        };
338        let mut pipeline = SlicePipeline::with_startup_blank(
339            engine,
340            color_delay_points,
341            output_filter,
342            idle_policy,
343            initial_buf_capacity,
344            startup_blank,
345        );
346        pipeline.reset_output_filter(OutputResetReason::SessionStart);
347
348        let expected_frame_swap = backend.is_frame_swap();
349        let source: SourceOwned = if expected_frame_swap {
350            SourceOwned::Frame(Box::new(pipeline))
351        } else {
352            SourceOwned::Fifo(Box::new(pipeline))
353        };
354
355        let validator = Self::reconnect_validator(reconnect_policy.as_ref());
356        if !backend.is_connected() {
357            backend.connect()?;
358        }
359
360        driver::run(DriverInputs {
361            backend,
362            source,
363            control,
364            control_rx,
365            metrics,
366            reconnect_policy,
367            validator,
368            error_sink: Box::new(|_e: Error| { /* frame-mode swallows non-fatal errors */ }),
369            target_buffer: Duration::from_millis(20),
370            drain_timeout: Duration::ZERO,
371            pending_frame: Some(frame_slot),
372        })
373    }
374
375    fn reconnect_validator(policy: Option<&ReconnectPolicy>) -> driver::ReconnectValidator {
376        let target_id = policy
377            .map(|p| p.target.device_id.clone())
378            .unwrap_or_default();
379        Box::new(move |info: &DacInfo, _backend: &BackendKind, pps: u32| {
380            if pps < info.caps.pps_min || pps > info.caps.pps_max {
381                log::error!(
382                    "'{}' PPS {} outside new device range [{}, {}]",
383                    target_id,
384                    pps,
385                    info.caps.pps_min,
386                    info.caps.pps_max
387                );
388                return Err(RunExit::Disconnected);
389            }
390            Ok(())
391        })
392    }
393}
394
395impl Drop for FrameSession {
396    fn drop(&mut self) {
397        let _ = self.control.stop();
398        if let Some(handle) = self.thread.take() {
399            let _ = handle.join();
400        }
401    }
402}