1use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
4use std::sync::{Arc, Mutex};
5use std::time::Duration;
6
7use crate::backend::{Error, Result};
8use crate::discovery::DacDiscovery;
9use crate::stream::{Dac, StreamControl};
10use crate::types::{ChunkRequest, ChunkResult, DacInfo, LaserPoint, RunExit, StreamConfig};
11
12type DisconnectCallback = Box<dyn FnMut(&Error) + Send + 'static>;
13type ReconnectCallback = Box<dyn FnMut(&DacInfo) + Send + 'static>;
14type DiscoveryFactory = Box<dyn Fn() -> DacDiscovery + Send + 'static>;
15
16#[derive(Clone)]
25pub struct SessionControl {
26 inner: Arc<SessionControlInner>,
27}
28
29struct SessionControlInner {
30 armed: AtomicBool,
31 stop_requested: AtomicBool,
32 color_delay_micros: AtomicU64,
33 current: Mutex<Option<StreamControl>>,
34}
35
36impl SessionControl {
37 fn new() -> Self {
38 Self {
39 inner: Arc::new(SessionControlInner {
40 armed: AtomicBool::new(false),
41 stop_requested: AtomicBool::new(false),
42 color_delay_micros: AtomicU64::new(0),
43 current: Mutex::new(None),
44 }),
45 }
46 }
47
48 fn attach(&self, control: StreamControl) {
49 *self.inner.current.lock().unwrap() = Some(control.clone());
50
51 if self.inner.stop_requested.load(Ordering::SeqCst) {
52 let _ = control.stop();
53 return;
54 }
55
56 if self.inner.armed.load(Ordering::SeqCst) {
57 let _ = control.arm();
58 } else {
59 let _ = control.disarm();
60 }
61
62 let delay = self.inner.color_delay_micros.load(Ordering::SeqCst);
63 control.set_color_delay(Duration::from_micros(delay));
64 }
65
66 fn detach(&self) {
67 *self.inner.current.lock().unwrap() = None;
68 }
69
70 pub fn arm(&self) -> Result<()> {
72 self.inner.armed.store(true, Ordering::SeqCst);
73 if let Some(control) = self.inner.current.lock().unwrap().as_ref() {
74 let _ = control.arm();
75 }
76 Ok(())
77 }
78
79 pub fn disarm(&self) -> Result<()> {
81 self.inner.armed.store(false, Ordering::SeqCst);
82 if let Some(control) = self.inner.current.lock().unwrap().as_ref() {
83 let _ = control.disarm();
84 }
85 Ok(())
86 }
87
88 pub fn is_armed(&self) -> bool {
90 self.inner.armed.load(Ordering::SeqCst)
91 }
92
93 pub fn set_color_delay(&self, delay: Duration) {
97 self.inner
98 .color_delay_micros
99 .store(delay.as_micros() as u64, Ordering::SeqCst);
100 if let Some(control) = self.inner.current.lock().unwrap().as_ref() {
101 control.set_color_delay(delay);
102 }
103 }
104
105 pub fn color_delay(&self) -> Duration {
107 Duration::from_micros(self.inner.color_delay_micros.load(Ordering::SeqCst))
108 }
109
110 pub fn stop(&self) -> Result<()> {
112 self.inner.stop_requested.store(true, Ordering::SeqCst);
113 if let Some(control) = self.inner.current.lock().unwrap().as_ref() {
114 let _ = control.stop();
115 }
116 Ok(())
117 }
118
119 pub fn is_stop_requested(&self) -> bool {
121 self.inner.stop_requested.load(Ordering::SeqCst)
122 }
123}
124
125pub struct ReconnectingSession {
165 device_id: String,
166 config: StreamConfig,
167 max_retries: Option<u32>,
168 backoff: Duration,
169 on_disconnect: Arc<Mutex<Option<DisconnectCallback>>>,
170 on_reconnect: Option<ReconnectCallback>,
171 control: SessionControl,
172 discovery_factory: Option<DiscoveryFactory>,
173}
174
175impl ReconnectingSession {
176 pub fn new(device_id: impl Into<String>, config: StreamConfig) -> Self {
178 Self {
179 device_id: device_id.into(),
180 config,
181 max_retries: None,
182 backoff: Duration::from_secs(1),
183 on_disconnect: Arc::new(Mutex::new(None)),
184 on_reconnect: None,
185 control: SessionControl::new(),
186 discovery_factory: None,
187 }
188 }
189
190 pub fn with_max_retries(mut self, max_retries: u32) -> Self {
194 self.max_retries = Some(max_retries);
195 self
196 }
197
198 pub fn with_backoff(mut self, backoff: Duration) -> Self {
200 self.backoff = backoff;
201 self
202 }
203
204 pub fn on_disconnect<F>(self, f: F) -> Self
206 where
207 F: FnMut(&Error) + Send + 'static,
208 {
209 *self.on_disconnect.lock().unwrap() = Some(Box::new(f));
210 self
211 }
212
213 pub fn on_reconnect<F>(mut self, f: F) -> Self
215 where
216 F: FnMut(&DacInfo) + Send + 'static,
217 {
218 self.on_reconnect = Some(Box::new(f));
219 self
220 }
221
222 pub fn with_discovery<F>(mut self, factory: F) -> Self
240 where
241 F: Fn() -> DacDiscovery + Send + 'static,
242 {
243 self.discovery_factory = Some(Box::new(factory));
244 self
245 }
246
247 pub fn control(&self) -> SessionControl {
249 self.control.clone()
250 }
251
252 pub fn run<F, E>(&mut self, producer: F, on_error: E) -> Result<RunExit>
256 where
257 F: FnMut(&ChunkRequest, &mut [LaserPoint]) -> ChunkResult + Send + 'static,
258 E: FnMut(Error) + Send + 'static,
259 {
260 let producer = Arc::new(Mutex::new(producer));
261 let on_error = Arc::new(Mutex::new(on_error));
262 let on_disconnect = Arc::clone(&self.on_disconnect);
263 let mut connected_once = false;
264 let mut retries = 0u32;
265
266 loop {
267 if self.control.is_stop_requested() {
268 return Ok(RunExit::Stopped);
269 }
270
271 if let Some(max) = self.max_retries {
272 if retries >= max {
273 return Ok(RunExit::Disconnected);
274 }
275 }
276
277 log::info!(
278 "'{}' attempting open_device (retry {})",
279 self.device_id,
280 retries
281 );
282 let device = match self.open_device() {
283 Ok(device) => {
284 log::info!("'{}' open_device succeeded", self.device_id);
285 device
286 }
287 Err(err) => {
288 log::warn!("'{}' open_device failed: {}", self.device_id, err);
289 if !Self::is_retriable_connect_error(&err) {
290 return Err(err);
291 }
292 {
293 let mut handler = on_error.lock().unwrap();
294 handler(err);
295 }
296 retries = retries.saturating_add(1);
297 if let Some(max) = self.max_retries {
298 if retries >= max {
299 return Ok(RunExit::Disconnected);
300 }
301 }
302 if self.sleep_with_stop(self.backoff) {
303 return Ok(RunExit::Stopped);
304 }
305 continue;
306 }
307 };
308
309 let (stream, info) = match device.start_stream(self.config.clone()) {
310 Ok(result) => {
311 log::info!("'{}' start_stream succeeded", self.device_id);
312 result
313 }
314 Err(err) => {
315 log::warn!("'{}' start_stream failed: {}", self.device_id, err);
316 if !Self::is_retriable_connect_error(&err) {
317 return Err(err);
318 }
319 {
320 let mut handler = on_error.lock().unwrap();
321 handler(err);
322 }
323 retries = retries.saturating_add(1);
324 if let Some(max) = self.max_retries {
325 if retries >= max {
326 return Ok(RunExit::Disconnected);
327 }
328 }
329 if self.sleep_with_stop(self.backoff) {
330 return Ok(RunExit::Stopped);
331 }
332 continue;
333 }
334 };
335
336 if connected_once {
337 log::info!("'{}' reconnected, firing on_reconnect", self.device_id);
338 if let Some(cb) = self.on_reconnect.as_mut() {
339 cb(&info);
340 }
341 } else {
342 log::info!("'{}' first connection established", self.device_id);
343 }
344 connected_once = true;
345 retries = 0;
346
347 self.control.attach(stream.control());
348
349 let producer_handle = Arc::clone(&producer);
350 let on_error_handle = Arc::clone(&on_error);
351 let on_disconnect_handle = Arc::clone(&on_disconnect);
352 let mut error_count: u64 = 0;
353 let exit = match stream.run(
354 move |req, buffer| {
355 let mut handler = producer_handle.lock().unwrap();
356 handler(req, buffer)
357 },
358 move |err| {
359 error_count += 1;
360 if error_count == 1 || error_count.is_multiple_of(10000) {
361 log::warn!(
362 "error callback (#{error_count}): {} (is_disconnected={})",
363 err,
364 err.is_disconnected()
365 );
366 }
367 if err.is_disconnected() {
368 if let Some(cb) = on_disconnect_handle.lock().unwrap().as_mut() {
369 cb(&err);
370 }
371 }
372 let mut handler = on_error_handle.lock().unwrap();
373 handler(err)
374 },
375 ) {
376 Ok(exit) => {
377 log::info!("'{}' stream.run() exited with: {:?}", self.device_id, exit);
378 exit
379 }
380 Err(err) => {
381 log::error!("'{}' stream.run() returned error: {}", self.device_id, err);
382 self.control.detach();
383 return Err(err);
384 }
385 };
386
387 self.control.detach();
388
389 match exit {
390 RunExit::Disconnected => {
391 log::info!("'{}' stream disconnected, will retry", self.device_id);
392 if let Some(max) = self.max_retries {
393 if retries >= max {
394 return Ok(RunExit::Disconnected);
395 }
396 }
397 if self.sleep_with_stop(self.backoff) {
398 return Ok(RunExit::Stopped);
399 }
400 continue;
401 }
402 other => return Ok(other),
403 }
404 }
405 }
406
407 fn open_device(&mut self) -> Result<Dac> {
408 if let Some(factory) = &self.discovery_factory {
409 let mut discovery = factory();
410 discovery.open_by_id(&self.device_id)
411 } else {
412 crate::open_device(&self.device_id)
413 }
414 }
415
416 fn is_retriable_connect_error(err: &Error) -> bool {
417 !matches!(err, Error::InvalidConfig(_) | Error::Stopped)
418 }
419
420 fn sleep_with_stop(&self, duration: Duration) -> bool {
421 const SLICE: Duration = Duration::from_millis(50);
422 let mut remaining = duration;
423 while remaining > Duration::ZERO {
424 if self.control.is_stop_requested() {
425 return true;
426 }
427 let slice = remaining.min(SLICE);
428 std::thread::sleep(slice);
429 remaining = remaining.saturating_sub(slice);
430 }
431 self.control.is_stop_requested()
432 }
433}