1use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
4use std::sync::mpsc;
5use std::sync::{Arc, Mutex};
6use std::thread::JoinHandle;
7use std::time::{Duration, Instant};
8
9use crate::backend::BackendKind;
10use crate::device::{DacInfo, OutputModel};
11use crate::error::{Error, Result};
12use crate::reconnect::ReconnectPolicy;
13use crate::stream::{ControlMsg, RunExit, StreamControl};
14
15use super::driver::{self, DriverInputs, SourceOwned};
16use super::engine::PresentationEngine;
17use super::slice_pipeline::SlicePipeline;
18use super::{default_transition, Frame, OutputResetReason, TransitionFn};
19
20pub struct FrameSessionConfig {
26 pub pps: u32,
28 pub transition_fn: TransitionFn,
30 pub startup_blank: std::time::Duration,
32 pub color_delay_points: usize,
38 pub reconnect: Option<crate::config::ReconnectConfig>,
43 pub idle_policy: crate::config::IdlePolicy,
49 pub output_filter: Option<Box<dyn super::OutputFilter>>,
51}
52
53impl FrameSessionConfig {
54 const DEFAULT_COLOR_DELAY: std::time::Duration = std::time::Duration::from_micros(150);
55
56 pub fn new(pps: u32) -> Self {
58 let color_delay_points =
59 (Self::DEFAULT_COLOR_DELAY.as_secs_f64() * pps as f64).ceil() as usize;
60 Self {
61 pps,
62 transition_fn: default_transition(pps),
63 startup_blank: std::time::Duration::from_millis(1),
64 color_delay_points,
65 idle_policy: crate::config::IdlePolicy::default(),
66 reconnect: None,
67 output_filter: None,
68 }
69 }
70
71 pub fn with_transition_fn(mut self, f: TransitionFn) -> Self {
73 self.transition_fn = f;
74 self
75 }
76
77 pub fn with_startup_blank(mut self, duration: std::time::Duration) -> Self {
79 self.startup_blank = duration;
80 self
81 }
82
83 pub fn with_color_delay_points(mut self, n: usize) -> Self {
85 self.color_delay_points = n;
86 self
87 }
88
89 pub fn with_reconnect(mut self, config: crate::config::ReconnectConfig) -> Self {
93 self.reconnect = Some(config);
94 self
95 }
96
97 pub fn with_idle_policy(mut self, policy: crate::config::IdlePolicy) -> Self {
101 self.idle_policy = policy;
102 self
103 }
104
105 pub fn with_output_filter(mut self, filter: Box<dyn super::OutputFilter>) -> Self {
107 self.output_filter = Some(filter);
108 self
109 }
110}
111
112#[derive(Clone)]
118pub struct FrameSessionMetrics {
119 inner: Arc<FrameSessionMetricsInner>,
120}
121
122struct FrameSessionMetricsInner {
123 connected: AtomicBool,
124 origin: Instant,
125 last_loop_activity_nanos: AtomicU64,
126 last_write_success_nanos: AtomicU64,
127}
128
129impl FrameSessionMetrics {
130 pub(crate) fn new(connected: bool) -> Self {
131 let metrics = Self {
132 inner: Arc::new(FrameSessionMetricsInner {
133 connected: AtomicBool::new(connected),
134 origin: Instant::now(),
135 last_loop_activity_nanos: AtomicU64::new(0),
136 last_write_success_nanos: AtomicU64::new(0),
137 }),
138 };
139 metrics.mark_loop_activity();
140 metrics
141 }
142
143 pub fn connected(&self) -> bool {
145 self.inner.connected.load(Ordering::SeqCst)
146 }
147
148 pub fn last_loop_activity(&self) -> Option<Instant> {
150 self.instant_from_nanos(self.inner.last_loop_activity_nanos.load(Ordering::SeqCst))
151 }
152
153 pub fn last_write_success(&self) -> Option<Instant> {
155 self.instant_from_nanos(self.inner.last_write_success_nanos.load(Ordering::SeqCst))
156 }
157
158 fn instant_from_nanos(&self, nanos: u64) -> Option<Instant> {
159 if nanos == 0 {
160 None
161 } else {
162 self.inner.origin.checked_add(Duration::from_nanos(nanos))
163 }
164 }
165
166 fn now_nanos(&self) -> u64 {
167 (self.inner.origin.elapsed().as_nanos().min(u64::MAX as u128) as u64).max(1)
168 }
169
170 pub(super) fn mark_loop_activity(&self) {
171 self.inner
172 .last_loop_activity_nanos
173 .store(self.now_nanos(), Ordering::SeqCst);
174 }
175
176 pub(super) fn mark_write_success(&self) {
177 let now = self.now_nanos();
178 self.inner
179 .last_loop_activity_nanos
180 .store(now, Ordering::SeqCst);
181 self.inner
182 .last_write_success_nanos
183 .store(now, Ordering::SeqCst);
184 }
185
186 pub(super) fn set_connected(&self, connected: bool) {
187 self.inner.connected.store(connected, Ordering::SeqCst);
188 self.mark_loop_activity();
189 }
190}
191
192struct MetricsDisconnectGuard(FrameSessionMetrics);
193
194impl Drop for MetricsDisconnectGuard {
195 fn drop(&mut self) {
196 self.0.set_connected(false);
197 }
198}
199
200pub struct FrameSession {
220 control: StreamControl,
221 thread: Option<JoinHandle<Result<RunExit>>>,
222 frame_slot: Arc<Mutex<Option<Frame>>>,
223 metrics: FrameSessionMetrics,
224}
225
226impl FrameSession {
227 pub(crate) fn start(
229 mut backend: BackendKind,
230 config: FrameSessionConfig,
231 reconnect_policy: Option<ReconnectPolicy>,
232 ) -> Result<Self> {
233 if !backend.is_connected() {
234 backend.connect()?;
235 }
236
237 let (control_tx, control_rx) = mpsc::channel();
238 let initial_color_delay = if config.color_delay_points > 0 {
239 Duration::from_secs_f64(config.color_delay_points as f64 / config.pps as f64)
240 } else {
241 Duration::ZERO
242 };
243 let control = StreamControl::new(control_tx, initial_color_delay, config.pps);
244 let frame_slot: Arc<Mutex<Option<Frame>>> = Arc::new(Mutex::new(None));
245 let metrics = FrameSessionMetrics::new(backend.is_connected());
246
247 let control_clone = control.clone();
248 let slot_clone = frame_slot.clone();
249 let metrics_clone = metrics.clone();
250
251 let thread = std::thread::spawn(move || {
252 let _disconnect_guard = MetricsDisconnectGuard(metrics_clone.clone());
253 Self::run_loop(
254 backend,
255 config,
256 control_clone,
257 control_rx,
258 slot_clone,
259 metrics_clone,
260 reconnect_policy,
261 )
262 });
263
264 Ok(Self {
265 control,
266 thread: Some(thread),
267 frame_slot,
268 metrics,
269 })
270 }
271
272 pub fn control(&self) -> StreamControl {
274 self.control.clone()
275 }
276
277 pub fn send_frame(&self, frame: Frame) {
280 *self.frame_slot.lock().unwrap() = Some(frame);
281 }
282
283 pub fn is_finished(&self) -> bool {
285 self.thread.as_ref().is_some_and(|h| h.is_finished())
286 }
287
288 pub fn metrics(&self) -> FrameSessionMetrics {
290 self.metrics.clone()
291 }
292
293 pub fn join(mut self) -> Result<RunExit> {
295 if let Some(handle) = self.thread.take() {
296 handle
297 .join()
298 .unwrap_or(Err(Error::disconnected("thread panicked")))
299 } else {
300 Ok(RunExit::Stopped)
301 }
302 }
303
304 fn run_loop(
309 mut backend: BackendKind,
310 config: FrameSessionConfig,
311 control: StreamControl,
312 control_rx: mpsc::Receiver<ControlMsg>,
313 frame_slot: Arc<Mutex<Option<Frame>>>,
314 metrics: FrameSessionMetrics,
315 reconnect_policy: Option<ReconnectPolicy>,
316 ) -> Result<RunExit> {
317 let FrameSessionConfig {
318 pps: _,
319 transition_fn,
320 startup_blank,
321 color_delay_points,
322 idle_policy,
323 output_filter,
324 reconnect: _,
325 } = config;
326
327 let mut engine = PresentationEngine::new(transition_fn);
328 if backend.is_frame_swap() {
329 engine.set_frame_capacity(backend.frame_capacity());
330 }
331
332 let initial_buf_capacity = match backend.caps().output_model {
335 OutputModel::UsbFrameSwap => backend.frame_capacity().unwrap_or(0),
336 OutputModel::NetworkFifo | OutputModel::UdpTimed => backend.caps().max_points_per_chunk,
337 };
338 let mut pipeline = SlicePipeline::with_startup_blank(
339 engine,
340 color_delay_points,
341 output_filter,
342 idle_policy,
343 initial_buf_capacity,
344 startup_blank,
345 );
346 pipeline.reset_output_filter(OutputResetReason::SessionStart);
347
348 let expected_frame_swap = backend.is_frame_swap();
349 let source: SourceOwned = if expected_frame_swap {
350 SourceOwned::Frame(Box::new(pipeline))
351 } else {
352 SourceOwned::Fifo(Box::new(pipeline))
353 };
354
355 let validator = Self::reconnect_validator(reconnect_policy.as_ref());
356 if !backend.is_connected() {
357 backend.connect()?;
358 }
359
360 driver::run(DriverInputs {
361 backend,
362 source,
363 control,
364 control_rx,
365 metrics,
366 reconnect_policy,
367 validator,
368 error_sink: Box::new(|_e: Error| { }),
369 target_buffer: Duration::from_millis(20),
370 drain_timeout: Duration::ZERO,
371 pending_frame: Some(frame_slot),
372 })
373 }
374
375 fn reconnect_validator(policy: Option<&ReconnectPolicy>) -> driver::ReconnectValidator {
376 let target_id = policy
377 .map(|p| p.target.device_id.clone())
378 .unwrap_or_default();
379 Box::new(move |info: &DacInfo, _backend: &BackendKind, pps: u32| {
380 if pps < info.caps.pps_min || pps > info.caps.pps_max {
381 log::error!(
382 "'{}' PPS {} outside new device range [{}, {}]",
383 target_id,
384 pps,
385 info.caps.pps_min,
386 info.caps.pps_max
387 );
388 return Err(RunExit::Disconnected);
389 }
390 Ok(())
391 })
392 }
393}
394
395impl Drop for FrameSession {
396 fn drop(&mut self) {
397 let _ = self.control.stop();
398 if let Some(handle) = self.thread.take() {
399 let _ = handle.join();
400 }
401 }
402}