use bytes::BytesMut;
use futures::{try_ready, Async, Poll, Stream};
use log::trace;
use tokio_codec::Decoder;
use tokio_io::AsyncRead;
pub struct FramedRead2<T> {
inner: T,
eof: bool,
is_readable: bool,
buffer: BytesMut,
}
const INITIAL_CAPACITY: usize = 8 * 1024;
pub fn framed_read2<T>(inner: T) -> FramedRead2<T> {
FramedRead2 {
inner: inner,
eof: false,
is_readable: false,
buffer: BytesMut::with_capacity(INITIAL_CAPACITY),
}
}
pub fn framed_read2_with_buffer<T>(inner: T, mut buf: BytesMut) -> FramedRead2<T> {
if buf.capacity() < INITIAL_CAPACITY {
let bytes_to_reserve = INITIAL_CAPACITY - buf.capacity();
buf.reserve(bytes_to_reserve);
}
FramedRead2 {
inner: inner,
eof: false,
is_readable: buf.len() > 0,
buffer: buf,
}
}
impl<T> FramedRead2<T> {
pub fn get_ref(&self) -> &T {
&self.inner
}
pub fn into_inner(self) -> T {
self.inner
}
pub fn into_parts(self) -> (T, BytesMut) {
(self.inner, self.buffer)
}
pub fn get_mut(&mut self) -> &mut T {
&mut self.inner
}
}
impl<T> Stream for FramedRead2<T>
where
T: AsyncRead + Decoder,
{
type Item = T::Item;
type Error = T::Error;
fn poll(&mut self) -> Poll<Option<Self::Item>, Self::Error> {
loop {
if self.is_readable {
if self.eof {
let frame = self.inner.decode_eof(&mut self.buffer)?;
return Ok(Async::Ready(frame));
}
trace!("attempting to decode a frame");
if let Some(frame) = self.inner.decode(&mut self.buffer)? {
trace!("frame decoded from buffer");
return Ok(Async::Ready(Some(frame)));
}
self.is_readable = false;
}
assert!(!self.eof);
self.buffer.reserve(1);
if 0 == try_ready!(self.inner.read_buf(&mut self.buffer)) {
self.eof = true;
}
self.is_readable = true;
}
}
}