1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
// Copyright 2025 Umberto Gotti <umberto.gotti@umbertogotti.dev>
// Licensed under the Apache License, Version 2.0
// http://www.apache.org/licenses/LICENSE-2.0
use Debug;
use StreamItem;
use ;
use StreamExt;
use Stream;
use Duration;
use select;
use sleep;
/// Receives a value from an UnboundedReceiver with a timeout.
///
/// # Panics
/// Panics if no item is received within the timeout.
pub async
/// Asserts that no item is received from an UnboundedReceiver within a timeout.
///
/// # Panics
/// Panics if an item is received within the timeout.
pub async
/// Unwraps a `StreamItem::Value`, panicking if it's an error.
///
/// This helper eliminates the common `.unwrap().unwrap()` pattern in tests
/// where you need to extract the value from both the `Option` and `StreamItem`.
///
/// # Panics
///
/// Panics if the `StreamItem` is an `Error` variant.
///
/// # Example
///
/// ```rust
/// use fluxion_test_utils::{test_channel, unwrap_value, unwrap_stream, Sequenced};
/// use fluxion_test_utils::test_data::person_alice;
/// use futures::StreamExt;
///
/// # async fn example() {
/// let (tx, mut stream) = test_channel();
/// tx.unbounded_send(Sequenced::new(person_alice())).unwrap();
///
/// // Instead of: let item = stream.next().await.unwrap().unwrap();
/// // Prefer the async helper which waits safely for spawned tasks:
/// let item = unwrap_value(Some(unwrap_stream(&mut stream, 500).await));
/// # }
/// ```
/// Unwraps a value from a stream with a timeout for spawned tasks to process.
///
/// This function polls the stream with a timeout to allow spawned background tasks
/// time to process items. This is useful when testing streams that use `tokio::spawn`
/// internally.
///
/// # Panics
///
/// Panics if:
/// - The stream ends (returns `None`) before an item arrives
/// - No item is received within the 500ms timeout
///
/// # Example
///
/// ```rust
/// use fluxion_test_utils::{test_channel, unwrap_stream, Sequenced};
/// use fluxion_test_utils::test_data::person_alice;
/// use futures::StreamExt;
///
/// # async fn example() {
/// let (tx, mut stream) = test_channel();
/// tx.unbounded_send(Sequenced::new(person_alice())).unwrap();
///
/// // Waits up to 500ms for the item to arrive
/// let item = unwrap_stream(&mut stream, 500).await;
/// # }
/// ```
pub async
/// Creates a test channel that automatically wraps values in `StreamItem::Value`.
///
/// This helper simplifies test setup by handling the `StreamItem` wrapping automatically,
/// allowing tests to send plain values while the stream receives `StreamItem<T>`.
///
/// # Example
///
/// ```rust
/// use fluxion_test_utils::{test_channel, Sequenced};
/// use fluxion_test_utils::test_data::person_alice;
/// use futures::StreamExt;
///
/// # async fn example() {
/// let (tx, mut stream) = test_channel();
///
/// // Send plain values
/// tx.unbounded_send(Sequenced::new(person_alice())).unwrap();
///
/// // Receive StreamItem-wrapped values (prefer using `unwrap_stream` in async tests)
/// // Option -> StreamItem -> Value
/// // Example using the async helper:
/// // let item = unwrap_value(Some(unwrap_stream(&mut stream, 500).await));
/// # }
/// ```
/// Creates a test channel that accepts `StreamItem<T>` for testing error propagation.
///
/// This helper allows tests to explicitly send both values and errors through the stream,
/// enabling comprehensive error handling tests.
///
/// # Example
///
/// ```rust
/// use fluxion_test_utils::test_channel_with_errors;
/// use fluxion_core::{StreamItem, FluxionError};
/// use futures::StreamExt;
///
/// # async fn example() {
/// let (tx, mut stream) = test_channel_with_errors();
///
/// // Send values
/// tx.unbounded_send(StreamItem::Value(42)).unwrap();
///
/// // Send errors
/// tx.unbounded_send(StreamItem::Error(FluxionError::stream_error("test error"))).unwrap();
///
/// let value = stream.next().await.unwrap();
/// let error = stream.next().await.unwrap();
/// # }
/// ```
/// Assert that no element is emitted within the given timeout.
///
/// # Panics
/// Panics if the stream emits an element before the timeout elapses.
pub async
/// Assert that the stream has ended (returns None) within a timeout.
///
/// This prevents tests from hanging when checking if a stream has terminated.
/// If the stream doesn't end within the timeout, the test will panic.
///
/// # Panics
/// Panics if:
/// - The stream returns a value instead of None
/// - The stream doesn't end within the timeout
///
/// # Example
///
/// ```rust
/// use fluxion_test_utils::{test_channel, assert_stream_ended};
/// # async fn example() {
/// let (tx, mut stream) = test_channel::<i32>();
/// drop(tx); // Close the stream
///
/// // This will pass because the stream has ended
/// assert_stream_ended(&mut stream, 500).await;
/// # }
/// ```
pub async
/// Collects all values from a stream until it times out waiting for the next item.
///
/// This function repeatedly polls the stream with a per-item timeout. It collects
/// all `StreamItem::Value` items, ignoring errors, until no more items arrive within
/// the timeout period.
///
/// # Returns
/// A `Vec<T>` containing all the inner values from `StreamItem::Value` items.
///
/// # Example
///
/// ```rust
/// use fluxion_test_utils::{test_channel, unwrap_all, Sequenced};
/// use fluxion_test_utils::test_data::{person_alice, person_bob};
///
/// # async fn example() {
/// let (tx, mut stream) = test_channel();
/// tx.unbounded_send(Sequenced::new(person_alice())).unwrap();
/// tx.unbounded_send(Sequenced::new(person_bob())).unwrap();
/// drop(tx);
///
/// let results = unwrap_all(&mut stream, 100).await;
/// assert_eq!(results.len(), 2);
/// # }
/// ```
pub async
/// Macro to wrap test bodies with timeout to prevent hanging tests