tokio-util 0.3.1

Additional utilities for working with Tokio.
Documentation
#![warn(rust_2018_idioms)]

use tokio::io::AsyncWrite;
use tokio_test::{assert_ready, task};
use tokio_util::codec::{Encoder, FramedWrite};

use bytes::{BufMut, BytesMut};
use futures_sink::Sink;
use std::collections::VecDeque;
use std::io::{self, Write};
use std::pin::Pin;
use std::task::Poll::{Pending, Ready};
use std::task::{Context, Poll};

macro_rules! mock {
    ($($x:expr,)*) => {{
        let mut v = VecDeque::new();
        v.extend(vec![$($x),*]);
        Mock { calls: v }
    }};
}

macro_rules! pin {
    ($id:ident) => {
        Pin::new(&mut $id)
    };
}

struct U32Encoder;

impl Encoder<u32> for U32Encoder {
    type Error = io::Error;

    fn encode(&mut self, item: u32, dst: &mut BytesMut) -> io::Result<()> {
        // Reserve space
        dst.reserve(4);
        dst.put_u32(item);
        Ok(())
    }
}

#[test]
fn write_multi_frame_in_packet() {
    let mut task = task::spawn(());
    let mock = mock! {
        Ok(b"\x00\x00\x00\x00\x00\x00\x00\x01\x00\x00\x00\x02".to_vec()),
    };
    let mut framed = FramedWrite::new(mock, U32Encoder);

    task.enter(|cx, _| {
        assert!(assert_ready!(pin!(framed).poll_ready(cx)).is_ok());
        assert!(pin!(framed).start_send(0).is_ok());
        assert!(assert_ready!(pin!(framed).poll_ready(cx)).is_ok());
        assert!(pin!(framed).start_send(1).is_ok());
        assert!(assert_ready!(pin!(framed).poll_ready(cx)).is_ok());
        assert!(pin!(framed).start_send(2).is_ok());

        // Nothing written yet
        assert_eq!(1, framed.get_ref().calls.len());

        // Flush the writes
        assert!(assert_ready!(pin!(framed).poll_flush(cx)).is_ok());

        assert_eq!(0, framed.get_ref().calls.len());
    });
}

#[test]
fn write_hits_backpressure() {
    const ITER: usize = 2 * 1024;

    let mut mock = mock! {
        // Block the `ITER`th write
        Err(io::Error::new(io::ErrorKind::WouldBlock, "not ready")),
        Ok(b"".to_vec()),
    };

    for i in 0..=ITER {
        let mut b = BytesMut::with_capacity(4);
        b.put_u32(i as u32);

        // Append to the end
        match mock.calls.back_mut().unwrap() {
            Ok(ref mut data) => {
                // Write in 2kb chunks
                if data.len() < ITER {
                    data.extend_from_slice(&b[..]);
                    continue;
                } // else fall through and create a new buffer
            }
            _ => unreachable!(),
        }

        // Push a new new chunk
        mock.calls.push_back(Ok(b[..].to_vec()));
    }
    // 1 'wouldblock', 4 * 2KB buffers, 1 b-byte buffer
    assert_eq!(mock.calls.len(), 6);

    let mut task = task::spawn(());
    let mut framed = FramedWrite::new(mock, U32Encoder);
    task.enter(|cx, _| {
        // Send 8KB. This fills up FramedWrite2 buffer
        for i in 0..ITER {
            assert!(assert_ready!(pin!(framed).poll_ready(cx)).is_ok());
            assert!(pin!(framed).start_send(i as u32).is_ok());
        }

        // Now we poll_ready which forces a flush. The mock pops the front message
        // and decides to block.
        assert!(pin!(framed).poll_ready(cx).is_pending());

        // We poll again, forcing another flush, which this time succeeds
        // The whole 8KB buffer is flushed
        assert!(assert_ready!(pin!(framed).poll_ready(cx)).is_ok());

        // Send more data. This matches the final message expected by the mock
        assert!(pin!(framed).start_send(ITER as u32).is_ok());

        // Flush the rest of the buffer
        assert!(assert_ready!(pin!(framed).poll_flush(cx)).is_ok());

        // Ensure the mock is empty
        assert_eq!(0, framed.get_ref().calls.len());
    })
}

// // ===== Mock ======

struct Mock {
    calls: VecDeque<io::Result<Vec<u8>>>,
}

impl Write for Mock {
    fn write(&mut self, src: &[u8]) -> io::Result<usize> {
        match self.calls.pop_front() {
            Some(Ok(data)) => {
                assert!(src.len() >= data.len());
                assert_eq!(&data[..], &src[..data.len()]);
                Ok(data.len())
            }
            Some(Err(e)) => Err(e),
            None => panic!("unexpected write; {:?}", src),
        }
    }

    fn flush(&mut self) -> io::Result<()> {
        Ok(())
    }
}

impl AsyncWrite for Mock {
    fn poll_write(
        self: Pin<&mut Self>,
        _cx: &mut Context<'_>,
        buf: &[u8],
    ) -> Poll<Result<usize, io::Error>> {
        match Pin::get_mut(self).write(buf) {
            Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => Pending,
            other => Ready(other),
        }
    }
    fn poll_flush(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Result<(), io::Error>> {
        match Pin::get_mut(self).flush() {
            Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => Pending,
            other => Ready(other),
        }
    }
    fn poll_shutdown(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Result<(), io::Error>> {
        unimplemented!()
    }
}