use crate::{DetectedDac, RawPoint};
use crate::util::{clamp, map_range};
use derive_more::From;
use failure::Fail;
use std::io;
use std::ops::{Deref, DerefMut};
use std::sync::atomic::{self, AtomicBool};
use std::sync::{mpsc, Arc, Mutex};
pub trait RenderFn<M>: Fn(&mut M, &mut Buffer) {}
impl<M, F> RenderFn<M> for F where F: Fn(&mut M, &mut Buffer) {}
#[derive(Clone)]
pub struct Stream<M> {
state_update_tx: mpsc::Sender<StateUpdate>,
model_update_tx: mpsc::Sender<ModelUpdate<M>>,
shared: Arc<Shared<M>>,
}
#[derive(Clone)]
struct State {
point_hz: u32,
latency_points: u32,
}
struct Shared<M> {
model: Arc<Mutex<Option<M>>>,
is_paused: AtomicBool,
is_closed: Arc<AtomicBool>,
}
#[derive(Debug)]
pub struct Buffer {
pub(crate) point_hz: u32,
pub(crate) latency_points: u32,
pub(crate) points: Box<[RawPoint]>,
}
pub struct Builder<M, F> {
pub(crate) api_inner: Arc<super::super::Inner>,
pub builder: super::Builder,
pub model: M,
pub render: F,
}
type StateUpdate = Box<FnMut(&mut State) + 'static + Send>;
pub type ModelUpdate<M> = Box<FnMut(&mut M) + 'static + Send>;
#[derive(Debug, Fail, From)]
pub enum RawStreamError {
#[fail(display = "an Ether Dream DAC stream error occurred: {}", err)]
EtherDreamStream {
#[fail(cause)]
err: EtherDreamStreamError,
}
}
#[derive(Debug, Fail, From)]
pub enum EtherDreamStreamError {
#[fail(display = "laser DAC detection failed: {}", err)]
FailedToDetectDacs {
#[fail(cause)]
err: io::Error,
},
#[fail(display = "failed to connect the DAC stream: {}", err)]
FailedToConnectStream {
#[fail(cause)]
err: ether_dream::dac::stream::CommunicationError,
},
#[fail(display = "failed to prepare the DAC stream: {}", err)]
FailedToPrepareStream {
#[fail(cause)]
err: ether_dream::dac::stream::CommunicationError,
},
#[fail(display = "failed to begin the DAC stream: {}", err)]
FailedToBeginStream {
#[fail(cause)]
err: ether_dream::dac::stream::CommunicationError,
},
#[fail(display = "failed to submit data over the DAC stream: {}", err)]
FailedToSubmitData {
#[fail(cause)]
err: ether_dream::dac::stream::CommunicationError,
},
#[fail(display = "failed to submit point rate change over the DAC stream: {}", err)]
FailedToSubmitPointRate {
#[fail(cause)]
err: ether_dream::dac::stream::CommunicationError,
},
#[fail(display = "failed to submit stop command to the DAC stream: {}", err)]
FailedToStopStream {
#[fail(cause)]
err: ether_dream::dac::stream::CommunicationError,
},
}
impl<M> Stream<M> {
pub fn set_point_hz(&self, point_hz: u32) -> Result<(), mpsc::SendError<()>> {
self.send_raw_state_update(move |state| state.point_hz = point_hz)
.map_err(|_| mpsc::SendError(()))
}
pub fn set_latency_points(&self, points: u32) -> Result<(), mpsc::SendError<()>> {
self.send_raw_state_update(move |state| state.latency_points = points)
.map_err(|_| mpsc::SendError(()))
}
pub fn send<F>(&self, update: F) -> Result<(), mpsc::SendError<ModelUpdate<M>>>
where
F: FnOnce(&mut M) + Send + 'static,
{
if self.shared.is_paused.load(atomic::Ordering::Relaxed) {
if let Ok(mut guard) = self.shared.model.lock() {
let mut model = guard.take().unwrap();
update(&mut model);
*guard = Some(model);
}
} else {
let mut update_opt = Some(update);
let update_fn = move |model: &mut M| {
if let Some(update) = update_opt.take() {
update(model);
}
};
self.model_update_tx.send(Box::new(update_fn))?;
}
Ok(())
}
fn send_raw_state_update<F>(&self, update: F) -> Result<(), mpsc::SendError<StateUpdate>>
where
F: FnOnce(&mut State) + Send + 'static,
{
let mut update_opt = Some(update);
let update_fn = move |state: &mut State| {
if let Some(update) = update_opt.take() {
update(state);
}
};
self.state_update_tx.send(Box::new(update_fn))
}
}
impl Buffer {
pub fn point_hz(&self) -> u32 {
self.point_hz
}
pub fn latency_points(&self) -> u32 {
self.latency_points
}
}
impl<M, F> Builder<M, F> {
pub fn detected_dac(mut self, dac: DetectedDac) -> Self {
self.builder.dac = Some(dac);
self
}
pub fn point_hz(mut self, point_hz: u32) -> Self {
self.builder.point_hz = Some(point_hz);
self
}
pub fn latency_points(mut self, points: u32) -> Self {
self.builder.latency_points = Some(points);
self
}
pub fn build(self) -> io::Result<Stream<M>>
where
M: 'static + Send,
F: 'static + RenderFn<M> + Send,
{
let Builder { api_inner, builder, model, render } = self;
let model = Arc::new(Mutex::new(Some(model)));
let model_2 = model.clone();
let (model_update_tx, m_rx) = mpsc::channel();
let (state_update_tx, s_rx) = mpsc::channel();
let point_hz = builder.point_hz.unwrap_or(super::DEFAULT_POINT_HZ);
let latency_points = builder.latency_points
.unwrap_or_else(|| default_latency_points(point_hz));
let state = Arc::new(Mutex::new(State { point_hz, latency_points }));
let mut maybe_dac = builder.dac;
let is_closed = Arc::new(AtomicBool::new(false));
let is_closed2 = is_closed.clone();
std::thread::Builder::new()
.name("raw_laser_stream_thread".into())
.spawn(move || {
let mut connect_attempts = 3;
while !is_closed2.load(atomic::Ordering::Relaxed) {
if connect_attempts == 0 {
connect_attempts = 3;
if let Some(ref mut dac) = maybe_dac {
let dac_id = dac.id();
eprintln!("re-attempting to detect DAC with id: {:?}", dac_id);
*dac = match api_inner.detect_dac(dac_id) {
Ok(dac) => dac,
Err(err) => {
let err = EtherDreamStreamError::FailedToDetectDacs { err };
return Err(RawStreamError::EtherDreamStream { err });
}
};
}
}
let dac = match maybe_dac {
Some(ref dac) => dac.clone(),
None => api_inner.detect_dacs()
.map_err(|err| EtherDreamStreamError::FailedToDetectDacs { err })?
.next()
.expect("ether dream DAC detection iterator should never return `None`")
.map_err(|err| EtherDreamStreamError::FailedToDetectDacs { err })?,
};
match run_laser_stream(&dac, &state, &model_2, &render, &s_rx, &m_rx, &is_closed2) {
Ok(()) => return Ok(()),
Err(RawStreamError::EtherDreamStream { err }) => match err {
EtherDreamStreamError::FailedToConnectStream { err } => {
eprintln!("failed to connect to stream: {}", err);
connect_attempts -= 1;
eprintln!(
"connection attempts remaining before re-detecting DAC: {}",
connect_attempts,
);
std::thread::sleep(std::time::Duration::from_millis(16));
}
EtherDreamStreamError::FailedToPrepareStream { .. }
| EtherDreamStreamError::FailedToBeginStream { .. }
| EtherDreamStreamError::FailedToSubmitData { .. }
| EtherDreamStreamError::FailedToSubmitPointRate { .. } => {
eprintln!("{} - will now attempt to reconnect", err);
}
err => return Err(RawStreamError::EtherDreamStream { err }),
}
}
}
Ok(())
})?;
let is_paused = AtomicBool::new(false);
let shared = Arc::new(Shared { model, is_paused, is_closed });
let stream = Stream { shared, state_update_tx, model_update_tx };
Ok(stream)
}
}
impl Deref for Buffer {
type Target = [RawPoint];
fn deref(&self) -> &Self::Target {
&self.points
}
}
impl DerefMut for Buffer {
fn deref_mut(&mut self) -> &mut Self::Target {
&mut self.points
}
}
impl<M> Drop for Stream<M> {
fn drop(&mut self) {
self.shared.is_closed.store(true, atomic::Ordering::Relaxed)
}
}
pub fn default_latency_points(point_hz: u32) -> u32 {
super::points_per_frame(point_hz, 60)
}
fn run_laser_stream<M, F>(
dac: &DetectedDac,
state: &Arc<Mutex<State>>,
model: &Arc<Mutex<Option<M>>>,
render: F,
state_update_rx: &mpsc::Receiver<StateUpdate>,
model_update_rx: &mpsc::Receiver<ModelUpdate<M>>,
is_closed: &AtomicBool,
) -> Result<(), RawStreamError>
where
F: RenderFn<M>,
{
let (broadcast, src_addr) = match dac {
DetectedDac::EtherDream { broadcast, source_addr } => {
(broadcast, source_addr)
}
};
let mut pending_model_updates: Vec<ModelUpdate<M>> = Vec::new();
let ip = src_addr.ip().clone();
let mut stream = ether_dream::dac::stream::connect(&broadcast, ip)
.map_err(|err| EtherDreamStreamError::FailedToConnectStream { err })?;
stream
.queue_commands()
.prepare_stream()
.submit()
.map_err(|err| EtherDreamStreamError::FailedToPrepareStream { err })?;
let dac_max_point_hz = dac.max_point_hz();
let init_point_hz = {
let hz = state.lock().expect("failed to acquire raw state lock").point_hz;
std::cmp::min(hz, dac_max_point_hz)
};
let low_water_mark = 0;
let n_points = dac_remaining_buffer_capacity(stream.dac());
stream
.queue_commands()
.data((0..n_points).map(|_| centered_blank()))
.begin(low_water_mark, init_point_hz)
.submit()
.map_err(|err| EtherDreamStreamError::FailedToBeginStream { err })?;
let mut ether_dream_points = vec![];
while !is_closed.load(atomic::Ordering::Relaxed) {
pending_model_updates.extend(model_update_rx.try_iter());
if !pending_model_updates.is_empty() {
if let Ok(mut guard) = model.lock() {
let mut model = guard.take().unwrap();
for mut update in pending_model_updates.drain(..) {
update(&mut model);
}
*guard = Some(model);
}
}
let (state, prev_point_hz) = {
let mut state = state.lock().expect("failed to acquare raw state lock");
let prev_point_hz = std::cmp::min(state.point_hz, dac.max_point_hz());
for mut state_update in state_update_rx.try_iter() {
(*state_update)(&mut state);
}
(state.clone(), prev_point_hz)
};
let point_hz = std::cmp::min(state.point_hz, dac.max_point_hz());
let point_rate_changed = point_hz != prev_point_hz;
if point_rate_changed {
stream
.queue_commands()
.point_rate(point_hz)
.submit()
.map_err(|err| EtherDreamStreamError::FailedToSubmitPointRate { err })?;
}
let latency_points = std::cmp::min(state.latency_points, dac.buffer_capacity());
let n_points = points_to_generate(stream.dac(), latency_points as u16) as usize;
let mut buffer = Buffer {
point_hz,
latency_points: latency_points as _,
points: vec![RawPoint::centered_blank(); n_points].into_boxed_slice(),
};
if let Ok(mut guard) = model.lock() {
let mut m = guard.take().unwrap();
render(&mut m, &mut buffer);
*guard = Some(m);
}
ether_dream_points.extend(buffer.iter().cloned().map(point_to_ether_dream_point));
if point_rate_changed {
ether_dream_points[0].control = ether_dream::dac::PointControl::CHANGE_RATE.bits();
}
stream
.queue_commands()
.data(ether_dream_points.drain(..))
.submit()
.map_err(|err| EtherDreamStreamError::FailedToSubmitData { err })?;
}
stream
.queue_commands()
.stop()
.submit()
.map_err(|err| EtherDreamStreamError::FailedToStopStream { err })?;
Ok(())
}
fn dac_remaining_buffer_capacity(dac: ðer_dream::dac::Dac) -> u16 {
dac.buffer_capacity - 1 - dac.status.buffer_fullness
}
fn points_to_generate(dac: ðer_dream::dac::Dac, latency_points: u16) -> u16 {
let remaining_capacity = dac_remaining_buffer_capacity(dac);
let n = if dac.status.buffer_fullness < latency_points {
latency_points - dac.status.buffer_fullness
} else {
0
};
std::cmp::min(n, remaining_capacity)
}
fn centered_blank() -> ether_dream::protocol::DacPoint {
ether_dream::protocol::DacPoint {
control: 0,
x: 0,
y: 0,
r: 0,
g: 0,
b: 0,
i: 0,
u1: 0,
u2: 0,
}
}
fn position_to_ether_dream_position([px, py]: crate::point::Position) -> [i16; 2] {
let min = std::i16::MIN;
let max = std::i16::MAX;
let x = map_range(clamp(px, -1.0, 1.0), -1.0, 1.0, min as f64, max as f64) as i16;
let y = map_range(clamp(py, -1.0, 1.0), -1.0, 1.0, min as f64, max as f64) as i16;
[x, y]
}
fn color_to_ether_dream_color([pr, pg, pb]: crate::point::Rgb) -> [u16; 3] {
let r = (clamp(pr, 0.0, 1.0) * std::u16::MAX as f32) as u16;
let g = (clamp(pg, 0.0, 1.0) * std::u16::MAX as f32) as u16;
let b = (clamp(pb, 0.0, 1.0) * std::u16::MAX as f32) as u16;
[r, g, b]
}
fn point_to_ether_dream_point(p: RawPoint) -> ether_dream::protocol::DacPoint {
let [x, y] = position_to_ether_dream_position(p.position);
let [r, g, b] = color_to_ether_dream_color(p.color);
let (control, i, u1, u2) = (0, 0, 0, 0);
ether_dream::protocol::DacPoint { control, x, y, r, g, b, i, u1, u2 }
}