fluxion_test_utils/
helpers.rs

1// Copyright 2025 Umberto Gotti <umberto.gotti@umbertogotti.dev>
2// Licensed under the Apache License, Version 2.0
3// http://www.apache.org/licenses/LICENSE-2.0
4
5use core::fmt::Debug;
6use fluxion_core::StreamItem;
7use futures::channel::mpsc::{unbounded, UnboundedReceiver, UnboundedSender};
8use futures::stream::StreamExt;
9use futures::Stream;
10use std::time::Duration;
11use tokio::select;
12use tokio::time::sleep;
13
14/// Receives a value from an UnboundedReceiver with a timeout.
15///
16/// # Panics
17/// Panics if no item is received within the timeout.
18pub async fn recv_timeout<T>(rx: &mut UnboundedReceiver<T>, timeout_ms: u64) -> Option<T> {
19    select! {
20        item = rx.next() => item,
21        () = sleep(Duration::from_millis(timeout_ms)) => {
22            panic!("Timeout: No item received within {} ms", timeout_ms)
23        }
24    }
25}
26
27/// Asserts that no item is received from an UnboundedReceiver within a timeout.
28///
29/// # Panics
30/// Panics if an item is received within the timeout.
31pub async fn assert_no_recv<T>(rx: &mut UnboundedReceiver<T>, timeout_ms: u64) {
32    select! {
33        item = rx.next() => {
34            if item.is_some() { panic!("Unexpected item received within {} ms", timeout_ms) }
35        }
36        () = sleep(Duration::from_millis(timeout_ms)) => {
37            // Timeout occurred, which is success
38        }
39    }
40}
41
42/// Unwraps a `StreamItem::Value`, panicking if it's an error.
43///
44/// This helper eliminates the common `.unwrap().unwrap()` pattern in tests
45/// where you need to extract the value from both the `Option` and `StreamItem`.
46///
47/// # Panics
48///
49/// Panics if the `StreamItem` is an `Error` variant.
50///
51/// # Example
52///
53/// ```rust
54/// use fluxion_test_utils::{test_channel, unwrap_value, unwrap_stream, Sequenced};
55/// use fluxion_test_utils::test_data::person_alice;
56/// use futures::StreamExt;
57///
58/// # async fn example() {
59/// let (tx, mut stream) = test_channel();
60/// tx.unbounded_send(Sequenced::new(person_alice())).unwrap();
61///
62/// // Instead of: let item = stream.next().await.unwrap().unwrap();
63/// // Prefer the async helper which waits safely for spawned tasks:
64/// let item = unwrap_value(Some(unwrap_stream(&mut stream, 500).await));
65/// # }
66/// ```
67pub fn unwrap_value<T>(item: Option<StreamItem<T>>) -> T {
68    match item {
69        Some(StreamItem::Value(value)) => value,
70        Some(StreamItem::Error(e)) => panic!("Expected Value but got Error: {}", e),
71        None => panic!("Expected Value but stream ended"),
72    }
73}
74
75/// Unwraps a value from a stream with a timeout for spawned tasks to process.
76///
77/// This function polls the stream with a timeout to allow spawned background tasks
78/// time to process items. This is useful when testing streams that use `tokio::spawn`
79/// internally.
80///
81/// # Panics
82///
83/// Panics if:
84/// - The stream ends (returns `None`) before an item arrives
85/// - No item is received within the 500ms timeout
86///
87/// # Example
88///
89/// ```rust
90/// use fluxion_test_utils::{test_channel, unwrap_stream, Sequenced};
91/// use fluxion_test_utils::test_data::person_alice;
92/// use futures::StreamExt;
93///
94/// # async fn example() {
95/// let (tx, mut stream) = test_channel();
96/// tx.unbounded_send(Sequenced::new(person_alice())).unwrap();
97///
98/// // Waits up to 500ms for the item to arrive
99/// let item = unwrap_stream(&mut stream, 500).await;
100/// # }
101/// ```
102pub async fn unwrap_stream<T, S>(stream: &mut S, timeout_ms: u64) -> StreamItem<T>
103where
104    S: Stream<Item = StreamItem<T>> + Unpin,
105{
106    select! {
107        item = stream.next() => {
108            item.expect("Expected StreamItem but stream ended")
109        }
110        () = sleep(Duration::from_millis(timeout_ms)) => {
111            panic!("Timeout: No item received within {} ms", timeout_ms)
112        }
113    }
114}
115
116/// Creates a test channel that automatically wraps values in `StreamItem::Value`.
117///
118/// This helper simplifies test setup by handling the `StreamItem` wrapping automatically,
119/// allowing tests to send plain values while the stream receives `StreamItem<T>`.
120///
121/// # Example
122///
123/// ```rust
124/// use fluxion_test_utils::{test_channel, Sequenced};
125/// use fluxion_test_utils::test_data::person_alice;
126/// use futures::StreamExt;
127///
128/// # async fn example() {
129/// let (tx, mut stream) = test_channel();
130///
131/// // Send plain values
132/// tx.unbounded_send(Sequenced::new(person_alice())).unwrap();
133///
134/// // Receive StreamItem-wrapped values (prefer using `unwrap_stream` in async tests)
135/// // Option -> StreamItem -> Value
136/// // Example using the async helper:
137/// // let item = unwrap_value(Some(unwrap_stream(&mut stream, 500).await));
138/// # }
139/// ```
140pub fn test_channel<T: Send + 'static>(
141) -> (UnboundedSender<T>, impl Stream<Item = StreamItem<T>> + Send) {
142    let (tx, rx) = unbounded();
143    let stream = rx.map(StreamItem::Value);
144    (tx, stream)
145}
146
147/// Creates a test channel that accepts `StreamItem<T>` for testing error propagation.
148///
149/// This helper allows tests to explicitly send both values and errors through the stream,
150/// enabling comprehensive error handling tests.
151///
152/// # Example
153///
154/// ```rust
155/// use fluxion_test_utils::test_channel_with_errors;
156/// use fluxion_core::{StreamItem, FluxionError};
157/// use futures::StreamExt;
158///
159/// # async fn example() {
160/// let (tx, mut stream) = test_channel_with_errors();
161///
162/// // Send values
163/// tx.unbounded_send(StreamItem::Value(42)).unwrap();
164///
165/// // Send errors
166/// tx.unbounded_send(StreamItem::Error(FluxionError::stream_error("test error"))).unwrap();
167///
168/// let value = stream.next().await.unwrap();
169/// let error = stream.next().await.unwrap();
170/// # }
171/// ```
172pub fn test_channel_with_errors<T: Send + 'static>() -> (
173    UnboundedSender<StreamItem<T>>,
174    impl Stream<Item = StreamItem<T>> + Send,
175) {
176    let (tx, rx) = unbounded();
177    (tx, rx)
178}
179
180/// Assert that no element is emitted within the given timeout.
181///
182/// # Panics
183/// Panics if the stream emits an element before the timeout elapses.
184pub async fn assert_no_element_emitted<S, T>(stream: &mut S, timeout_ms: u64)
185where
186    S: Stream<Item = T> + Send + Unpin,
187    T: Debug,
188{
189    select! {
190        state = stream.next() => {
191            panic!(
192                "Unexpected combination emitted {:?}, expected no output.",
193                state
194            );
195        }
196        () = sleep(Duration::from_millis(timeout_ms)) => {
197        }
198    }
199}
200
201/// Assert that the stream has ended (returns None) within a timeout.
202///
203/// This prevents tests from hanging when checking if a stream has terminated.
204/// If the stream doesn't end within the timeout, the test will panic.
205///
206/// # Panics
207/// Panics if:
208/// - The stream returns a value instead of None
209/// - The stream doesn't end within the timeout
210///
211/// # Example
212///
213/// ```rust
214/// use fluxion_test_utils::{test_channel, assert_stream_ended};
215/// # async fn example() {
216/// let (tx, mut stream) = test_channel::<i32>();
217/// drop(tx); // Close the stream
218///
219/// // This will pass because the stream has ended
220/// assert_stream_ended(&mut stream, 500).await;
221/// # }
222/// ```
223pub async fn assert_stream_ended<S, T>(stream: &mut S, timeout_ms: u64)
224where
225    S: Stream<Item = T> + Unpin,
226{
227    select! {
228        item = stream.next() => {
229            if item.is_some() { panic!("Expected stream to end but it returned a value") }
230        }
231        () = sleep(Duration::from_millis(timeout_ms)) => {
232            panic!("Timeout: Stream did not end within {} ms", timeout_ms)
233        }
234    }
235}
236
237/// Collects all values from a stream until it times out waiting for the next item.
238///
239/// This function repeatedly polls the stream with a per-item timeout. It collects
240/// all `StreamItem::Value` items, ignoring errors, until no more items arrive within
241/// the timeout period.
242///
243/// # Returns
244/// A `Vec<T>` containing all the inner values from `StreamItem::Value` items.
245///
246/// # Example
247///
248/// ```rust
249/// use fluxion_test_utils::{test_channel, unwrap_all, Sequenced};
250/// use fluxion_test_utils::test_data::{person_alice, person_bob};
251///
252/// # async fn example() {
253/// let (tx, mut stream) = test_channel();
254/// tx.unbounded_send(Sequenced::new(person_alice())).unwrap();
255/// tx.unbounded_send(Sequenced::new(person_bob())).unwrap();
256/// drop(tx);
257///
258/// let results = unwrap_all(&mut stream, 100).await;
259/// assert_eq!(results.len(), 2);
260/// # }
261/// ```
262pub async fn unwrap_all<T, S>(stream: &mut S, timeout_ms: u64) -> Vec<T>
263where
264    S: Stream<Item = StreamItem<T>> + Unpin,
265{
266    let mut results = Vec::new();
267    loop {
268        let item = select! {
269            item = stream.next() => item,
270            () = sleep(Duration::from_millis(timeout_ms)) => break,
271        };
272
273        match item {
274            Some(StreamItem::Value(val)) => results.push(val),
275            Some(StreamItem::Error(_)) => continue, // Ignore errors
276            None => break,                          // Stream ended
277        }
278    }
279    results
280}
281
282/// Macro to wrap test bodies with timeout to prevent hanging tests
283#[macro_export]
284macro_rules! with_timeout {
285    ($test_body:expr) => {
286        timeout(Duration::from_secs(5), async { $test_body })
287            .await
288            .expect("Test timed out after 5 seconds")
289    };
290}