Skip to main content

laser_dac/
session.rs

1//! Reconnecting session wrapper for automatic reconnection.
2
3use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
4use std::sync::{Arc, Mutex};
5use std::time::Duration;
6
7use crate::backend::{Error, Result};
8use crate::discovery::DacDiscovery;
9use crate::stream::{Dac, StreamControl};
10use crate::types::{ChunkRequest, ChunkResult, DacInfo, LaserPoint, RunExit, StreamConfig};
11
12type DisconnectCallback = Box<dyn FnMut(&Error) + Send + 'static>;
13type ReconnectCallback = Box<dyn FnMut(&DacInfo) + Send + 'static>;
14type DiscoveryFactory = Box<dyn Fn() -> DacDiscovery + Send + 'static>;
15
16// =============================================================================
17// Session Control
18// =============================================================================
19
20/// Control handle for a [`ReconnectingSession`].
21///
22/// This mirrors `StreamControl`, but survives reconnections by attaching
23/// to each new stream as it is created.
24#[derive(Clone)]
25pub struct SessionControl {
26    inner: Arc<SessionControlInner>,
27}
28
29struct SessionControlInner {
30    armed: AtomicBool,
31    stop_requested: AtomicBool,
32    color_delay_micros: AtomicU64,
33    current: Mutex<Option<StreamControl>>,
34}
35
36impl SessionControl {
37    fn new() -> Self {
38        Self {
39            inner: Arc::new(SessionControlInner {
40                armed: AtomicBool::new(false),
41                stop_requested: AtomicBool::new(false),
42                color_delay_micros: AtomicU64::new(0),
43                current: Mutex::new(None),
44            }),
45        }
46    }
47
48    fn attach(&self, control: StreamControl) {
49        *self.inner.current.lock().unwrap() = Some(control.clone());
50
51        if self.inner.stop_requested.load(Ordering::SeqCst) {
52            let _ = control.stop();
53            return;
54        }
55
56        if self.inner.armed.load(Ordering::SeqCst) {
57            let _ = control.arm();
58        } else {
59            let _ = control.disarm();
60        }
61
62        let delay = self.inner.color_delay_micros.load(Ordering::SeqCst);
63        control.set_color_delay(Duration::from_micros(delay));
64    }
65
66    fn detach(&self) {
67        *self.inner.current.lock().unwrap() = None;
68    }
69
70    /// Arm the output (allow laser to fire).
71    pub fn arm(&self) -> Result<()> {
72        self.inner.armed.store(true, Ordering::SeqCst);
73        if let Some(control) = self.inner.current.lock().unwrap().as_ref() {
74            let _ = control.arm();
75        }
76        Ok(())
77    }
78
79    /// Disarm the output (force laser off).
80    pub fn disarm(&self) -> Result<()> {
81        self.inner.armed.store(false, Ordering::SeqCst);
82        if let Some(control) = self.inner.current.lock().unwrap().as_ref() {
83            let _ = control.disarm();
84        }
85        Ok(())
86    }
87
88    /// Check if the output is armed.
89    pub fn is_armed(&self) -> bool {
90        self.inner.armed.load(Ordering::SeqCst)
91    }
92
93    /// Set the color delay for scanner sync compensation.
94    ///
95    /// Persists across reconnections — each new stream receives this value.
96    pub fn set_color_delay(&self, delay: Duration) {
97        self.inner
98            .color_delay_micros
99            .store(delay.as_micros() as u64, Ordering::SeqCst);
100        if let Some(control) = self.inner.current.lock().unwrap().as_ref() {
101            control.set_color_delay(delay);
102        }
103    }
104
105    /// Get the current color delay.
106    pub fn color_delay(&self) -> Duration {
107        Duration::from_micros(self.inner.color_delay_micros.load(Ordering::SeqCst))
108    }
109
110    /// Request the session to stop.
111    pub fn stop(&self) -> Result<()> {
112        self.inner.stop_requested.store(true, Ordering::SeqCst);
113        if let Some(control) = self.inner.current.lock().unwrap().as_ref() {
114            let _ = control.stop();
115        }
116        Ok(())
117    }
118
119    /// Check if a stop has been requested.
120    pub fn is_stop_requested(&self) -> bool {
121        self.inner.stop_requested.load(Ordering::SeqCst)
122    }
123}
124
125// =============================================================================
126// Reconnecting Session
127// =============================================================================
128
129/// A reconnecting wrapper around the streaming API.
130///
131/// This helper reconnects to a device by ID and restarts streaming
132/// automatically when a disconnection occurs.
133///
134/// By default this uses `open_device()` internally. To use custom DAC
135/// backends, call [`with_discovery`](Self::with_discovery) with a factory
136/// function that creates a configured [`DacDiscovery`].
137///
138/// # Example
139///
140/// ```no_run
141/// use laser_dac::{ChunkRequest, ChunkResult, LaserPoint, ReconnectingSession, StreamConfig};
142/// use std::time::Duration;
143///
144/// let mut session = ReconnectingSession::new("my-device", StreamConfig::new(30_000))
145///     .with_max_retries(5)
146///     .with_backoff(Duration::from_secs(1))
147///     .on_disconnect(|err| eprintln!("Lost connection: {}", err))
148///     .on_reconnect(|info| println!("Reconnected to {}", info.name));
149///
150/// session.control().arm()?;
151///
152/// session.run(
153///     |req: &ChunkRequest, buffer: &mut [LaserPoint]| {
154///         let n = req.target_points;
155///         for i in 0..n {
156///             buffer[i] = LaserPoint::blanked(0.0, 0.0);
157///         }
158///         ChunkResult::Filled(n)
159///     },
160///     |err| eprintln!("Stream error: {}", err),
161/// )?;
162/// # Ok::<(), laser_dac::Error>(())
163/// ```
164pub struct ReconnectingSession {
165    device_id: String,
166    config: StreamConfig,
167    max_retries: Option<u32>,
168    backoff: Duration,
169    on_disconnect: Arc<Mutex<Option<DisconnectCallback>>>,
170    on_reconnect: Option<ReconnectCallback>,
171    control: SessionControl,
172    discovery_factory: Option<DiscoveryFactory>,
173}
174
175impl ReconnectingSession {
176    /// Create a new reconnecting session for a device ID.
177    pub fn new(device_id: impl Into<String>, config: StreamConfig) -> Self {
178        Self {
179            device_id: device_id.into(),
180            config,
181            max_retries: None,
182            backoff: Duration::from_secs(1),
183            on_disconnect: Arc::new(Mutex::new(None)),
184            on_reconnect: None,
185            control: SessionControl::new(),
186            discovery_factory: None,
187        }
188    }
189
190    /// Set the maximum number of reconnect attempts.
191    ///
192    /// `None` (default) retries forever. `Some(0)` disables retries.
193    pub fn with_max_retries(mut self, max_retries: u32) -> Self {
194        self.max_retries = Some(max_retries);
195        self
196    }
197
198    /// Set a fixed backoff duration between reconnect attempts.
199    pub fn with_backoff(mut self, backoff: Duration) -> Self {
200        self.backoff = backoff;
201        self
202    }
203
204    /// Register a callback invoked when a disconnect is detected.
205    pub fn on_disconnect<F>(self, f: F) -> Self
206    where
207        F: FnMut(&Error) + Send + 'static,
208    {
209        *self.on_disconnect.lock().unwrap() = Some(Box::new(f));
210        self
211    }
212
213    /// Register a callback invoked after a successful reconnect.
214    pub fn on_reconnect<F>(mut self, f: F) -> Self
215    where
216        F: FnMut(&DacInfo) + Send + 'static,
217    {
218        self.on_reconnect = Some(Box::new(f));
219        self
220    }
221
222    /// Use a custom discovery factory for opening devices.
223    ///
224    /// This allows using custom DAC backends by providing a factory function
225    /// that creates a [`DacDiscovery`] with external discoverers registered.
226    ///
227    /// # Example
228    ///
229    /// ```no_run
230    /// use laser_dac::{DacDiscovery, EnabledDacTypes, ReconnectingSession, StreamConfig};
231    ///
232    /// let session = ReconnectingSession::new("custom:my-device", StreamConfig::new(30_000))
233    ///     .with_discovery(|| {
234    ///         let mut discovery = DacDiscovery::new(EnabledDacTypes::all());
235    ///         // discovery.register(my_custom_discoverer);
236    ///         discovery
237    ///     });
238    /// ```
239    pub fn with_discovery<F>(mut self, factory: F) -> Self
240    where
241        F: Fn() -> DacDiscovery + Send + 'static,
242    {
243        self.discovery_factory = Some(Box::new(factory));
244        self
245    }
246
247    /// Returns a control handle for arm/disarm/stop.
248    pub fn control(&self) -> SessionControl {
249        self.control.clone()
250    }
251
252    /// Run the stream, automatically reconnecting on disconnection.
253    ///
254    /// Uses the zero-allocation callback API with buffer-driven timing.
255    pub fn run<F, E>(&mut self, producer: F, on_error: E) -> Result<RunExit>
256    where
257        F: FnMut(&ChunkRequest, &mut [LaserPoint]) -> ChunkResult + Send + 'static,
258        E: FnMut(Error) + Send + 'static,
259    {
260        let producer = Arc::new(Mutex::new(producer));
261        let on_error = Arc::new(Mutex::new(on_error));
262        let on_disconnect = Arc::clone(&self.on_disconnect);
263        let mut connected_once = false;
264        let mut retries = 0u32;
265
266        loop {
267            if self.control.is_stop_requested() {
268                return Ok(RunExit::Stopped);
269            }
270
271            if let Some(max) = self.max_retries {
272                if retries >= max {
273                    return Ok(RunExit::Disconnected);
274                }
275            }
276
277            log::info!(
278                "'{}' attempting open_device (retry {})",
279                self.device_id,
280                retries
281            );
282            let device = match self.open_device() {
283                Ok(device) => {
284                    log::info!("'{}' open_device succeeded", self.device_id);
285                    device
286                }
287                Err(err) => {
288                    log::warn!("'{}' open_device failed: {}", self.device_id, err);
289                    if !Self::is_retriable_connect_error(&err) {
290                        return Err(err);
291                    }
292                    {
293                        let mut handler = on_error.lock().unwrap();
294                        handler(err);
295                    }
296                    retries = retries.saturating_add(1);
297                    if let Some(max) = self.max_retries {
298                        if retries >= max {
299                            return Ok(RunExit::Disconnected);
300                        }
301                    }
302                    if self.sleep_with_stop(self.backoff) {
303                        return Ok(RunExit::Stopped);
304                    }
305                    continue;
306                }
307            };
308
309            let (stream, info) = match device.start_stream(self.config.clone()) {
310                Ok(result) => {
311                    log::info!("'{}' start_stream succeeded", self.device_id);
312                    result
313                }
314                Err(err) => {
315                    log::warn!("'{}' start_stream failed: {}", self.device_id, err);
316                    if !Self::is_retriable_connect_error(&err) {
317                        return Err(err);
318                    }
319                    {
320                        let mut handler = on_error.lock().unwrap();
321                        handler(err);
322                    }
323                    retries = retries.saturating_add(1);
324                    if let Some(max) = self.max_retries {
325                        if retries >= max {
326                            return Ok(RunExit::Disconnected);
327                        }
328                    }
329                    if self.sleep_with_stop(self.backoff) {
330                        return Ok(RunExit::Stopped);
331                    }
332                    continue;
333                }
334            };
335
336            if connected_once {
337                log::info!("'{}' reconnected, firing on_reconnect", self.device_id);
338                if let Some(cb) = self.on_reconnect.as_mut() {
339                    cb(&info);
340                }
341            } else {
342                log::info!("'{}' first connection established", self.device_id);
343            }
344            connected_once = true;
345            retries = 0;
346
347            self.control.attach(stream.control());
348
349            let producer_handle = Arc::clone(&producer);
350            let on_error_handle = Arc::clone(&on_error);
351            let on_disconnect_handle = Arc::clone(&on_disconnect);
352            let mut error_count: u64 = 0;
353            let exit = match stream.run(
354                move |req, buffer| {
355                    let mut handler = producer_handle.lock().unwrap();
356                    handler(req, buffer)
357                },
358                move |err| {
359                    error_count += 1;
360                    if error_count == 1 || error_count.is_multiple_of(10000) {
361                        log::warn!(
362                            "error callback (#{error_count}): {} (is_disconnected={})",
363                            err,
364                            err.is_disconnected()
365                        );
366                    }
367                    if err.is_disconnected() {
368                        if let Some(cb) = on_disconnect_handle.lock().unwrap().as_mut() {
369                            cb(&err);
370                        }
371                    }
372                    let mut handler = on_error_handle.lock().unwrap();
373                    handler(err)
374                },
375            ) {
376                Ok(exit) => {
377                    log::info!("'{}' stream.run() exited with: {:?}", self.device_id, exit);
378                    exit
379                }
380                Err(err) => {
381                    log::error!("'{}' stream.run() returned error: {}", self.device_id, err);
382                    self.control.detach();
383                    return Err(err);
384                }
385            };
386
387            self.control.detach();
388
389            match exit {
390                RunExit::Disconnected => {
391                    log::info!("'{}' stream disconnected, will retry", self.device_id);
392                    if let Some(max) = self.max_retries {
393                        if retries >= max {
394                            return Ok(RunExit::Disconnected);
395                        }
396                    }
397                    if self.sleep_with_stop(self.backoff) {
398                        return Ok(RunExit::Stopped);
399                    }
400                    continue;
401                }
402                other => return Ok(other),
403            }
404        }
405    }
406
407    fn open_device(&mut self) -> Result<Dac> {
408        if let Some(factory) = &self.discovery_factory {
409            let mut discovery = factory();
410            discovery.open_by_id(&self.device_id)
411        } else {
412            crate::open_device(&self.device_id)
413        }
414    }
415
416    fn is_retriable_connect_error(err: &Error) -> bool {
417        !matches!(err, Error::InvalidConfig(_) | Error::Stopped)
418    }
419
420    fn sleep_with_stop(&self, duration: Duration) -> bool {
421        const SLICE: Duration = Duration::from_millis(50);
422        let mut remaining = duration;
423        while remaining > Duration::ZERO {
424            if self.control.is_stop_requested() {
425                return true;
426            }
427            let slice = remaining.min(SLICE);
428            std::thread::sleep(slice);
429            remaining = remaining.saturating_sub(slice);
430        }
431        self.control.is_stop_requested()
432    }
433}