use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
use std::sync::mpsc;
use std::sync::{Arc, Mutex};
use std::thread::JoinHandle;
use std::time::{Duration, Instant};
use crate::backend::BackendKind;
use crate::device::{DacInfo, OutputModel};
use crate::error::{Error, Result};
use crate::reconnect::ReconnectPolicy;
use crate::stream::{ControlMsg, RunExit, StreamControl};
use super::driver::{self, DriverInputs, SourceOwned};
use super::engine::PresentationEngine;
use super::slice_pipeline::SlicePipeline;
use super::{default_transition, Frame, OutputResetReason, TransitionFn};
pub struct FrameSessionConfig {
pub pps: u32,
pub transition_fn: TransitionFn,
pub startup_blank: std::time::Duration,
pub color_delay_points: usize,
pub reconnect: Option<crate::config::ReconnectConfig>,
pub idle_policy: crate::config::IdlePolicy,
pub output_filter: Option<Box<dyn super::OutputFilter>>,
}
impl FrameSessionConfig {
const DEFAULT_COLOR_DELAY: std::time::Duration = std::time::Duration::from_micros(150);
pub fn new(pps: u32) -> Self {
let color_delay_points =
(Self::DEFAULT_COLOR_DELAY.as_secs_f64() * pps as f64).ceil() as usize;
Self {
pps,
transition_fn: default_transition(pps),
startup_blank: std::time::Duration::from_millis(1),
color_delay_points,
idle_policy: crate::config::IdlePolicy::default(),
reconnect: None,
output_filter: None,
}
}
pub fn with_transition_fn(mut self, f: TransitionFn) -> Self {
self.transition_fn = f;
self
}
pub fn with_startup_blank(mut self, duration: std::time::Duration) -> Self {
self.startup_blank = duration;
self
}
pub fn with_color_delay_points(mut self, n: usize) -> Self {
self.color_delay_points = n;
self
}
pub fn with_reconnect(mut self, config: crate::config::ReconnectConfig) -> Self {
self.reconnect = Some(config);
self
}
pub fn with_idle_policy(mut self, policy: crate::config::IdlePolicy) -> Self {
self.idle_policy = policy;
self
}
pub fn with_output_filter(mut self, filter: Box<dyn super::OutputFilter>) -> Self {
self.output_filter = Some(filter);
self
}
}
#[derive(Clone)]
pub struct FrameSessionMetrics {
inner: Arc<FrameSessionMetricsInner>,
}
struct FrameSessionMetricsInner {
connected: AtomicBool,
origin: Instant,
last_loop_activity_nanos: AtomicU64,
last_write_success_nanos: AtomicU64,
}
impl FrameSessionMetrics {
pub(crate) fn new(connected: bool) -> Self {
let metrics = Self {
inner: Arc::new(FrameSessionMetricsInner {
connected: AtomicBool::new(connected),
origin: Instant::now(),
last_loop_activity_nanos: AtomicU64::new(0),
last_write_success_nanos: AtomicU64::new(0),
}),
};
metrics.mark_loop_activity();
metrics
}
pub fn connected(&self) -> bool {
self.inner.connected.load(Ordering::SeqCst)
}
pub fn last_loop_activity(&self) -> Option<Instant> {
self.instant_from_nanos(self.inner.last_loop_activity_nanos.load(Ordering::SeqCst))
}
pub fn last_write_success(&self) -> Option<Instant> {
self.instant_from_nanos(self.inner.last_write_success_nanos.load(Ordering::SeqCst))
}
fn instant_from_nanos(&self, nanos: u64) -> Option<Instant> {
if nanos == 0 {
None
} else {
self.inner.origin.checked_add(Duration::from_nanos(nanos))
}
}
fn now_nanos(&self) -> u64 {
(self.inner.origin.elapsed().as_nanos().min(u64::MAX as u128) as u64).max(1)
}
pub(super) fn mark_loop_activity(&self) {
self.inner
.last_loop_activity_nanos
.store(self.now_nanos(), Ordering::SeqCst);
}
pub(super) fn mark_write_success(&self) {
let now = self.now_nanos();
self.inner
.last_loop_activity_nanos
.store(now, Ordering::SeqCst);
self.inner
.last_write_success_nanos
.store(now, Ordering::SeqCst);
}
pub(super) fn set_connected(&self, connected: bool) {
self.inner.connected.store(connected, Ordering::SeqCst);
self.mark_loop_activity();
}
}
struct MetricsDisconnectGuard(FrameSessionMetrics);
impl Drop for MetricsDisconnectGuard {
fn drop(&mut self) {
self.0.set_connected(false);
}
}
pub struct FrameSession {
control: StreamControl,
thread: Option<JoinHandle<Result<RunExit>>>,
frame_slot: Arc<Mutex<Option<Frame>>>,
metrics: FrameSessionMetrics,
}
impl FrameSession {
pub(crate) fn start(
mut backend: BackendKind,
config: FrameSessionConfig,
reconnect_policy: Option<ReconnectPolicy>,
) -> Result<Self> {
if !backend.is_connected() {
backend.connect()?;
}
let (control_tx, control_rx) = mpsc::channel();
let initial_color_delay = if config.color_delay_points > 0 {
Duration::from_secs_f64(config.color_delay_points as f64 / config.pps as f64)
} else {
Duration::ZERO
};
let control = StreamControl::new(control_tx, initial_color_delay, config.pps);
let frame_slot: Arc<Mutex<Option<Frame>>> = Arc::new(Mutex::new(None));
let metrics = FrameSessionMetrics::new(backend.is_connected());
let control_clone = control.clone();
let slot_clone = frame_slot.clone();
let metrics_clone = metrics.clone();
let thread = std::thread::spawn(move || {
let _disconnect_guard = MetricsDisconnectGuard(metrics_clone.clone());
Self::run_loop(
backend,
config,
control_clone,
control_rx,
slot_clone,
metrics_clone,
reconnect_policy,
)
});
Ok(Self {
control,
thread: Some(thread),
frame_slot,
metrics,
})
}
pub fn control(&self) -> StreamControl {
self.control.clone()
}
pub fn send_frame(&self, frame: Frame) {
*self.frame_slot.lock().unwrap() = Some(frame);
}
pub fn is_finished(&self) -> bool {
self.thread.as_ref().is_some_and(|h| h.is_finished())
}
pub fn metrics(&self) -> FrameSessionMetrics {
self.metrics.clone()
}
pub fn join(mut self) -> Result<RunExit> {
if let Some(handle) = self.thread.take() {
handle
.join()
.unwrap_or(Err(Error::disconnected("thread panicked")))
} else {
Ok(RunExit::Stopped)
}
}
fn run_loop(
mut backend: BackendKind,
config: FrameSessionConfig,
control: StreamControl,
control_rx: mpsc::Receiver<ControlMsg>,
frame_slot: Arc<Mutex<Option<Frame>>>,
metrics: FrameSessionMetrics,
reconnect_policy: Option<ReconnectPolicy>,
) -> Result<RunExit> {
let FrameSessionConfig {
pps: _,
transition_fn,
startup_blank,
color_delay_points,
idle_policy,
output_filter,
reconnect: _,
} = config;
let mut engine = PresentationEngine::new(transition_fn);
if backend.is_frame_swap() {
engine.set_frame_capacity(backend.frame_capacity());
}
let initial_buf_capacity = match backend.caps().output_model {
OutputModel::UsbFrameSwap => backend.frame_capacity().unwrap_or(0),
OutputModel::NetworkFifo | OutputModel::UdpTimed => backend.caps().max_points_per_chunk,
};
let mut pipeline = SlicePipeline::with_startup_blank(
engine,
color_delay_points,
output_filter,
idle_policy,
initial_buf_capacity,
startup_blank,
);
pipeline.reset_output_filter(OutputResetReason::SessionStart);
let expected_frame_swap = backend.is_frame_swap();
let source: SourceOwned = if expected_frame_swap {
SourceOwned::Frame(Box::new(pipeline))
} else {
SourceOwned::Fifo(Box::new(pipeline))
};
let validator = Self::reconnect_validator(reconnect_policy.as_ref());
if !backend.is_connected() {
backend.connect()?;
}
driver::run(DriverInputs {
backend,
source,
control,
control_rx,
metrics,
reconnect_policy,
validator,
error_sink: Box::new(|_e: Error| { }),
target_buffer: Duration::from_millis(20),
drain_timeout: Duration::ZERO,
pending_frame: Some(frame_slot),
})
}
fn reconnect_validator(policy: Option<&ReconnectPolicy>) -> driver::ReconnectValidator {
let target_id = policy
.map(|p| p.target.device_id.clone())
.unwrap_or_default();
Box::new(move |info: &DacInfo, _backend: &BackendKind, pps: u32| {
if pps < info.caps.pps_min || pps > info.caps.pps_max {
log::error!(
"'{}' PPS {} outside new device range [{}, {}]",
target_id,
pps,
info.caps.pps_min,
info.caps.pps_max
);
return Err(RunExit::Disconnected);
}
Ok(())
})
}
}
impl Drop for FrameSession {
fn drop(&mut self) {
let _ = self.control.stop();
if let Some(handle) = self.thread.take() {
let _ = handle.join();
}
}
}