xdr_codec/
record.rs

1//! XDR record marking
2//!
3//! This module implements wrappers for `Write` and `BufRead` which
4//! implement "Record Marking" from [RFC1831](https://tools.ietf.org/html/rfc1831.html#section-10),
5//! used for encoding XDR structures onto a bytestream such as TCP.
6//!
7//! The format is simple - each record is broken up into one or more
8//! record fragments. Each record fragment is prefixed with a 32-bit
9//! big-endian value. The low 31 bits is the fragment size, and the
10//! top bit is the "end of record" marker, indicating the last
11//! fragment of the record.
12//!
13//! There's no magic number or other way to determine whether a stream
14//! is using record marking; both ends must agree.
15use std::io::{self, BufRead, Read, Write};
16use std::cmp::min;
17
18use error::*;
19
20use super::{Error, pack, unpack};
21
22const LAST_REC: u32 = 1u32 << 31;
23
24fn mapioerr(xdrerr: Error) -> io::Error {
25    match xdrerr {
26        Error(ErrorKind::IOError(ioerr), _) => ioerr,
27        other => io::Error::new(io::ErrorKind::Other, other),
28    }
29}
30
31/// Read records from a bytestream.
32///
33/// Reads will read up to the end of the current fragment, and not
34/// beyond. The `BufRead` trait doesn't otherwise allow for record
35/// boundaries to be deliniated. Callers can use the `eor` method to
36/// determine record ends.
37#[derive(Debug)]
38pub struct XdrRecordReader<R: BufRead> {
39    size: usize, // record size
40    consumed: usize, // bytes consumed
41    eor: bool, // is last record
42
43    reader: R, // reader
44}
45
46impl<R: BufRead> XdrRecordReader<R> {
47    /// Wrapper a record reader around an existing implementation of
48    /// `BufRead`, such as `BufReader`.
49    pub fn new(rd: R) -> XdrRecordReader<R> {
50        XdrRecordReader {
51            size: 0,
52            consumed: 0,
53            eor: false,
54            reader: rd,
55        }
56    }
57
58    // read next record, returns true on EOF
59    fn nextrec(&mut self) -> io::Result<bool> {
60        assert_eq!(self.consumed, self.size);
61
62        let rechdr: u32 = match unpack(&mut self.reader) {
63            Ok(v) => v,
64            Err(Error(ErrorKind::IOError(ref err), _))
65                if err.kind() == io::ErrorKind::UnexpectedEof => return Ok(true),
66            Err(e) => return Err(mapioerr(e)),
67        };
68
69        self.size = (rechdr & !LAST_REC) as usize;
70        self.consumed = 0;
71        self.eor = (rechdr & LAST_REC) != 0;
72
73        Ok(false)
74    }
75
76    fn totremains(&self) -> usize {
77        self.size - self.consumed
78    }
79
80    /// Current fragment is the end of the record.
81    pub fn eor(&self) -> bool {
82        self.eor
83    }
84}
85
86impl<R: BufRead> Read for XdrRecordReader<R> {
87    fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
88        let nread = {
89            let data = self.fill_buf()?;
90            let len = min(buf.len(), data.len());
91
92            (&data[..len]).read(buf)?
93        };
94
95        self.consume(nread);
96        Ok(nread)
97    }
98}
99
100impl<R: BufRead> BufRead for XdrRecordReader<R> {
101    fn fill_buf(&mut self) -> io::Result<&[u8]> {
102        while self.totremains() == 0 {
103            if self.nextrec()? {
104                return Ok(&[]);
105            }
106        }
107
108        let remains = self.totremains();
109        let data = self.reader.fill_buf()?;
110        Ok(&data[..min(data.len(), remains)])
111    }
112
113    fn consume(&mut self, sz: usize) {
114        assert!(sz <= self.totremains());
115        self.consumed += sz;
116        self.reader.consume(sz);
117    }
118}
119
120impl<R: BufRead> IntoIterator for XdrRecordReader<R> {
121    type Item = io::Result<Vec<u8>>;
122    type IntoIter = XdrRecordReaderIter<R>;
123
124    fn into_iter(self) -> Self::IntoIter {
125        XdrRecordReaderIter(Some(self))
126    }
127}
128
129/// Iterator over records in the stream.
130///
131/// Each iterator result is either:
132///
133///  * A complete record, or
134///  * an IO error.
135///
136/// It will return an IO error once, and then end the iterator.
137/// A short read or an unterminated record will also end the iterator. It will not return a partial
138/// record.
139#[derive(Debug)]
140pub struct XdrRecordReaderIter<R: BufRead>(Option<XdrRecordReader<R>>);
141
142impl<R: BufRead> Iterator for XdrRecordReaderIter<R> {
143    type Item = io::Result<Vec<u8>>;
144
145    fn next(&mut self) -> Option<Self::Item> {
146        if let Some(mut rr) = self.0.take() {
147            let mut buf = Vec::new();
148
149            // loop over fragments until we get a complete record
150            loop {
151                // Do we need next fragment?
152                if rr.totremains() == 0 {
153                    match rr.nextrec() {
154                        Err(e) => return Some(Err(e)),  // IO error
155                        Ok(true) => return None,        // EOF
156                        Ok(false) => (),                // keep going
157                    }
158                }
159
160                let remains = rr.totremains();
161                let eor = rr.eor();
162
163                match rr.by_ref().take(remains as u64).read_to_end(&mut buf) {
164                    Ok(sz) if sz == remains => (),  // OK, keep going
165                    Ok(_) => return None,           // short read
166                    Err(e) => return Some(Err(e)),  // error
167                };
168
169                if eor {
170                    break;
171                }
172            }
173            self.0 = Some(rr);
174            Some(Ok(buf))
175        } else {
176            None
177        }
178    }
179}
180
181const WRBUF: usize = 65536;
182
183/// Write records into a bytestream.
184///
185/// Flushes the current buffer as end of record when destroyed.
186pub struct XdrRecordWriter<W: Write> {
187    buf: Vec<u8>, // accumulated record fragment
188    bufsz: usize, // max fragment size
189    eor: bool, // last fragment was eor
190    writer: W, // writer we're passing on to
191}
192
193impl<W: Write> XdrRecordWriter<W> {
194    /// Create a new `XdrRecordWriter` wrapped around a `Write`
195    /// implementation, using a default buffer size (64k).
196    pub fn new(w: W) -> XdrRecordWriter<W> {
197        XdrRecordWriter::with_buffer(w, WRBUF)
198    }
199
200    /// Create an instance with a specific buffer size. Panics if the
201    /// size is zero.
202    pub fn with_buffer(w: W, bufsz: usize) -> XdrRecordWriter<W> {
203        if bufsz == 0 {
204            panic!("bufsz must be non-zero")
205        }
206        XdrRecordWriter {
207            buf: Vec::with_capacity(bufsz),
208            bufsz: bufsz,
209            eor: false,
210            writer: w,
211        }
212    }
213
214    /// Flush the current buffer. If `eor` is true, the end of record
215    /// marker is set.
216    pub fn flush_eor(&mut self, eor: bool) -> io::Result<()> {
217        if !eor && self.buf.len() == 0 {
218            return Ok(());
219        }
220
221        let rechdr = self.buf.len() as u32 | (if eor { LAST_REC } else { 0 });
222
223        pack(&rechdr, &mut self.writer).map_err(mapioerr)?;
224        let _ = self.writer.write_all(&self.buf).map(|_| ())?;
225        self.buf.truncate(0);
226
227        self.eor = eor;
228        self.writer.flush()
229    }
230}
231
232impl<W: Write> Drop for XdrRecordWriter<W> {
233    fn drop(&mut self) {
234        if self.buf.len() > 0 || !self.eor {
235            let _ = self.flush_eor(true);
236        }
237    }
238}
239
240impl<W: Write> Write for XdrRecordWriter<W> {
241    fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
242        let mut off = 0;
243
244        while off < buf.len() {
245            let chunk = &buf[off..off + min(buf.len() - off, self.bufsz)];
246            if self.buf.len() + chunk.len() > self.bufsz {
247                self.flush()?;
248            }
249
250            self.buf.extend(chunk);
251            off += chunk.len();
252        }
253
254        Ok(off)
255    }
256
257    fn flush(&mut self) -> io::Result<()> {
258        self.flush_eor(false)
259    }
260}