event_scanner/test_utils/
macros.rs1#[macro_export]
2macro_rules! assert_next {
3 ($stream: expr, $expected: expr) => {
4 assert_next!($stream, $expected, timeout = 5)
5 };
6 ($stream: expr, $expected: expr, timeout = $secs: expr) => {
7 let message = tokio::time::timeout(
8 std::time::Duration::from_secs($secs),
9 tokio_stream::StreamExt::next(&mut $stream),
10 )
11 .await
12 .expect("timed out");
13 if let Some(msg) = message {
14 assert_eq!(msg, $expected)
15 } else {
16 panic!("Expected {:?}, got: {message:?}", $expected)
17 }
18 };
19}
20
21#[macro_export]
22macro_rules! assert_closed {
23 ($stream: expr) => {
24 assert_closed!($stream, timeout = 5)
25 };
26 ($stream: expr, timeout = $secs: expr) => {
27 let message = tokio::time::timeout(
28 std::time::Duration::from_secs($secs),
29 tokio_stream::StreamExt::next(&mut $stream),
30 )
31 .await
32 .expect("timed out");
33 assert!(message.is_none())
34 };
35}
36
37#[macro_export]
38macro_rules! assert_empty {
39 ($stream: expr) => {{
40 let inner = $stream.into_inner();
41 assert!(inner.is_empty(), "Stream should have no pending messages");
42 tokio_stream::wrappers::ReceiverStream::new(inner)
43 }};
44}