use std::{pin::Pin, time::Duration};
use fast_ordered_buffer::FobStreamExt;
use futures::{
Stream, StreamExt,
channel::oneshot,
future::{self, Future},
stream,
};
use pretty_assertions::assert_eq;
use rand::{Rng, rng};
use tokio::time::sleep;
async fn run_with_timeout<F, Fut>(test_closure: F)
where
F: Fn() -> Fut + Clone + 'static,
Fut: Future<Output = ()> + 'static,
{
const TIMEOUT_SECS: u64 = 2;
tokio::time::timeout(Duration::from_secs(TIMEOUT_SECS), test_closure())
.await
.unwrap();
}
fn stream_of_instant_futures<T: Clone + Send + 'static>(
items: &[T],
) -> impl Stream<Item = impl Future<Output = T> + Send> {
let iter = items.iter().cloned();
stream::iter(iter).map(|x| future::ready(x))
}
fn stream_of_random_delayed_futures<T: Clone + Send + 'static>(
items: &[T],
max_delay_ms: u64,
) -> impl Stream<Item = impl Future<Output = T> + Send> {
let mut random_generator = rng();
let iter = items.iter().cloned();
stream::iter(iter).map(move |x| {
let delay = random_generator.random_range(0..=max_delay_ms);
async move {
sleep(Duration::from_millis(delay)).await;
x
}
})
}
async fn assert_stream_matches_expected<T: std::fmt::Debug + PartialEq>(
stream: impl Stream<Item = T> + Unpin,
expected: &[T],
) {
let results: Vec<_> = stream.collect().await;
assert_eq!(results, expected);
}
#[tokio::test]
async fn test_empty_stream() {
run_with_timeout(|| async {
let s = stream::empty::<future::Ready<()>>();
let mut buffered = s.fast_ordered_buffer(5);
assert_eq!(buffered.next().await, None);
})
.await;
}
#[tokio::test]
async fn test_single_item() {
run_with_timeout(|| async {
let items = vec![42];
let s = stream_of_instant_futures(&items);
let mut buffered = s.fast_ordered_buffer(1);
assert_eq!(buffered.next().await, Some(42));
assert_eq!(buffered.next().await, None);
})
.await;
}
#[tokio::test]
async fn test_ordering_for_instant_futures() {
run_with_timeout(|| async {
let items = vec![1, 2, 3, 4, 5];
let s = stream_of_instant_futures(&items);
let buffered = s.fast_ordered_buffer(2);
assert_stream_matches_expected(buffered, &items).await;
})
.await;
}
#[tokio::test]
async fn test_random_delays_small_concurrency() {
run_with_timeout(|| async {
let items = (1..=10).collect::<Vec<_>>();
let s = stream_of_random_delayed_futures(&items, 30);
let buffered = s.fast_ordered_buffer(2);
assert_stream_matches_expected(buffered, &items).await;
})
.await;
}
#[tokio::test]
async fn test_random_delays_larger_concurrency() {
run_with_timeout(|| async {
let items = (1..=10).collect::<Vec<_>>();
let s = stream_of_random_delayed_futures(&items, 30);
let buffered = s.fast_ordered_buffer(8);
assert_stream_matches_expected(buffered, &items).await;
})
.await;
}
#[tokio::test]
async fn test_does_not_block_new_futures_waiting_for_first() {
run_with_timeout(|| async {
let (tx1, rx1) = oneshot::channel();
let (tx2, rx2) = oneshot::channel();
let s = stream::iter(vec![
Box::pin(async move {
sleep(Duration::from_millis(100)).await;
tx1.send("first").unwrap();
"first"
}) as Pin<Box<dyn Future<Output = _> + Send>>,
Box::pin(async move {
sleep(Duration::from_millis(10)).await;
tx2.send("second").unwrap();
"second"
}) as Pin<Box<dyn Future<Output = _> + Send>>,
]);
let mut buffered = s.fast_ordered_buffer(2);
let first = buffered.next().await;
let second = buffered.next().await;
let none = buffered.next().await;
assert_eq!(first, Some("first"));
assert_eq!(second, Some("second"));
assert_eq!(none, None);
assert_eq!(rx1.await.unwrap(), "first");
assert_eq!(rx2.await.unwrap(), "second");
})
.await;
}
#[tokio::test]
async fn test_concurrency_larger_than_stream_len() {
run_with_timeout(|| async {
let items = vec![10, 20, 30];
let s = stream_of_instant_futures(&items);
let buffered = s.fast_ordered_buffer(9999);
assert_stream_matches_expected(buffered, &items).await;
})
.await;
}
#[tokio::test]
async fn test_size_hint() {
run_with_timeout(|| async {
let (tx1, rx1) = oneshot::channel();
let (tx2, rx2) = oneshot::channel();
let (tx3, rx3) = oneshot::channel();
let (tx4, rx4) = oneshot::channel();
let s = stream::iter(vec![
Box::pin(async move { rx1.await.unwrap() }) as Pin<Box<dyn Future<Output = _> + Send>>,
Box::pin(async move { rx3.await.unwrap() }) as Pin<Box<dyn Future<Output = _> + Send>>,
Box::pin(async move { rx2.await.unwrap() }) as Pin<Box<dyn Future<Output = _> + Send>>,
Box::pin(async move { rx4.await.unwrap() }) as Pin<Box<dyn Future<Output = _> + Send>>,
]);
let mut buffered = s.fast_ordered_buffer(2);
let (lower, upper) = buffered.size_hint();
assert_eq!(lower, 4);
assert_eq!(upper, Some(4));
tx1.send(10).unwrap();
assert_eq!(buffered.next().await, Some(10));
let (lower, upper) = buffered.size_hint();
assert_eq!(lower, 3);
assert_eq!(upper, Some(3));
tx2.send(20).unwrap();
let (lower, upper) = buffered.size_hint();
assert_eq!(lower, 3);
assert_eq!(upper, Some(3));
tx3.send(30).unwrap();
assert_eq!(buffered.next().await, Some(30));
let (lower, upper) = buffered.size_hint();
assert_eq!(lower, 2);
assert_eq!(upper, Some(2));
assert_eq!(buffered.next().await, Some(20));
let (lower, upper) = buffered.size_hint();
assert_eq!(lower, 1);
assert_eq!(upper, Some(1));
tx4.send(40).unwrap();
assert_eq!(buffered.next().await, Some(40));
let (lower, upper) = buffered.size_hint();
assert_eq!(lower, 0);
assert_eq!(upper, Some(0));
assert_eq!(buffered.next().await, None);
})
.await;
}
#[tokio::test]
async fn test_size_hint_many_pending() {
run_with_timeout(|| async {
let (tx1, rx1) = oneshot::channel();
let (tx2, rx2) = oneshot::channel();
let (tx3, rx3) = oneshot::channel();
let (tx4, rx4) = oneshot::channel();
let s = stream::iter(vec![
Box::pin(async move { rx1.await.unwrap() }) as Pin<Box<dyn Future<Output = _> + Send>>,
Box::pin(async move { rx2.await.unwrap() }) as Pin<Box<dyn Future<Output = _> + Send>>,
Box::pin(async move { rx3.await.unwrap() }) as Pin<Box<dyn Future<Output = _> + Send>>,
Box::pin(async move { rx4.await.unwrap() }) as Pin<Box<dyn Future<Output = _> + Send>>,
]);
let mut buffered = s.fast_ordered_buffer(2);
let (lower, upper) = buffered.size_hint();
assert_eq!(lower, 4);
assert_eq!(upper, Some(4));
tx4.send(40).unwrap();
let next = tokio::time::timeout(Duration::from_millis(10), buffered.next()).await;
assert!(next.is_err());
let (lower, upper) = buffered.size_hint();
assert_eq!(lower, 4);
assert_eq!(upper, Some(4));
tx3.send(30).unwrap();
let next = tokio::time::timeout(Duration::from_millis(10), buffered.next()).await;
assert!(next.is_err());
let (lower, upper) = buffered.size_hint();
assert_eq!(lower, 4);
assert_eq!(upper, Some(4));
tx2.send(20).unwrap();
let next = tokio::time::timeout(Duration::from_millis(10), buffered.next()).await;
assert!(next.is_err());
let (lower, upper) = buffered.size_hint();
assert_eq!(lower, 4);
assert_eq!(upper, Some(4));
tx1.send(10).unwrap();
assert_eq!(buffered.next().await, Some(10));
assert_eq!(buffered.next().await, Some(20));
assert_eq!(buffered.next().await, Some(30));
assert_eq!(buffered.next().await, Some(40));
})
.await;
}
#[tokio::test]
async fn test_large_out_of_order() {
run_with_timeout(|| async {
let items = (0..200).collect::<Vec<_>>();
let s = stream_of_random_delayed_futures(&items, 50);
let buffered = s.fast_ordered_buffer(10);
assert_stream_matches_expected(buffered, &items).await;
})
.await;
}
#[tokio::test]
async fn test_futures_returning_results() {
run_with_timeout(|| async {
let s = stream::iter(vec![
Box::pin(async { Ok::<_, &'static str>(1) })
as Pin<Box<dyn Future<Output = Result<_, _>> + Send>>,
Box::pin(async { Err::<i32, &'static str>("boom!") })
as Pin<Box<dyn Future<Output = Result<_, _>> + Send>>,
Box::pin(async { Ok::<_, &'static str>(3) })
as Pin<Box<dyn Future<Output = Result<_, _>> + Send>>,
]);
let mut buffered = s.fast_ordered_buffer(2);
let mut results = Vec::new();
while let Some(item) = buffered.next().await {
results.push(item);
}
assert_eq!(results, vec![Ok(1), Err("boom!"), Ok(3)]);
})
.await;
}
#[tokio::test]
async fn test_concurrency_zero() {
run_with_timeout(|| async {
let items = vec![1, 2, 3];
let s = stream_of_instant_futures(&items);
let mut buffered = s.fast_ordered_buffer(0);
assert_eq!(buffered.next().await, Some(1));
assert_eq!(buffered.next().await, Some(2));
assert_eq!(buffered.next().await, Some(3));
assert_eq!(buffered.next().await, None);
})
.await;
}