piper 0.2.5

An asynchronous single-consumer single-producer pipe for bytes.
Documentation
use easy_parallel::Parallel;
use futures_lite::{
    future::{self, block_on},
    AsyncReadExt, AsyncWriteExt,
};
use piper::pipe;

use std::future::poll_fn;
use std::task::{Context, Poll};
use std::thread::sleep;
use std::time::Duration;

#[test]
fn smoke() {
    let (mut r, mut w) = pipe(8);
    let mut buf = [0u8; 8];

    future::block_on(w.write_all(&[1, 2, 3, 4, 5, 6, 7, 8])).unwrap();
    future::block_on(r.read_exact(&mut buf)).unwrap();

    assert_eq!(buf, [1, 2, 3, 4, 5, 6, 7, 8]);

    future::block_on(w.write_all(&[9, 10, 11, 12, 13, 14, 15, 16])).unwrap();
    future::block_on(r.read_exact(&mut buf)).unwrap();

    assert_eq!(buf, [9, 10, 11, 12, 13, 14, 15, 16]);

    drop(w);
    assert_eq!(future::block_on(r.read(&mut buf)).ok(), Some(0));
}

#[test]
fn read() {
    let (mut r, mut w) = pipe(100);
    let ms = Duration::from_micros;

    Parallel::new()
        .add(move || {
            let mut buf = [0u8; 3];
            sleep(ms(1000));
            future::block_on(r.read_exact(&mut buf)).unwrap();
            assert_eq!(buf, [1, 2, 3]);

            sleep(ms(1000));
            future::block_on(r.read_exact(&mut buf)).unwrap();
            assert_eq!(buf, [4, 5, 6]);
        })
        .add(move || {
            sleep(ms(1500));
            future::block_on(w.write_all(&[1, 2, 3, 4, 5, 6])).unwrap();
        })
        .run();
}

#[test]
fn buf_read() {
    use futures_lite::io::AsyncBufReadExt;
    let (mut r, mut w) = pipe(8);
    let ms = Duration::from_micros;

    Parallel::new()
        .add(move || {
            let mut line = String::new();
            future::block_on(r.read_line(&mut line)).unwrap();
            assert_eq!(line, "hello world\n");
            line.clear();

            future::block_on(r.read_line(&mut line)).unwrap();
            assert_eq!(line, "line2\n");
            line.clear();

            future::block_on(r.read_line(&mut line)).unwrap();
            assert_eq!(line, "line3");
        })
        .add(move || {
            sleep(ms(500));
            future::block_on(w.write_all(b"hello world\nline2\n")).unwrap();
            sleep(ms(100));
            future::block_on(w.write_all(b"line3")).unwrap();
        })
        .run();
}

#[test]
#[should_panic]
fn excessive_consume() {
    let (mut r, _) = pipe(8);
    r.consume(1);
}

#[test]
#[should_panic]
fn excessive_produced() {
    let (_, mut w) = pipe(8);
    w.produced(9);
}

#[should_panic]
#[test]
fn zero_cap_pipe() {
    let _ = pipe(0);
}

#[should_panic]
#[test]
fn large_pipe() {
    let _ = pipe(core::usize::MAX);
}

#[test]
fn dropping_reader_wakes_writer() {
    let (r, mut w) = pipe(1);

    Parallel::new()
        .add(move || {
            sleep(Duration::from_millis(100));
            drop(r);
        })
        .add(move || {
            future::block_on(w.write_all(&[0u8])).unwrap();
            sleep(Duration::from_millis(200));
            with_cx(|cx| {
                assert_eq!(w.poll_fill_bytes(cx, &[0u8]), Poll::Ready(0));
                assert!(w.is_closed());
            });
        })
        .run();
}

#[test]
fn dropping_writer_wakes_reader() {
    let (mut r, w) = pipe(1);

    Parallel::new()
        .add(move || {
            drop(w);
        })
        .add(move || {
            sleep(Duration::from_millis(100));
            with_cx(|cx| {
                assert_eq!(r.poll_drain_bytes(cx, &mut [0u8]), Poll::Ready(0));
                assert!(r.is_closed());
            });
        })
        .run();
}

#[test]
fn len() {
    let (mut r, mut w) = pipe(100);

    assert_eq!(r.len(), 0);
    assert_eq!(w.len(), 0);
    assert!(r.is_empty());
    assert!(w.is_empty());
    assert!(!r.is_full());
    assert!(!w.is_full());

    let buf = [0u8; 10];
    future::block_on(w.write_all(&buf)).unwrap();

    assert_eq!(r.len(), 10);
    assert_eq!(w.len(), 10);
    assert!(!r.is_empty());
    assert!(!w.is_empty());
    assert!(!r.is_full());
    assert!(!w.is_full());

    // Fill up the pipe
    let buf = [0u8; 90];
    future::block_on(w.write_all(&buf)).unwrap();

    assert_eq!(r.len(), 100);
    assert_eq!(w.len(), 100);
    assert!(!r.is_empty());
    assert!(!w.is_empty());
    assert!(r.is_full());
    assert!(w.is_full());

    // Read some bytes.
    let mut buf = [0u8; 15];
    future::block_on(r.read_exact(&mut buf)).unwrap();

    assert_eq!(r.len(), 85);
    assert_eq!(w.len(), 85);
    assert!(!r.is_empty());
    assert!(!w.is_empty());
    assert!(!r.is_full());
    assert!(!w.is_full());

    // Write some more to loop around the capacity.
    let buf = [0u8; 10];
    future::block_on(w.write_all(&buf)).unwrap();

    assert_eq!(r.len(), 95);
    assert_eq!(w.len(), 95);
    assert!(!r.is_empty());
    assert!(!w.is_empty());
    assert!(!r.is_full());
    assert!(!w.is_full());
}

fn with_cx<R, F: FnOnce(&mut Context<'_>) -> R>(f: F) -> R {
    let mut f = Some(f);
    future::block_on(future::poll_fn(|cx| Poll::Ready((f.take().unwrap())(cx))))
}

#[test]
fn dropping_writer_does_not_lose_writes() {
    let (mut r, mut w) = pipe(20);

    const TEXT: &str = "hello world";
    Parallel::new()
        .add(move || {
            block_on(async {
                let mut bytes = Vec::with_capacity(20);
                // Keep draining until we're out of bytes.
                while poll_fn(|cx| r.poll_drain(cx, &mut bytes)).await.unwrap() != 0 {}
                assert_eq!(str::from_utf8(&bytes).unwrap(), TEXT);
            });
        })
        .add(move || {
            block_on(async move {
                w.write_all(TEXT.as_bytes()).await.unwrap();
                drop(w);
            });
        })
        .run();
}