#![cfg(feature = "use_std")]
#[macro_use]
extern crate futures;
use futures::prelude::*;
use futures::future::{lazy, ok};
use futures::stream::unfold;
use futures::sync::mpsc;
use futures::sync::oneshot;
use std::thread;
use std::sync::{Arc, Mutex};
use std::sync::atomic::{AtomicUsize, Ordering};
mod support;
use support::*;
trait AssertSend: Send {}
impl AssertSend for mpsc::Sender<i32> {}
impl AssertSend for mpsc::Receiver<i32> {}
#[test]
fn send_recv() {
let (tx, rx) = mpsc::channel::<i32>(16);
let mut rx = rx.wait();
tx.send(1).wait().unwrap();
assert_eq!(rx.next().unwrap(), Ok(1));
}
#[test]
fn send_recv_no_buffer() {
let (mut tx, mut rx) = mpsc::channel::<i32>(0);
lazy(move || {
assert!(tx.poll_complete().unwrap().is_ready());
assert!(tx.poll_ready().unwrap().is_ready());
let res = tx.start_send(1).unwrap();
assert!(is_ready(&res));
assert!(tx.poll_ready().unwrap().is_not_ready());
let res = tx.start_send(2).unwrap();
assert!(!is_ready(&res));
assert_eq!(rx.poll().unwrap(), Async::Ready(Some(1)));
assert!(tx.poll_ready().unwrap().is_ready());
let res = tx.start_send(2).unwrap();
assert!(is_ready(&res));
assert!(tx.poll_ready().unwrap().is_not_ready());
assert_eq!(rx.poll().unwrap(), Async::Ready(Some(2)));
assert!(tx.poll_ready().unwrap().is_ready());
Ok::<(), ()>(())
}).wait().unwrap();
}
#[test]
fn send_shared_recv() {
let (tx1, rx) = mpsc::channel::<i32>(16);
let tx2 = tx1.clone();
let mut rx = rx.wait();
tx1.send(1).wait().unwrap();
assert_eq!(rx.next().unwrap(), Ok(1));
tx2.send(2).wait().unwrap();
assert_eq!(rx.next().unwrap(), Ok(2));
}
#[test]
fn send_recv_threads() {
let (tx, rx) = mpsc::channel::<i32>(16);
let mut rx = rx.wait();
thread::spawn(move|| {
tx.send(1).wait().unwrap();
});
assert_eq!(rx.next().unwrap(), Ok(1));
}
#[test]
fn send_recv_threads_no_capacity() {
let (tx, rx) = mpsc::channel::<i32>(0);
let mut rx = rx.wait();
let (readytx, readyrx) = mpsc::channel::<()>(2);
let mut readyrx = readyrx.wait();
let t = thread::spawn(move|| {
let readytx = readytx.sink_map_err(|_| panic!());
let (a, b) = tx.send(1).join(readytx.send(())).wait().unwrap();
a.send(2).join(b.send(())).wait().unwrap();
});
drop(readyrx.next().unwrap());
assert_eq!(rx.next().unwrap(), Ok(1));
drop(readyrx.next().unwrap());
assert_eq!(rx.next().unwrap(), Ok(2));
t.join().unwrap();
}
#[test]
fn recv_close_gets_none() {
let (mut tx, mut rx) = mpsc::channel::<i32>(10);
lazy(move || {
rx.close();
assert_eq!(rx.poll(), Ok(Async::Ready(None)));
assert!(tx.poll_ready().is_err());
drop(tx);
Ok::<(), ()>(())
}).wait().unwrap();
}
#[test]
fn tx_close_gets_none() {
let (_, mut rx) = mpsc::channel::<i32>(10);
lazy(move || {
assert_eq!(rx.poll(), Ok(Async::Ready(None)));
assert_eq!(rx.poll(), Ok(Async::Ready(None)));
Ok::<(), ()>(())
}).wait().unwrap();
}
#[test]
fn spawn_sends_items() {
let core = local_executor::Core::new();
let stream = unfold(0, |i| Some(ok::<_,u8>((i, i + 1))));
let rx = mpsc::spawn(stream, &core, 1);
assert_eq!(core.run(rx.take(4).collect()).unwrap(),
[0, 1, 2, 3]);
}
#[test]
fn spawn_kill_dead_stream() {
use std::thread;
use std::time::Duration;
use futures::future::Either;
use futures::sync::oneshot;
#[derive(Debug)]
struct Dead {
done: oneshot::Sender<()>,
}
impl Stream for Dead {
type Item = ();
type Error = ();
fn poll(&mut self) -> Poll<Option<Self::Item>, Self::Error> {
Ok(Async::NotReady)
}
}
let (timeout_tx, timeout_rx) = oneshot::channel();
thread::spawn(move || {
thread::sleep(Duration::from_millis(1000));
let _ = timeout_tx.send(());
});
let core = local_executor::Core::new();
let (done_tx, done_rx) = oneshot::channel();
let stream = Dead{done: done_tx};
let rx = mpsc::spawn(stream, &core, 1);
let res = core.run(
Ok::<_, ()>(())
.into_future()
.then(move |_| {
drop(rx);
done_rx
})
.select2(timeout_rx)
);
match res {
Err(Either::A((oneshot::Canceled, _))) => (),
_ => {
panic!("dead stream wasn't canceled");
},
}
}
#[test]
fn stress_shared_unbounded() {
const AMT: u32 = 10000;
const NTHREADS: u32 = 8;
let (tx, rx) = mpsc::unbounded::<i32>();
let mut rx = rx.wait();
let t = thread::spawn(move|| {
for _ in 0..AMT * NTHREADS {
assert_eq!(rx.next().unwrap(), Ok(1));
}
if rx.next().is_some() {
panic!();
}
});
for _ in 0..NTHREADS {
let tx = tx.clone();
thread::spawn(move|| {
for _ in 0..AMT {
tx.unbounded_send(1).unwrap();
}
});
}
drop(tx);
t.join().ok().unwrap();
}
#[test]
fn stress_shared_bounded_hard() {
const AMT: u32 = 10000;
const NTHREADS: u32 = 8;
let (tx, rx) = mpsc::channel::<i32>(0);
let mut rx = rx.wait();
let t = thread::spawn(move|| {
for _ in 0..AMT * NTHREADS {
assert_eq!(rx.next().unwrap(), Ok(1));
}
if rx.next().is_some() {
panic!();
}
});
for _ in 0..NTHREADS {
let mut tx = tx.clone();
thread::spawn(move|| {
for _ in 0..AMT {
tx = tx.send(1).wait().unwrap();
}
});
}
drop(tx);
t.join().ok().unwrap();
}
#[test]
fn stress_receiver_multi_task_bounded_hard() {
const AMT: usize = 10_000;
const NTHREADS: u32 = 2;
let (mut tx, rx) = mpsc::channel::<usize>(0);
let rx = Arc::new(Mutex::new(Some(rx)));
let n = Arc::new(AtomicUsize::new(0));
let mut th = vec![];
for _ in 0..NTHREADS {
let rx = rx.clone();
let n = n.clone();
let t = thread::spawn(move || {
let mut i = 0;
loop {
i += 1;
let mut lock = rx.lock().ok().unwrap();
match lock.take() {
Some(mut rx) => {
if i % 5 == 0 {
let (item, rest) = rx.into_future().wait().ok().unwrap();
if item.is_none() {
break;
}
n.fetch_add(1, Ordering::Relaxed);
*lock = Some(rest);
} else {
let n = n.clone();
let r = lazy(move || {
let r = match rx.poll().unwrap() {
Async::Ready(Some(_)) => {
n.fetch_add(1, Ordering::Relaxed);
*lock = Some(rx);
false
}
Async::Ready(None) => {
true
}
Async::NotReady => {
*lock = Some(rx);
false
}
};
Ok::<bool, ()>(r)
}).wait().unwrap();
if r {
break;
}
}
}
None => break,
}
}
});
th.push(t);
}
for i in 0..AMT {
tx = tx.send(i).wait().unwrap();
}
drop(tx);
for t in th {
t.join().unwrap();
}
assert_eq!(AMT, n.load(Ordering::Relaxed));
}
#[test]
fn stress_drop_sender() {
fn list() -> Box<Stream<Item=i32, Error=u32>> {
let (tx, rx) = mpsc::channel(1);
tx.send(Ok(1))
.and_then(|tx| tx.send(Ok(2)))
.and_then(|tx| tx.send(Ok(3)))
.forget();
Box::new(rx.then(|r| r.unwrap()))
}
for _ in 0..10000 {
assert_eq!(list().wait().collect::<Result<Vec<_>, _>>(),
Ok(vec![1, 2, 3]));
}
}
fn stress_close_receiver_iter() {
let (tx, rx) = mpsc::unbounded();
let (unwritten_tx, unwritten_rx) = std::sync::mpsc::channel();
let th = thread::spawn(move || {
for i in 1.. {
if let Err(_) = tx.unbounded_send(i) {
unwritten_tx.send(i).expect("unwritten_tx");
return;
}
}
});
let mut rx = rx.wait();
assert_eq!(Some(Ok(1)), rx.next());
rx.get_mut().close();
for i in 2.. {
match rx.next() {
Some(Ok(r)) => assert!(i == r),
Some(Err(_)) => unreachable!(),
None => {
let unwritten = unwritten_rx.recv().expect("unwritten_rx");
assert_eq!(unwritten, i);
th.join().unwrap();
return;
}
}
}
}
#[test]
fn stress_close_receiver() {
for _ in 0..10000 {
stress_close_receiver_iter();
}
}
#[test]
fn stress_poll_ready() {
struct SenderTask {
sender: mpsc::Sender<u32>,
count: u32,
}
impl Future for SenderTask {
type Item = ();
type Error = ();
fn poll(&mut self) -> Poll<(), ()> {
while self.count > 0 {
try_ready!(self.sender.poll_ready().map_err(|_| ()));
assert!(self.sender.start_send(self.count).unwrap().is_ready());
self.count -= 1;
}
Ok(Async::Ready(()))
}
}
const AMT: u32 = 1000;
const NTHREADS: u32 = 8;
fn stress(capacity: usize) {
let (tx, rx) = mpsc::channel(capacity);
let mut threads = Vec::new();
for _ in 0..NTHREADS {
let sender = tx.clone();
threads.push(thread::spawn(move || {
SenderTask {
sender: sender,
count: AMT,
}.wait()
}));
}
drop(tx);
let mut rx = rx.wait();
for _ in 0..AMT * NTHREADS {
assert!(rx.next().is_some());
}
assert!(rx.next().is_none());
for thread in threads {
thread.join().unwrap().unwrap();
}
}
stress(0);
stress(1);
stress(8);
stress(16);
}
fn is_ready<T>(res: &AsyncSink<T>) -> bool {
match *res {
AsyncSink::Ready => true,
_ => false,
}
}
#[test]
fn try_send_1() {
const N: usize = 3000;
let (mut tx, rx) = mpsc::channel(0);
let t = thread::spawn(move || {
for i in 0..N {
loop {
if tx.try_send(i).is_ok() {
break
}
}
}
});
for (i, j) in rx.wait().enumerate() {
assert_eq!(i, j.unwrap());
}
t.join().unwrap();
}
#[test]
fn try_send_2() {
let (mut tx, rx) = mpsc::channel(0);
tx.try_send("hello").unwrap();
let (readytx, readyrx) = oneshot::channel::<()>();
let th = thread::spawn(|| {
lazy(|| {
assert!(tx.start_send("fail").unwrap().is_not_ready());
Ok::<_, ()>(())
}).wait().unwrap();
drop(readytx);
tx.send("goodbye").wait().unwrap();
});
let mut rx = rx.wait();
drop(readyrx.wait());
assert_eq!(rx.next(), Some(Ok("hello")));
assert_eq!(rx.next(), Some(Ok("goodbye")));
assert!(rx.next().is_none());
th.join().unwrap();
}
#[test]
fn try_send_fail() {
let (mut tx, rx) = mpsc::channel(0);
let mut rx = rx.wait();
tx.try_send("hello").unwrap();
assert!(tx.try_send("fail").is_err());
assert_eq!(rx.next(), Some(Ok("hello")));
tx.try_send("goodbye").unwrap();
drop(tx);
assert_eq!(rx.next(), Some(Ok("goodbye")));
assert!(rx.next().is_none());
}
#[test]
fn bounded_is_really_bounded() {
use futures::Async::*;
let (mut tx, mut rx) = mpsc::channel(0);
lazy(|| {
assert!(tx.start_send(1).unwrap().is_ready());
assert!(!tx.poll_complete().unwrap().is_ready());
assert_eq!(rx.poll().unwrap(), Ready(Some(1)));
assert!(tx.poll_complete().unwrap().is_ready());
Ok::<_, ()>(())
}).wait().unwrap();
}