use easy_parallel::Parallel;
use futures_lite::{
future::{self, block_on},
AsyncReadExt, AsyncWriteExt,
};
use piper::pipe;
use std::future::poll_fn;
use std::task::{Context, Poll};
use std::thread::sleep;
use std::time::Duration;
#[test]
fn smoke() {
let (mut r, mut w) = pipe(8);
let mut buf = [0u8; 8];
future::block_on(w.write_all(&[1, 2, 3, 4, 5, 6, 7, 8])).unwrap();
future::block_on(r.read_exact(&mut buf)).unwrap();
assert_eq!(buf, [1, 2, 3, 4, 5, 6, 7, 8]);
future::block_on(w.write_all(&[9, 10, 11, 12, 13, 14, 15, 16])).unwrap();
future::block_on(r.read_exact(&mut buf)).unwrap();
assert_eq!(buf, [9, 10, 11, 12, 13, 14, 15, 16]);
drop(w);
assert_eq!(future::block_on(r.read(&mut buf)).ok(), Some(0));
}
#[test]
fn read() {
let (mut r, mut w) = pipe(100);
let ms = Duration::from_micros;
Parallel::new()
.add(move || {
let mut buf = [0u8; 3];
sleep(ms(1000));
future::block_on(r.read_exact(&mut buf)).unwrap();
assert_eq!(buf, [1, 2, 3]);
sleep(ms(1000));
future::block_on(r.read_exact(&mut buf)).unwrap();
assert_eq!(buf, [4, 5, 6]);
})
.add(move || {
sleep(ms(1500));
future::block_on(w.write_all(&[1, 2, 3, 4, 5, 6])).unwrap();
})
.run();
}
#[test]
fn buf_read() {
use futures_lite::io::AsyncBufReadExt;
let (mut r, mut w) = pipe(8);
let ms = Duration::from_micros;
Parallel::new()
.add(move || {
let mut line = String::new();
future::block_on(r.read_line(&mut line)).unwrap();
assert_eq!(line, "hello world\n");
line.clear();
future::block_on(r.read_line(&mut line)).unwrap();
assert_eq!(line, "line2\n");
line.clear();
future::block_on(r.read_line(&mut line)).unwrap();
assert_eq!(line, "line3");
})
.add(move || {
sleep(ms(500));
future::block_on(w.write_all(b"hello world\nline2\n")).unwrap();
sleep(ms(100));
future::block_on(w.write_all(b"line3")).unwrap();
})
.run();
}
#[test]
#[should_panic]
fn excessive_consume() {
let (mut r, _) = pipe(8);
r.consume(1);
}
#[test]
#[should_panic]
fn excessive_produced() {
let (_, mut w) = pipe(8);
w.produced(9);
}
#[should_panic]
#[test]
fn zero_cap_pipe() {
let _ = pipe(0);
}
#[should_panic]
#[test]
fn large_pipe() {
let _ = pipe(core::usize::MAX);
}
#[test]
fn dropping_reader_wakes_writer() {
let (r, mut w) = pipe(1);
Parallel::new()
.add(move || {
sleep(Duration::from_millis(100));
drop(r);
})
.add(move || {
future::block_on(w.write_all(&[0u8])).unwrap();
sleep(Duration::from_millis(200));
with_cx(|cx| {
assert_eq!(w.poll_fill_bytes(cx, &[0u8]), Poll::Ready(0));
assert!(w.is_closed());
});
})
.run();
}
#[test]
fn dropping_writer_wakes_reader() {
let (mut r, w) = pipe(1);
Parallel::new()
.add(move || {
drop(w);
})
.add(move || {
sleep(Duration::from_millis(100));
with_cx(|cx| {
assert_eq!(r.poll_drain_bytes(cx, &mut [0u8]), Poll::Ready(0));
assert!(r.is_closed());
});
})
.run();
}
#[test]
fn len() {
let (mut r, mut w) = pipe(100);
assert_eq!(r.len(), 0);
assert_eq!(w.len(), 0);
assert!(r.is_empty());
assert!(w.is_empty());
assert!(!r.is_full());
assert!(!w.is_full());
let buf = [0u8; 10];
future::block_on(w.write_all(&buf)).unwrap();
assert_eq!(r.len(), 10);
assert_eq!(w.len(), 10);
assert!(!r.is_empty());
assert!(!w.is_empty());
assert!(!r.is_full());
assert!(!w.is_full());
let buf = [0u8; 90];
future::block_on(w.write_all(&buf)).unwrap();
assert_eq!(r.len(), 100);
assert_eq!(w.len(), 100);
assert!(!r.is_empty());
assert!(!w.is_empty());
assert!(r.is_full());
assert!(w.is_full());
let mut buf = [0u8; 15];
future::block_on(r.read_exact(&mut buf)).unwrap();
assert_eq!(r.len(), 85);
assert_eq!(w.len(), 85);
assert!(!r.is_empty());
assert!(!w.is_empty());
assert!(!r.is_full());
assert!(!w.is_full());
let buf = [0u8; 10];
future::block_on(w.write_all(&buf)).unwrap();
assert_eq!(r.len(), 95);
assert_eq!(w.len(), 95);
assert!(!r.is_empty());
assert!(!w.is_empty());
assert!(!r.is_full());
assert!(!w.is_full());
}
fn with_cx<R, F: FnOnce(&mut Context<'_>) -> R>(f: F) -> R {
let mut f = Some(f);
future::block_on(future::poll_fn(|cx| Poll::Ready((f.take().unwrap())(cx))))
}
#[test]
fn dropping_writer_does_not_lose_writes() {
let (mut r, mut w) = pipe(20);
const TEXT: &str = "hello world";
Parallel::new()
.add(move || {
block_on(async {
let mut bytes = Vec::with_capacity(20);
while poll_fn(|cx| r.poll_drain(cx, &mut bytes)).await.unwrap() != 0 {}
assert_eq!(str::from_utf8(&bytes).unwrap(), TEXT);
});
})
.add(move || {
block_on(async move {
w.write_all(TEXT.as_bytes()).await.unwrap();
drop(w);
});
})
.run();
}