use bytes::BytesMut;
use futures::Async;
use futures::Poll;
use futures::Stream;
use protocol::frames::Frame;
use protocol::frames::FramingError;
use tokio_io::codec::length_delimited;
use tokio_io::AsyncRead;
pub struct FrameReader<T> {
src: length_delimited::FramedRead<T>,
}
impl<T: AsyncRead> FrameReader<T> {
pub fn new(src: T) -> Self {
let src = length_delimited::Builder::new()
.big_endian()
.length_adjustment(-4)
.length_field_offset(0)
.length_field_length(4)
.new_read(src);
FrameReader { src }
}
pub fn decode_frame(&self, bytes: BytesMut) -> Result<Frame, FramingError> {
Frame::decode_from(bytes)
}
pub fn poll_frame(&mut self) -> Poll<Option<Frame>, FramingError> {
let bytes_res = try_ready!(self.src.poll().map_err(|err| FramingError::Io(err)));
match bytes_res {
Some(bytes) => {
let frame = self.decode_frame(bytes)?;
Ok(Async::Ready(Some(frame)))
}
None => {
Ok(Async::Ready(None))
}
}
}
}
impl<T: AsyncRead> Stream for FrameReader<T> {
type Item = Frame;
type Error = FramingError;
fn poll(&mut self) -> Poll<Option<Self::Item>, Self::Error> {
self.poll_frame()
}
}