#![feature(futures_api, arbitrary_self_types, pin)]
use futures::channel::oneshot::{self, Sender};
use futures::executor::block_on;
use futures::future::{Future, FutureExt, poll_fn};
use futures::task::{self, Poll};
use std::pin::PinMut;
use std::sync::mpsc;
use std::thread;
#[test]
fn smoke_poll() {
let (mut tx, rx) = oneshot::channel::<u32>();
let mut rx = Some(rx);
let f = poll_fn(|cx| {
assert!(tx.poll_cancel(cx).is_pending());
assert!(tx.poll_cancel(cx).is_pending());
drop(rx.take());
assert!(tx.poll_cancel(cx).is_ready());
assert!(tx.poll_cancel(cx).is_ready());
Poll::Ready(())
});
block_on(f);
}
#[test]
fn cancel_notifies() {
let (tx, rx) = oneshot::channel::<u32>();
let t = thread::spawn(|| {
block_on(WaitForCancel { tx: tx });
});
drop(rx);
t.join().unwrap();
}
struct WaitForCancel {
tx: Sender<u32>,
}
impl Future for WaitForCancel {
type Output = ();
fn poll(mut self: PinMut<Self>, cx: &mut task::Context) -> Poll<Self::Output> {
self.tx.poll_cancel(cx)
}
}
#[test]
fn cancel_lots() {
let (tx, rx) = mpsc::channel::<(Sender<_>, mpsc::Sender<_>)>();
let t = thread::spawn(move || {
for (tx, tx2) in rx {
block_on(WaitForCancel { tx });
tx2.send(()).unwrap();
}
});
for _ in 0..20000 {
let (otx, orx) = oneshot::channel::<u32>();
let (tx2, rx2) = mpsc::channel();
tx.send((otx, tx2)).unwrap();
drop(orx);
rx2.recv().unwrap();
}
drop(tx);
t.join().unwrap();
}
#[test]
fn close() {
let (mut tx, mut rx) = oneshot::channel::<u32>();
rx.close();
block_on(poll_fn(|cx| {
match rx.poll_unpin(cx) {
Poll::Ready(Err(_)) => {},
_ => panic!(),
};
assert!(tx.poll_cancel(cx).is_ready());
Poll::Ready(())
}));
}
#[test]
fn close_wakes() {
let (tx, mut rx) = oneshot::channel::<u32>();
let (tx2, rx2) = mpsc::channel();
let t = thread::spawn(move || {
rx.close();
rx2.recv().unwrap();
});
block_on(WaitForCancel { tx: tx });
tx2.send(()).unwrap();
t.join().unwrap();
}
#[test]
fn is_canceled() {
let (tx, rx) = oneshot::channel::<u32>();
assert!(!tx.is_canceled());
drop(rx);
assert!(tx.is_canceled());
}
#[test]
fn cancel_sends() {
let (tx, rx) = mpsc::channel::<Sender<_>>();
let t = thread::spawn(move || {
for otx in rx {
let _ = otx.send(42);
}
});
for _ in 0..20000 {
let (otx, mut orx) = oneshot::channel::<u32>();
tx.send(otx).unwrap();
orx.close();
drop(block_on(orx));
}
drop(tx);
t.join().unwrap();
}