use std::io;
use futures::{Async, Poll, Stream, Sink, StartSend, AsyncSink};
use tokio_io::{AsyncRead, AsyncWrite};
use {IoBuf, WriteBuf, ReadBuf, Buf};
pub trait Decode: Sized {
type Item;
fn decode(&mut self, buf: &mut Buf)
-> Result<Option<Self::Item>, io::Error>;
fn done(&mut self, buf: &mut Buf) -> io::Result<Self::Item> {
match self.decode(buf)? {
Some(frame) => Ok(frame),
None => Err(io::Error::new(io::ErrorKind::Other,
"bytes remaining on stream")),
}
}
}
pub trait Encode {
type Item: Sized;
fn encode(&mut self, value: Self::Item, buf: &mut Buf);
}
pub struct Framed<T, C>(IoBuf<T>, C);
pub struct ReadFramed<T, C>(ReadBuf<T>, C);
pub struct WriteFramed<T, C>(WriteBuf<T>, C);
impl<T: AsyncRead, C: Decode> Stream for Framed<T, C> {
type Item = C::Item;
type Error = io::Error;
fn poll(&mut self) -> Poll<Option<Self::Item>, io::Error> {
loop {
if let Some(frame) = self.1.decode(&mut self.0.in_buf)? {
return Ok(Async::Ready(Some(frame)));
} else {
let nbytes = self.0.read()?;
if nbytes == 0 {
if self.0.done() {
return Ok(Async::Ready(None));
} else {
return Ok(Async::NotReady);
}
}
}
}
}
}
impl<T: AsyncWrite, C: Encode> Sink for Framed<T, C> {
type SinkItem = C::Item;
type SinkError = io::Error;
fn start_send(&mut self, item: C::Item) -> StartSend<C::Item, io::Error> {
self.1.encode(item, &mut self.0.out_buf);
Ok(AsyncSink::Ready)
}
fn poll_complete(&mut self) -> Poll<(), io::Error> {
self.0.flush()?;
Ok(Async::Ready(()))
}
}
pub fn framed<T, C>(io: IoBuf<T>, codec: C) -> Framed<T, C> {
Framed(io, codec)
}
impl<T, C> Framed<T, C> {
pub fn get_ref(&self) -> &IoBuf<T> {
&self.0
}
pub fn get_mut(&mut self) -> &mut IoBuf<T> {
&mut self.0
}
pub fn into_inner(self) -> IoBuf<T> {
self.0
}
}
impl<T: AsyncRead, C: Decode> Stream for ReadFramed<T, C> {
type Item = C::Item;
type Error = io::Error;
fn poll(&mut self) -> Poll<Option<Self::Item>, io::Error> {
loop {
if let Some(frame) = self.1.decode(&mut self.0.in_buf)? {
return Ok(Async::Ready(Some(frame)));
} else {
let nbytes = self.0.read()?;
if nbytes == 0 {
if self.0.done() {
return Ok(Async::Ready(None));
} else {
return Ok(Async::NotReady);
}
}
}
}
}
}
pub fn read_framed<T, C>(io: ReadBuf<T>, codec: C)
-> ReadFramed<T, C>
{
ReadFramed(io, codec)
}
impl<T, C> ReadFramed<T, C> {
pub fn get_ref(&self) -> &ReadBuf<T> {
&self.0
}
pub fn get_mut(&mut self) -> &mut ReadBuf<T> {
&mut self.0
}
pub fn into_inner(self) -> ReadBuf<T> {
self.0
}
}
impl<T: AsyncWrite, C: Encode> Sink for WriteFramed<T, C> {
type SinkItem = C::Item;
type SinkError = io::Error;
fn start_send(&mut self, item: C::Item) -> StartSend<C::Item, io::Error> {
self.1.encode(item, &mut self.0.out_buf);
Ok(AsyncSink::Ready)
}
fn poll_complete(&mut self) -> Poll<(), io::Error> {
self.0.flush()?;
Ok(Async::Ready(()))
}
}
pub fn write_framed<T, C>(io: WriteBuf<T>, codec: C) -> WriteFramed<T, C> {
WriteFramed(io, codec)
}
impl<T, C> WriteFramed<T, C> {
pub fn get_ref(&self) -> &WriteBuf<T> {
&self.0
}
pub fn get_mut(&mut self) -> &mut WriteBuf<T> {
&mut self.0
}
pub fn into_inner(self) -> WriteBuf<T> {
self.0
}
}