tokio-codec 0.1.1

Utilities for encoding and decoding frames.
Documentation
extern crate tokio_codec;
extern crate tokio_io;
extern crate bytes;
extern crate futures;

use tokio_io::AsyncRead;
use tokio_codec::{FramedRead, Decoder};

use bytes::{BytesMut, Buf, IntoBuf};
use futures::Stream;
use futures::Async::{Ready, NotReady};

use std::io::{self, Read};
use std::collections::VecDeque;

macro_rules! mock {
    ($($x:expr,)*) => {{
        let mut v = VecDeque::new();
        v.extend(vec![$($x),*]);
        Mock { calls: v }
    }};
}

struct U32Decoder;

impl Decoder for U32Decoder {
    type Item = u32;
    type Error = io::Error;

    fn decode(&mut self, buf: &mut BytesMut) -> io::Result<Option<u32>> {
        if buf.len() < 4 {
            return Ok(None);
        }

        let n = buf.split_to(4).into_buf().get_u32_be();
        Ok(Some(n))
    }
}

#[test]
fn read_multi_frame_in_packet() {
    let mock = mock! {
        Ok(b"\x00\x00\x00\x00\x00\x00\x00\x01\x00\x00\x00\x02".to_vec()),
    };

    let mut framed = FramedRead::new(mock, U32Decoder);
    assert_eq!(Ready(Some(0)), framed.poll().unwrap());
    assert_eq!(Ready(Some(1)), framed.poll().unwrap());
    assert_eq!(Ready(Some(2)), framed.poll().unwrap());
    assert_eq!(Ready(None), framed.poll().unwrap());
}

#[test]
fn read_multi_frame_across_packets() {
    let mock = mock! {
        Ok(b"\x00\x00\x00\x00".to_vec()),
        Ok(b"\x00\x00\x00\x01".to_vec()),
        Ok(b"\x00\x00\x00\x02".to_vec()),
    };

    let mut framed = FramedRead::new(mock, U32Decoder);
    assert_eq!(Ready(Some(0)), framed.poll().unwrap());
    assert_eq!(Ready(Some(1)), framed.poll().unwrap());
    assert_eq!(Ready(Some(2)), framed.poll().unwrap());
    assert_eq!(Ready(None), framed.poll().unwrap());
}

#[test]
fn read_not_ready() {
    let mock = mock! {
        Err(io::Error::new(io::ErrorKind::WouldBlock, "")),
        Ok(b"\x00\x00\x00\x00".to_vec()),
        Ok(b"\x00\x00\x00\x01".to_vec()),
    };

    let mut framed = FramedRead::new(mock, U32Decoder);
    assert_eq!(NotReady, framed.poll().unwrap());
    assert_eq!(Ready(Some(0)), framed.poll().unwrap());
    assert_eq!(Ready(Some(1)), framed.poll().unwrap());
    assert_eq!(Ready(None), framed.poll().unwrap());
}

#[test]
fn read_partial_then_not_ready() {
    let mock = mock! {
        Ok(b"\x00\x00".to_vec()),
        Err(io::Error::new(io::ErrorKind::WouldBlock, "")),
        Ok(b"\x00\x00\x00\x00\x00\x01\x00\x00\x00\x02".to_vec()),
    };

    let mut framed = FramedRead::new(mock, U32Decoder);
    assert_eq!(NotReady, framed.poll().unwrap());
    assert_eq!(Ready(Some(0)), framed.poll().unwrap());
    assert_eq!(Ready(Some(1)), framed.poll().unwrap());
    assert_eq!(Ready(Some(2)), framed.poll().unwrap());
    assert_eq!(Ready(None), framed.poll().unwrap());
}

#[test]
fn read_err() {
    let mock = mock! {
        Err(io::Error::new(io::ErrorKind::Other, "")),
    };

    let mut framed = FramedRead::new(mock, U32Decoder);
    assert_eq!(io::ErrorKind::Other, framed.poll().unwrap_err().kind());
}

#[test]
fn read_partial_then_err() {
    let mock = mock! {
        Ok(b"\x00\x00".to_vec()),
        Err(io::Error::new(io::ErrorKind::Other, "")),
    };

    let mut framed = FramedRead::new(mock, U32Decoder);
    assert_eq!(io::ErrorKind::Other, framed.poll().unwrap_err().kind());
}

#[test]
fn read_partial_would_block_then_err() {
    let mock = mock! {
        Ok(b"\x00\x00".to_vec()),
        Err(io::Error::new(io::ErrorKind::WouldBlock, "")),
        Err(io::Error::new(io::ErrorKind::Other, "")),
    };

    let mut framed = FramedRead::new(mock, U32Decoder);
    assert_eq!(NotReady, framed.poll().unwrap());
    assert_eq!(io::ErrorKind::Other, framed.poll().unwrap_err().kind());
}

#[test]
fn huge_size() {
    let data = [0; 32 * 1024];

    let mut framed = FramedRead::new(&data[..], BigDecoder);
    assert_eq!(Ready(Some(0)), framed.poll().unwrap());
    assert_eq!(Ready(None), framed.poll().unwrap());

    struct BigDecoder;

    impl Decoder for BigDecoder {
        type Item = u32;
        type Error = io::Error;

        fn decode(&mut self, buf: &mut BytesMut) -> io::Result<Option<u32>> {
            if buf.len() < 32 * 1024 {
                return Ok(None);
            }
            buf.split_to(32 * 1024);
            Ok(Some(0))
        }
    }
}

#[test]
fn data_remaining_is_error() {
    let data = [0; 5];

    let mut framed = FramedRead::new(&data[..], U32Decoder);
    assert_eq!(Ready(Some(0)), framed.poll().unwrap());
    assert!(framed.poll().is_err());
}

#[test]
fn multi_frames_on_eof() {
    struct MyDecoder(Vec<u32>);

    impl Decoder for MyDecoder {
        type Item = u32;
        type Error = io::Error;

        fn decode(&mut self, _buf: &mut BytesMut) -> io::Result<Option<u32>> {
            unreachable!();
        }

        fn decode_eof(&mut self, _buf: &mut BytesMut) -> io::Result<Option<u32>> {
            if self.0.is_empty() {
                return Ok(None);
            }

            Ok(Some(self.0.remove(0)))
        }
    }

    let mut framed = FramedRead::new(mock!(), MyDecoder(vec![0, 1, 2, 3]));
    assert_eq!(Ready(Some(0)), framed.poll().unwrap());
    assert_eq!(Ready(Some(1)), framed.poll().unwrap());
    assert_eq!(Ready(Some(2)), framed.poll().unwrap());
    assert_eq!(Ready(Some(3)), framed.poll().unwrap());
    assert_eq!(Ready(None), framed.poll().unwrap());
}

// ===== Mock ======

struct Mock {
    calls: VecDeque<io::Result<Vec<u8>>>,
}

impl Read for Mock {
    fn read(&mut self, dst: &mut [u8]) -> io::Result<usize> {
        match self.calls.pop_front() {
            Some(Ok(data)) => {
                debug_assert!(dst.len() >= data.len());
                dst[..data.len()].copy_from_slice(&data[..]);
                Ok(data.len())
            }
            Some(Err(e)) => Err(e),
            None => Ok(0),
        }
    }
}

impl AsyncRead for Mock {
}