use dac;
use protocol::{self, Command, ReadBytes, SizeBytes, WriteBytes, WriteToBytes};
use std::borrow::Cow;
use std::error::Error;
use std::{self, fmt, mem, net, ops, time};
use std::io::{self, Read, Write};
pub struct Stream {
dac: dac::Addressed,
tcp_stream: net::TcpStream,
timeout: Option<time::Duration>,
command_buffer: Vec<QueuedCommand>,
point_buffer: Vec<protocol::DacPoint>,
bytes: Vec<u8>,
}
#[derive(Clone, Debug, PartialEq, Eq, Hash)]
pub enum QueuedCommand {
PrepareStream,
Begin(protocol::command::Begin),
PointRate(protocol::command::PointRate),
Data(ops::Range<usize>),
Stop,
EmergencyStop,
ClearEmergencyStop,
Ping,
}
pub struct CommandQueue<'a> {
stream: &'a mut Stream,
}
#[derive(Debug)]
pub enum CommunicationError {
Io(io::Error),
Protocol(dac::ProtocolError),
Response(ResponseError),
}
#[derive(Debug)]
pub struct ResponseError {
pub response: protocol::DacResponse,
pub kind: ResponseErrorKind,
}
#[derive(Clone, Debug, PartialEq, Eq, Hash)]
pub enum ResponseErrorKind {
UnexpectedCommand(u8),
Nak(Nak),
}
#[derive(Copy, Clone, Debug, PartialEq, Eq, Hash)]
pub enum Nak {
Full,
Invalid,
StopCondition,
}
impl Stream {
fn send_command<C>(&mut self, command: C) -> io::Result<()>
where
C: Command + WriteToBytes,
{
let Stream { ref mut bytes, ref mut tcp_stream, .. } = *self;
send_command(bytes, tcp_stream, command)
}
fn recv_response(&mut self, expected_command: u8) -> Result<(), CommunicationError> {
let Stream { ref mut bytes, ref mut tcp_stream, ref mut dac, .. } = *self;
recv_response(bytes, tcp_stream, dac, expected_command)
}
pub fn dac(&self) -> &dac::Addressed {
&self.dac
}
pub fn queue_commands(&mut self) -> CommandQueue {
self.command_buffer.clear();
self.point_buffer.clear();
CommandQueue { stream: self }
}
pub fn set_nodelay(&self, b: bool) -> io::Result<()> {
self.tcp_stream.set_nodelay(b)
}
pub fn nodelay(&self) -> io::Result<bool> {
self.tcp_stream.nodelay()
}
pub fn set_ttl(&self, ttl: u32) -> io::Result<()> {
self.tcp_stream.set_ttl(ttl)
}
pub fn ttl(&self) -> io::Result<u32> {
self.tcp_stream.ttl()
}
}
fn send_command<C>(
bytes: &mut Vec<u8>,
tcp_stream: &mut net::TcpStream,
command: C,
) -> io::Result<()>
where
C: Command + WriteToBytes,
{
bytes.clear();
bytes.write_bytes(command)?;
tcp_stream.write(bytes)?;
Ok(())
}
fn recv_response(
bytes: &mut Vec<u8>,
tcp_stream: &mut net::TcpStream,
dac: &mut dac::Addressed,
expected_command: u8,
) -> Result<(), CommunicationError>
{
bytes.resize(protocol::DacResponse::SIZE_BYTES, 0);
tcp_stream.read_exact(bytes)?;
let response = (&bytes[..]).read_bytes::<protocol::DacResponse>()?;
response.check_errors(expected_command)?;
dac.update_status(&response.dac_status)?;
Ok(())
}
impl<'a> CommandQueue<'a> {
pub fn prepare_stream(self) -> Self {
self.stream.command_buffer.push(QueuedCommand::PrepareStream);
self
}
pub fn begin(self, low_water_mark: u16, point_rate: u32) -> Self {
let begin = protocol::command::Begin { low_water_mark, point_rate };
self.stream.command_buffer.push(QueuedCommand::Begin(begin));
self
}
pub fn point_rate(self, point_rate: u32) -> Self {
let point_rate = protocol::command::PointRate(point_rate);
self.stream.command_buffer.push(QueuedCommand::PointRate(point_rate));
self
}
pub fn data<I>(self, points: I) -> Self
where
I: IntoIterator<Item = protocol::DacPoint>,
{
let start = self.stream.point_buffer.len();
self.stream.point_buffer.extend(points);
let end = self.stream.point_buffer.len();
assert!(end - start < std::u16::MAX as usize, "the number of points exceeds the `u16` MAX");
self.stream.command_buffer.push(QueuedCommand::Data(start..end));
self
}
pub fn stop(self) -> Self {
self.stream.command_buffer.push(QueuedCommand::Stop);
self
}
pub fn emergency_stop(self) -> Self {
self.stream.command_buffer.push(QueuedCommand::Stop);
self
}
pub fn clear_emergency_stop(self) -> Self {
self.stream.command_buffer.push(QueuedCommand::Stop);
self
}
pub fn ping(self) -> Self {
self.stream.command_buffer.push(QueuedCommand::Ping);
self
}
pub fn submit(self) -> Result<(), CommunicationError> {
let CommandQueue { stream } = self;
let mut command_bytes = vec![];
let mut command_buffer = mem::replace(&mut stream.command_buffer, Vec::new());
for command in command_buffer.drain(..) {
match command {
QueuedCommand::PrepareStream => {
stream.send_command(protocol::command::PrepareStream)?;
command_bytes.push(protocol::command::PrepareStream::START_BYTE);
}
QueuedCommand::Begin(begin) => {
stream.send_command(begin)?;
command_bytes.push(protocol::command::Begin::START_BYTE);
}
QueuedCommand::PointRate(point_rate) => {
stream.send_command(point_rate)?;
command_bytes.push(protocol::command::PointRate::START_BYTE);
}
QueuedCommand::Data(range) => {
let Stream {
ref mut bytes,
ref mut tcp_stream,
ref point_buffer,
..
} = *stream;
let points = Cow::Borrowed(&point_buffer[range]);
let data = protocol::command::Data { points };
send_command(bytes, tcp_stream, data)?;
command_bytes.push(protocol::command::Data::START_BYTE);
}
QueuedCommand::Stop => {
stream.send_command(protocol::command::Stop)?;
command_bytes.push(protocol::command::Stop::START_BYTE);
},
QueuedCommand::EmergencyStop => {
stream.send_command(protocol::command::EmergencyStop)?;
command_bytes.push(protocol::command::EmergencyStop::START_BYTE);
},
QueuedCommand::ClearEmergencyStop => {
stream.send_command(protocol::command::ClearEmergencyStop)?;
command_bytes.push(protocol::command::ClearEmergencyStop::START_BYTE);
},
QueuedCommand::Ping => {
stream.send_command(protocol::command::Ping)?;
command_bytes.push(protocol::command::Ping::START_BYTE);
},
}
}
mem::swap(&mut stream.command_buffer, &mut command_buffer);
for command_byte in command_bytes.drain(..) {
stream.recv_response(command_byte)?;
}
Ok(())
}
}
impl protocol::DacResponse {
fn check_errors(&self, expected_command: u8) -> Result<(), ResponseError> {
if self.command != expected_command {
let response = self.clone();
let kind = ResponseErrorKind::UnexpectedCommand(self.command);
let err = ResponseError { response, kind };
return Err(err);
}
if let Some(nak) = Nak::from_protocol(self.response) {
let response = self.clone();
let kind = ResponseErrorKind::Nak(nak);
let err = ResponseError { response, kind };
return Err(err);
}
Ok(())
}
}
impl Nak {
pub fn from_protocol(nak: u8) -> Option<Self> {
match nak {
protocol::DacResponse::NAK_FULL => Some(Nak::Full),
protocol::DacResponse::NAK_INVALID => Some(Nak::Invalid),
protocol::DacResponse::NAK_STOP_CONDITION => Some(Nak::StopCondition),
_ => None,
}
}
pub fn to_protocol(&self) -> u8 {
match *self {
Nak::Full => protocol::DacResponse::NAK_FULL,
Nak::Invalid => protocol::DacResponse::NAK_INVALID,
Nak::StopCondition => protocol::DacResponse::NAK_STOP_CONDITION,
}
}
}
impl Error for CommunicationError {
fn description(&self) -> &str {
match *self {
CommunicationError::Io(ref err) => err.description(),
CommunicationError::Protocol(ref err) => err.description(),
CommunicationError::Response(ref err) => err.description(),
}
}
fn cause(&self) -> Option<&Error> {
match *self {
CommunicationError::Io(ref err) => Some(err as _),
CommunicationError::Protocol(ref err) => Some(err as _),
CommunicationError::Response(ref err) => Some(err as _),
}
}
}
impl Error for ResponseError {
fn description(&self) -> &str {
match self.kind {
ResponseErrorKind::UnexpectedCommand(_) => {
"the received response was to an unexpected command"
},
ResponseErrorKind::Nak(ref nak) => match *nak {
Nak::Full => "DAC responded with \"NAK - Full\"",
Nak::Invalid => "DAC responded with \"NAK - Invalid\"",
Nak::StopCondition => "DAC responded with \"NAK - Stop Condition\"",
},
}
}
}
impl fmt::Display for CommunicationError {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
write!(f, "{}", self.description())
}
}
impl fmt::Display for ResponseError {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
write!(f, "{}", self.description())
}
}
impl From<io::Error> for CommunicationError {
fn from(err: io::Error) -> Self {
CommunicationError::Io(err)
}
}
impl From<dac::ProtocolError> for CommunicationError {
fn from(err: dac::ProtocolError) -> Self {
CommunicationError::Protocol(err)
}
}
impl From<ResponseError> for CommunicationError {
fn from(err: ResponseError) -> Self {
CommunicationError::Response(err)
}
}
pub fn connect(
broadcast: &protocol::DacBroadcast,
dac_ip: net::IpAddr,
) -> Result<Stream, CommunicationError>
{
let mut dac = dac::Addressed::from_broadcast(broadcast)?;
let dac_addr = net::SocketAddr::new(dac_ip, protocol::COMMUNICATION_PORT);
let mut tcp_stream = net::TcpStream::connect(dac_addr)?;
tcp_stream.set_nodelay(true)?;
let mut bytes = vec![];
recv_response(
&mut bytes,
&mut tcp_stream,
&mut dac,
protocol::command::Ping::START_BYTE,
)?;
let stream = Stream {
dac,
tcp_stream,
timeout: None,
command_buffer: vec![],
point_buffer: vec![],
bytes,
};
Ok(stream)
}