tokio-util_wasi 0.7.4

Additional utilities for working with Tokio.
#![warn(rust_2018_idioms)]

use tokio::io::{AsyncRead, ReadBuf};
use tokio_test::assert_ready;
use tokio_test::task;
use tokio_util::codec::{Decoder, FramedRead};

use bytes::{Buf, BytesMut};
use futures::Stream;
use std::collections::VecDeque;
use std::io;
use std::pin::Pin;
use std::task::Poll::{Pending, Ready};
use std::task::{Context, Poll};

macro_rules! mock {
    ($($x:expr,)*) => {{
        let mut v = VecDeque::new();
        v.extend(vec![$($x),*]);
        Mock { calls: v }
    }};
}

macro_rules! assert_read {
    ($e:expr, $n:expr) => {{
        let val = assert_ready!($e);
        assert_eq!(val.unwrap().unwrap(), $n);
    }};
}

macro_rules! pin {
    ($id:ident) => {
        Pin::new(&mut $id)
    };
}

struct U32Decoder;

impl Decoder for U32Decoder {
    type Item = u32;
    type Error = io::Error;

    fn decode(&mut self, buf: &mut BytesMut) -> io::Result<Option<u32>> {
        if buf.len() < 4 {
            return Ok(None);
        }

        let n = buf.split_to(4).get_u32();
        Ok(Some(n))
    }
}

struct U64Decoder;

impl Decoder for U64Decoder {
    type Item = u64;
    type Error = io::Error;

    fn decode(&mut self, buf: &mut BytesMut) -> io::Result<Option<u64>> {
        if buf.len() < 8 {
            return Ok(None);
        }

        let n = buf.split_to(8).get_u64();
        Ok(Some(n))
    }
}

#[test]
fn read_multi_frame_in_packet() {
    let mut task = task::spawn(());
    let mock = mock! {
        Ok(b"\x00\x00\x00\x00\x00\x00\x00\x01\x00\x00\x00\x02".to_vec()),
    };
    let mut framed = FramedRead::new(mock, U32Decoder);

    task.enter(|cx, _| {
        assert_read!(pin!(framed).poll_next(cx), 0);
        assert_read!(pin!(framed).poll_next(cx), 1);
        assert_read!(pin!(framed).poll_next(cx), 2);
        assert!(assert_ready!(pin!(framed).poll_next(cx)).is_none());
    });
}

#[test]
fn read_multi_frame_across_packets() {
    let mut task = task::spawn(());
    let mock = mock! {
        Ok(b"\x00\x00\x00\x00".to_vec()),
        Ok(b"\x00\x00\x00\x01".to_vec()),
        Ok(b"\x00\x00\x00\x02".to_vec()),
    };
    let mut framed = FramedRead::new(mock, U32Decoder);

    task.enter(|cx, _| {
        assert_read!(pin!(framed).poll_next(cx), 0);
        assert_read!(pin!(framed).poll_next(cx), 1);
        assert_read!(pin!(framed).poll_next(cx), 2);
        assert!(assert_ready!(pin!(framed).poll_next(cx)).is_none());
    });
}

#[test]
fn read_multi_frame_in_packet_after_codec_changed() {
    let mut task = task::spawn(());
    let mock = mock! {
        Ok(b"\x00\x00\x00\x04\x00\x00\x00\x00\x00\x00\x00\x08".to_vec()),
    };
    let mut framed = FramedRead::new(mock, U32Decoder);

    task.enter(|cx, _| {
        assert_read!(pin!(framed).poll_next(cx), 0x04);

        let mut framed = framed.map_decoder(|_| U64Decoder);
        assert_read!(pin!(framed).poll_next(cx), 0x08);

        assert!(assert_ready!(pin!(framed).poll_next(cx)).is_none());
    });
}

#[test]
fn read_not_ready() {
    let mut task = task::spawn(());
    let mock = mock! {
        Err(io::Error::new(io::ErrorKind::WouldBlock, "")),
        Ok(b"\x00\x00\x00\x00".to_vec()),
        Ok(b"\x00\x00\x00\x01".to_vec()),
    };
    let mut framed = FramedRead::new(mock, U32Decoder);

    task.enter(|cx, _| {
        assert!(pin!(framed).poll_next(cx).is_pending());
        assert_read!(pin!(framed).poll_next(cx), 0);
        assert_read!(pin!(framed).poll_next(cx), 1);
        assert!(assert_ready!(pin!(framed).poll_next(cx)).is_none());
    });
}

#[test]
fn read_partial_then_not_ready() {
    let mut task = task::spawn(());
    let mock = mock! {
        Ok(b"\x00\x00".to_vec()),
        Err(io::Error::new(io::ErrorKind::WouldBlock, "")),
        Ok(b"\x00\x00\x00\x00\x00\x01\x00\x00\x00\x02".to_vec()),
    };
    let mut framed = FramedRead::new(mock, U32Decoder);

    task.enter(|cx, _| {
        assert!(pin!(framed).poll_next(cx).is_pending());
        assert_read!(pin!(framed).poll_next(cx), 0);
        assert_read!(pin!(framed).poll_next(cx), 1);
        assert_read!(pin!(framed).poll_next(cx), 2);
        assert!(assert_ready!(pin!(framed).poll_next(cx)).is_none());
    });
}

#[test]
fn read_err() {
    let mut task = task::spawn(());
    let mock = mock! {
        Err(io::Error::new(io::ErrorKind::Other, "")),
    };
    let mut framed = FramedRead::new(mock, U32Decoder);

    task.enter(|cx, _| {
        assert_eq!(
            io::ErrorKind::Other,
            assert_ready!(pin!(framed).poll_next(cx))
                .unwrap()
                .unwrap_err()
                .kind()
        )
    });
}

#[test]
fn read_partial_then_err() {
    let mut task = task::spawn(());
    let mock = mock! {
        Ok(b"\x00\x00".to_vec()),
        Err(io::Error::new(io::ErrorKind::Other, "")),
    };
    let mut framed = FramedRead::new(mock, U32Decoder);

    task.enter(|cx, _| {
        assert_eq!(
            io::ErrorKind::Other,
            assert_ready!(pin!(framed).poll_next(cx))
                .unwrap()
                .unwrap_err()
                .kind()
        )
    });
}

#[test]
fn read_partial_would_block_then_err() {
    let mut task = task::spawn(());
    let mock = mock! {
        Ok(b"\x00\x00".to_vec()),
        Err(io::Error::new(io::ErrorKind::WouldBlock, "")),
        Err(io::Error::new(io::ErrorKind::Other, "")),
    };
    let mut framed = FramedRead::new(mock, U32Decoder);

    task.enter(|cx, _| {
        assert!(pin!(framed).poll_next(cx).is_pending());
        assert_eq!(
            io::ErrorKind::Other,
            assert_ready!(pin!(framed).poll_next(cx))
                .unwrap()
                .unwrap_err()
                .kind()
        )
    });
}

#[test]
fn huge_size() {
    let mut task = task::spawn(());
    let data = &[0; 32 * 1024][..];
    let mut framed = FramedRead::new(data, BigDecoder);

    task.enter(|cx, _| {
        assert_read!(pin!(framed).poll_next(cx), 0);
        assert!(assert_ready!(pin!(framed).poll_next(cx)).is_none());
    });

    struct BigDecoder;

    impl Decoder for BigDecoder {
        type Item = u32;
        type Error = io::Error;

        fn decode(&mut self, buf: &mut BytesMut) -> io::Result<Option<u32>> {
            if buf.len() < 32 * 1024 {
                return Ok(None);
            }
            buf.advance(32 * 1024);
            Ok(Some(0))
        }
    }
}

#[test]
fn data_remaining_is_error() {
    let mut task = task::spawn(());
    let slice = &[0; 5][..];
    let mut framed = FramedRead::new(slice, U32Decoder);

    task.enter(|cx, _| {
        assert_read!(pin!(framed).poll_next(cx), 0);
        assert!(assert_ready!(pin!(framed).poll_next(cx)).unwrap().is_err());
    });
}

#[test]
fn multi_frames_on_eof() {
    let mut task = task::spawn(());
    struct MyDecoder(Vec<u32>);

    impl Decoder for MyDecoder {
        type Item = u32;
        type Error = io::Error;

        fn decode(&mut self, _buf: &mut BytesMut) -> io::Result<Option<u32>> {
            unreachable!();
        }

        fn decode_eof(&mut self, _buf: &mut BytesMut) -> io::Result<Option<u32>> {
            if self.0.is_empty() {
                return Ok(None);
            }

            Ok(Some(self.0.remove(0)))
        }
    }

    let mut framed = FramedRead::new(mock!(), MyDecoder(vec![0, 1, 2, 3]));

    task.enter(|cx, _| {
        assert_read!(pin!(framed).poll_next(cx), 0);
        assert_read!(pin!(framed).poll_next(cx), 1);
        assert_read!(pin!(framed).poll_next(cx), 2);
        assert_read!(pin!(framed).poll_next(cx), 3);
        assert!(assert_ready!(pin!(framed).poll_next(cx)).is_none());
    });
}

#[test]
fn read_eof_then_resume() {
    let mut task = task::spawn(());
    let mock = mock! {
        Ok(b"\x00\x00\x00\x01".to_vec()),
        Ok(b"".to_vec()),
        Ok(b"\x00\x00\x00\x02".to_vec()),
        Ok(b"".to_vec()),
        Ok(b"\x00\x00\x00\x03".to_vec()),
    };
    let mut framed = FramedRead::new(mock, U32Decoder);

    task.enter(|cx, _| {
        assert_read!(pin!(framed).poll_next(cx), 1);
        assert!(assert_ready!(pin!(framed).poll_next(cx)).is_none());
        assert_read!(pin!(framed).poll_next(cx), 2);
        assert!(assert_ready!(pin!(framed).poll_next(cx)).is_none());
        assert_read!(pin!(framed).poll_next(cx), 3);
        assert!(assert_ready!(pin!(framed).poll_next(cx)).is_none());
        assert!(assert_ready!(pin!(framed).poll_next(cx)).is_none());
    });
}

// ===== Mock ======

struct Mock {
    calls: VecDeque<io::Result<Vec<u8>>>,
}

impl AsyncRead for Mock {
    fn poll_read(
        mut self: Pin<&mut Self>,
        _cx: &mut Context<'_>,
        buf: &mut ReadBuf<'_>,
    ) -> Poll<io::Result<()>> {
        use io::ErrorKind::WouldBlock;

        match self.calls.pop_front() {
            Some(Ok(data)) => {
                debug_assert!(buf.remaining() >= data.len());
                buf.put_slice(&data);
                Ready(Ok(()))
            }
            Some(Err(ref e)) if e.kind() == WouldBlock => Pending,
            Some(Err(e)) => Ready(Err(e)),
            None => Ready(Ok(())),
        }
    }
}