use std::io::{self, BufRead, Read, Write};
use std::cmp::min;
use error::*;
use super::{Error, pack, unpack};
const LAST_REC: u32 = 1u32 << 31;
fn mapioerr(xdrerr: Error) -> io::Error {
match xdrerr {
Error(ErrorKind::IOError(ioerr), _) => ioerr,
other => io::Error::new(io::ErrorKind::Other, other),
}
}
#[derive(Debug)]
pub struct XdrRecordReader<R: BufRead> {
size: usize, consumed: usize, eor: bool,
reader: R, }
impl<R: BufRead> XdrRecordReader<R> {
pub fn new(rd: R) -> XdrRecordReader<R> {
XdrRecordReader {
size: 0,
consumed: 0,
eor: false,
reader: rd,
}
}
fn nextrec(&mut self) -> io::Result<bool> {
assert_eq!(self.consumed, self.size);
let rechdr: u32 = match unpack(&mut self.reader) {
Ok(v) => v,
Err(Error(ErrorKind::IOError(ref err), _))
if err.kind() == io::ErrorKind::UnexpectedEof => return Ok(true),
Err(e) => return Err(mapioerr(e)),
};
self.size = (rechdr & !LAST_REC) as usize;
self.consumed = 0;
self.eor = (rechdr & LAST_REC) != 0;
Ok(false)
}
fn totremains(&self) -> usize {
self.size - self.consumed
}
pub fn eor(&self) -> bool {
self.eor
}
}
impl<R: BufRead> Read for XdrRecordReader<R> {
fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
let nread = {
let data = self.fill_buf()?;
let len = min(buf.len(), data.len());
(&data[..len]).read(buf)?
};
self.consume(nread);
Ok(nread)
}
}
impl<R: BufRead> BufRead for XdrRecordReader<R> {
fn fill_buf(&mut self) -> io::Result<&[u8]> {
while self.totremains() == 0 {
if self.nextrec()? {
return Ok(&[]);
}
}
let remains = self.totremains();
let data = self.reader.fill_buf()?;
Ok(&data[..min(data.len(), remains)])
}
fn consume(&mut self, sz: usize) {
assert!(sz <= self.totremains());
self.consumed += sz;
self.reader.consume(sz);
}
}
impl<R: BufRead> IntoIterator for XdrRecordReader<R> {
type Item = io::Result<Vec<u8>>;
type IntoIter = XdrRecordReaderIter<R>;
fn into_iter(self) -> Self::IntoIter {
XdrRecordReaderIter(Some(self))
}
}
#[derive(Debug)]
pub struct XdrRecordReaderIter<R: BufRead>(Option<XdrRecordReader<R>>);
impl<R: BufRead> Iterator for XdrRecordReaderIter<R> {
type Item = io::Result<Vec<u8>>;
fn next(&mut self) -> Option<Self::Item> {
if let Some(mut rr) = self.0.take() {
let mut buf = Vec::new();
loop {
if rr.totremains() == 0 {
match rr.nextrec() {
Err(e) => return Some(Err(e)), Ok(true) => return None, Ok(false) => (), }
}
let remains = rr.totremains();
let eor = rr.eor();
match rr.by_ref().take(remains as u64).read_to_end(&mut buf) {
Ok(sz) if sz == remains => (), Ok(_) => return None, Err(e) => return Some(Err(e)), };
if eor {
break;
}
}
self.0 = Some(rr);
Some(Ok(buf))
} else {
None
}
}
}
const WRBUF: usize = 65536;
pub struct XdrRecordWriter<W: Write> {
buf: Vec<u8>, bufsz: usize, eor: bool, writer: W, }
impl<W: Write> XdrRecordWriter<W> {
pub fn new(w: W) -> XdrRecordWriter<W> {
XdrRecordWriter::with_buffer(w, WRBUF)
}
pub fn with_buffer(w: W, bufsz: usize) -> XdrRecordWriter<W> {
if bufsz == 0 {
panic!("bufsz must be non-zero")
}
XdrRecordWriter {
buf: Vec::with_capacity(bufsz),
bufsz: bufsz,
eor: false,
writer: w,
}
}
pub fn flush_eor(&mut self, eor: bool) -> io::Result<()> {
if !eor && self.buf.len() == 0 {
return Ok(());
}
let rechdr = self.buf.len() as u32 | (if eor { LAST_REC } else { 0 });
pack(&rechdr, &mut self.writer).map_err(mapioerr)?;
let _ = self.writer.write_all(&self.buf).map(|_| ())?;
self.buf.truncate(0);
self.eor = eor;
self.writer.flush()
}
}
impl<W: Write> Drop for XdrRecordWriter<W> {
fn drop(&mut self) {
if self.buf.len() > 0 || !self.eor {
let _ = self.flush_eor(true);
}
}
}
impl<W: Write> Write for XdrRecordWriter<W> {
fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
let mut off = 0;
while off < buf.len() {
let chunk = &buf[off..off + min(buf.len() - off, self.bufsz)];
if self.buf.len() + chunk.len() > self.bufsz {
self.flush()?;
}
self.buf.extend(chunk);
off += chunk.len();
}
Ok(off)
}
fn flush(&mut self) -> io::Result<()> {
self.flush_eor(false)
}
}