1use crate::dac;
2use crate::protocol::{self, Command, ReadBytes, SizeBytes, WriteBytes, WriteToBytes};
3use std::borrow::Cow;
4use std::error::Error;
5use std::io::{self, Read, Write};
6use std::{self, fmt, mem, net, ops, time};
7
8pub struct Stream {
10 dac: dac::Addressed,
12 tcp_stream: net::TcpStream,
14 command_buffer: Vec<QueuedCommand>,
16 point_buffer: Vec<protocol::DacPoint>,
18 bytes: Vec<u8>,
20}
21
22#[derive(Clone, Debug, PartialEq, Eq, Hash)]
24pub enum QueuedCommand {
25 PrepareStream,
26 Begin(protocol::command::Begin),
27 PointRate(protocol::command::PointRate),
28 Data(ops::Range<usize>),
29 Stop,
30 EmergencyStop,
31 ClearEmergencyStop,
32 Ping,
33}
34
35pub struct CommandQueue<'a> {
37 stream: &'a mut Stream,
38}
39
40#[derive(Debug)]
42pub enum CommunicationError {
43 Io(io::Error),
44 Protocol(dac::ProtocolError),
45 Response(ResponseError),
46}
47
48#[derive(Debug)]
50pub struct ResponseError {
51 pub response: protocol::DacResponse,
53 pub kind: ResponseErrorKind,
55}
56
57#[derive(Clone, Debug, PartialEq, Eq, Hash)]
59pub enum ResponseErrorKind {
60 UnexpectedCommand(u8),
62 Nak(Nak),
64}
65
66#[derive(Copy, Clone, Debug, PartialEq, Eq, Hash)]
68pub enum Nak {
69 Full,
72 Invalid,
74 StopCondition,
76}
77
78impl Stream {
79 fn send_command<C>(&mut self, command: C) -> io::Result<()>
80 where
81 C: Command + WriteToBytes,
82 {
83 let Stream {
84 ref mut bytes,
85 ref mut tcp_stream,
86 ..
87 } = *self;
88 send_command(bytes, tcp_stream, command)
89 }
90
91 fn recv_response(&mut self, expected_command: u8) -> Result<(), CommunicationError> {
92 let Stream {
93 ref mut bytes,
94 ref mut tcp_stream,
95 ref mut dac,
96 ..
97 } = *self;
98 recv_response(bytes, tcp_stream, dac, expected_command)
99 }
100
101 pub fn dac(&self) -> &dac::Addressed {
103 &self.dac
104 }
105
106 pub fn queue_commands(&mut self) -> CommandQueue {
108 self.command_buffer.clear();
109 self.point_buffer.clear();
110 CommandQueue { stream: self }
111 }
112
113 pub fn set_nodelay(&self, b: bool) -> io::Result<()> {
124 self.tcp_stream.set_nodelay(b)
125 }
126
127 pub fn nodelay(&self) -> io::Result<bool> {
131 self.tcp_stream.nodelay()
132 }
133
134 pub fn set_ttl(&self, ttl: u32) -> io::Result<()> {
141 self.tcp_stream.set_ttl(ttl)
142 }
143
144 pub fn ttl(&self) -> io::Result<u32> {
148 self.tcp_stream.ttl()
149 }
150
151 pub fn set_read_timeout(&self, duration: Option<time::Duration>) -> io::Result<()> {
157 self.tcp_stream.set_read_timeout(duration)
158 }
159
160 pub fn set_write_timeout(&self, duration: Option<time::Duration>) -> io::Result<()> {
166 self.tcp_stream.set_write_timeout(duration)
167 }
168
169 pub fn set_timeout(&self, duration: Option<time::Duration>) -> io::Result<()> {
176 self.set_read_timeout(duration)?;
177 self.set_write_timeout(duration)
178 }
179}
180
181impl<'a> CommandQueue<'a> {
182 pub fn prepare_stream(self) -> Self {
188 self.stream
189 .command_buffer
190 .push(QueuedCommand::PrepareStream);
191 self
192 }
193
194 pub fn begin(self, low_water_mark: u16, point_rate: u32) -> Self {
206 let begin = protocol::command::Begin {
207 low_water_mark,
208 point_rate,
209 };
210 self.stream.command_buffer.push(QueuedCommand::Begin(begin));
211 self
212 }
213
214 pub fn point_rate(self, point_rate: u32) -> Self {
225 let point_rate = protocol::command::PointRate(point_rate);
226 self.stream
227 .command_buffer
228 .push(QueuedCommand::PointRate(point_rate));
229 self
230 }
231
232 pub fn data<I>(self, points: I) -> Self
234 where
235 I: IntoIterator<Item = protocol::DacPoint>,
236 {
237 let start = self.stream.point_buffer.len();
238 self.stream.point_buffer.extend(points);
239 let end = self.stream.point_buffer.len();
240 assert!(
241 end - start < std::u16::MAX as usize,
242 "the number of points exceeds the `u16` MAX"
243 );
244 self.stream
245 .command_buffer
246 .push(QueuedCommand::Data(start..end));
247 self
248 }
249
250 pub fn stop(self) -> Self {
256 self.stream.command_buffer.push(QueuedCommand::Stop);
257 self
258 }
259
260 pub fn emergency_stop(self) -> Self {
264 self.stream.command_buffer.push(QueuedCommand::Stop);
265 self
266 }
267
268 pub fn clear_emergency_stop(self) -> Self {
278 self.stream.command_buffer.push(QueuedCommand::Stop);
279 self
280 }
281
282 pub fn ping(self) -> Self {
286 self.stream.command_buffer.push(QueuedCommand::Ping);
287 self
288 }
289
290 pub fn submit(self) -> Result<(), CommunicationError> {
295 let CommandQueue { stream } = self;
296
297 let mut command_bytes = vec![];
299
300 let mut command_buffer = mem::replace(&mut stream.command_buffer, Vec::new());
302
303 for command in command_buffer.drain(..) {
305 match command {
306 QueuedCommand::PrepareStream => {
307 stream.send_command(protocol::command::PrepareStream)?;
308 command_bytes.push(protocol::command::PrepareStream::START_BYTE);
309 }
310 QueuedCommand::Begin(begin) => {
311 stream.send_command(begin)?;
312 command_bytes.push(protocol::command::Begin::START_BYTE);
313 }
314 QueuedCommand::PointRate(point_rate) => {
315 stream.send_command(point_rate)?;
316 command_bytes.push(protocol::command::PointRate::START_BYTE);
317 }
318 QueuedCommand::Data(range) => {
319 let Stream {
320 ref mut bytes,
321 ref mut tcp_stream,
322 ref point_buffer,
323 ..
324 } = *stream;
325 let points = Cow::Borrowed(&point_buffer[range]);
326 let data = protocol::command::Data { points };
327 send_command(bytes, tcp_stream, data)?;
328 command_bytes.push(protocol::command::Data::START_BYTE);
329 }
330 QueuedCommand::Stop => {
331 stream.send_command(protocol::command::Stop)?;
332 command_bytes.push(protocol::command::Stop::START_BYTE);
333 }
334 QueuedCommand::EmergencyStop => {
335 stream.send_command(protocol::command::EmergencyStop)?;
336 command_bytes.push(protocol::command::EmergencyStop::START_BYTE);
337 }
338 QueuedCommand::ClearEmergencyStop => {
339 stream.send_command(protocol::command::ClearEmergencyStop)?;
340 command_bytes.push(protocol::command::ClearEmergencyStop::START_BYTE);
341 }
342 QueuedCommand::Ping => {
343 stream.send_command(protocol::command::Ping)?;
344 command_bytes.push(protocol::command::Ping::START_BYTE);
345 }
346 }
347 }
348
349 mem::swap(&mut stream.command_buffer, &mut command_buffer);
351
352 for command_byte in command_bytes.drain(..) {
354 stream.recv_response(command_byte)?;
355 }
356
357 Ok(())
358 }
359}
360
361impl protocol::DacResponse {
362 fn check_errors(&self, expected_command: u8) -> Result<(), ResponseError> {
364 if self.command != expected_command {
365 let response = self.clone();
366 let kind = ResponseErrorKind::UnexpectedCommand(self.command);
367 let err = ResponseError { response, kind };
368 return Err(err);
369 }
370
371 if let Some(nak) = Nak::from_protocol(self.response) {
372 let response = self.clone();
373 let kind = ResponseErrorKind::Nak(nak);
374 let err = ResponseError { response, kind };
375 return Err(err);
376 }
377
378 Ok(())
379 }
380}
381
382impl Nak {
383 pub fn from_protocol(nak: u8) -> Option<Self> {
385 match nak {
386 protocol::DacResponse::NAK_FULL => Some(Nak::Full),
387 protocol::DacResponse::NAK_INVALID => Some(Nak::Invalid),
388 protocol::DacResponse::NAK_STOP_CONDITION => Some(Nak::StopCondition),
389 _ => None,
390 }
391 }
392
393 pub fn to_protocol(&self) -> u8 {
395 match *self {
396 Nak::Full => protocol::DacResponse::NAK_FULL,
397 Nak::Invalid => protocol::DacResponse::NAK_INVALID,
398 Nak::StopCondition => protocol::DacResponse::NAK_STOP_CONDITION,
399 }
400 }
401}
402
403impl Error for CommunicationError {
404 fn cause(&self) -> Option<&dyn Error> {
405 match *self {
406 CommunicationError::Io(ref err) => Some(err as _),
407 CommunicationError::Protocol(ref err) => Some(err as _),
408 CommunicationError::Response(ref err) => Some(err as _),
409 }
410 }
411}
412
413impl Error for ResponseError {}
414
415impl fmt::Display for CommunicationError {
416 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
417 match *self {
418 CommunicationError::Io(ref err) => err.fmt(f),
419 CommunicationError::Protocol(ref err) => err.fmt(f),
420 CommunicationError::Response(ref err) => err.fmt(f),
421 }
422 }
423}
424
425impl fmt::Display for ResponseError {
426 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
427 let s = match self.kind {
428 ResponseErrorKind::UnexpectedCommand(_) => {
429 "the received response was to an unexpected command"
430 }
431 ResponseErrorKind::Nak(ref nak) => match *nak {
432 Nak::Full => "DAC responded with \"NAK - Full\"",
433 Nak::Invalid => "DAC responded with \"NAK - Invalid\"",
434 Nak::StopCondition => "DAC responded with \"NAK - Stop Condition\"",
435 },
436 };
437 write!(f, "{}", s)
438 }
439}
440
441impl From<io::Error> for CommunicationError {
442 fn from(err: io::Error) -> Self {
443 CommunicationError::Io(err)
444 }
445}
446
447impl From<dac::ProtocolError> for CommunicationError {
448 fn from(err: dac::ProtocolError) -> Self {
449 CommunicationError::Protocol(err)
450 }
451}
452
453impl From<ResponseError> for CommunicationError {
454 fn from(err: ResponseError) -> Self {
455 CommunicationError::Response(err)
456 }
457}
458
459pub fn connect(
468 broadcast: &protocol::DacBroadcast,
469 dac_ip: net::IpAddr,
470) -> Result<Stream, CommunicationError> {
471 connect_inner(broadcast, dac_ip, &net::TcpStream::connect)
472}
473
474pub fn connect_timeout(
478 broadcast: &protocol::DacBroadcast,
479 dac_ip: net::IpAddr,
480 timeout: time::Duration,
481) -> Result<Stream, CommunicationError> {
482 let connect = |addr| net::TcpStream::connect_timeout(&addr, timeout);
483 connect_inner(broadcast, dac_ip, &connect)
484}
485
486fn connect_inner(
488 broadcast: &protocol::DacBroadcast,
489 dac_ip: net::IpAddr,
490 connect: &dyn Fn(net::SocketAddr) -> io::Result<net::TcpStream>,
491) -> Result<Stream, CommunicationError> {
492 let mut dac = dac::Addressed::from_broadcast(broadcast)?;
494
495 let dac_addr = net::SocketAddr::new(dac_ip, protocol::COMMUNICATION_PORT);
497 let mut tcp_stream = connect(dac_addr)?;
498
499 tcp_stream.set_nodelay(true)?;
501
502 let mut bytes = vec![];
504
505 recv_response(
507 &mut bytes,
508 &mut tcp_stream,
509 &mut dac,
510 protocol::command::Ping::START_BYTE,
511 )?;
512
513 let stream = Stream {
515 dac,
516 tcp_stream,
517 command_buffer: vec![],
518 point_buffer: vec![],
519 bytes,
520 };
521
522 Ok(stream)
523}
524
525fn send_command<C>(
527 bytes: &mut Vec<u8>,
528 tcp_stream: &mut net::TcpStream,
529 command: C,
530) -> io::Result<()>
531where
532 C: Command + WriteToBytes,
533{
534 bytes.clear();
535 bytes.write_bytes(command)?;
536 tcp_stream.write(bytes)?;
537 Ok(())
538}
539
540fn recv_response(
542 bytes: &mut Vec<u8>,
543 tcp_stream: &mut net::TcpStream,
544 dac: &mut dac::Addressed,
545 expected_command: u8,
546) -> Result<(), CommunicationError> {
547 bytes.resize(protocol::DacResponse::SIZE_BYTES, 0);
549 tcp_stream.read_exact(bytes)?;
550 let response = (&bytes[..]).read_bytes::<protocol::DacResponse>()?;
551 response.check_errors(expected_command)?;
552 dac.update_status(&response.dac_status)?;
554 Ok(())
555}