use crate::util::{clamp, map_range};
use crate::Inner as ApiInner;
use crate::{DetectedDac, RawPoint};
use std::io;
use std::ops::{Deref, DerefMut};
use std::sync::atomic::{self, AtomicBool};
use std::sync::{mpsc, Arc, Mutex};
use std::time::Duration;
use thiserror::Error;
pub trait RenderFn<M>: Fn(&mut M, &mut Buffer) {}
impl<M, F> RenderFn<M> for F where F: Fn(&mut M, &mut Buffer) {}
pub trait StreamErrorFn<M>: Fn(&mut M, &StreamError, &mut StreamErrorAction) {}
impl<M, F> StreamErrorFn<M> for F where F: Fn(&mut M, &StreamError, &mut StreamErrorAction) {}
#[derive(Clone)]
pub struct Stream<M> {
state_update_tx: mpsc::Sender<StateUpdate>,
model_update_tx: mpsc::Sender<ModelUpdate<M>>,
shared: Arc<Shared<M>>,
}
#[derive(Clone)]
struct State {
point_hz: u32,
latency_points: u32,
}
struct Shared<M> {
model: Arc<Mutex<Option<M>>>,
is_paused: AtomicBool,
dac: Option<DetectedDac>,
is_closed: Arc<AtomicBool>,
thread: Mutex<Option<std::thread::JoinHandle<Result<(), StreamError>>>>,
}
#[derive(Debug)]
pub struct Buffer {
pub(crate) point_hz: u32,
pub(crate) latency_points: u32,
pub(crate) points: Box<[RawPoint]>,
}
pub struct Builder<M, F, E = DefaultStreamErrorFn<M>> {
pub(crate) api_inner: Arc<super::super::Inner>,
pub builder: super::Builder,
pub model: M,
pub render: F,
pub stream_error: E,
}
pub type DefaultStreamErrorFn<M> = fn(&mut M, &StreamError, &mut StreamErrorAction);
type StateUpdate = Box<dyn FnMut(&mut State) + 'static + Send>;
pub type ModelUpdate<M> = Box<dyn FnMut(&mut M) + 'static + Send>;
#[derive(Debug, Error)]
pub enum StreamError {
#[error("an Ether Dream DAC stream error occurred: {err}")]
EtherDreamStream {
#[from]
err: EtherDreamStreamError,
},
}
#[derive(Debug, Error)]
pub enum EtherDreamStreamError {
#[error("laser DAC detection failed: {err}")]
FailedToDetectDacs {
#[source]
err: io::Error,
attempts: u32,
},
#[error("failed to connect the DAC stream (attempt {attempts}): {err}")]
FailedToConnectStream {
#[source]
err: ether_dream::dac::stream::CommunicationError,
attempts: u32,
},
#[error("failed to prepare the DAC stream: {err}")]
FailedToPrepareStream {
#[source]
err: ether_dream::dac::stream::CommunicationError,
},
#[error("failed to begin the DAC stream: {err}")]
FailedToBeginStream {
#[source]
err: ether_dream::dac::stream::CommunicationError,
},
#[error("failed to submit data over the DAC stream: {err}")]
FailedToSubmitData {
#[source]
err: ether_dream::dac::stream::CommunicationError,
},
#[error("failed to submit point rate change over the DAC stream: {err}")]
FailedToSubmitPointRate {
#[source]
err: ether_dream::dac::stream::CommunicationError,
},
#[error("failed to submit stop command to the DAC stream: {err}")]
FailedToStopStream {
#[source]
err: ether_dream::dac::stream::CommunicationError,
},
}
#[derive(Clone, Debug)]
pub enum StreamErrorAction {
ReattemptConnect,
RedetectDac {
timeout: Option<Duration>,
},
CloseThread,
}
impl<M> Stream<M> {
pub fn set_point_hz(&self, point_hz: u32) -> Result<(), mpsc::SendError<()>> {
self.send_raw_state_update(move |state| state.point_hz = point_hz)
.map_err(|_| mpsc::SendError(()))
}
pub fn set_latency_points(&self, points: u32) -> Result<(), mpsc::SendError<()>> {
self.send_raw_state_update(move |state| state.latency_points = points)
.map_err(|_| mpsc::SendError(()))
}
pub fn dac(&self) -> Option<DetectedDac> {
self.shared.dac.clone()
}
pub fn send<F>(&self, update: F) -> Result<(), mpsc::SendError<ModelUpdate<M>>>
where
F: FnOnce(&mut M) + Send + 'static,
{
if self.shared.is_paused.load(atomic::Ordering::Relaxed) {
if let Ok(mut guard) = self.shared.model.lock() {
let mut model = guard.take().unwrap();
update(&mut model);
*guard = Some(model);
}
} else {
let mut update_opt = Some(update);
let update_fn = move |model: &mut M| {
if let Some(update) = update_opt.take() {
update(model);
}
};
self.model_update_tx.send(Box::new(update_fn))?;
}
Ok(())
}
pub fn is_closed(&self) -> bool {
self.shared.is_closed.load(atomic::Ordering::Relaxed)
}
pub fn close(self) -> Option<std::thread::Result<Result<(), StreamError>>> {
self.close_inner()
}
fn send_raw_state_update<F>(&self, update: F) -> Result<(), mpsc::SendError<StateUpdate>>
where
F: FnOnce(&mut State) + Send + 'static,
{
let mut update_opt = Some(update);
let update_fn = move |state: &mut State| {
if let Some(update) = update_opt.take() {
update(state);
}
};
self.state_update_tx.send(Box::new(update_fn))
}
fn close_inner(&self) -> Option<std::thread::Result<Result<(), StreamError>>> {
self.shared.close_inner()
}
}
impl<M> Shared<M> {
fn close_inner(&self) -> Option<std::thread::Result<Result<(), StreamError>>> {
let mut guard = match self.thread.lock() {
Ok(guard) => guard,
Err(_) => return None,
};
guard.take().map(|thread| {
self.is_closed.store(true, atomic::Ordering::Relaxed);
let res = thread.join();
self.is_paused.store(true, atomic::Ordering::Relaxed);
res
})
}
}
impl Buffer {
pub fn point_hz(&self) -> u32 {
self.point_hz
}
pub fn latency_points(&self) -> u32 {
self.latency_points
}
}
impl<M, F, E> Builder<M, F, E> {
pub fn detected_dac(mut self, dac: DetectedDac) -> Self {
self.builder.dac = Some(dac);
self
}
pub fn point_hz(mut self, point_hz: u32) -> Self {
self.builder.point_hz = Some(point_hz);
self
}
pub fn latency_points(mut self, points: u32) -> Self {
self.builder.latency_points = Some(points);
self
}
pub fn tcp_timeout(mut self, tcp_timeout: Option<Duration>) -> Self {
self.builder.tcp_timeout = tcp_timeout;
self
}
pub fn stream_error<E2>(self, stream_error: E2) -> Builder<M, F, E2> {
let Builder {
api_inner,
builder,
model,
render,
..
} = self;
Builder {
api_inner,
builder,
model,
render,
stream_error,
}
}
pub fn build(self) -> io::Result<Stream<M>>
where
M: 'static + Send,
F: 'static + RenderFn<M> + Send,
E: 'static + StreamErrorFn<M> + Send,
{
let Builder {
api_inner,
builder,
model,
render,
stream_error,
} = self;
let model = Arc::new(Mutex::new(Some(model)));
let model_2 = model.clone();
let (model_update_tx, m_rx) = mpsc::channel();
let (state_update_tx, s_rx) = mpsc::channel();
let point_hz = builder.point_hz.unwrap_or(super::DEFAULT_POINT_HZ);
let latency_points = builder
.latency_points
.unwrap_or_else(|| default_latency_points(point_hz));
let state = Arc::new(Mutex::new(State {
point_hz,
latency_points,
}));
let maybe_dac = builder.dac;
let maybe_dac2 = maybe_dac.clone();
let tcp_timeout = builder.tcp_timeout;
let is_closed = Arc::new(AtomicBool::new(false));
let is_closed2 = is_closed.clone();
let thread = std::thread::Builder::new()
.name("raw_laser_stream_thread".into())
.spawn(move || {
let res = run_laser_stream(
&api_inner,
maybe_dac2,
tcp_timeout,
&state,
&model_2,
render,
stream_error,
&s_rx,
&m_rx,
&is_closed2,
);
is_closed2.store(true, atomic::Ordering::Relaxed);
res
})?;
let is_paused = AtomicBool::new(false);
let thread = Mutex::new(Some(thread));
let shared = Arc::new(Shared {
model,
is_paused,
is_closed,
thread,
dac: maybe_dac,
});
let stream = Stream {
shared,
state_update_tx,
model_update_tx,
};
Ok(stream)
}
}
impl Default for StreamErrorAction {
fn default() -> Self {
StreamErrorAction::CloseThread
}
}
impl Deref for Buffer {
type Target = [RawPoint];
fn deref(&self) -> &Self::Target {
&self.points
}
}
impl DerefMut for Buffer {
fn deref_mut(&mut self) -> &mut Self::Target {
&mut self.points
}
}
impl<M> Drop for Shared<M> {
fn drop(&mut self) {
self.close_inner();
}
}
pub fn default_latency_points(point_hz: u32) -> u32 {
super::points_per_frame(point_hz, 60)
}
fn run_laser_stream<M, F, E>(
api_inner: &ApiInner,
mut maybe_dac: Option<DetectedDac>,
tcp_timeout: Option<Duration>,
state: &Arc<Mutex<State>>,
model: &Arc<Mutex<Option<M>>>,
render: F,
stream_error: E,
state_update_rx: &mpsc::Receiver<StateUpdate>,
model_update_rx: &mpsc::Receiver<ModelUpdate<M>>,
is_closed: &AtomicBool,
) -> Result<(), StreamError>
where
F: RenderFn<M>,
E: StreamErrorFn<M>,
{
macro_rules! lock_or_return_err {
($mutex:expr, $err:expr) => {
match $mutex.lock() {
Ok(guard) => guard,
Err(_) => return Err($err),
}
};
}
let mut connect_attempts = 0;
let mut detect_attempts = 0;
let mut detect_timeout = tcp_timeout;
let mut redetect_dac = false;
while !is_closed.load(atomic::Ordering::Relaxed) {
if redetect_dac {
redetect_dac = false;
if let Some(ref mut dac) = maybe_dac {
let dac_id = dac.id();
detect_attempts += 1;
*dac = match api_inner.detect_dac(dac_id) {
Ok(dac) => {
detect_attempts = 0;
dac
}
Err(err) => {
let attempts = detect_attempts;
let err = EtherDreamStreamError::FailedToDetectDacs { err, attempts };
let err = StreamError::from(err);
let mut guard = lock_or_return_err!(model, err);
let mut model = guard.take().unwrap();
let mut action = StreamErrorAction::default();
stream_error(&mut model, &err, &mut action);
*guard = Some(model);
match action {
StreamErrorAction::CloseThread => return Err(err),
StreamErrorAction::ReattemptConnect => continue,
StreamErrorAction::RedetectDac { timeout } => {
redetect_dac = true;
detect_timeout = timeout;
continue;
}
}
}
};
}
}
let dac = match maybe_dac {
Some(ref dac) => dac.clone(),
None => {
detect_attempts += 1;
let attempts = detect_attempts;
let detect_err = &|err| EtherDreamStreamError::FailedToDetectDacs { err, attempts };
match api_inner
.detect_dacs()
.map_err(detect_err)
.and_then(|detect_dacs| {
detect_dacs
.set_timeout(detect_timeout)
.map_err(detect_err)?;
Ok(detect_dacs)
})
.and_then(|mut dacs| {
dacs.next()
.expect("ether dream DAC detection iterator should never return `None`")
.map_err(detect_err)
}) {
Ok(dac) => {
detect_attempts = 0;
dac
}
Err(err) => {
let err = StreamError::from(err);
let mut guard = lock_or_return_err!(model, err);
let mut model = guard.take().unwrap();
let mut action = StreamErrorAction::default();
stream_error(&mut model, &err, &mut action);
*guard = Some(model);
match action {
StreamErrorAction::CloseThread => return Err(err),
StreamErrorAction::ReattemptConnect => continue,
StreamErrorAction::RedetectDac { timeout } => {
detect_timeout = timeout;
continue;
}
}
}
}
}
};
match run_laser_stream_tcp_loop(
&dac,
tcp_timeout,
&state,
&model,
&render,
&state_update_rx,
&model_update_rx,
&is_closed,
&mut connect_attempts,
) {
Ok(()) => break,
Err(err) => {
let mut guard = lock_or_return_err!(model, err);
let mut model = guard.take().unwrap();
let mut action = StreamErrorAction::default();
stream_error(&mut model, &err, &mut action);
*guard = Some(model);
match action {
StreamErrorAction::CloseThread => return Err(err),
StreamErrorAction::ReattemptConnect => continue,
StreamErrorAction::RedetectDac { timeout } => {
detect_timeout = timeout;
continue;
}
}
}
}
}
Ok(())
}
fn run_laser_stream_tcp_loop<M, F>(
dac: &DetectedDac,
tcp_timeout: Option<Duration>,
state: &Arc<Mutex<State>>,
model: &Arc<Mutex<Option<M>>>,
render: F,
state_update_rx: &mpsc::Receiver<StateUpdate>,
model_update_rx: &mpsc::Receiver<ModelUpdate<M>>,
is_closed: &AtomicBool,
connection_attempts: &mut u32,
) -> Result<(), StreamError>
where
F: RenderFn<M>,
{
let (broadcast, src_addr) = match dac {
DetectedDac::EtherDream {
broadcast,
source_addr,
} => (broadcast, source_addr),
};
let mut pending_model_updates: Vec<ModelUpdate<M>> = Vec::new();
let ip = src_addr.ip().clone();
let result = match tcp_timeout {
None => ether_dream::dac::stream::connect(&broadcast, ip),
Some(timeout) => ether_dream::dac::stream::connect_timeout(&broadcast, ip, timeout)
.and_then(|stream| {
stream.set_timeout(Some(timeout))?;
Ok(stream)
}),
};
let mut stream = match result {
Ok(stream) => stream,
Err(err) => {
*connection_attempts += 1;
let attempts = *connection_attempts;
return Err(EtherDreamStreamError::FailedToConnectStream { err, attempts }.into());
}
};
*connection_attempts = 0;
stream
.queue_commands()
.prepare_stream()
.submit()
.map_err(|err| EtherDreamStreamError::FailedToPrepareStream { err })?;
let dac_max_point_hz = dac.max_point_hz();
let init_point_hz = {
let hz = state
.lock()
.expect("failed to acquire raw state lock")
.point_hz;
std::cmp::min(hz, dac_max_point_hz)
};
let low_water_mark = 0;
let n_points = dac_remaining_buffer_capacity(stream.dac());
stream
.queue_commands()
.data((0..n_points).map(|_| centered_blank()))
.begin(low_water_mark, init_point_hz)
.submit()
.map_err(|err| EtherDreamStreamError::FailedToBeginStream { err })?;
let mut ether_dream_points = vec![];
while !is_closed.load(atomic::Ordering::Relaxed) {
pending_model_updates.extend(model_update_rx.try_iter());
if !pending_model_updates.is_empty() {
if let Ok(mut guard) = model.lock() {
let mut model = guard.take().unwrap();
for mut update in pending_model_updates.drain(..) {
update(&mut model);
}
*guard = Some(model);
}
}
let (state, prev_point_hz) = {
let mut state = state.lock().expect("failed to acquare raw state lock");
let prev_point_hz = std::cmp::min(state.point_hz, dac.max_point_hz());
for mut state_update in state_update_rx.try_iter() {
(*state_update)(&mut state);
}
(state.clone(), prev_point_hz)
};
let point_hz = std::cmp::min(state.point_hz, dac.max_point_hz());
let point_rate_changed = point_hz != prev_point_hz;
if point_rate_changed {
stream
.queue_commands()
.point_rate(point_hz)
.submit()
.map_err(|err| EtherDreamStreamError::FailedToSubmitPointRate { err })?;
}
let latency_points = std::cmp::min(state.latency_points, dac.buffer_capacity());
let n_points = points_to_generate(stream.dac(), latency_points as u16) as usize;
let mut buffer = Buffer {
point_hz,
latency_points: latency_points as _,
points: vec![RawPoint::centered_blank(); n_points].into_boxed_slice(),
};
if let Ok(mut guard) = model.lock() {
let mut m = guard.take().unwrap();
render(&mut m, &mut buffer);
*guard = Some(m);
}
ether_dream_points.extend(buffer.iter().cloned().map(point_to_ether_dream_point));
if point_rate_changed && !ether_dream_points.is_empty() {
ether_dream_points[0].control = ether_dream::dac::PointControl::CHANGE_RATE.bits();
}
stream
.queue_commands()
.data(ether_dream_points.drain(..))
.submit()
.map_err(|err| EtherDreamStreamError::FailedToSubmitData { err })?;
}
stream
.queue_commands()
.stop()
.submit()
.map_err(|err| EtherDreamStreamError::FailedToStopStream { err })?;
Ok(())
}
fn dac_remaining_buffer_capacity(dac: ðer_dream::dac::Dac) -> u16 {
dac.buffer_capacity - 1 - dac.status.buffer_fullness
}
fn points_to_generate(dac: ðer_dream::dac::Dac, latency_points: u16) -> u16 {
let remaining_capacity = dac_remaining_buffer_capacity(dac);
let n = if dac.status.buffer_fullness < latency_points {
latency_points - dac.status.buffer_fullness
} else {
0
};
std::cmp::min(n, remaining_capacity)
}
fn centered_blank() -> ether_dream::protocol::DacPoint {
ether_dream::protocol::DacPoint {
control: 0,
x: 0,
y: 0,
r: 0,
g: 0,
b: 0,
i: 0,
u1: 0,
u2: 0,
}
}
fn position_to_ether_dream_position([px, py]: crate::point::Position) -> [i16; 2] {
let min = std::i16::MIN;
let max = std::i16::MAX;
let x = map_range(clamp(px, -1.0, 1.0), -1.0, 1.0, min as f64, max as f64) as i16;
let y = map_range(clamp(py, -1.0, 1.0), -1.0, 1.0, min as f64, max as f64) as i16;
[x, y]
}
fn color_to_ether_dream_color([pr, pg, pb]: crate::point::Rgb) -> [u16; 3] {
let r = (clamp(pr, 0.0, 1.0) * std::u16::MAX as f32) as u16;
let g = (clamp(pg, 0.0, 1.0) * std::u16::MAX as f32) as u16;
let b = (clamp(pb, 0.0, 1.0) * std::u16::MAX as f32) as u16;
[r, g, b]
}
fn point_to_ether_dream_point(p: RawPoint) -> ether_dream::protocol::DacPoint {
let [x, y] = position_to_ether_dream_position(p.position);
let [r, g, b] = color_to_ether_dream_color(p.color);
let (control, i, u1, u2) = (0, 0, 0, 0);
ether_dream::protocol::DacPoint {
control,
x,
y,
r,
g,
b,
i,
u1,
u2,
}
}
pub fn default_stream_error_fn<M>(
_model: &mut M,
err: &StreamError,
action: &mut StreamErrorAction,
) {
fn redetect_dac_action() -> StreamErrorAction {
let timeout = Some(Duration::from_secs(2));
StreamErrorAction::RedetectDac { timeout }
}
let ether_dream_err = match *err {
StreamError::EtherDreamStream { ref err } => err,
};
*action = match *ether_dream_err {
EtherDreamStreamError::FailedToDetectDacs { attempts, .. } if attempts < 3 => {
redetect_dac_action()
}
EtherDreamStreamError::FailedToConnectStream { attempts, .. } if attempts < 3 => {
std::thread::sleep(std::time::Duration::from_millis(16));
StreamErrorAction::ReattemptConnect
}
EtherDreamStreamError::FailedToConnectStream { attempts, .. } if attempts == 3 => {
redetect_dac_action()
}
EtherDreamStreamError::FailedToPrepareStream { .. }
| EtherDreamStreamError::FailedToBeginStream { .. }
| EtherDreamStreamError::FailedToSubmitData { .. }
| EtherDreamStreamError::FailedToSubmitPointRate { .. } => {
StreamErrorAction::ReattemptConnect
}
_ => StreamErrorAction::CloseThread,
};
}