event_scanner/test_utils/
macros.rs

1use alloy::primitives::LogData;
2use tokio_stream::Stream;
3
4use crate::{ScannerMessage, event_scanner::EventScannerResult};
5
6#[macro_export]
7macro_rules! assert_next {
8    // 1. Explicit Error Matching (Value based) - uses the new PartialEq implementation
9    ($stream: expr, Err($expected_err:expr)) => {
10        $crate::assert_next!($stream, Err($expected_err), timeout = 5)
11    };
12    ($stream: expr, Err($expected_err:expr), timeout = $secs: expr) => {
13        let message = tokio::time::timeout(
14            std::time::Duration::from_secs($secs),
15            tokio_stream::StreamExt::next(&mut $stream),
16        )
17        .await
18        .expect("timed out");
19        if let Some(msg) = message {
20            let expected = &$expected_err;
21            assert_eq!(&msg, expected, "Expected error {:?}, got {:?}", expected, msg);
22        } else {
23            panic!("Expected error {:?}, but channel was closed", $expected_err);
24        }
25    };
26
27    // 2. Success Matching (Implicit unwrapping) - existing behavior
28    ($stream: expr, $expected: expr) => {
29        $crate::assert_next!($stream, $expected, timeout = 5)
30    };
31    ($stream: expr, $expected: expr, timeout = $secs: expr) => {
32        let message = tokio::time::timeout(
33            std::time::Duration::from_secs($secs),
34            tokio_stream::StreamExt::next(&mut $stream),
35        )
36        .await
37        .expect("timed out");
38        let expected = $expected;
39        match message {
40            std::option::Option::Some(std::result::Result::Ok(msg)) => {
41                assert_eq!(msg, expected, "Expected {:?}, got {:?}", expected, msg);
42            }
43            std::option::Option::Some(std::result::Result::Err(e)) => {
44                panic!("Expected Ok({:?}), got Err({:?})", expected, e);
45            }
46            std::option::Option::None => {
47                panic!("Expected Ok({:?}), but channel was closed", expected);
48            }
49        }
50    };
51}
52
53#[macro_export]
54macro_rules! assert_closed {
55    ($stream: expr) => {
56        assert_closed!($stream, timeout = 5)
57    };
58    ($stream: expr, timeout = $secs: expr) => {
59        let message = tokio::time::timeout(
60            std::time::Duration::from_secs($secs),
61            tokio_stream::StreamExt::next(&mut $stream),
62        )
63        .await
64        .expect("timed out");
65        assert!(message.is_none())
66    };
67}
68
69#[macro_export]
70macro_rules! assert_empty {
71    ($stream: expr) => {{
72        let inner = $stream.into_inner();
73        assert!(inner.is_empty(), "Stream should have no pending messages");
74        tokio_stream::wrappers::ReceiverStream::new(inner)
75    }};
76}
77
78/// Asserts that a stream emits a specific sequence of events in order.
79///
80/// This macro consumes messages from a stream and verifies that the provided events are emitted
81/// in the exact order specified, regardless of how they are batched. The stream may emit events
82/// across multiple batches or all at once—the macro handles both cases. It ensures no unexpected
83/// events appear between the expected ones and that the sequence completes exactly as specified.
84///
85/// The macro accepts events of any type implementing [`SolEvent`](alloy::sol_types::SolEvent).
86/// Events are compared by their encoded log data, allowing flexible matching across different
87/// batch configurations while maintaining strict ordering requirements.
88///
89/// # Examples
90///
91/// ```no_run
92/// # use alloy::sol;
93/// sol! {
94///     event CountIncreased(uint256 newCount);
95/// }
96///
97/// #[tokio::test]
98/// async fn test_event_order() {
99///     // scanner setup...
100///
101///     let mut stream = scanner.subscribe(EventFilter::new().contract_address(contract_address));
102///
103///     // Assert these two events are emitted in order
104///     assert_event_sequence!(
105///         stream,
106///         &[
107///             CountIncreased { newCount: U256::from(1) },
108///             CountIncreased { newCount: U256::from(2) },
109///         ]
110///     );
111/// }
112/// ```
113///
114/// The assertion passes whether events arrive in separate batches or together:
115/// * **Separate batches**: `[Event1]`, then `[Event2]`
116/// * **Single batch**: `[Event1, Event2]`
117///
118/// # Panics
119///
120/// * **Timeout**: The stream doesn't produce the next expected event within the timeout period
121///   (default 5 seconds, configurable via `timeout = N` parameter).
122/// * **Wrong event**: The stream emits a different event than the next expected one in the
123///   sequence.
124/// * **Extra events**: The stream emits more events than expected after the sequence completes.
125/// * **Stream closed early**: The stream ends before all expected events are emitted.
126/// * **Wrong message type**: The stream yields a non-`Data` message (e.g., `Error` or `Status`)
127///   when an event is expected.
128/// * **Empty sequence**: The macro is called with an empty event collection (use `assert_empty!`
129///   instead).
130///
131/// On panic, the error message includes the remaining expected events for debugging.
132#[macro_export]
133macro_rules! assert_event_sequence {
134    // owned slices just pass to the borrowed slices variant
135    ($stream: expr, [$($event:expr),+ $(,)?]) => {
136        assert_event_sequence!($stream, &[$($event),+], timeout = 5)
137    };
138    ($stream: expr, [$($event:expr),+ $(,)?], timeout = $secs: expr) => {
139        assert_event_sequence!($stream, &[$($event),+], timeout = $secs)
140    };
141    // borrowed slices
142    ($stream: expr, &[$($event:expr),+ $(,)?]) => {
143        assert_event_sequence!($stream, &[$($event),+], timeout = 5)
144    };
145    ($stream: expr, &[$($event:expr),+ $(,)?], timeout = $secs: expr) => {
146        let expected_options = &[$(alloy::sol_types::SolEvent::encode_log_data(&$event)),+];
147
148       $crate::test_utils::macros::assert_event_sequence(&mut $stream, expected_options, $secs).await
149    };
150    // variables and non-slice expressions
151    ($stream: expr, $events: expr) => {
152        assert_event_sequence!($stream, $events, timeout = 5)
153    };
154    ($stream: expr, $events: expr, timeout = $secs: expr) => {
155        let expected_options = $events.iter().map(alloy::sol_types::SolEvent::encode_log_data).collect::<Vec<_>>();
156        if expected_options.is_empty() {
157            panic!("error: assert_event_sequence! called with an empty collection. Use assert_empty! macro instead to check for no pending messages.")
158        }
159        $crate::test_utils::macros::assert_event_sequence(&mut $stream, expected_options.iter(), $secs).await
160    };
161}
162
163/// Same as [`assert_event_sequence!`], but invokes [`assert_empty!`] at the end.
164#[macro_export]
165macro_rules! assert_event_sequence_final {
166    // owned slices
167    ($stream: expr, [$($event:expr),+ $(,)?]) => {{
168        assert_event_sequence_final!($stream, &[$($event),+])
169    }};
170    ($stream: expr, [$($event:expr),+ $(,)?], timeout = $secs: expr) => {{
171        assert_event_sequence_final!($stream, &[$($event),+], timeout = $secs)
172    }};
173    // borrowed slices
174    ($stream: expr, &[$($event:expr),+ $(,)?]) => {{
175        assert_event_sequence_final!($stream, &[$($event),+], timeout = 5)
176    }};
177    ($stream: expr, &[$($event:expr),+ $(,)?], timeout = $secs: expr) => {{
178        $crate::assert_event_sequence!($stream, &[$($event),+], timeout = $secs);
179        $crate::assert_empty!($stream)
180    }};
181    // variables and non-slice expressions
182    ($stream: expr, $events: expr) => {{
183        assert_event_sequence_final!($stream, $events, timeout = 5)
184    }};
185    ($stream: expr, $events: expr, timeout = $secs: expr) => {{
186        $crate::assert_event_sequence!($stream, $events, timeout = $secs);
187        $crate::assert_empty!($stream)
188    }};
189}
190
191#[allow(clippy::missing_panics_doc)]
192pub async fn assert_event_sequence<S: Stream<Item = EventScannerResult> + Unpin>(
193    stream: &mut S,
194    expected_options: impl IntoIterator<Item = &LogData>,
195    timeout_secs: u64,
196) {
197    let mut remaining = expected_options.into_iter();
198    let start = std::time::Instant::now();
199    let timeout_duration = std::time::Duration::from_secs(timeout_secs);
200
201    while let Some(expected) = remaining.next() {
202        let elapsed = start.elapsed();
203
204        assert!(
205            elapsed < timeout_duration,
206            "Timed out waiting for events. Still expecting: {:#?}",
207            remaining.collect::<Vec<_>>()
208        );
209
210        let time_left = timeout_duration - elapsed;
211        let message = tokio::time::timeout(time_left, tokio_stream::StreamExt::next(stream))
212            .await
213            .expect("timed out waiting for next batch");
214
215        match message {
216            Some(Ok(ScannerMessage::Data(batch))) => {
217                let mut batch = batch.iter();
218                let event = batch.next().expect("Streamed batch should not be empty");
219                assert_eq!(
220                    expected,
221                    event.data(),
222                    "\nRemaining: {:#?}\n",
223                    remaining.collect::<Vec<_>>()
224                );
225                while let Some(event) = batch.next() {
226                    let expected = remaining.next().unwrap_or_else(|| panic!("Received more events than expected.\nNext event: {:#?}\nStreamed remaining: {batch:#?}", event.data()));
227                    assert_eq!(
228                        expected,
229                        event.data(),
230                        "\nRemaining: {:#?}\n",
231                        remaining.collect::<Vec<_>>()
232                    );
233                }
234            }
235            Some(Ok(other)) => {
236                panic!("Expected Message::Data, got: {other:#?}");
237            }
238            Some(Err(e)) => {
239                panic!("Expected Ok(Message::Data), got Err: {e:#?}");
240            }
241            None => {
242                panic!("Stream closed while still expecting: {:#?}", remaining.collect::<Vec<_>>());
243            }
244        }
245    }
246}
247
248/// Asserts that a stream of block ranges completely covers an expected block range.
249///
250/// This macro consumes messages from a stream and verifies that the block ranges received
251/// sequentially cover the entire `expected_range` without gaps or overlaps. Each streamed
252/// range must start exactly where the previous one ended, and all ranges must fit within
253/// the expected bounds.
254///
255/// The macro expects the stream to yield `ScannerMessage::Data(range)` variants containing
256/// `RangeInclusive<u64>` values representing block ranges. It tracks coverage by ensuring
257/// each new range starts at the next expected block number and doesn't exceed the end of
258/// the expected range. Once the entire range is covered, the assertion succeeds.
259///
260/// # Example
261///
262/// ```rust
263/// use event_scanner::{ScannerMessage, assert_range_coverage};
264/// use tokio::sync::mpsc;
265/// use tokio_stream::wrappers::ReceiverStream;
266///
267/// #[tokio::test]
268/// async fn test_scanner_covers_range() {
269///     let (tx, rx) = mpsc::channel(10);
270///     let mut stream = ReceiverStream::new(rx);
271///
272///     // Simulate a scanner that splits blocks 100-199 into chunks
273///     tokio::spawn(async move {
274///         tx.send(ScannerMessage::Data(100..=149)).await.unwrap();
275///         tx.send(ScannerMessage::Data(150..=199)).await.unwrap();
276///     });
277///
278///     // Assert that the stream covers blocks 100-199
279///     assert_range_coverage!(stream, 100..=199);
280/// }
281/// ```
282///
283/// # Panics
284///
285/// * **Timeout**: The stream doesn't produce the next expected range within the timeout period
286///   (default 5 seconds, configurable via `timeout = N` parameter).
287/// * **Gap or overlap**: A streamed range doesn't start exactly at the next expected block number,
288///   indicating a gap or overlap in coverage.
289/// * **Out of bounds**: A streamed range extends beyond the end of the expected range.
290/// * **Wrong message type**: The stream yields a non-`Data` message (e.g., `Error` or
291///   `Notification`) when a block range is expected.
292/// * **Stream closed early**: The stream ends before the entire expected range is covered.
293///
294/// On panic, the error message includes the expected remaining range and all previously
295/// streamed ranges for debugging.
296#[macro_export]
297macro_rules! assert_range_coverage {
298    ($stream: expr, $expected_range: expr) => {
299        assert_range_coverage!($stream, $expected_range, timeout = 5)
300    };
301    ($stream: expr, $expected_range: expr, timeout = $secs: expr) => {{
302        fn bounds<R: ::std::ops::RangeBounds<u64>>(range: &R) -> (u64, u64) {
303            let start_bound = match range.start_bound() {
304                ::std::ops::Bound::Unbounded => 0,
305                ::std::ops::Bound::Excluded(&x) => x + 1,
306                ::std::ops::Bound::Included(&x) => x,
307            };
308            let end_bound = match range.end_bound() {
309                ::std::ops::Bound::Unbounded => u64::MAX,
310                ::std::ops::Bound::Excluded(&x) => x - 1,
311                ::std::ops::Bound::Included(&x) => x,
312            };
313            (start_bound, end_bound)
314        }
315
316        let original_bounds = bounds(&$expected_range);
317        let (mut start, end) = original_bounds;
318
319        let start_time = ::std::time::Instant::now();
320        let timeout_duration = ::std::time::Duration::from_secs($secs);
321
322        // log all streamed ranges on failures
323        let mut streamed_ranges = vec![];
324
325        while start <= end {
326            let elapsed = start_time.elapsed();
327
328            assert!(elapsed < timeout_duration, "Timed out. Still expecting: {:#?}", start..=end,);
329
330            let time_left = timeout_duration - elapsed;
331            let message =
332                tokio::time::timeout(time_left, tokio_stream::StreamExt::next(&mut $stream))
333                    .await
334                    .expect("Timed out waiting for the next block range");
335
336            match message {
337                std::option::Option::Some(std::result::Result::Ok(event_scanner::ScannerMessage::Data(range))) => {
338                    let (streamed_start, streamed_end) = bounds(&range);
339                    streamed_ranges.push(range.clone());
340                    assert!(
341                        start == streamed_start && streamed_end <= end,
342                        "Unexpected range bounds, expected max. range: {:#?}, got: {:#?}\nPrevious streams:\n{:#?}",
343                        start..=end,
344                        range,
345                        streamed_ranges,
346                    );
347                    start = streamed_end + 1;
348                }
349                std::option::Option::Some(std::result::Result::Ok(other)) => {
350                    panic!("Expected a block range, got: {other:#?}");
351                }
352                std::option::Option::Some(std::result::Result::Err(e)) => {
353                    panic!("Expected Ok(Message::Data), got Err: {e:#?}");
354                }
355                std::option::Option::None => {
356                    panic!("Stream closed without covering range: {:#?}", start..=end);
357                }
358            }
359        }
360    }};
361}
362
363#[cfg(test)]
364mod tests {
365    use alloy::sol;
366    use tokio::sync::mpsc;
367    use tokio_stream::wrappers::ReceiverStream;
368
369    sol! {
370        #[derive(Debug)]
371        event Transfer(address indexed from, address indexed to, uint256 value);
372    }
373
374    #[tokio::test]
375    #[should_panic = "error: assert_event_sequence! called with an empty collection. Use assert_empty! macro instead to check for no pending messages."]
376    async fn assert_event_sequence_macro_with_empty_vec() {
377        let (_tx, rx) = mpsc::channel(10);
378        let mut stream = ReceiverStream::new(rx);
379
380        let empty_vec: Vec<Transfer> = Vec::new();
381        assert_event_sequence!(stream, empty_vec);
382    }
383
384    #[tokio::test]
385    #[should_panic = "error: assert_event_sequence! called with an empty collection. Use assert_empty! macro instead to check for no pending messages."]
386    async fn assert_event_sequence_macro_with_empty_slice() {
387        let (_tx, rx) = mpsc::channel(10);
388        let mut stream = ReceiverStream::new(rx);
389
390        let empty_vec: &[Transfer] = &[];
391        assert_event_sequence!(stream, empty_vec);
392    }
393}