use futures::{Stream, StreamExt};
use std::{collections::HashMap, hash::Hash, time::Duration};
#[allow(dead_code)]
#[allow(clippy::mutable_key_type)] pub fn get_item_counts<I>(items: I) -> HashMap<I::Item, usize>
where
I: IntoIterator,
I::Item: Hash + Eq,
{
items.into_iter().fold(HashMap::new(), |mut counts, item| {
let entry = counts.entry(item).or_insert(0);
*entry += 1;
counts
})
}
#[macro_export]
macro_rules! collect_stream {
($stream:expr, take=$take:expr, timeout=$timeout:expr $(,)?) => {{
use futures::{Stream, StreamExt};
use tokio::time;
let mut stream = &mut $stream;
let mut items = Vec::new();
loop {
if let Some(item) = time::timeout($timeout, stream.next()).await.expect(
format!(
"Timeout before stream could collect {} item(s). Got {} item(s).",
$take,
items.len()
)
.as_str(),
) {
items.push(item);
if items.len() == $take {
break items;
}
} else {
break items;
}
}
}};
($stream:expr, timeout=$timeout:expr $(,)?) => {{
use futures::StreamExt;
use tokio::time;
let mut items = Vec::new();
while let Some(item) = time::timeout($timeout, $stream.next())
.await
.expect(format!("Timeout before stream was closed. Got {} items.", items.len()).as_str())
{
items.push(item);
}
items
}};
}
#[macro_export]
macro_rules! collect_stream_count {
($stream:expr, take=$take:expr, timeout=$timeout:expr$(,)?) => {{
use std::collections::HashMap;
let items = $crate::collect_stream!($stream, take = $take, timeout = $timeout);
$crate::streams::get_item_counts(items)
}};
($stream:expr, timeout=$timeout:expr $(,)?) => {{
use std::collections::HashMap;
let items = $crate::collect_stream!($stream, timeout = $timeout);
$crate::streams::get_item_counts(items)
}};
}
pub async fn assert_in_stream<S, P, R>(stream: &mut S, mut predicate: P, timeout: Duration) -> R
where
S: Stream + Unpin,
P: FnMut(S::Item) -> Option<R>,
{
loop {
if let Some(item) = tokio::time::timeout(timeout, stream.next())
.await
.expect("Timeout before stream emitted")
{
if let Some(r) = (predicate)(item) {
break r;
}
} else {
panic!("Predicate did not return true before the stream ended");
}
}
}