use std::io::{self, Error, ErrorKind, Write};
use std::sync::mpsc::{channel, Sender, Receiver};
use std::str::FromStr;
use std::u64;
use protobuf::{self, Message};
use proto::scheduler::{Event};
pub struct RecordIOCodec {
len_buf: Option<Vec<u8>>,
buf: Option<Vec<u8>>,
send: Sender<Event>,
}
impl RecordIOCodec {
pub fn new(send: Sender<Event>) -> RecordIOCodec {
RecordIOCodec {
len_buf: None,
buf: None,
send: send,
}
}
}
impl Write for RecordIOCodec {
fn write(&mut self, input: &[u8]) -> io::Result<usize> {
for byte in input {
if self.buf.is_none() {
if *byte == 0xA {
if self.len_buf.is_none() {
continue;
}
let len = try!(parse(self.len_buf.take().unwrap()));
self.buf = Some(Vec::with_capacity(len as usize));
} else {
if *byte < 0x30 || *byte > 0x39 {
println!("got bad byte: {:?}", byte);
return Err(
Error::new(
ErrorKind::InvalidData,
"received invalid bytes representing the size of a recordio frame"
)
);
}
let mut len_buf = self.len_buf.take().unwrap_or(vec![]);
len_buf.push(*byte);
self.len_buf = Some(len_buf);
}
} else {
let mut buf = self.buf.take().unwrap();
buf.push(*byte);
if buf.capacity() - buf.len() == 0 {
let event: Event =
protobuf::parse_from_bytes(&*buf).unwrap();
self.send.send(event);
} else {
self.buf = Some(buf);
}
}
}
Ok(input.len())
}
fn flush(&mut self) -> io::Result<()> { Ok(()) }
}
#[inline]
fn parse(bytes: Vec<u8>) -> io::Result<u64> {
let mut sum: u64 = 0;
for byte in bytes {
if byte < 0x30 || byte > 0x39 {
println!("got bad byte: {:?}", byte);
return Err(
Error::new(
ErrorKind::InvalidData,
"received invalid bytes representing the size of a recordio frame"
)
);
}
else {
sum = (sum * 10) + (byte - 0x30) as u64;
}
}
Ok(sum)
}