event_scanner/test_utils/
macros.rs

1#[macro_export]
2macro_rules! assert_next {
3    ($stream: expr, $expected: expr) => {
4        assert_next!($stream, $expected, timeout = 5)
5    };
6    ($stream: expr, $expected: expr, timeout = $secs: expr) => {
7        let message = tokio::time::timeout(
8            std::time::Duration::from_secs($secs),
9            tokio_stream::StreamExt::next(&mut $stream),
10        )
11        .await
12        .expect("timed out");
13        if let Some(msg) = message {
14            assert_eq!(msg, $expected)
15        } else {
16            panic!("Expected {:?}, got: {message:?}", $expected)
17        }
18    };
19}
20
21#[macro_export]
22macro_rules! assert_closed {
23    ($stream: expr) => {
24        assert_closed!($stream, timeout = 5)
25    };
26    ($stream: expr, timeout = $secs: expr) => {
27        let message = tokio::time::timeout(
28            std::time::Duration::from_secs($secs),
29            tokio_stream::StreamExt::next(&mut $stream),
30        )
31        .await
32        .expect("timed out");
33        assert!(message.is_none())
34    };
35}
36
37#[macro_export]
38macro_rules! assert_empty {
39    ($stream: expr) => {{
40        let inner = $stream.into_inner();
41        assert!(inner.is_empty(), "Stream should have no pending messages");
42        tokio_stream::wrappers::ReceiverStream::new(inner)
43    }};
44}