tk-bufstream 0.3.0

A buffered stream backed by contiguous buffers (netbuf) for tokio
Documentation
// This module contains some code from tokio-core/src/io/framed.rs
use std::io;

use futures::{Async, Poll, Stream, Sink, StartSend, AsyncSink};
use tokio_io::{AsyncRead, AsyncWrite};

use {IoBuf, WriteBuf, ReadBuf, Buf};


/// Decoding of a frame from an internal buffer.
///
/// This trait is used when constructing an instance of `Framed`. It defines how
/// to decode the incoming bytes on a stream to the specified type of frame for
/// that framed I/O stream.
///
/// The primary method of this trait, `decode`, attempts to decode a
/// frame from a buffer of bytes. It has the option of returning `NotReady`,
/// indicating that more bytes need to be read before decoding can
/// continue.
pub trait Decode: Sized {
    /// Decoded message
    type Item;
    /// Attempts to decode a frame from the provided buffer of bytes.
    ///
    /// This method is called by `Framed` whenever bytes are ready to be parsed.
    /// The provided buffer of bytes is what's been read so far, and this
    /// instance of `Decode` can determine whether an entire frame is in the
    /// buffer and is ready to be returned.
    ///
    /// If an entire frame is available, then this instance will remove those
    /// bytes from the buffer provided and return them as a decoded
    /// frame. Note that removing bytes from the provided buffer doesn't always
    /// necessarily copy the bytes, so this should be an efficient operation in
    /// most circumstances.
    ///
    /// If the bytes look valid, but a frame isn't fully available yet, then
    /// `Ok(None)` is returned. This indicates to the `Framed` instance that
    /// it needs to read some more bytes before calling this method again.
    ///
    /// Finally, if the bytes in the buffer are malformed then an error is
    /// returned indicating why. This informs `Framed` that the stream is now
    /// corrupt and should be terminated.
    fn decode(&mut self, buf: &mut Buf)
        -> Result<Option<Self::Item>, io::Error>;

    /// A default method available to be called when there are no more bytes
    /// available to be read from the underlying I/O.
    ///
    /// This method defaults to calling `decode` and returns an error if
    /// `Ok(None)` is returned. Typically this doesn't need to be implemented
    /// unless the framing protocol differs near the end of the stream.
    fn done(&mut self, buf: &mut Buf) -> io::Result<Self::Item> {
        match self.decode(buf)? {
            Some(frame) => Ok(frame),
            None => Err(io::Error::new(io::ErrorKind::Other,
                                       "bytes remaining on stream")),
        }
    }
}

/// A trait for encoding frames into a byte buffer.
///
/// This trait is used as a building block of `Framed` to define how frames are
/// encoded into bytes to get passed to the underlying byte stream. each
/// frame written to `Framed` will be encoded with this trait to an internal
/// buffer. That buffer is then written out when possible to the underlying I/O
/// stream.
pub trait Encode {
    /// Value to encode
    type Item: Sized;
    /// Encodes a frame into the buffer provided.
    ///
    /// This method will encode `msg` into the byte buffer provided by `buf`.
    /// The `buf` provided is an internal buffer of the `Framed` instance and
    /// will be written out when possible.
    fn encode(&mut self, value: Self::Item, buf: &mut Buf);
}

/// A unified `Stream` and `Sink` interface to an underlying `Io` object, using
/// the `Encode` and `Decode` traits to encode and decode frames.
pub struct Framed<T, C>(IoBuf<T>, C);

/// A `Stream` interface to `ReadBuf` object
pub struct ReadFramed<T, C>(ReadBuf<T>, C);

/// A `Sink` interface to `WriteBuf` object
pub struct WriteFramed<T, C>(WriteBuf<T>, C);

impl<T: AsyncRead, C: Decode> Stream for Framed<T, C> {
    type Item = C::Item;
    type Error = io::Error;

    fn poll(&mut self) -> Poll<Option<Self::Item>, io::Error> {
        loop {
            if let Some(frame) = self.1.decode(&mut self.0.in_buf)? {
                return Ok(Async::Ready(Some(frame)));
            } else {
                let nbytes = self.0.read()?;
                if nbytes == 0 {
                    if self.0.done() {
                        return Ok(Async::Ready(None));
                    } else {
                        return Ok(Async::NotReady);
                    }
                }
            }
        }
    }
}

impl<T: AsyncWrite, C: Encode> Sink for Framed<T, C> {
    type SinkItem = C::Item;
    type SinkError = io::Error;

    fn start_send(&mut self, item: C::Item) -> StartSend<C::Item, io::Error> {
        self.1.encode(item, &mut self.0.out_buf);
        Ok(AsyncSink::Ready)
    }

    fn poll_complete(&mut self) -> Poll<(), io::Error> {
        self.0.flush()?;
        Ok(Async::Ready(()))
    }
}

pub fn framed<T, C>(io: IoBuf<T>, codec: C) -> Framed<T, C> {
    Framed(io, codec)
}

impl<T, C> Framed<T, C> {
    /// Returns a reference to the underlying I/O stream wrapped by `Framed`.
    pub fn get_ref(&self) -> &IoBuf<T> {
        &self.0
    }

    /// Returns a mutable reference to the underlying I/O stream wrapped by
    /// `Framed`.
    ///
    /// Note that care should be taken to not tamper with the underlying stream
    /// of data coming in as it may corrupt the stream of frames otherwise being
    /// worked with.
    pub fn get_mut(&mut self) -> &mut IoBuf<T> {
        &mut self.0
    }

    /// Consumes the `Framed`, returning its underlying I/O stream.
    ///
    /// Note that stream may contain both input and output data buffered.
    pub fn into_inner(self) -> IoBuf<T> {
        self.0
    }
}

impl<T: AsyncRead, C: Decode> Stream for ReadFramed<T, C> {
    type Item = C::Item;
    type Error = io::Error;

    fn poll(&mut self) -> Poll<Option<Self::Item>, io::Error> {
        loop {
            if let Some(frame) = self.1.decode(&mut self.0.in_buf)? {
                return Ok(Async::Ready(Some(frame)));
            } else {
                let nbytes = self.0.read()?;
                if nbytes == 0 {
                    if self.0.done() {
                        return Ok(Async::Ready(None));
                    } else {
                        return Ok(Async::NotReady);
                    }
                }
            }
        }
    }
}

pub fn read_framed<T, C>(io: ReadBuf<T>, codec: C)
    -> ReadFramed<T, C>
{
    ReadFramed(io, codec)
}

impl<T, C> ReadFramed<T, C> {
    /// Returns a reference to the underlying I/O stream wrapped by `ReadFramed`.
    pub fn get_ref(&self) -> &ReadBuf<T> {
        &self.0
    }

    /// Returns a mutable reference to the underlying I/O stream wrapped by
    /// `ReadFramed`.
    ///
    /// Note that care should be taken to not tamper with the underlying stream
    /// of data coming in as it may corrupt the stream of frames otherwise being
    /// worked with.
    pub fn get_mut(&mut self) -> &mut ReadBuf<T> {
        &mut self.0
    }

    /// Consumes the `ReadFramed`, returning its underlying I/O stream.
    ///
    /// Note that stream may contain both input and output data buffered.
    pub fn into_inner(self) -> ReadBuf<T> {
        self.0
    }
}

impl<T: AsyncWrite, C: Encode> Sink for WriteFramed<T, C> {
    type SinkItem = C::Item;
    type SinkError = io::Error;

    fn start_send(&mut self, item: C::Item) -> StartSend<C::Item, io::Error> {
        self.1.encode(item, &mut self.0.out_buf);
        Ok(AsyncSink::Ready)
    }

    fn poll_complete(&mut self) -> Poll<(), io::Error> {
        self.0.flush()?;
        Ok(Async::Ready(()))
    }
}

pub fn write_framed<T, C>(io: WriteBuf<T>, codec: C) -> WriteFramed<T, C> {
    WriteFramed(io, codec)
}

impl<T, C> WriteFramed<T, C> {
    /// Returns a reference to the underlying I/O stream wrapped by `WriteFramed`.
    pub fn get_ref(&self) -> &WriteBuf<T> {
        &self.0
    }

    /// Returns a mutable reference to the underlying I/O stream wrapped by
    /// `WriteFramed`.
    ///
    /// Note that care should be taken to not tamper with the underlying stream
    /// of data coming in as it may corrupt the stream of frames otherwise being
    /// worked with.
    pub fn get_mut(&mut self) -> &mut WriteBuf<T> {
        &mut self.0
    }

    /// Consumes the `WriteFramed`, returning its underlying I/O stream.
    ///
    /// Note that stream may contain both input and output data buffered.
    pub fn into_inner(self) -> WriteBuf<T> {
        self.0
    }
}