use crate::protocols::lasercube_wifi::dac::buffer_estimator::BufferEstimator;
use crate::protocols::lasercube_wifi::dac::Addressed;
use crate::protocols::lasercube_wifi::error::CommunicationError;
use crate::protocols::lasercube_wifi::protocol::{
command, BufferStatus, Point, WriteBytes, CMD_PORT, DATA_HEADER_SIZE, DATA_PORT,
MAX_POINTS_PER_PACKET, POINT_SIZE_BYTES,
};
use std::net::{SocketAddr, UdpSocket};
use std::time::{Duration, Instant};
const CMD_TIMEOUT: Duration = Duration::from_millis(500);
const INITIAL_POINT_RATE: u32 = 30_000;
const CHUNK_DELAY_US: u64 = 10;
const COMMAND_REPEAT_COUNT: usize = 2;
const REQUIRED_WARMUP_PACKETS: usize = 300;
pub struct Stream {
dac: Addressed,
cmd_socket: UdpSocket,
data_socket: UdpSocket,
message_number: u8,
frame_number: u8,
current_rate: u32,
initialized: bool,
send_buffer: Vec<u8>,
recv_buffer: [u8; 1500],
buffer_estimator: BufferEstimator,
}
impl Stream {
pub fn connect(dac: &Addressed) -> Result<Self, CommunicationError> {
Self::connect_with_timeout(dac, CMD_TIMEOUT)
}
pub fn connect_with_timeout(
dac: &Addressed,
timeout: Duration,
) -> Result<Self, CommunicationError> {
let cmd_socket = UdpSocket::bind("0.0.0.0:0")?;
cmd_socket.set_read_timeout(Some(timeout))?;
let cmd_addr = SocketAddr::new(dac.ip_addr, CMD_PORT);
cmd_socket.connect(cmd_addr)?;
let data_socket = UdpSocket::bind("0.0.0.0:0")?;
data_socket.set_read_timeout(Some(Duration::from_micros(1000)))?;
let data_addr = SocketAddr::new(dac.ip_addr, DATA_PORT);
data_socket.connect(data_addr)?;
let mut stream = Stream {
buffer_estimator: BufferEstimator::new(dac.max_buffer_space, INITIAL_POINT_RATE),
dac: dac.clone(),
cmd_socket,
data_socket,
message_number: 0,
frame_number: 0,
current_rate: 0,
initialized: false,
send_buffer: Vec::with_capacity(
DATA_HEADER_SIZE + MAX_POINTS_PER_PACKET * POINT_SIZE_BYTES,
),
recv_buffer: [0u8; 1500],
};
stream.initialize()?;
Ok(stream)
}
fn initialize(&mut self) -> Result<(), CommunicationError> {
self.send_command_repeated(&command::enable_buffer_size_response(true))?;
self.send_command_repeated(&command::clear_ringbuffer())?;
self.send_command_repeated(&command::set_output(true))?;
self.dac.status.output_enabled = true;
self.send_command_repeated(&command::set_rate(INITIAL_POINT_RATE))?;
self.current_rate = INITIAL_POINT_RATE;
self.dac.status.point_rate = INITIAL_POINT_RATE;
self.buffer_estimator.set_point_rate(INITIAL_POINT_RATE);
self.warmup()?;
self.initialized = true;
Ok(())
}
fn warmup(&mut self) -> Result<(), CommunicationError> {
let blank_points = [Point::blank(); MAX_POINTS_PER_PACKET];
for _ in 0..REQUIRED_WARMUP_PACKETS {
self.send_points_internal(&blank_points)?;
self.receive_buffer_status()?;
}
Ok(())
}
fn send_command_repeated(&mut self, cmd: &[u8]) -> Result<(), CommunicationError> {
for _ in 0..COMMAND_REPEAT_COUNT {
self.cmd_socket.send(cmd)?;
}
Ok(())
}
fn receive_buffer_status(&mut self) -> Result<Option<BufferStatus>, CommunicationError> {
match self.data_socket.recv(&mut self.recv_buffer) {
Ok(len) if len >= 4 => {
if let Ok(status) = BufferStatus::from_response(&self.recv_buffer[..len]) {
let now = Instant::now();
self.buffer_estimator
.record_ack(status.message_number, status.free_space, now);
self.dac.status.free_buffer_space = status.free_space;
return Ok(Some(status));
}
}
Ok(_) => {}
Err(ref e) if e.kind() == std::io::ErrorKind::WouldBlock => {}
Err(ref e) if e.kind() == std::io::ErrorKind::TimedOut => {}
Err(e) => return Err(e.into()),
}
Ok(None)
}
fn try_receive_buffer_status(&mut self) {
for _ in 0..2 {
if self.receive_buffer_status().is_err() {
break;
}
}
}
fn send_points_internal(&mut self, points: &[Point]) -> Result<(), CommunicationError> {
for chunk in points.chunks(MAX_POINTS_PER_PACKET) {
self.send_buffer.clear();
let header = command::sample_data_header(self.message_number, self.frame_number);
self.send_buffer.extend_from_slice(&header);
for point in chunk {
self.send_buffer.write_bytes(point)?;
}
self.data_socket.send(&self.send_buffer)?;
let now = Instant::now();
self.buffer_estimator
.record_send(self.message_number, chunk.len() as u16, now);
self.message_number = self.message_number.wrapping_add(1);
if points.len() > MAX_POINTS_PER_PACKET {
std::thread::sleep(Duration::from_micros(CHUNK_DELAY_US));
}
self.try_receive_buffer_status();
}
Ok(())
}
pub fn dac(&self) -> &Addressed {
&self.dac
}
pub fn free_buffer_space(&self) -> u16 {
self.dac.status.free_buffer_space
}
pub fn estimated_buffer_fullness(&self) -> u16 {
self.buffer_estimator
.estimated_buffer_fullness(Instant::now())
}
pub fn safe_writable_points(&mut self) -> u16 {
self.try_receive_buffer_status();
self.buffer_estimator.max_points_to_add(Instant::now())
}
pub fn point_rate(&self) -> u32 {
self.current_rate
}
pub fn set_rate(&mut self, rate: u32) -> Result<(), CommunicationError> {
if (rate as i64 - self.current_rate as i64).unsigned_abs() > 3 {
self.send_command_repeated(&command::set_rate(rate))?;
self.current_rate = rate;
self.dac.status.point_rate = rate;
}
self.buffer_estimator.set_point_rate(rate);
Ok(())
}
pub fn set_output(&mut self, enabled: bool) -> Result<(), CommunicationError> {
self.send_command_repeated(&command::set_output(enabled))?;
self.dac.status.output_enabled = enabled;
Ok(())
}
pub fn write_frame(&mut self, points: &[Point], rate: u32) -> Result<(), CommunicationError> {
if !self.initialized {
return Err(CommunicationError::NotInitialized);
}
self.set_rate(rate)?;
if !self.buffer_estimator.can_send(Instant::now()) {
self.try_receive_buffer_status();
}
self.send_points_internal(points)?;
self.frame_number = self.frame_number.wrapping_add(1);
Ok(())
}
pub fn stop(&mut self) -> Result<(), CommunicationError> {
self.send_command_repeated(&command::set_output(false))?;
self.dac.status.output_enabled = false;
self.send_command_repeated(&command::clear_ringbuffer())?;
self.dac.status.free_buffer_space = self.dac.max_buffer_space;
self.buffer_estimator.reset();
Ok(())
}
pub fn set_cmd_timeout(&self, timeout: Option<Duration>) -> Result<(), CommunicationError> {
self.cmd_socket.set_read_timeout(timeout)?;
Ok(())
}
pub fn set_data_timeout(&self, timeout: Option<Duration>) -> Result<(), CommunicationError> {
self.data_socket.set_read_timeout(timeout)?;
Ok(())
}
}
impl Drop for Stream {
fn drop(&mut self) {
let _ = self.stop();
}
}
pub fn connect(dac: &Addressed) -> Result<Stream, CommunicationError> {
Stream::connect(dac)
}
pub fn connect_timeout(dac: &Addressed, timeout: Duration) -> Result<Stream, CommunicationError> {
Stream::connect_with_timeout(dac, timeout)
}
#[inline]
pub fn calculate_buffer_used(max_buffer_space: u16, free_buffer_space: u16) -> u16 {
max_buffer_space.saturating_sub(free_buffer_space)
}