use futures::stream::FuturesUnordered;
use futures::{Stream, StreamExt, stream};
use join_me_maybe::join;
use std::cell::Cell;
use std::future::ready;
use std::hash::Hash;
use std::pin::Pin;
use std::task::{Context, Poll, Waker};
use tokio::time::{Duration, sleep};
use tokio_stream::StreamMap;
#[tokio::test]
async fn test_maybe() {
let ret = join!(
maybe ready(1),
ready(2),
ready(3),
maybe ready(4),
);
assert_eq!(ret, (Some(1), 2, 3, None));
}
#[tokio::test]
async fn test_cancel() {
let ret = join!(
maybe ready(0),
ready(1),
foo: async {
sleep(Duration::from_secs(1_000_000)).await;
2 },
bar: maybe async {
sleep(Duration::from_secs(1_000_000)).await;
3 },
maybe async {
foo.cancel();
4
},
async {
bar.cancel();
5
},
baz: maybe ready(6),
);
assert_eq!(ret, (Some(0), 1, None, None, Some(4), 5, None));
}
#[tokio::test]
async fn test_early_exit() {
let ret = join!(
maybe async {
foo.cancel();
sleep(Duration::from_secs(0)).await;
0
},
foo: ready(1),
);
assert_eq!(ret, (None, None));
}
#[tokio::test]
async fn test_cancel_already_finished() {
let ret = join!(
foo: ready(0),
async {
foo.cancel();
1
},
async {
2
},
);
assert_eq!(ret, (Some(0), 1, 2));
}
#[tokio::test]
async fn test_drop_promptly() {
let mutex1 = tokio::sync::Mutex::new(());
let mutex2 = tokio::sync::Mutex::new(());
let mutex3 = tokio::sync::Mutex::new(());
let ret = join!(
foo: async {
let _guard = mutex1.lock().await;
sleep(Duration::from_secs(1_000_000)).await;
},
maybe async {
let _guard = mutex2.lock().await;
sleep(Duration::from_secs(1_000_000)).await;
},
bar: maybe async {
let _guard = mutex3.lock().await;
sleep(Duration::from_secs(1_000_000)).await;
},
_ = async {
foo.cancel();
let _guard = mutex1.lock().await;
} => {
let _guard = mutex2.lock().await;
let _guard = mutex3.lock().await;
}
);
assert_eq!(ret, (None, None, None, ()));
}
#[tokio::test]
async fn test_future_items_drop_promptly() {
let mutex = tokio::sync::Mutex::new(());
let mut foo_guard_acquired = false;
let mut foo_body_ran = false;
join!(
maybe _ = ready(()) => {
sleep(Duration::from_secs(1_000_000)).await;
},
foo: _guard = async {
let guard = mutex.lock().await;
foo_guard_acquired = true;
guard
} => {
foo_body_ran = true;
},
async {
foo.cancel();
let _guard = mutex.lock().await;
}
);
assert!(foo_guard_acquired);
assert!(!foo_body_ran);
}
#[tokio::test]
async fn test_stream_items_drop_promptly() {
let mutex = tokio::sync::Mutex::new(());
let mut foo_guard_acquired = false;
let mut foo_body_ran = false;
join!(
maybe _ = ready(()) => {
sleep(Duration::from_secs(1_000_000)).await;
},
foo: _guard in stream::once(async {
let guard = mutex.lock().await;
foo_guard_acquired = true;
guard
}) => {
foo_body_ran = true;
},
async {
foo.cancel();
let _guard = mutex.lock().await;
}
);
assert!(foo_guard_acquired);
assert!(!foo_body_ran);
}
#[tokio::test]
async fn test_drop_promptly_within_future_body() {
let mutex = tokio::sync::Mutex::new(());
let ret = join!(
foo: async {
let _guard = mutex.lock().await;
sleep(Duration::from_secs(1_000_000)).await;
},
_ = ready(()) => {
foo.cancel();
let _guard = mutex.lock().await;
assert!(foo.with_pin_mut(|foo| foo.is_none()));
42
}
);
assert_eq!(ret, (None, 42));
}
#[tokio::test]
async fn test_drop_promptly_within_stream_body() {
let mutex = tokio::sync::Mutex::new(());
let mut evidence = None;
join!(
foo: async {
let _guard = mutex.lock().await;
sleep(Duration::from_secs(1_000_000)).await;
},
_ in stream::iter([()]) => {
foo.cancel();
let _guard = mutex.lock().await;
assert!(foo.with_pin_mut(|foo| foo.is_none()));
evidence = Some(42);
}
);
assert_eq!(evidence, Some(42));
}
#[tokio::test]
async fn test_drop_promptly_within_stream_finally() {
let mutex = tokio::sync::Mutex::new(());
let ret = join!(
foo: async {
let _guard = mutex.lock().await;
sleep(Duration::from_secs(1_000_000)).await;
},
_ in stream::iter(0..3) => {} finally {
foo.cancel();
let _guard = mutex.lock().await;
assert!(foo.with_pin_mut(|foo| foo.is_none()));
42
}
);
assert_eq!(ret, (None, 42));
}
#[tokio::test]
async fn test_nontrivial_futures() {
let ret = join!(
maybe async {
sleep(Duration::from_millis(1)).await;
1
},
async {
sleep(Duration::from_millis(10)).await;
2
},
);
assert_eq!(ret, (Some(1), 2));
}
#[tokio::test]
async fn test_future_arms_with_bodies() {
let mut counter = 0;
let ret = join!(
maybe x = ready(1) => {
assert_eq!(x, 1);
counter += 1;
"hello"
},
y = ready(2) => {
counter += 1;
10 * y
},
_ = sleep(Duration::from_millis(1)) => counter += 1,
maybe _ = sleep(Duration::from_millis(10)) => counter += 1,
);
assert_eq!(ret, (Some("hello"), 20, (), None));
assert_eq!(counter, 3);
}
#[tokio::test]
async fn test_stream_arms() {
let mut elements1 = Vec::new();
let mut elements2 = Vec::new();
let mut counter = 0;
let ret = join!(
x in stream::iter(0..5) => {
elements1.push(x);
counter+= 1;
},
x in stream::iter(5..8) => {
elements2.push(x);
counter+= 1;
},
_ = ready(()) => counter += 100,
);
assert_eq!(elements1, [0, 1, 2, 3, 4]);
assert_eq!(elements2, [5, 6, 7]);
assert_eq!(counter, 108);
assert_eq!(ret, ((), (), ()));
}
#[tokio::test]
async fn test_stream_body_sleep_does_not_miss_items() {
let mut elements = Vec::new();
join!(
x in stream::iter(0..3) => {
sleep(Duration::from_millis(1)).await;
elements.push(x);
},
);
assert_eq!(elements, [0, 1, 2]);
}
#[tokio::test]
async fn test_stream_body_sleep_does_not_miss_other_stream_items() {
let mut slow_elements = Vec::new();
let mut fast_elements = Vec::new();
join!(
x in stream::iter(0..3) => {
sleep(Duration::from_millis(1)).await;
slow_elements.push(x);
},
x in stream::iter(0..3) => {
fast_elements.push(x);
},
);
assert_eq!(slow_elements, [0, 1, 2]);
assert_eq!(fast_elements, [0, 1, 2]);
}
#[tokio::test]
async fn test_future_body_sleep_does_not_miss_stream_items() {
let mut elements = Vec::new();
join!(
_ = ready(()) => {
sleep(Duration::from_millis(1)).await;
},
x in stream::iter(0..3) => {
elements.push(x);
},
);
assert_eq!(elements, [0, 1, 2]);
}
#[tokio::test]
async fn test_finally_sleep_does_not_miss_stream_items() {
let mut elements = Vec::new();
join!(
_ in stream::empty::<()>() => {} finally {
sleep(Duration::from_millis(1)).await;
},
x in stream::iter(0..3) => {
elements.push(x);
},
);
assert_eq!(elements, [0, 1, 2]);
}
fn resuming<S>(stream: S) -> ResumingStream<S> {
ResumingStream {
stream: Box::pin(stream),
waker: None,
ended: false,
}
}
struct ResumingStream<S> {
stream: Pin<Box<S>>,
waker: Option<Waker>,
ended: bool,
}
impl<S> ResumingStream<S> {
fn inner(self: Pin<&mut Self>) -> Pin<&mut S> {
if let Some(waker) = &self.waker {
waker.wake_by_ref();
}
self.get_mut().stream.as_mut()
}
fn end(&mut self) {
self.ended = true;
}
}
impl<S: Stream> Stream for ResumingStream<S> {
type Item = S::Item;
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> {
match self.stream.as_mut().poll_next(cx) {
Poll::Ready(None) if self.ended => Poll::Ready(None),
Poll::Pending | Poll::Ready(None) => {
self.waker = Some(cx.waker().clone());
Poll::Pending
}
Poll::Ready(Some(item)) => {
self.waker = None;
Poll::Ready(Some(item))
}
}
}
}
#[tokio::test]
async fn test_canceller_mut_futuresunordered() {
let inputs = futures::stream::iter(0..5).then(|i| async move {
sleep(Duration::from_millis(1)).await;
i
});
let mut outputs = Vec::new();
join!(
i in inputs => {
unordered.with_pin_mut(|unordered| {
unordered.unwrap().inner().push(async move { i });
});
} finally unordered.with_mut(|unordered| unordered.unwrap().end()),
unordered: i in resuming(FuturesUnordered::new()) => outputs.push(i),
);
outputs.sort();
assert_eq!(outputs, [0, 1, 2, 3, 4]);
}
struct WellBehavedStreamMap<K, V> {
map: StreamMap<K, V>,
waker: Option<Waker>,
drain: bool,
}
impl<K, V> WellBehavedStreamMap<K, V> {
fn new() -> Self {
Self {
map: StreamMap::new(),
waker: None,
drain: false,
}
}
fn start_drain(&mut self) {
self.drain = true;
}
}
impl<K: Hash + Eq, V: Stream> WellBehavedStreamMap<K, V> {
fn insert(&mut self, key: K, stream: V) {
assert!(!self.drain, "already draining");
self.map.insert(key, stream);
if let Some(waker) = &self.waker {
waker.wake_by_ref();
}
}
}
impl<K: Clone + Unpin, V: Stream + Unpin> Stream for WellBehavedStreamMap<K, V> {
type Item = (K, V::Item);
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> {
match Pin::new(&mut self.map).poll_next(cx) {
Poll::Ready(None) if self.drain => {
Poll::Ready(None)
}
Poll::Pending | Poll::Ready(None) => {
self.waker = Some(cx.waker().clone());
Poll::Pending
}
Poll::Ready(Some(item)) => {
self.waker = None;
Poll::Ready(Some(item))
}
}
}
}
#[tokio::test]
async fn test_canceller_mut_streammap() {
let inputs = futures::stream::iter(0..5).then(|i| async move {
sleep(Duration::from_millis(1)).await;
i
});
let mut outputs = Vec::new();
join!(
i in inputs => {
stream_map.with_pin_mut(|stream_map| {
stream_map.unwrap().insert(i, futures::stream::iter(vec![i; i]));
});
} finally {
stream_map.with_pin_mut(|stream_map| {
stream_map.unwrap().start_drain();
});
},
stream_map: (_k, v) in WellBehavedStreamMap::new() => outputs.push(v),
);
outputs.sort();
assert_eq!(outputs, [1, 2, 2, 3, 3, 3, 4, 4, 4, 4]);
}
#[tokio::test]
async fn test_potentially_ambiguous_colons() {
let ret = join!(
ready(0),
core::future::ready(1),
::core::future::ready(2),
label3: ready(3),
label4: core::future::ready(4),
label5: ::core::future::ready(5),
);
assert_eq!(ret, (0, 1, 2, Some(3), Some(4), Some(5)));
}
#[tokio::test]
async fn test_finally_values() {
let ret = join!(
maybe _ in futures::stream::iter([()]) => {},
maybe _ in futures::stream::iter([()]) => {} finally 99,
_ in futures::stream::iter([()]) => {},
_ in futures::stream::iter([()]) => {} finally 42,
maybe _ in futures::stream::iter([()]) => {} finally 1_000_000,
);
assert_eq!(ret, ((), None, (), 42, None));
}
#[tokio::test]
async fn test_await_in_bodies() {
let mut x = 0;
let ret = join!(
y = ready(1) => {
x += y;
ready(42).await
},
z in stream::iter([10]) => {
x += z;
sleep(Duration::from_millis(1)).await;
} finally {
x += 100;
ready(99).await;
x
},
);
assert_eq!(ret, (42, 111));
}
#[tokio::test]
async fn test_return_in_bodies() {
fn io_fail() -> std::io::Result<i32> {
Err(std::io::Error::new(std::io::ErrorKind::NotFound, "fail!!!"))
}
async fn foo() -> std::io::Result<()> {
join!(
sleep(Duration::from_secs(1_000_000)),
_ = sleep(Duration::from_millis(10)) => {
io_fail()?;
}
);
Ok(())
}
assert!(foo().await.is_err());
}
#[tokio::test]
#[should_panic = "already mutably borrowed"]
async fn test_with_pin_mut_panic() {
join!(
foo: sleep(Duration::from_secs(1_000_000)),
_ = async {} => {
foo.with_pin_mut(|_| {
foo.with_pin_mut(|_| {});
});
},
);
}
#[tokio::test]
async fn test_manipulate_canceller_generically() {
fn do_something<T>(_: T) {}
join!(
foo: sleep(Duration::from_millis(1)),
async {
do_something(foo);
},
);
}
#[tokio::test]
async fn test_canceller_send_sync_when_future_is_send() {
fn assert_send_sync<T: Send + Sync>(_: &T) {}
join!(
foo: async {
let _x = std::cell::RefCell::new(());
sleep(Duration::from_millis(1)).await;
drop(_x);
},
_ = async {
assert_send_sync::<join_me_maybe::Canceller>(foo);
} => {
assert_send_sync::<join_me_maybe::CancellerMut<_>>(foo);
},
);
}
#[tokio::test]
async fn test_canceller_send_sync_when_future_is_not_send() {
fn assert_send_sync<T: Send + Sync>(_: &T) {}
join!(
foo: async {
let _x = std::rc::Rc::new(());
sleep(Duration::from_millis(1)).await;
drop(_x);
},
async {
assert_send_sync::<join_me_maybe::Canceller>(foo);
},
);
}
#[tokio::test]
async fn test_cancel_self() {
let mut did_write = false;
join!(
foo: async {
foo.cancel();
did_write = true;
sleep(Duration::from_secs(1_000_000)).await;
},
);
assert!(did_write);
}
#[tokio::test]
async fn test_circular_cancellation() {
let mut did_write1 = false;
let mut did_write2 = false;
join!(
foo: async {
bar.cancel();
sleep(Duration::from_millis(1)).await;
did_write1 = true;
},
bar: async {
foo.cancel();
did_write2 = true;
},
);
assert!(did_write1);
assert!(!did_write2);
}
#[tokio::test]
#[should_panic]
async fn test_panic_in_body() {
join!(
_ = ready(2) => todo!(),
_ in stream::iter([1, 2, 3]) => todo!() finally todo!(),
);
}
#[tokio::test]
async fn test_type_annotation_needed() {
join!(
x = ready(Some(true)) => {
let x: Option<bool> = x;
x.unwrap();
}
);
}
#[tokio::test]
async fn test_maybe_bodies_cancelled() {
let mut definitely_finished = false;
let mut definitely_body_finished = false;
let mut maybe_started = false;
let mut maybe_finished = false;
join!(
_ = async {
sleep(Duration::from_millis(10)).await;
definitely_finished = true;
} => {
definitely_body_finished = true;
},
maybe _ = ready(()) => {
maybe_started = true;
sleep(Duration::from_secs(1_000_000)).await;
maybe_finished = true;
},
);
assert!(definitely_finished);
assert!(definitely_body_finished);
assert!(maybe_started);
assert!(!maybe_finished);
}
#[tokio::test]
async fn test_cancelled_bodies_cancelled() {
let mut labeled_started = false;
let mut labeled_finished = false;
join!(
async {
sleep(Duration::from_millis(10)).await;
labeled.cancel();
},
labeled: _ = ready(()) => {
labeled_started = true;
sleep(Duration::from_secs(1_000_000)).await;
labeled_finished = true;
},
);
assert!(labeled_started);
assert!(!labeled_finished);
}
#[tokio::test]
async fn test_maybe_stream_body_cancelled() {
let mut body_started = false;
let mut body_finished = false;
join!(
sleep(Duration::from_millis(10)),
maybe x in stream::iter([1]) => {
_ = x;
body_started = true;
sleep(Duration::from_secs(1_000_000)).await;
body_finished = true;
},
);
assert!(body_started);
assert!(!body_finished);
}
#[tokio::test]
async fn test_maybe_finally_cancelled_mid_execution() {
let mut finally_started = false;
let mut finally_finished = false;
join!(
sleep(Duration::from_millis(10)),
maybe _ in stream::iter([()]) => {} finally {
finally_started = true;
sleep(Duration::from_secs(1_000_000)).await;
finally_finished = true;
42
},
);
assert!(finally_started);
assert!(!finally_finished);
}
#[tokio::test]
async fn test_labeled_maybe_body_cancelled_explicitly() {
let mut started = false;
let mut finished = false;
join!(
async {
sleep(Duration::from_millis(10)).await;
labeled.cancel();
},
labeled: maybe _ = ready(()) => {
started = true;
sleep(Duration::from_secs(1_000_000)).await;
finished = true;
},
);
assert!(started);
assert!(!finished);
}
#[tokio::test]
async fn test_body_self_cancellation() {
let mut started = false;
let mut finished = false;
join!(
foo: _ = ready(()) => {
started = true;
foo.cancel();
sleep(Duration::from_secs(1_000_000)).await;
finished = true;
},
);
assert!(started);
assert!(!finished);
}
#[tokio::test]
async fn test_multiple_maybe_bodies_all_cancelled() {
let mut started1 = false;
let mut finished1 = false;
let mut started2 = false;
let mut finished2 = false;
join!(
sleep(Duration::from_millis(10)),
maybe _ = ready(()) => {
started1 = true;
sleep(Duration::from_secs(1_000_000)).await;
finished1 = true;
},
maybe _ = ready(()) => {
started2 = true;
sleep(Duration::from_secs(1_000_000)).await;
finished2 = true;
},
);
assert!(started1);
assert!(!started2);
assert!(!finished1);
assert!(!finished2);
}
#[tokio::test]
async fn test_stream_body_cancelled_finally_skipped() {
let mut body_started = false;
let mut finally_ran = false;
let ret = join!(
sleep(Duration::from_millis(10)),
maybe _ in stream::iter([()]) => {
body_started = true;
sleep(Duration::from_secs(1_000_000)).await;
} finally {
finally_ran = true;
99
},
);
assert!(body_started);
assert!(!finally_ran);
assert_eq!(ret, ((), None));
}
#[tokio::test]
async fn test_maybe_body_skipped() {
let mut maybe_body_ran = false;
let mut definitely_body_ran = false;
let ret = join!(
maybe _ = ready(()) => {
maybe_body_ran = true;
},
_ = ready(()) => {
definitely_body_ran = true;
},
);
assert!(!maybe_body_ran);
assert!(definitely_body_ran);
assert_eq!(ret, (None, ()));
}
#[tokio::test]
async fn test_maybe_body_cancels_last_definitely_arm() {
let mut maybe_finished = false;
join!(
foo: sleep(Duration::from_secs(1_000_000)),
maybe _ = ready(()) => {
foo.cancel();
sleep(Duration::from_millis(10)).await;
maybe_finished = true;
},
);
assert!(!maybe_finished);
}
#[tokio::test]
async fn test_self_cancel_immediately_ready() {
let ret = join!(
foo: _ = ready(()) => {
foo.cancel();
42
},
);
assert_eq!(ret, (Some(42),));
}
#[tokio::test]
async fn test_labeled_stream_body_self_cancellation() {
join!(
foo: _ in stream::iter([1]) => {
foo.cancel();
sleep(Duration::from_secs(1_000_000)).await;
},
);
}
#[tokio::test]
async fn test_cancel_from_finally_block() {
let ret = join!(
bar: sleep(Duration::from_secs(1_000_000)),
_ in stream::iter([()]) => {} finally bar.cancel(),
);
assert_eq!(ret, (None, ()));
}
#[tokio::test]
async fn test_labeled_definitely_stream_body_cancelled() {
let mut body_started = false;
let mut body_finished = false;
let mut finally_ran = false;
let ret = join!(
async {
sleep(Duration::from_millis(10)).await;
labeled.cancel();
},
labeled: _ in stream::iter([1]) => {
body_started = true;
sleep(Duration::from_secs(1_000_000)).await;
body_finished = true;
} finally {
finally_ran = true;
99
},
);
assert!(body_started);
assert!(!body_finished);
assert!(!finally_ran);
assert_eq!(ret, ((), None));
}
#[tokio::test]
async fn test_dont_poll_streams_concurrently_with_their_own_bodies() {
let foo_running = Cell::new(false);
let foo = async || {
assert!(!foo_running.get(), "only one call to `foo` at a time");
foo_running.set(true);
sleep(Duration::from_millis(10)).await;
foo_running.set(false);
42
};
let foo_stream = stream::iter(0..3).then(|_| foo());
join!(
x in foo_stream => {
assert_eq!(x, 42);
foo().await;
},
);
}