use std::time::Duration;
use super::*;
use async_stream::stream;
pub fn gen_input_stream<T>(generator_vec: Vec<T>) -> PinnedInputStream<T>
where
T: Send + Sync + 'static,
{
let it = stream! {
for item in generator_vec {
yield item;
}
};
Box::pin(it)
}
pub fn gen_input_stream_with_delay<T>(
generator_vec: Vec<T>,
delay: Duration,
) -> PinnedInputStream<T>
where
T: Send + Sync + 'static,
{
let it = stream! {
for item in generator_vec {
tokio::time::sleep(delay).await;
yield item;
}
};
Box::pin(it)
}
#[tokio::test]
async fn test_gen_input_stream() {
use futures_util::StreamExt;
let mut input_stream = gen_input_stream(vec![1, 2, 3]);
for _ in 1..=3 {
input_stream.next().await;
}
pretty_assertions::assert_eq!(input_stream.next().await, None);
}
#[tokio::test]
async fn test_gen_input_stream_with_delay() {
use futures_util::StreamExt;
let delay = 100;
let start_time = std::time::Instant::now();
let mut input_stream = gen_input_stream_with_delay(vec![1, 2, 3], Duration::from_millis(delay));
for _ in 1..=3 {
input_stream.next().await;
}
let end_time = std::time::Instant::now();
pretty_assertions::assert_eq!(input_stream.next().await, None);
assert!(end_time - start_time >= Duration::from_millis(delay * 3));
}