use crate::Point;
use crate::util::{clamp, map_range};
use derive_more::From;
use failure::Fail;
use std::io;
use std::ops::{Deref, DerefMut};
use std::sync::atomic::{self, AtomicBool};
use std::sync::{mpsc, Arc, Mutex};
pub trait RenderFn<M>: Fn(&mut M, &mut Buffer) {}
impl<M, F> RenderFn<M> for F where F: Fn(&mut M, &mut Buffer) {}
pub struct Stream<M> {
update_tx: mpsc::Sender<Box<FnMut(&mut M) + 'static + Send>>,
shared: Arc<Shared<M>>,
}
struct Shared<M> {
model: Arc<Mutex<Option<M>>>,
is_paused: AtomicBool,
}
#[derive(Debug)]
pub struct Buffer {
pub(crate) point_hz: u32,
pub(crate) latency_points: u32,
pub(crate) points: Box<[Point]>,
}
pub struct Builder<M, F> {
pub(crate) api_inner: Arc<super::super::Inner>,
pub builder: super::Builder,
pub model: M,
pub render: F,
}
#[derive(Debug, Fail, From)]
pub enum RawStreamError {
#[fail(display = "an Ether Dream DAC stream error occurred: {}", err)]
EtherDreamStream {
#[fail(cause)]
err: EtherDreamStreamError,
}
}
#[derive(Debug, Fail, From)]
pub enum EtherDreamStreamError {
#[fail(display = "laser DAC detection failed: {}", err)]
FailedToDetectDacs {
#[fail(cause)]
err: io::Error,
},
#[fail(display = "failed to connect the DAC stream: {}", err)]
FailedToConnectStream {
#[fail(cause)]
err: ether_dream::dac::stream::CommunicationError,
},
#[fail(display = "failed to prepare the DAC stream: {}", err)]
FailedToPrepareStream {
#[fail(cause)]
err: ether_dream::dac::stream::CommunicationError,
},
#[fail(display = "failed to begin the DAC stream: {}", err)]
FailedToBeginStream {
#[fail(cause)]
err: ether_dream::dac::stream::CommunicationError,
},
#[fail(display = "failed to submit data over the DAC stream: {}", err)]
FailedToSubmitData {
#[fail(cause)]
err: ether_dream::dac::stream::CommunicationError,
},
}
impl<M> Stream<M> {
pub fn send<F>(
&self,
update: F,
) -> Result<(), mpsc::SendError<Box<FnMut(&mut M) + Send + 'static>>>
where
F: FnOnce(&mut M) + Send + 'static,
{
if self.shared.is_paused.load(atomic::Ordering::Relaxed) {
if let Ok(mut guard) = self.shared.model.lock() {
let mut model = guard.take().unwrap();
update(&mut model);
*guard = Some(model);
}
} else {
let mut update_opt = Some(update);
let update_fn = move |model: &mut M| {
if let Some(update) = update_opt.take() {
update(model);
}
};
self.update_tx.send(Box::new(update_fn))?;
}
Ok(())
}
}
impl Buffer {
pub fn point_hz(&self) -> u32 {
self.point_hz
}
pub fn latency_points(&self) -> u32 {
self.latency_points
}
}
impl<M, F> Builder<M, F> {
pub fn detected_dac(mut self, dac: super::super::DetectedDac) -> Self {
self.builder.dac = Some(dac);
self
}
pub fn point_hz(mut self, point_hz: u32) -> Self {
self.builder.point_hz = Some(point_hz);
self
}
pub fn latency_points(mut self, points: u32) -> Self {
self.builder.latency_points = Some(points);
self
}
pub fn build(self) -> io::Result<Stream<M>>
where
M: 'static + Send,
F: 'static + RenderFn<M> + Send,
{
let Builder { api_inner, mut builder, model, render } = self;
let model = Arc::new(Mutex::new(Some(model)));
let model_2 = model.clone();
let (update_tx, update_rx) = mpsc::channel();
std::thread::Builder::new()
.name("raw_laser_stream_thread".into())
.spawn(move || {
let mut connect_attempts = 3;
loop {
if connect_attempts == 0 {
connect_attempts = 3;
if let Some(ref mut dac) = builder.dac {
let dac_id = dac.id();
eprintln!("re-attempting to detect DAC with id: {:?}", dac_id);
*dac = match api_inner.detect_dac(dac_id) {
Ok(dac) => dac,
Err(err) => {
let err = EtherDreamStreamError::FailedToDetectDacs { err };
return Err(RawStreamError::EtherDreamStream { err });
}
};
}
}
match run_laser_stream(&model_2, &render, &api_inner, &builder, &update_rx) {
Ok(()) => return Ok(()),
Err(RawStreamError::EtherDreamStream { err }) => match err {
EtherDreamStreamError::FailedToConnectStream { err } => {
eprintln!("failed to connect to stream: {}", err);
connect_attempts -= 1;
eprintln!(
"connection attempts remaining before re-detecting DAC: {}",
connect_attempts,
);
std::thread::sleep(std::time::Duration::from_millis(16));
}
EtherDreamStreamError::FailedToPrepareStream { .. }
| EtherDreamStreamError::FailedToBeginStream { .. }
| EtherDreamStreamError::FailedToSubmitData { .. } => {
eprintln!("{} - will now attempt to reconnect", err);
}
err => return Err(RawStreamError::EtherDreamStream { err }),
}
}
}
})?;
let is_paused = AtomicBool::new(false);
let shared = Arc::new(Shared { model, is_paused });
let stream = Stream { shared, update_tx };
Ok(stream)
}
}
impl Deref for Buffer {
type Target = [Point];
fn deref(&self) -> &Self::Target {
&self.points
}
}
impl DerefMut for Buffer {
fn deref_mut(&mut self) -> &mut Self::Target {
&mut self.points
}
}
pub fn default_latency_points(point_hz: u32) -> u32 {
super::points_per_frame(point_hz, 60)
}
fn run_laser_stream<M, F>(
model: &Arc<Mutex<Option<M>>>,
render: F,
api_inner: &Arc<super::super::Inner>,
builder: &super::Builder,
update_rx: &mpsc::Receiver<Box<FnMut(&mut M) + 'static + Send>>,
) -> Result<(), RawStreamError>
where
F: RenderFn<M>,
{
let dac = match builder.dac {
Some(ref dac) => dac.clone(),
None => api_inner.detect_dacs()
.map_err(|err| EtherDreamStreamError::FailedToDetectDacs { err })?
.next()
.expect("ether dream DAC detection iterator should never return `None`")
.map_err(|err| EtherDreamStreamError::FailedToDetectDacs { err })?,
};
let _dac_id = dac.id();
let point_hz = {
let hz = builder.point_hz.unwrap_or(super::DEFAULT_POINT_HZ);
std::cmp::min(hz, dac.max_point_hz())
};
let latency_points = {
let points = builder.latency_points.unwrap_or_else(|| default_latency_points(point_hz));
std::cmp::min(points, dac.buffer_capacity())
};
let (broadcast, src_addr) = match dac {
super::super::DetectedDac::EtherDream { broadcast, source_addr } => {
(broadcast, source_addr)
}
};
let mut pending_updates: Vec<Box<FnMut(&mut M) + 'static + Send>> = Vec::new();
let ip = src_addr.ip().clone();
let mut stream = ether_dream::dac::stream::connect(&broadcast, ip)
.map_err(|err| EtherDreamStreamError::FailedToConnectStream { err })?;
stream
.queue_commands()
.prepare_stream()
.submit()
.map_err(|err| EtherDreamStreamError::FailedToPrepareStream { err })?;
let latency_points = latency_points as u16;
let low_water_mark = 0;
let n_points = dac_remaining_buffer_capacity(stream.dac());
stream
.queue_commands()
.data((0..n_points).map(|_| centered_blank()))
.begin(low_water_mark, point_hz)
.submit()
.map_err(|err| EtherDreamStreamError::FailedToBeginStream { err })?;
loop {
pending_updates.extend(update_rx.try_iter());
if !pending_updates.is_empty() {
if let Ok(mut guard) = model.lock() {
let mut model = guard.take().unwrap();
for mut update in pending_updates.drain(..) {
update(&mut model);
}
*guard = Some(model);
}
}
let n_points = points_to_generate(stream.dac(), latency_points) as usize;
let mut buffer = Buffer {
point_hz,
latency_points: latency_points as _,
points: vec![Point::centered_blank(); n_points].into_boxed_slice(),
};
if let Ok(mut guard) = model.lock() {
let mut m = guard.take().unwrap();
render(&mut m, &mut buffer);
*guard = Some(m);
}
let points = buffer.iter().cloned().map(point_to_ether_dream_point);
stream
.queue_commands()
.data(points)
.submit()
.map_err(|err| EtherDreamStreamError::FailedToSubmitData { err })?;
}
}
fn dac_remaining_buffer_capacity(dac: ðer_dream::dac::Dac) -> u16 {
dac.buffer_capacity - 1 - dac.status.buffer_fullness
}
fn points_to_generate(dac: ðer_dream::dac::Dac, latency_points: u16) -> u16 {
let remaining_capacity = dac_remaining_buffer_capacity(dac);
let n = if dac.status.buffer_fullness < latency_points {
latency_points - dac.status.buffer_fullness
} else {
0
};
std::cmp::min(n, remaining_capacity)
}
fn centered_blank() -> ether_dream::protocol::DacPoint {
ether_dream::protocol::DacPoint {
control: 0,
x: 0,
y: 0,
r: 0,
g: 0,
b: 0,
i: 0,
u1: 0,
u2: 0,
}
}
fn position_to_ether_dream_position([px, py]: crate::point::Position) -> [i16; 2] {
let min = std::i16::MIN;
let max = std::i16::MAX;
let x = map_range(clamp(px, -1.0, 1.0), -1.0, 1.0, min as f64, max as f64) as i16;
let y = map_range(clamp(py, -1.0, 1.0), -1.0, 1.0, min as f64, max as f64) as i16;
[x, y]
}
fn color_to_ether_dream_color([pr, pg, pb]: crate::point::Rgb) -> [u16; 3] {
let r = (clamp(pr, 0.0, 1.0) * std::u16::MAX as f32) as u16;
let g = (clamp(pg, 0.0, 1.0) * std::u16::MAX as f32) as u16;
let b = (clamp(pb, 0.0, 1.0) * std::u16::MAX as f32) as u16;
[r, g, b]
}
fn point_to_ether_dream_point(p: Point) -> ether_dream::protocol::DacPoint {
let [x, y] = position_to_ether_dream_position(p.position);
let [r, g, b] = color_to_ether_dream_color(p.color);
let (control, i, u1, u2) = (0, 0, 0, 0);
ether_dream::protocol::DacPoint { control, x, y, r, g, b, i, u1, u2 }
}