use fluxion_core::StreamItem;
use futures::stream::StreamExt;
use futures::Stream;
use std::time::Duration;
use tokio::select;
use tokio::sync::mpsc::{unbounded_channel, UnboundedSender};
use tokio::time::sleep;
use tokio::time::timeout;
use tokio_stream::wrappers::UnboundedReceiverStream;
pub fn unwrap_value<T>(item: Option<StreamItem<T>>) -> T {
match item {
Some(StreamItem::Value(value)) => value,
Some(StreamItem::Error(e)) => panic!("Expected Value but got Error: {}", e),
None => panic!("Expected Value but stream ended"),
}
}
pub async fn unwrap_stream<T, S>(stream: &mut S, timeout_ms: u64) -> StreamItem<T>
where
S: Stream<Item = StreamItem<T>> + Unpin,
{
match timeout(Duration::from_millis(timeout_ms), stream.next()).await {
Ok(Some(item)) => item,
Ok(None) => panic!("Expected StreamItem but stream ended"),
Err(_) => panic!("Timeout: No item received within {} ms", timeout_ms),
}
}
pub fn test_channel<T: Send + 'static>(
) -> (UnboundedSender<T>, impl Stream<Item = StreamItem<T>> + Send) {
let (tx, rx) = unbounded_channel();
let stream = UnboundedReceiverStream::new(rx).map(StreamItem::Value);
(tx, stream)
}
pub fn test_channel_with_errors<T: Send + 'static>() -> (
UnboundedSender<StreamItem<T>>,
impl Stream<Item = StreamItem<T>> + Send,
) {
let (tx, rx) = unbounded_channel();
let stream = UnboundedReceiverStream::new(rx);
(tx, stream)
}
pub async fn assert_no_element_emitted<S, T>(stream: &mut S, timeout_ms: u64)
where
S: Stream<Item = T> + Unpin,
{
select! {
_state = stream.next() => {
panic!(
"Unexpected combination emitted, expected no output."
);
}
() = sleep(Duration::from_millis(timeout_ms)) => {
}
}
}
pub async fn assert_stream_ended<S, T>(stream: &mut S, timeout_ms: u64)
where
S: Stream<Item = T> + Unpin,
{
match timeout(Duration::from_millis(timeout_ms), stream.next()).await {
Ok(Some(_)) => panic!("Expected stream to end but it returned a value"),
Ok(None) => {} Err(_) => panic!("Timeout: Stream did not end within {} ms", timeout_ms),
}
}
#[macro_export]
macro_rules! with_timeout {
($test_body:expr) => {
timeout(Duration::from_secs(5), async { $test_body })
.await
.expect("Test timed out after 5 seconds")
};
}
#[cfg(test)]
mod tests {
use super::*;
use fluxion_core::FluxionError;
#[tokio::test]
async fn test_assert_no_element_emitted() {
let (_tx, mut stream) = test_channel::<i32>();
assert_no_element_emitted(&mut stream, 100).await;
}
#[tokio::test]
#[should_panic = "Timeout: No item received within 100 ms"]
async fn test_unwrap_stream_timeout() {
let (_tx, mut stream) = test_channel::<i32>();
unwrap_stream(&mut stream, 100).await;
}
#[tokio::test]
#[should_panic = "Expected StreamItem but stream ended"]
async fn test_unwrap_stream_empty() {
let (tx, mut stream) = test_channel::<i32>();
drop(tx);
unwrap_stream(&mut stream, 500).await;
}
#[tokio::test]
#[should_panic = "Expected Value but got Error: Stream processing error: injected error"]
async fn test_unwrap_stream_error_injected() {
let (tx, mut stream) = test_channel_with_errors::<i32>();
tx.send(StreamItem::Error(FluxionError::stream_error(
"injected error",
)))
.unwrap();
let item = unwrap_stream(&mut stream, 500).await;
unwrap_value(Some(item));
}
#[tokio::test]
async fn test_assert_stream_ended_success() {
let (tx, mut stream) = test_channel::<i32>();
drop(tx);
assert_stream_ended(&mut stream, 500).await;
}
#[tokio::test]
#[should_panic = "Expected stream to end but it returned a value"]
async fn test_assert_stream_ended_returns_value() {
let (tx, mut stream) = test_channel::<i32>();
tx.send(42).unwrap();
assert_stream_ended(&mut stream, 500).await;
}
#[tokio::test]
#[should_panic = "Timeout: Stream did not end within 100 ms"]
async fn test_assert_stream_ended_timeout() {
let (_tx, mut stream) = test_channel::<i32>();
assert_stream_ended(&mut stream, 100).await;
}
}