ether_dream/dac/
stream.rs

1use crate::dac;
2use crate::protocol::{self, Command, ReadBytes, SizeBytes, WriteBytes, WriteToBytes};
3use std::borrow::Cow;
4use std::error::Error;
5use std::io::{self, Read, Write};
6use std::{self, fmt, mem, net, ops, time};
7
8/// A bi-directional communication stream between the user and a `Dac`.
9pub struct Stream {
10    /// An up-to-date representation of the `DAC` with which the stream is connected.
11    dac: dac::Addressed,
12    /// The TCP stream used for communicating with the DAC.
13    tcp_stream: net::TcpStream,
14    /// A buffer to re-use for queueing commands via the `queue_commands` method.
15    command_buffer: Vec<QueuedCommand>,
16    /// A buffer to re-use for queueing points for `Data` commands.
17    point_buffer: Vec<protocol::DacPoint>,
18    /// A buffer used for efficiently writing and reading bytes to and from TCP.
19    bytes: Vec<u8>,
20}
21
22/// A runtime representation of any of the possible commands.
23#[derive(Clone, Debug, PartialEq, Eq, Hash)]
24pub enum QueuedCommand {
25    PrepareStream,
26    Begin(protocol::command::Begin),
27    PointRate(protocol::command::PointRate),
28    Data(ops::Range<usize>),
29    Stop,
30    EmergencyStop,
31    ClearEmergencyStop,
32    Ping,
33}
34
35/// A queue of commands that are to be submitted at once before listening for their responses.
36pub struct CommandQueue<'a> {
37    stream: &'a mut Stream,
38}
39
40/// Errors that may occur when connecting a `Stream`.
41#[derive(Debug)]
42pub enum CommunicationError {
43    Io(io::Error),
44    Protocol(dac::ProtocolError),
45    Response(ResponseError),
46}
47
48/// An error indicated by a DAC response.
49#[derive(Debug)]
50pub struct ResponseError {
51    /// The DAC response on which the NAK was received.
52    pub response: protocol::DacResponse,
53    /// The kind of response error that occurred.
54    pub kind: ResponseErrorKind,
55}
56
57/// The kinds of errors that may be interpreted from a DAC response.
58#[derive(Clone, Debug, PartialEq, Eq, Hash)]
59pub enum ResponseErrorKind {
60    /// The response was to a command that was unexpected.
61    UnexpectedCommand(u8),
62    /// The DAC responded with a NAK.
63    Nak(Nak),
64}
65
66/// The NAK message kinds that may be returned by the DAC.
67#[derive(Copy, Clone, Debug, PartialEq, Eq, Hash)]
68pub enum Nak {
69    /// The write command could not be performed because there was not enough buffer space when it
70    /// was received.
71    Full,
72    /// The command contained an invalid `command` byte or parameters.
73    Invalid,
74    /// An emergency-stop condition still exists.
75    StopCondition,
76}
77
78impl Stream {
79    fn send_command<C>(&mut self, command: C) -> io::Result<()>
80    where
81        C: Command + WriteToBytes,
82    {
83        let Stream {
84            ref mut bytes,
85            ref mut tcp_stream,
86            ..
87        } = *self;
88        send_command(bytes, tcp_stream, command)
89    }
90
91    fn recv_response(&mut self, expected_command: u8) -> Result<(), CommunicationError> {
92        let Stream {
93            ref mut bytes,
94            ref mut tcp_stream,
95            ref mut dac,
96            ..
97        } = *self;
98        recv_response(bytes, tcp_stream, dac, expected_command)
99    }
100
101    /// Borrow the inner DAC to examine its state.
102    pub fn dac(&self) -> &dac::Addressed {
103        &self.dac
104    }
105
106    /// Queue one or more commands to be submitted to the DAC at once.
107    pub fn queue_commands(&mut self) -> CommandQueue {
108        self.command_buffer.clear();
109        self.point_buffer.clear();
110        CommandQueue { stream: self }
111    }
112
113    /// This directly calls `set_nodelay` on the inner **TcpStream**. In other words, this sets the
114    /// value of the TCP_NODELAY option for this socket.
115    ///
116    /// Note that due to the necessity for very low-latency communication with the DAC, this API
117    /// enables TCP_NODELAY by default. This method is exposed in order to allow the user to
118    /// disable this if they wish.
119    ///
120    /// When not set, data is buffered until there is a sufficient amount to send out, thereby
121    /// avoiding the frequent sending of small packets. Although perhaps more efficient for the
122    /// network, this may result in DAC underflows if **Data** commands are delayed for too long.
123    pub fn set_nodelay(&self, b: bool) -> io::Result<()> {
124        self.tcp_stream.set_nodelay(b)
125    }
126
127    /// Gets the value of the TCP_NODELAY option for this socket.
128    ///
129    /// For more infnormation about this option, see `set_nodelay`.
130    pub fn nodelay(&self) -> io::Result<bool> {
131        self.tcp_stream.nodelay()
132    }
133
134    /// This directly calls `set_ttl` on the inner **TcpStream**. In other words, this sets the
135    /// value for the `IP_TTL` option on this socket.
136    ///
137    /// This value sets the time-to-live field that is used in every packet sent from this socket.
138    /// Time-to-live describes the number of hops between devices that a packet may make before it
139    /// is discarded/ignored.
140    pub fn set_ttl(&self, ttl: u32) -> io::Result<()> {
141        self.tcp_stream.set_ttl(ttl)
142    }
143
144    /// Gets the value of the `IP_TTL` option for this socket.
145    ///
146    /// For more information about this option see `set_ttl`.
147    pub fn ttl(&self) -> io::Result<u32> {
148        self.tcp_stream.ttl()
149    }
150
151    /// Sets the read timeout of the underlying `TcpStream`.
152    ///
153    /// See the following for details:
154    ///
155    /// - [`std::net::TcpStream::set_read_timeout`](https://doc.rust-lang.org/std/net/struct.TcpStream.html#method.set_read_timeout)
156    pub fn set_read_timeout(&self, duration: Option<time::Duration>) -> io::Result<()> {
157        self.tcp_stream.set_read_timeout(duration)
158    }
159
160    /// Sets the write timeout of the underlying `TcpStream`.
161    ///
162    /// See the following for details:
163    ///
164    /// - [`std::net::TcpStream::set_write_timeout`](https://doc.rust-lang.org/std/net/struct.TcpStream.html#method.set_write_timeout)
165    pub fn set_write_timeout(&self, duration: Option<time::Duration>) -> io::Result<()> {
166        self.tcp_stream.set_write_timeout(duration)
167    }
168
169    /// Sets the read and write timeout of the underlying `TcpStream`.
170    ///
171    /// See the following for details:
172    ///
173    /// - [`std::net::TcpStream::set_read_timeout`](https://doc.rust-lang.org/std/net/struct.TcpStream.html#method.set_read_timeout)
174    /// - [`std::net::TcpStream::set_write_timeout`](https://doc.rust-lang.org/std/net/struct.TcpStream.html#method.set_write_timeout)
175    pub fn set_timeout(&self, duration: Option<time::Duration>) -> io::Result<()> {
176        self.set_read_timeout(duration)?;
177        self.set_write_timeout(duration)
178    }
179}
180
181impl<'a> CommandQueue<'a> {
182    /// This command causes the playback system to enter the `Prepared` state. The DAC resets its
183    /// buffer to be empty and sets "point_count" to `0`.
184    ///
185    /// This command may only be sent if the light engine is `Ready` and the playback system is
186    /// `Idle`. If so, the DAC replies with ACK. Otherwise, it replies with NAK - Invalid.
187    pub fn prepare_stream(self) -> Self {
188        self.stream
189            .command_buffer
190            .push(QueuedCommand::PrepareStream);
191        self
192    }
193
194    /// Causes the DAC to begin producing output.
195    ///
196    /// ### `low_water_mark`
197    ///
198    /// *Currently unused.*
199    ///
200    /// ### `point_rate`
201    ///
202    /// The number of points per second to be read from the buffer.  If the playback system was
203    /// `Prepared` and there was data in the buffer, then the DAC will reply with ACK. Otherwise,
204    /// it replies with NAK - Invalid.
205    pub fn begin(self, low_water_mark: u16, point_rate: u32) -> Self {
206        let begin = protocol::command::Begin {
207            low_water_mark,
208            point_rate,
209        };
210        self.stream.command_buffer.push(QueuedCommand::Begin(begin));
211        self
212    }
213
214    /// Adds a new point rate to the point rate buffer.
215    ///
216    /// Point rate changes are read out of the buffer when a point with an appropriate flag is
217    /// played (see the `WriteData` command).
218    ///
219    /// If the DAC's playback state is not `Prepared` or `Playing`, it replies with NAK - Invalid.
220    ///
221    /// If the point rate buffer is full, it replies with NAK - Full.
222    ///
223    /// Otherwise, it replies with ACK.
224    pub fn point_rate(self, point_rate: u32) -> Self {
225        let point_rate = protocol::command::PointRate(point_rate);
226        self.stream
227            .command_buffer
228            .push(QueuedCommand::PointRate(point_rate));
229        self
230    }
231
232    /// Indicates to the DAC to add the following point data into its buffer.
233    pub fn data<I>(self, points: I) -> Self
234    where
235        I: IntoIterator<Item = protocol::DacPoint>,
236    {
237        let start = self.stream.point_buffer.len();
238        self.stream.point_buffer.extend(points);
239        let end = self.stream.point_buffer.len();
240        assert!(
241            end - start < std::u16::MAX as usize,
242            "the number of points exceeds the `u16` MAX"
243        );
244        self.stream
245            .command_buffer
246            .push(QueuedCommand::Data(start..end));
247        self
248    }
249
250    /// Causes the DAC to immediately stop playing and return to the `Idle` playback state.
251    ///
252    /// It is ACKed if the DAC was in the `Playing` or `Prepared` playback states.
253    ///
254    /// Otherwise it is replied to with NAK - Invalid.
255    pub fn stop(self) -> Self {
256        self.stream.command_buffer.push(QueuedCommand::Stop);
257        self
258    }
259
260    /// Causes the light engine to enter the E-Stop state, regardless of its previous state.
261    ///
262    /// This command is always ACKed.
263    pub fn emergency_stop(self) -> Self {
264        self.stream.command_buffer.push(QueuedCommand::Stop);
265        self
266    }
267
268    /// If the light engine was in E-Stop state due to an emergency stop command (either from a
269    /// local stop condition or over the network), this command resets it to `Ready`.
270    ///
271    /// It is ACKed if the DAC was previously in E-Stop.
272    ///
273    /// Otherwise it is replied to with a NAK - Invalid.
274    ///
275    /// If the condition that caused the emergency stop is still active (e.g. E-Stop input still
276    /// asserted, temperature still out of bounds, etc) a NAK - Stop Condition is sent.
277    pub fn clear_emergency_stop(self) -> Self {
278        self.stream.command_buffer.push(QueuedCommand::Stop);
279        self
280    }
281
282    /// The DAC will reply to this with an ACK packet.
283    ///
284    /// This serves as a keep-alive for the connection when the DAC is not actively streaming.
285    pub fn ping(self) -> Self {
286        self.stream.command_buffer.push(QueuedCommand::Ping);
287        self
288    }
289
290    /// Finish queueing commands and send them to the DAC.
291    ///
292    /// First all commands are written to the TCP stream, then we block waiting for a response to
293    /// each from the DAC.
294    pub fn submit(self) -> Result<(), CommunicationError> {
295        let CommandQueue { stream } = self;
296
297        // Track the command byte order so that we can ensure we get correct responses.
298        let mut command_bytes = vec![];
299
300        // Retrieve the command buffer so we can drain it.
301        let mut command_buffer = mem::replace(&mut stream.command_buffer, Vec::new());
302
303        // Send each command via the TCP stream.
304        for command in command_buffer.drain(..) {
305            match command {
306                QueuedCommand::PrepareStream => {
307                    stream.send_command(protocol::command::PrepareStream)?;
308                    command_bytes.push(protocol::command::PrepareStream::START_BYTE);
309                }
310                QueuedCommand::Begin(begin) => {
311                    stream.send_command(begin)?;
312                    command_bytes.push(protocol::command::Begin::START_BYTE);
313                }
314                QueuedCommand::PointRate(point_rate) => {
315                    stream.send_command(point_rate)?;
316                    command_bytes.push(protocol::command::PointRate::START_BYTE);
317                }
318                QueuedCommand::Data(range) => {
319                    let Stream {
320                        ref mut bytes,
321                        ref mut tcp_stream,
322                        ref point_buffer,
323                        ..
324                    } = *stream;
325                    let points = Cow::Borrowed(&point_buffer[range]);
326                    let data = protocol::command::Data { points };
327                    send_command(bytes, tcp_stream, data)?;
328                    command_bytes.push(protocol::command::Data::START_BYTE);
329                }
330                QueuedCommand::Stop => {
331                    stream.send_command(protocol::command::Stop)?;
332                    command_bytes.push(protocol::command::Stop::START_BYTE);
333                }
334                QueuedCommand::EmergencyStop => {
335                    stream.send_command(protocol::command::EmergencyStop)?;
336                    command_bytes.push(protocol::command::EmergencyStop::START_BYTE);
337                }
338                QueuedCommand::ClearEmergencyStop => {
339                    stream.send_command(protocol::command::ClearEmergencyStop)?;
340                    command_bytes.push(protocol::command::ClearEmergencyStop::START_BYTE);
341                }
342                QueuedCommand::Ping => {
343                    stream.send_command(protocol::command::Ping)?;
344                    command_bytes.push(protocol::command::Ping::START_BYTE);
345                }
346            }
347        }
348
349        // Place the allocated command buffer back in the stream.
350        mem::swap(&mut stream.command_buffer, &mut command_buffer);
351
352        // Wait for a response to each command.
353        for command_byte in command_bytes.drain(..) {
354            stream.recv_response(command_byte)?;
355        }
356
357        Ok(())
358    }
359}
360
361impl protocol::DacResponse {
362    // Checks the response for unexpected command and NAK errors.
363    fn check_errors(&self, expected_command: u8) -> Result<(), ResponseError> {
364        if self.command != expected_command {
365            let response = self.clone();
366            let kind = ResponseErrorKind::UnexpectedCommand(self.command);
367            let err = ResponseError { response, kind };
368            return Err(err);
369        }
370
371        if let Some(nak) = Nak::from_protocol(self.response) {
372            let response = self.clone();
373            let kind = ResponseErrorKind::Nak(nak);
374            let err = ResponseError { response, kind };
375            return Err(err);
376        }
377
378        Ok(())
379    }
380}
381
382impl Nak {
383    /// Produce a `Nak` from the low-level protocol byte representation.
384    pub fn from_protocol(nak: u8) -> Option<Self> {
385        match nak {
386            protocol::DacResponse::NAK_FULL => Some(Nak::Full),
387            protocol::DacResponse::NAK_INVALID => Some(Nak::Invalid),
388            protocol::DacResponse::NAK_STOP_CONDITION => Some(Nak::StopCondition),
389            _ => None,
390        }
391    }
392
393    /// Convert the `Nak` to the low-level protocol byte representation.
394    pub fn to_protocol(&self) -> u8 {
395        match *self {
396            Nak::Full => protocol::DacResponse::NAK_FULL,
397            Nak::Invalid => protocol::DacResponse::NAK_INVALID,
398            Nak::StopCondition => protocol::DacResponse::NAK_STOP_CONDITION,
399        }
400    }
401}
402
403impl Error for CommunicationError {
404    fn cause(&self) -> Option<&dyn Error> {
405        match *self {
406            CommunicationError::Io(ref err) => Some(err as _),
407            CommunicationError::Protocol(ref err) => Some(err as _),
408            CommunicationError::Response(ref err) => Some(err as _),
409        }
410    }
411}
412
413impl Error for ResponseError {}
414
415impl fmt::Display for CommunicationError {
416    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
417        match *self {
418            CommunicationError::Io(ref err) => err.fmt(f),
419            CommunicationError::Protocol(ref err) => err.fmt(f),
420            CommunicationError::Response(ref err) => err.fmt(f),
421        }
422    }
423}
424
425impl fmt::Display for ResponseError {
426    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
427        let s = match self.kind {
428            ResponseErrorKind::UnexpectedCommand(_) => {
429                "the received response was to an unexpected command"
430            }
431            ResponseErrorKind::Nak(ref nak) => match *nak {
432                Nak::Full => "DAC responded with \"NAK - Full\"",
433                Nak::Invalid => "DAC responded with \"NAK - Invalid\"",
434                Nak::StopCondition => "DAC responded with \"NAK - Stop Condition\"",
435            },
436        };
437        write!(f, "{}", s)
438    }
439}
440
441impl From<io::Error> for CommunicationError {
442    fn from(err: io::Error) -> Self {
443        CommunicationError::Io(err)
444    }
445}
446
447impl From<dac::ProtocolError> for CommunicationError {
448    fn from(err: dac::ProtocolError) -> Self {
449        CommunicationError::Protocol(err)
450    }
451}
452
453impl From<ResponseError> for CommunicationError {
454    fn from(err: ResponseError) -> Self {
455        CommunicationError::Response(err)
456    }
457}
458
459/// Establishes a TCP stream connection with the DAC at the given address.
460///
461/// `TCP_NODELAY` is enabled on the TCP stream in order for better low-latency/realtime
462/// suitability. If necessary, this can be disabled via the `set_nodelay` method on the returned
463/// **Stream**.
464///
465/// Note that this does not "prepare" the DAC for playback. This must be done manually by
466/// submitting the `prepare_stream` command.
467pub fn connect(
468    broadcast: &protocol::DacBroadcast,
469    dac_ip: net::IpAddr,
470) -> Result<Stream, CommunicationError> {
471    connect_inner(broadcast, dac_ip, &net::TcpStream::connect)
472}
473
474/// Establishes a TCP stream connection with the DAC at the given address.
475///
476/// This behaves the same as `connect`, but times out after the given duration.
477pub fn connect_timeout(
478    broadcast: &protocol::DacBroadcast,
479    dac_ip: net::IpAddr,
480    timeout: time::Duration,
481) -> Result<Stream, CommunicationError> {
482    let connect = |addr| net::TcpStream::connect_timeout(&addr, timeout);
483    connect_inner(broadcast, dac_ip, &connect)
484}
485
486/// Shared between the `connect` and `connect_timeout` implementations.
487fn connect_inner(
488    broadcast: &protocol::DacBroadcast,
489    dac_ip: net::IpAddr,
490    connect: &dyn Fn(net::SocketAddr) -> io::Result<net::TcpStream>,
491) -> Result<Stream, CommunicationError> {
492    // Initialise the DAC state representation.
493    let mut dac = dac::Addressed::from_broadcast(broadcast)?;
494
495    // Connect the TCP stream.
496    let dac_addr = net::SocketAddr::new(dac_ip, protocol::COMMUNICATION_PORT);
497    let mut tcp_stream = connect(dac_addr)?;
498
499    // Enable `TCP_NODELAY` for better low-latency suitability.
500    tcp_stream.set_nodelay(true)?;
501
502    // Initialise a buffer for writing bytes to the TCP stream.
503    let mut bytes = vec![];
504
505    // Upon connection, the DAC responds as though it were sent a **Ping** command.
506    recv_response(
507        &mut bytes,
508        &mut tcp_stream,
509        &mut dac,
510        protocol::command::Ping::START_BYTE,
511    )?;
512
513    // Create the stream.
514    let stream = Stream {
515        dac,
516        tcp_stream,
517        command_buffer: vec![],
518        point_buffer: vec![],
519        bytes,
520    };
521
522    Ok(stream)
523}
524
525/// Used within the `Stream::send_command` method.
526fn send_command<C>(
527    bytes: &mut Vec<u8>,
528    tcp_stream: &mut net::TcpStream,
529    command: C,
530) -> io::Result<()>
531where
532    C: Command + WriteToBytes,
533{
534    bytes.clear();
535    bytes.write_bytes(command)?;
536    tcp_stream.write(bytes)?;
537    Ok(())
538}
539
540/// Used within the `Stream::recv_response` and `Stream::connect` methods.
541fn recv_response(
542    bytes: &mut Vec<u8>,
543    tcp_stream: &mut net::TcpStream,
544    dac: &mut dac::Addressed,
545    expected_command: u8,
546) -> Result<(), CommunicationError> {
547    // Read the response.
548    bytes.resize(protocol::DacResponse::SIZE_BYTES, 0);
549    tcp_stream.read_exact(bytes)?;
550    let response = (&bytes[..]).read_bytes::<protocol::DacResponse>()?;
551    response.check_errors(expected_command)?;
552    // Update the DAC representation.
553    dac.update_status(&response.dac_status)?;
554    Ok(())
555}