use std::collections::VecDeque;
use std::sync::atomic::{AtomicBool, AtomicU32, AtomicU64, Ordering};
use std::sync::mpsc::{self, Receiver, Sender, TryRecvError};
use std::sync::{Arc, Mutex};
use std::time::Duration;
use crate::backend::{BackendKind, Error, Result, WriteOutcome};
use crate::discovery::DacDiscovery;
use crate::reconnect::{reconnect_backend_with_retry, ReconnectPolicy, ReconnectTarget};
use crate::scheduler;
use crate::types::{
ChunkRequest, ChunkResult, DacCapabilities, DacInfo, DacType, IdlePolicy, LaserPoint,
OutputModel, RunExit, StreamConfig, StreamInstant, StreamStats, StreamStatus,
};
#[derive(Debug, Clone, Copy)]
pub(crate) enum ControlMsg {
Arm,
Disarm,
Stop,
}
#[derive(Clone)]
pub struct StreamControl {
inner: Arc<StreamControlInner>,
}
struct StreamControlInner {
armed: AtomicBool,
stop_requested: AtomicBool,
control_tx: Mutex<Sender<ControlMsg>>,
color_delay_micros: AtomicU64,
pps: AtomicU32,
}
impl StreamControl {
pub(crate) fn new(control_tx: Sender<ControlMsg>, color_delay: Duration, pps: u32) -> Self {
Self {
inner: Arc::new(StreamControlInner {
armed: AtomicBool::new(false),
stop_requested: AtomicBool::new(false),
control_tx: Mutex::new(control_tx),
color_delay_micros: AtomicU64::new(color_delay.as_micros() as u64),
pps: AtomicU32::new(pps),
}),
}
}
pub fn arm(&self) -> Result<()> {
self.inner.armed.store(true, Ordering::SeqCst);
if let Ok(tx) = self.inner.control_tx.lock() {
let _ = tx.send(ControlMsg::Arm);
}
Ok(())
}
pub fn disarm(&self) -> Result<()> {
self.inner.armed.store(false, Ordering::SeqCst);
if let Ok(tx) = self.inner.control_tx.lock() {
let _ = tx.send(ControlMsg::Disarm);
}
Ok(())
}
pub fn is_armed(&self) -> bool {
self.inner.armed.load(Ordering::SeqCst)
}
pub fn set_color_delay(&self, delay: Duration) {
self.inner
.color_delay_micros
.store(delay.as_micros() as u64, Ordering::SeqCst);
}
pub fn color_delay(&self) -> Duration {
Duration::from_micros(self.inner.color_delay_micros.load(Ordering::SeqCst))
}
pub fn set_pps(&self, pps: u32) {
self.inner.pps.store(pps, Ordering::SeqCst);
}
pub fn pps(&self) -> u32 {
self.inner.pps.load(Ordering::SeqCst)
}
pub fn stop(&self) -> Result<()> {
self.inner.stop_requested.store(true, Ordering::SeqCst);
if let Ok(tx) = self.inner.control_tx.lock() {
let _ = tx.send(ControlMsg::Stop);
}
Ok(())
}
pub fn is_stop_requested(&self) -> bool {
self.inner.stop_requested.load(Ordering::SeqCst)
}
}
struct StreamState {
current_instant: StreamInstant,
scheduled_ahead: u64,
fractional_consumed: f64,
chunk_buffer: Vec<LaserPoint>,
last_chunk: Vec<LaserPoint>,
last_chunk_len: usize,
color_delay_line: VecDeque<(u16, u16, u16, u16)>,
startup_blank_remaining: usize,
startup_blank_points: usize,
target_buffer_secs: f64,
min_buffer_secs: f64,
target_buffer_points: u64,
stats: StreamStats,
last_armed: bool,
shutter_open: bool,
}
impl StreamState {
fn new(
max_points_per_chunk: usize,
startup_blank_points: usize,
config: &StreamConfig,
) -> Self {
let pps = config.pps as f64;
let target_buffer_secs = config.target_buffer.as_secs_f64();
let min_buffer_secs = config.min_buffer.as_secs_f64();
Self {
current_instant: StreamInstant::new(0),
scheduled_ahead: 0,
fractional_consumed: 0.0,
chunk_buffer: vec![LaserPoint::default(); max_points_per_chunk],
last_chunk: vec![LaserPoint::default(); max_points_per_chunk],
last_chunk_len: 0,
color_delay_line: VecDeque::new(),
startup_blank_remaining: 0,
startup_blank_points,
target_buffer_secs,
min_buffer_secs,
target_buffer_points: (target_buffer_secs * pps) as u64,
stats: StreamStats::default(),
last_armed: false,
shutter_open: false,
}
}
}
pub struct Stream {
info: DacInfo,
backend: Option<BackendKind>,
config: StreamConfig,
control: StreamControl,
control_rx: Receiver<ControlMsg>,
state: StreamState,
pub(crate) reconnect_policy: Option<ReconnectPolicy>,
pub(crate) reconnect_target: Option<ReconnectTarget>,
}
impl Stream {
fn duration_micros_to_points(micros: u64, pps: u32) -> usize {
if micros == 0 {
0
} else {
(micros as f64 * pps as f64 / 1_000_000.0).ceil() as usize
}
}
fn udp_timed_sleep_slice(remaining: Duration) -> Option<Duration> {
const UDP_TIMED_SLEEP_SLICE: Duration = Duration::from_millis(1);
const UDP_TIMED_BUSY_WAIT_THRESHOLD: Duration = Duration::from_micros(500);
if remaining <= UDP_TIMED_BUSY_WAIT_THRESHOLD {
None
} else {
Some(
remaining
.saturating_sub(UDP_TIMED_BUSY_WAIT_THRESHOLD)
.min(UDP_TIMED_SLEEP_SLICE),
)
}
}
pub(crate) fn with_backend(info: DacInfo, backend: BackendKind, config: StreamConfig) -> Self {
let (control_tx, control_rx) = mpsc::channel();
let max_points = info.caps.max_points_per_chunk;
let startup_blank_points =
Self::duration_micros_to_points(config.startup_blank.as_micros() as u64, config.pps);
let color_delay = config.color_delay;
let pps = config.pps;
let state = StreamState::new(max_points, startup_blank_points, &config);
Self {
info,
backend: Some(backend),
config,
control: StreamControl::new(control_tx, color_delay, pps),
control_rx,
state,
reconnect_policy: None,
reconnect_target: None,
}
}
fn scheduler_target_buffer_points(&self) -> u64 {
if self.info.caps.output_model == OutputModel::UdpTimed {
self.info.caps.max_points_per_chunk as u64
} else {
self.state.target_buffer_points
}
}
pub fn info(&self) -> &DacInfo {
&self.info
}
pub fn config(&self) -> &StreamConfig {
&self.config
}
pub fn control(&self) -> StreamControl {
self.control.clone()
}
pub fn status(&self) -> Result<StreamStatus> {
let device_queued_points = self.backend.as_ref().and_then(|b| b.queued_points());
Ok(StreamStatus {
connected: self.backend.as_ref().is_some_and(|b| b.is_connected()),
scheduled_ahead_points: self.state.scheduled_ahead,
device_queued_points,
stats: Some(self.state.stats.clone()),
})
}
fn handle_shutter_transition(&mut self, is_armed: bool) {
let was_armed = self.state.last_armed;
self.state.last_armed = is_armed;
if was_armed && !is_armed {
self.state.color_delay_line.clear();
if self.state.shutter_open {
if let Some(backend) = &mut self.backend {
let _ = backend.set_shutter(false); }
self.state.shutter_open = false;
}
} else if !was_armed && is_armed {
let delay_micros = self.control.inner.color_delay_micros.load(Ordering::SeqCst);
let delay_points = Self::duration_micros_to_points(delay_micros, self.config.pps);
self.state.color_delay_line.clear();
for _ in 0..delay_points {
self.state.color_delay_line.push_back((0, 0, 0, 0));
}
self.state.startup_blank_remaining = self.state.startup_blank_points;
if !self.state.shutter_open {
if let Some(backend) = &mut self.backend {
let _ = backend.set_shutter(true); }
self.state.shutter_open = true;
}
}
}
fn shutdown_backend(&mut self) {
let _ = self.control.disarm();
let _ = self.control.stop();
if let Some(b) = &mut self.backend {
let _ = b.set_shutter(false);
let _ = b.stop();
}
}
pub fn stop(&mut self) -> Result<()> {
self.shutdown_backend();
if let Some(b) = &mut self.backend {
b.disconnect()?;
}
Ok(())
}
pub fn into_dac(mut self) -> (Dac, StreamStats) {
self.shutdown_backend();
let backend = self.backend.take();
let stats = self.state.stats.clone();
let reconnect_target = self
.reconnect_target
.take()
.or_else(|| self.reconnect_policy.take().map(|p| p.target));
let dac = Dac {
info: self.info.clone(),
backend,
reconnect_target,
};
(dac, stats)
}
fn handle_reconnect(
&mut self,
last_iteration: &mut std::time::Instant,
) -> std::result::Result<(), RunExit> {
let policy = self.reconnect_policy.as_ref().unwrap();
let (info, new_backend) = reconnect_backend_with_retry(
policy,
|| self.control.is_stop_requested(),
|info, new_backend| {
if new_backend.is_frame_swap() {
log::error!(
"'{}' reconnected device is frame-swap, incompatible with streaming",
policy.target.device_id
);
return Err(RunExit::Disconnected);
}
if Dac::validate_pps(&info.caps, self.config.pps).is_err() {
log::error!(
"'{}' config invalid for new device",
policy.target.device_id
);
return Err(RunExit::Disconnected);
}
Ok(())
},
|| {},
)?;
self.backend = Some(new_backend);
self.info = info;
self.reset_state_for_reconnect(last_iteration);
let policy = self.reconnect_policy.as_ref().unwrap();
if let Some(cb) = policy.on_reconnect.lock().unwrap().as_mut() {
cb(&self.info);
}
Ok(())
}
fn reset_state_for_reconnect(&mut self, last_iteration: &mut std::time::Instant) {
let max_points = self.info.caps.max_points_per_chunk;
self.state
.chunk_buffer
.resize(max_points, LaserPoint::default());
self.state
.last_chunk
.resize(max_points, LaserPoint::default());
self.state.last_chunk_len = 0;
self.state.scheduled_ahead = 0;
self.state.fractional_consumed = 0.0;
self.state.shutter_open = false;
self.state.last_armed = false;
self.state.color_delay_line.clear();
self.state.startup_blank_remaining = 0;
self.state.stats.reconnect_count += 1;
*last_iteration = std::time::Instant::now();
}
pub fn run<F, E>(mut self, mut producer: F, mut on_error: E) -> Result<RunExit>
where
F: FnMut(&ChunkRequest, &mut [LaserPoint]) -> ChunkResult + Send + 'static,
E: FnMut(Error) + Send + 'static,
{
use std::time::Instant;
let mut max_points = self.info.caps.max_points_per_chunk;
let mut last_iteration = Instant::now();
let mut last_stats_log = Instant::now();
loop {
if self.control.is_stop_requested() {
return Ok(RunExit::Stopped);
}
self.config.pps = self.control.pps();
let pps = self.config.pps as f64;
self.state.target_buffer_points = (self.state.target_buffer_secs * pps) as u64;
self.state.startup_blank_points = Self::duration_micros_to_points(
self.config.startup_blank.as_micros() as u64,
self.config.pps,
);
let now = Instant::now();
scheduler::advance_scheduled_ahead(
&mut self.state.scheduled_ahead,
&mut self.state.fractional_consumed,
&mut last_iteration,
now,
pps,
);
let buffered = self.estimate_buffer_points();
let target_points = self.scheduler_target_buffer_points();
if now.duration_since(last_stats_log) >= Duration::from_millis(500) {
let lead_ms = self.state.scheduled_ahead as f64 / pps * 1000.0;
let target_ms = target_points as f64 / pps * 1000.0;
log::debug!(
"scheduler: lead={:.1}ms target={:.1}ms ahead={} buffered={} target_pts={}",
lead_ms,
target_ms,
self.state.scheduled_ahead,
buffered,
target_points,
);
last_stats_log = now;
}
if buffered > target_points {
let excess_points = buffered - target_points;
let sleep_time = Duration::from_secs_f64(excess_points as f64 / pps);
let stop = if self.info.caps.output_model == OutputModel::UdpTimed {
self.sleep_until_with_control_check(Instant::now() + sleep_time)?
} else {
self.sleep_with_control_check(sleep_time)?
};
if stop {
return Ok(RunExit::Stopped);
}
continue; }
let disconnected = match &self.backend {
Some(b) => !b.is_connected(),
None => true,
};
if disconnected {
if self.reconnect_policy.is_some() {
match self.handle_reconnect(&mut last_iteration) {
Ok(()) => {
max_points = self.info.caps.max_points_per_chunk;
continue;
}
Err(exit) => return Ok(exit),
}
}
log::warn!("backend disconnected, exiting");
on_error(Error::disconnected("backend disconnected"));
return Ok(RunExit::Disconnected);
}
if self.process_control_messages() {
return Ok(RunExit::Stopped);
}
let req = self.build_fill_request(max_points, buffered);
let buffer = &mut self.state.chunk_buffer[..max_points];
let result = producer(&req, buffer);
match result {
ChunkResult::Filled(n) => {
let n = n.min(max_points);
if n == 0 && req.target_points > 0 {
self.handle_underrun(&req)?;
continue;
}
if n > 0 {
match self.write_fill_points(n, &mut on_error) {
Ok(()) => {}
Err(e) if e.is_disconnected() && self.reconnect_policy.is_some() => {
match self.handle_reconnect(&mut last_iteration) {
Ok(()) => {
max_points = self.info.caps.max_points_per_chunk;
continue;
}
Err(exit) => return Ok(exit),
}
}
Err(e) => return Err(e),
}
}
}
ChunkResult::Starved => {
self.handle_underrun(&req)?;
}
ChunkResult::End => {
self.drain_and_blank();
return Ok(RunExit::ProducerEnded);
}
}
}
}
fn sleep_with_control_check(&mut self, duration: Duration) -> Result<bool> {
const SLEEP_SLICE: Duration = Duration::from_millis(2);
let mut remaining = duration;
while remaining > Duration::ZERO {
let slice = remaining.min(SLEEP_SLICE);
std::thread::sleep(slice);
remaining = remaining.saturating_sub(slice);
if self.process_control_messages() {
return Ok(true);
}
}
Ok(false)
}
fn sleep_until_with_control_check(&mut self, deadline: std::time::Instant) -> Result<bool> {
loop {
let now = std::time::Instant::now();
if now >= deadline {
return Ok(false);
}
let remaining = deadline.duration_since(now);
if let Some(slice) = Self::udp_timed_sleep_slice(remaining) {
std::thread::sleep(slice);
} else {
std::thread::yield_now();
}
if self.process_control_messages() {
return Ok(true);
}
}
}
fn write_fill_points<E>(&mut self, n: usize, on_error: &mut E) -> Result<()>
where
E: FnMut(Error),
{
let is_armed = self.control.is_armed();
let pps = self.config.pps;
self.handle_shutter_transition(is_armed);
if !is_armed {
let park = match &self.config.idle_policy {
IdlePolicy::Park { x, y } => LaserPoint::blanked(*x, *y),
_ => LaserPoint::blanked(0.0, 0.0),
};
self.state.chunk_buffer[..n].fill(park);
}
if is_armed && self.state.startup_blank_remaining > 0 {
let blank_count = n.min(self.state.startup_blank_remaining);
for p in &mut self.state.chunk_buffer[..blank_count] {
p.r = 0;
p.g = 0;
p.b = 0;
p.intensity = 0;
}
self.state.startup_blank_remaining -= blank_count;
}
let delay_micros = self.control.inner.color_delay_micros.load(Ordering::SeqCst);
let color_delay_points = Self::duration_micros_to_points(delay_micros, self.config.pps);
if color_delay_points > 0 {
self.state
.color_delay_line
.resize(color_delay_points, (0, 0, 0, 0));
for p in &mut self.state.chunk_buffer[..n] {
self.state
.color_delay_line
.push_back((p.r, p.g, p.b, p.intensity));
let (r, g, b, i) = self.state.color_delay_line.pop_front().unwrap();
p.r = r;
p.g = g;
p.b = b;
p.intensity = i;
}
} else if !self.state.color_delay_line.is_empty() {
self.state.color_delay_line.clear();
}
loop {
let backend = match self.backend.as_mut() {
Some(b) => b,
None => return Err(Error::disconnected("no backend")),
};
match backend.try_write(pps, &self.state.chunk_buffer[..n]) {
Ok(WriteOutcome::Written) => {
self.record_write(n, is_armed);
return Ok(());
}
Ok(WriteOutcome::WouldBlock) => {
}
Err(e) if e.is_stopped() => {
return Err(Error::Stopped);
}
Err(e) if e.is_disconnected() => {
log::warn!("write got Disconnected error, exiting stream: {e}");
on_error(Error::disconnected("backend disconnected"));
return Err(e);
}
Err(e) => {
log::warn!("write error, disconnecting backend: {e}");
let _ = backend.disconnect();
on_error(e);
return Ok(());
}
}
std::thread::yield_now();
if self.process_control_messages() {
return Err(Error::Stopped);
}
std::thread::sleep(Duration::from_micros(100));
}
}
fn handle_underrun(&mut self, req: &ChunkRequest) -> Result<()> {
self.state.stats.underrun_count += 1;
let is_armed = self.control.is_armed();
self.handle_shutter_transition(is_armed);
let n_points = req.target_points.max(1);
if is_armed {
match &self.config.idle_policy {
IdlePolicy::Stop => {
self.control.stop()?;
return Err(Error::Stopped);
}
IdlePolicy::RepeatLast if self.state.last_chunk_len > 0 => {
for i in 0..n_points {
self.state.chunk_buffer[i] =
self.state.last_chunk[i % self.state.last_chunk_len];
}
}
IdlePolicy::Park { x, y } => {
let park = LaserPoint::blanked(*x, *y);
self.state.chunk_buffer[..n_points].fill(park);
}
_ => {
self.state.chunk_buffer[..n_points].fill(LaserPoint::blanked(0.0, 0.0));
}
}
} else {
let park = match &self.config.idle_policy {
IdlePolicy::Park { x, y } => LaserPoint::blanked(*x, *y),
_ => LaserPoint::blanked(0.0, 0.0),
};
self.state.chunk_buffer[..n_points].fill(park);
}
if let Some(backend) = &mut self.backend {
match backend.try_write(self.config.pps, &self.state.chunk_buffer[..n_points]) {
Ok(WriteOutcome::Written) => {
self.record_write(n_points, is_armed);
}
Ok(WriteOutcome::WouldBlock) => {
}
Err(_) => {
}
}
}
Ok(())
}
fn record_write(&mut self, n: usize, is_armed: bool) {
if is_armed {
debug_assert!(
n <= self.state.last_chunk.len(),
"n ({}) exceeds last_chunk capacity ({})",
n,
self.state.last_chunk.len()
);
self.state.last_chunk[..n].copy_from_slice(&self.state.chunk_buffer[..n]);
self.state.last_chunk_len = n;
}
self.state.current_instant += n as u64;
self.state.scheduled_ahead += n as u64;
self.state.stats.chunks_written += 1;
self.state.stats.points_written += n as u64;
}
fn process_control_messages(&mut self) -> bool {
loop {
match self.control_rx.try_recv() {
Ok(ControlMsg::Arm) => {
if !self.state.shutter_open {
if let Some(backend) = &mut self.backend {
let _ = backend.set_shutter(true);
}
self.state.shutter_open = true;
}
}
Ok(ControlMsg::Disarm) => {
if self.state.shutter_open {
if let Some(backend) = &mut self.backend {
let _ = backend.set_shutter(false);
}
self.state.shutter_open = false;
}
}
Ok(ControlMsg::Stop) => {
return true;
}
Err(TryRecvError::Empty) => break,
Err(TryRecvError::Disconnected) => break,
}
}
false
}
fn estimate_buffer_points(&self) -> u64 {
scheduler::conservative_buffered_points(
self.state.scheduled_ahead,
self.backend.as_ref().and_then(|b| b.queued_points()),
)
}
fn build_fill_request(&self, max_points: usize, buffered_points: u64) -> ChunkRequest {
let pps = self.config.pps;
let pps_f64 = pps as f64;
let buffered_secs = buffered_points as f64 / pps_f64;
let buffered = Duration::from_secs_f64(buffered_secs);
let start = self.state.current_instant;
let device_queued_points = self.backend.as_ref().and_then(|b| b.queued_points());
let (min_points, target_points) = if self.info.caps.output_model == OutputModel::UdpTimed {
(max_points, max_points)
} else {
let deficit_target = (self.state.target_buffer_secs - buffered_secs).max(0.0);
let target_points = ((deficit_target * pps_f64).ceil() as usize).min(max_points);
let deficit_min = (self.state.min_buffer_secs - buffered_secs).max(0.0);
let min_points = ((deficit_min * pps_f64).ceil() as usize).min(max_points);
(min_points, target_points)
};
ChunkRequest {
start,
pps,
min_points,
target_points,
buffered_points,
buffered,
device_queued_points,
}
}
fn drain_and_blank(&mut self) {
use std::time::Instant;
let timeout = self.config.drain_timeout;
if timeout.is_zero() {
self.blank_and_close_shutter();
return;
}
let deadline = Instant::now() + timeout;
let pps = self.config.pps;
let has_queue_depth = self
.backend
.as_ref()
.and_then(|b| b.queued_points())
.is_some();
if has_queue_depth {
const POLL_INTERVAL: Duration = Duration::from_millis(5);
while Instant::now() < deadline {
if let Some(queued) = self.backend.as_ref().and_then(|b| b.queued_points()) {
if queued == 0 {
break;
}
} else {
break;
}
if self.process_control_messages() {
break;
}
std::thread::sleep(POLL_INTERVAL);
}
} else {
let estimated_drain =
Duration::from_secs_f64(self.state.scheduled_ahead as f64 / pps as f64);
let wait_time = estimated_drain.min(timeout);
const SLEEP_SLICE: Duration = Duration::from_millis(10);
let mut remaining = wait_time;
while remaining > Duration::ZERO && Instant::now() < deadline {
let slice = remaining.min(SLEEP_SLICE);
std::thread::sleep(slice);
remaining = remaining.saturating_sub(slice);
if self.process_control_messages() {
break;
}
}
}
self.blank_and_close_shutter();
}
fn blank_and_close_shutter(&mut self) {
if let Some(b) = &mut self.backend {
let _ = b.set_shutter(false);
}
self.state.shutter_open = false;
if let Some(b) = &mut self.backend {
let blank_point = LaserPoint::blanked(0.0, 0.0);
let blank_chunk = [blank_point; 16];
let _ = b.try_write(self.config.pps, &blank_chunk);
}
}
}
impl Drop for Stream {
fn drop(&mut self) {
let _ = self.stop();
}
}
pub struct Dac {
info: DacInfo,
backend: Option<BackendKind>,
pub(crate) reconnect_target: Option<ReconnectTarget>,
}
impl Dac {
pub fn new(info: DacInfo, backend: BackendKind) -> Self {
Self {
info,
backend: Some(backend),
reconnect_target: None,
}
}
pub fn with_discovery_factory<F>(mut self, factory: F) -> Self
where
F: Fn() -> DacDiscovery + Send + 'static,
{
match self.reconnect_target {
Some(ref mut target) => {
target.discovery_factory = Some(Box::new(factory));
}
None => {
self.reconnect_target = Some(ReconnectTarget {
device_id: self.info.id.clone(),
discovery_factory: Some(Box::new(factory)),
});
}
}
self
}
pub fn info(&self) -> &DacInfo {
&self.info
}
pub fn id(&self) -> &str {
&self.info.id
}
pub fn name(&self) -> &str {
&self.info.name
}
pub fn kind(&self) -> &DacType {
&self.info.kind
}
pub fn caps(&self) -> &DacCapabilities {
&self.info.caps
}
pub fn has_backend(&self) -> bool {
self.backend.is_some()
}
pub(crate) fn into_backend(mut self) -> Option<BackendKind> {
self.backend.take()
}
pub fn is_connected(&self) -> bool {
self.backend.as_ref().is_some_and(|b| b.is_connected())
}
pub fn start_stream(mut self, mut cfg: StreamConfig) -> Result<(Stream, DacInfo)> {
let reconnect_config = cfg.reconnect.take();
let mut backend = self.backend.take().ok_or_else(|| {
Error::invalid_config("device backend has already been used for a stream")
})?;
if backend.is_frame_swap() {
return Err(Error::invalid_config(
"streaming is not supported on frame-swap DACs (e.g. Helios); \
use start_frame_session() instead",
));
}
let cfg = Self::apply_backend_buffer_defaults(&self.info.caps, cfg);
Self::validate_pps(&self.info.caps, cfg.pps)?;
if !backend.is_connected() {
backend.connect()?;
}
let mut stream = Stream::with_backend(self.info.clone(), backend, cfg);
stream.reconnect_target = self.reconnect_target.take();
if let Some(rc) = reconnect_config {
let target = stream.reconnect_target.take().ok_or_else(|| {
Error::invalid_config("reconnect requires a reconnect target — use open_device(), open_device_with(), or Dac::with_discovery_factory()")
})?;
stream.reconnect_policy = Some(ReconnectPolicy::new(rc, target));
}
Ok((stream, self.info))
}
fn apply_backend_buffer_defaults(
caps: &DacCapabilities,
mut cfg: StreamConfig,
) -> StreamConfig {
let untouched_defaults = cfg.target_buffer == StreamConfig::DEFAULT_TARGET_BUFFER
&& cfg.min_buffer == StreamConfig::DEFAULT_MIN_BUFFER;
if untouched_defaults
&& matches!(
caps.output_model,
OutputModel::NetworkFifo | OutputModel::UdpTimed
)
{
cfg.target_buffer = StreamConfig::NETWORK_DEFAULT_TARGET_BUFFER;
cfg.min_buffer = StreamConfig::NETWORK_DEFAULT_MIN_BUFFER;
}
cfg
}
fn validate_pps(caps: &DacCapabilities, pps: u32) -> Result<()> {
if pps < caps.pps_min || pps > caps.pps_max {
return Err(Error::invalid_config(format!(
"PPS {} is outside device range [{}, {}]",
pps, caps.pps_min, caps.pps_max
)));
}
Ok(())
}
pub fn start_frame_session(
mut self,
mut config: crate::presentation::FrameSessionConfig,
) -> Result<(crate::presentation::FrameSession, DacInfo)> {
let reconnect_config = config.reconnect.take();
let backend = self.backend.take().ok_or_else(|| {
Error::invalid_config("device backend has already been used for a session")
})?;
Self::validate_pps(backend.caps(), config.pps)?;
let reconnect_policy = match reconnect_config {
Some(rc) => {
let target = self.reconnect_target.take().ok_or_else(|| {
Error::invalid_config("reconnect requires a reconnect target — use open_device(), open_device_with(), or Dac::with_discovery_factory()")
})?;
Some(ReconnectPolicy::new(rc, target))
}
None => None,
};
let session = crate::presentation::FrameSession::start(backend, config, reconnect_policy)?;
Ok((session, self.info))
}
}
#[cfg(test)]
mod tests;