djin_protocol/wire/reader.rs
1use crate::{Error, ErrorKind, Parcel, Settings};
2use std::io;
3use std::io::prelude::*;
4
5/// A receive buffer that waits until enough data is ready
6/// and then returns the parsed parcels.
7///
8/// This mechanism can be used to send parcels without
9/// length prefixes.
10///
11///
12/// # Adding data to the receive buffer
13///
14/// `Reader` implements `std::io::Write`. Any bytes written
15/// to the reader are added to the receive queue for future
16/// processing.
17///
18/// # Example
19///
20/// This example shows a reader only returning a `u32`
21/// once enough data has been buffered.
22///
23/// ```
24/// use djin_protocol;
25/// use std::io::Write;
26///
27/// let mut reader = djin_protocol::wire::Reader::new();
28/// let settings = djin_protocol::Settings::default();
29///
30/// // No bytes received yet.
31/// assert_eq!(None, reader.poll::<u32>(&settings).unwrap());
32///
33/// // Two bytes received.
34/// reader.write(&[0xff, 0x00]).unwrap();
35/// assert_eq!(None, reader.poll::<u32>(&settings).unwrap());
36///
37/// // Three bytes received.
38/// reader.write(&[0x00]).unwrap();
39/// assert_eq!(None, reader.poll::<u32>(&settings).unwrap());
40///
41/// // All four bytes received.
42/// reader.write(&[0x00]).unwrap();
43/// assert_eq!(Some(0xff000000), reader.poll::<u32>(&settings).unwrap());
44/// ```
45#[derive(Debug)]
46pub struct Reader {
47 /// The internal receive buffer.
48 ///
49 /// Contains all bytes that have been received but not yet parsed
50 /// into a packet.
51 receive_buffer: Vec<u8>,
52}
53
54impl Reader {
55 /// Creates a new parcel reader.
56 pub fn new() -> Self {
57 Reader {
58 receive_buffer: Vec::new(),
59 }
60 }
61
62 /// Polls the reader for a value.
63 ///
64 /// Returns `Ok(None)` if further data must be received in order
65 /// to interpret the value.
66 ///
67 /// Returns `Ok(Some(value))` if the value is ready to be read
68 /// from the stream.
69 ///
70 /// Returns `Err(e)` on error.
71 pub fn poll<P>(&mut self,
72 settings: &Settings)
73 -> Result<Option<P>, Error>
74 where P: Parcel {
75 let mut cursor = io::Cursor::new(self.receive_buffer.clone());
76
77 match Parcel::read(&mut cursor, settings) {
78 Ok(value) => {
79 // Remove the interpreted bytes from the receive buffer.
80 let bytes_read = cursor.position() as usize;
81 self.receive_buffer.drain(0..bytes_read);
82
83 Ok(Some(value))
84 },
85 Err(e) => match e.0 {
86 ErrorKind::Io(io) => {
87 // Ignore errors caused by the receive buffer
88 // not having enough data yet.
89 if io.kind() == io::ErrorKind::UnexpectedEof {
90 Ok(None)
91 } else {
92 // An actual IO error.
93 Err(ErrorKind::Io(io).into())
94 }
95 },
96 _ => Err(e),
97 },
98 }
99 }
100}
101
102impl Write for Reader {
103 fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
104 self.receive_buffer.extend(buf);
105 Ok(buf.len())
106 }
107
108 fn flush(&mut self) -> io::Result<()> { Ok(()) }
109}
110
111impl Default for Reader {
112 fn default() -> Self {
113 Reader::new()
114 }
115}
116