event_scanner/test_utils/
macros.rs

1use alloy::primitives::LogData;
2use tokio_stream::Stream;
3
4use crate::{ScannerMessage, event_scanner::EventScannerResult};
5
6#[macro_export]
7macro_rules! assert_next {
8    // 1. Explicit Error Matching (Value based) - uses the new PartialEq implementation
9    ($stream: expr, Err($expected_err:expr)) => {
10        $crate::assert_next!($stream, Err($expected_err), timeout = 5)
11    };
12    ($stream: expr, Err($expected_err:expr), timeout = $secs: expr) => {
13        let message = tokio::time::timeout(
14            std::time::Duration::from_secs($secs),
15            tokio_stream::StreamExt::next(&mut $stream),
16        )
17        .await
18        .expect("timed out");
19        if let Some(msg) = message {
20            let expected = &$expected_err;
21            assert_eq!(&msg, expected, "Expected error {:?}, got {:?}", expected, msg);
22        } else {
23            panic!("Expected error {:?}, but channel was closed", $expected_err);
24        }
25    };
26
27    // 2. Success Matching (Implicit unwrapping) - existing behavior
28    ($stream: expr, $expected: expr) => {
29        $crate::assert_next!($stream, $expected, timeout = 5)
30    };
31    ($stream: expr, $expected: expr, timeout = $secs: expr) => {
32        let message = tokio::time::timeout(
33            std::time::Duration::from_secs($secs),
34            tokio_stream::StreamExt::next(&mut $stream),
35        )
36        .await
37        .expect("timed out");
38        let expected = $expected;
39        match message {
40            std::option::Option::Some(std::result::Result::Ok(msg)) => {
41                assert_eq!(msg, expected, "Expected {:?}, got {:?}", expected, msg);
42            }
43            std::option::Option::Some(std::result::Result::Err(e)) => {
44                panic!("Expected Ok({:?}), got Err({:?})", expected, e);
45            }
46            std::option::Option::None => {
47                panic!("Expected Ok({:?}), but channel was closed", expected);
48            }
49        }
50    };
51}
52
53#[macro_export]
54macro_rules! assert_closed {
55    ($stream: expr) => {
56        assert_closed!($stream, timeout = 5)
57    };
58    ($stream: expr, timeout = $secs: expr) => {
59        let message = tokio::time::timeout(
60            std::time::Duration::from_secs($secs),
61            tokio_stream::StreamExt::next(&mut $stream),
62        )
63        .await
64        .expect("timed out");
65        assert!(message.is_none())
66    };
67}
68
69#[macro_export]
70macro_rules! assert_empty {
71    ($stream: expr) => {{
72        let inner = $stream.into_inner();
73        assert!(inner.is_empty(), "Stream should have no pending messages");
74        tokio_stream::wrappers::ReceiverStream::new(inner)
75    }};
76}
77
78/// Asserts that a stream emits a specific sequence of events in order.
79///
80/// This macro consumes messages from a stream and verifies that the provided events are emitted
81/// in the exact order specified, regardless of how they are batched. The stream may emit events
82/// across multiple batches or all at once—the macro handles both cases. It ensures no unexpected
83/// events appear between the expected ones and that the sequence completes exactly as specified.
84///
85/// The macro accepts events of any type implementing [`SolEvent`](alloy::sol_types::SolEvent).
86/// Events are compared by their encoded log data, allowing flexible matching across different
87/// batch configurations while maintaining strict ordering requirements.
88///
89/// # Examples
90///
91/// ```no_run
92/// # use alloy::sol;
93/// sol! {
94///     event CountIncreased(uint256 newCount);
95/// }
96///
97/// #[tokio::test]
98/// async fn test_event_order() {
99///     // scanner setup...
100///
101///     let mut stream = scanner.subscribe(EventFilter::new().contract_address(contract_address));
102///
103///     // Assert these two events are emitted in order
104///     assert_event_sequence!(
105///         stream,
106///         &[
107///             CountIncreased { newCount: U256::from(1) },
108///             CountIncreased { newCount: U256::from(2) },
109///         ]
110///     );
111/// }
112/// ```
113///
114/// The assertion passes whether events arrive in separate batches or together:
115/// * **Separate batches**: `[Event1]`, then `[Event2]`
116/// * **Single batch**: `[Event1, Event2]`
117///
118/// # Panics
119///
120/// * **Timeout**: The stream doesn't produce the next expected event within the timeout period
121///   (default 5 seconds, configurable via `timeout = N` parameter).
122/// * **Wrong event**: The stream emits a different event than the next expected one in the
123///   sequence.
124/// * **Extra events**: The stream emits more events than expected after the sequence completes.
125/// * **Stream closed early**: The stream ends before all expected events are emitted.
126/// * **Wrong message type**: The stream yields a non-`Data` message (e.g., `Error` or `Status`)
127///   when an event is expected.
128/// * **Empty sequence**: The macro is called with an empty event collection (use `assert_empty!`
129///   instead).
130///
131/// On panic, the error message includes the remaining expected events for debugging.
132#[macro_export]
133macro_rules! assert_event_sequence {
134    // owned slices just pass to the borrowed slices variant
135    ($stream: expr, [$($event:expr),+ $(,)?]) => {
136        assert_event_sequence!($stream, &[$($event),+], timeout = 5)
137    };
138    ($stream: expr, [$($event:expr),+ $(,)?], timeout = $secs: expr) => {
139        assert_event_sequence!($stream, &[$($event),+], timeout = $secs)
140    };
141    // borrowed slices
142    ($stream: expr, &[$($event:expr),+ $(,)?]) => {
143        assert_event_sequence!($stream, &[$($event),+], timeout = 5)
144    };
145    ($stream: expr, &[$($event:expr),+ $(,)?], timeout = $secs: expr) => {
146        let expected_options = &[$(alloy::sol_types::SolEvent::encode_log_data(&$event)),+];
147
148       $crate::test_utils::macros::assert_event_sequence(&mut $stream, expected_options, $secs).await
149    };
150    // variables and non-slice expressions
151    ($stream: expr, $events: expr) => {
152        assert_event_sequence!($stream, $events, timeout = 5)
153    };
154    ($stream: expr, $events: expr, timeout = $secs: expr) => {
155        let expected_options = $events.iter().map(alloy::sol_types::SolEvent::encode_log_data).collect::<Vec<_>>();
156        if expected_options.is_empty() {
157            panic!("error: assert_event_sequence! called with an empty collection. Use assert_empty! macro instead to check for no pending messages.")
158        }
159        $crate::test_utils::macros::assert_event_sequence(&mut $stream, expected_options.iter(), $secs).await
160    };
161}
162
163/// Same as [`assert_event_sequence!`], but invokes [`assert_empty!`] at the end.
164#[macro_export]
165macro_rules! assert_event_sequence_final {
166    // owned slices
167    ($stream: expr, [$($event:expr),+ $(,)?]) => {{
168        assert_event_sequence_final!($stream, &[$($event),+])
169    }};
170    ($stream: expr, [$($event:expr),+ $(,)?], timeout = $secs: expr) => {{
171        assert_event_sequence_final!($stream, &[$($event),+], timeout = $secs)
172    }};
173    // borrowed slices
174    ($stream: expr, &[$($event:expr),+ $(,)?]) => {{
175        assert_event_sequence_final!($stream, &[$($event),+], timeout = 5)
176    }};
177    ($stream: expr, &[$($event:expr),+ $(,)?], timeout = $secs: expr) => {{
178        $crate::assert_event_sequence!($stream, &[$($event),+], timeout = $secs);
179        $crate::assert_empty!($stream)
180    }};
181    // variables and non-slice expressions
182    ($stream: expr, $events: expr) => {{
183        assert_event_sequence_final!($stream, $events, timeout = 5)
184    }};
185    ($stream: expr, $events: expr, timeout = $secs: expr) => {{
186        $crate::assert_event_sequence!($stream, $events, timeout = $secs);
187        $crate::assert_empty!($stream)
188    }};
189}
190
191#[allow(clippy::missing_panics_doc)]
192pub async fn assert_event_sequence<S: Stream<Item = EventScannerResult> + Unpin>(
193    stream: &mut S,
194    expected_options: impl IntoIterator<Item = &LogData>,
195    timeout_secs: u64,
196) {
197    let mut remaining = expected_options.into_iter();
198    let start = std::time::Instant::now();
199    let timeout_duration = std::time::Duration::from_secs(timeout_secs);
200
201    while let Some(expected) = remaining.next() {
202        let elapsed = start.elapsed();
203
204        assert!(
205            elapsed < timeout_duration,
206            "Timed out waiting for events. Still expecting: {:#?}",
207            remaining.collect::<Vec<_>>()
208        );
209
210        let time_left = timeout_duration - elapsed;
211        let message = tokio::time::timeout(time_left, tokio_stream::StreamExt::next(stream))
212            .await
213            .expect("timed out waiting for next batch");
214
215        match message {
216            Some(Ok(ScannerMessage::Data(batch))) => {
217                let mut batch = batch.iter();
218                let event = batch.next().expect("Streamed batch should not be empty");
219                assert_eq!(
220                    expected,
221                    event.data(),
222                    "\nRemaining: {:#?}\n",
223                    remaining.collect::<Vec<_>>()
224                );
225                while let Some(event) = batch.next() {
226                    let expected = remaining.next().unwrap_or_else(|| panic!("Received more events than expected.\nNext event: {:#?}\nStreamed remaining: {batch:#?}", event.data()));
227                    assert_eq!(
228                        expected,
229                        event.data(),
230                        "\nRemaining: {:#?}\n",
231                        remaining.collect::<Vec<_>>()
232                    );
233                }
234            }
235            Some(Ok(other)) => {
236                panic!("Expected Message::Data,\nGot: {other:#?}");
237            }
238            Some(Err(e)) => {
239                panic!("Expected Ok(Message::Data),\nGot Err: {e:#?}");
240            }
241            None => {
242                panic!(
243                    "Stream closed while still expecting:\n{:#?}",
244                    remaining.collect::<Vec<_>>()
245                );
246            }
247        }
248    }
249}
250
251/// Asserts that a stream of block ranges completely covers an expected block range.
252///
253/// This macro consumes messages from a stream and verifies that the block ranges received
254/// sequentially cover the entire `expected_range` without gaps or overlaps. Each streamed
255/// range must start exactly where the previous one ended, and all ranges must fit within
256/// the expected bounds.
257///
258/// The macro expects the stream to yield `ScannerMessage::Data(range)` variants containing
259/// `RangeInclusive<u64>` values representing block ranges. It tracks coverage by ensuring
260/// each new range starts at the next expected block number and doesn't exceed the end of
261/// the expected range. Once the entire range is covered, the assertion succeeds.
262///
263/// # Example
264///
265/// ```rust
266/// use event_scanner::{ScannerMessage, assert_range_coverage};
267/// use tokio::sync::mpsc;
268/// use tokio_stream::wrappers::ReceiverStream;
269///
270/// #[tokio::test]
271/// async fn test_scanner_covers_range() {
272///     let (tx, rx) = mpsc::channel(10);
273///     let mut stream = ReceiverStream::new(rx);
274///
275///     // Simulate a scanner that splits blocks 100-199 into chunks
276///     tokio::spawn(async move {
277///         tx.send(ScannerMessage::Data(100..=149)).await.unwrap();
278///         tx.send(ScannerMessage::Data(150..=199)).await.unwrap();
279///     });
280///
281///     // Assert that the stream covers blocks 100-199
282///     assert_range_coverage!(stream, 100..=199);
283/// }
284/// ```
285///
286/// # Panics
287///
288/// * **Timeout**: The stream doesn't produce the next expected range within the timeout period
289///   (default 5 seconds, configurable via `timeout = N` parameter).
290/// * **Gap or overlap**: A streamed range doesn't start exactly at the next expected block number,
291///   indicating a gap or overlap in coverage.
292/// * **Out of bounds**: A streamed range extends beyond the end of the expected range.
293/// * **Wrong message type**: The stream yields a non-`Data` message (e.g., `Error` or
294///   `Notification`) when a block range is expected.
295/// * **Stream closed early**: The stream ends before the entire expected range is covered.
296///
297/// On panic, the error message includes the expected remaining range and all previously
298/// streamed ranges for debugging.
299#[macro_export]
300macro_rules! assert_range_coverage {
301    ($stream: expr, $expected_range: expr) => {
302        assert_range_coverage!($stream, $expected_range, timeout = 5)
303    };
304    ($stream: expr, $expected_range: expr, timeout = $secs: expr) => {{
305        fn bounds<R: ::std::ops::RangeBounds<u64>>(range: &R) -> (u64, u64) {
306            let start_bound = match range.start_bound() {
307                ::std::ops::Bound::Unbounded => 0,
308                ::std::ops::Bound::Excluded(&x) => x + 1,
309                ::std::ops::Bound::Included(&x) => x,
310            };
311            let end_bound = match range.end_bound() {
312                ::std::ops::Bound::Unbounded => u64::MAX,
313                ::std::ops::Bound::Excluded(&x) => x - 1,
314                ::std::ops::Bound::Included(&x) => x,
315            };
316            (start_bound, end_bound)
317        }
318
319        let original_bounds = bounds(&$expected_range);
320        let (mut start, end) = original_bounds;
321
322        let start_time = ::std::time::Instant::now();
323        let timeout_duration = ::std::time::Duration::from_secs($secs);
324
325        // log all streamed ranges on failures
326        let mut streamed_ranges = vec![];
327
328        while start <= end {
329            let elapsed = start_time.elapsed();
330
331            assert!(elapsed < timeout_duration, "Timed out. Still expecting: {:#?}", start..=end,);
332
333            let time_left = timeout_duration - elapsed;
334            let message =
335                tokio::time::timeout(time_left, tokio_stream::StreamExt::next(&mut $stream))
336                    .await
337                    .expect("Timed out waiting for the next block range");
338
339            match message {
340                std::option::Option::Some(std::result::Result::Ok(event_scanner::ScannerMessage::Data(range))) => {
341                    let (streamed_start, streamed_end) = bounds(&range);
342                    streamed_ranges.push(range.clone());
343                    assert!(
344                        start == streamed_start && streamed_end <= end,
345                        "Unexpected range bounds, expected max. range: {:#?}, got: {:#?}\nPrevious streams:\n{:#?}",
346                        start..=end,
347                        range,
348                        streamed_ranges,
349                    );
350                    start = streamed_end + 1;
351                }
352                std::option::Option::Some(std::result::Result::Ok(other)) => {
353                    panic!("Expected a block range, got: {other:#?}");
354                }
355                std::option::Option::Some(std::result::Result::Err(e)) => {
356                    panic!("Expected Ok(Message::Data), got Err: {e:#?}");
357                }
358                std::option::Option::None => {
359                    panic!("Stream closed without covering range: {:#?}", start..=end);
360                }
361            }
362        }
363    }};
364}
365
366#[cfg(test)]
367mod tests {
368    use alloy::sol;
369    use tokio::sync::mpsc;
370    use tokio_stream::wrappers::ReceiverStream;
371
372    sol! {
373        #[derive(Debug)]
374        event Transfer(address indexed from, address indexed to, uint256 value);
375    }
376
377    #[tokio::test]
378    #[should_panic = "error: assert_event_sequence! called with an empty collection. Use assert_empty! macro instead to check for no pending messages."]
379    async fn assert_event_sequence_macro_with_empty_vec() {
380        let (_tx, rx) = mpsc::channel(10);
381        let mut stream = ReceiverStream::new(rx);
382
383        let empty_vec: Vec<Transfer> = Vec::new();
384        assert_event_sequence!(stream, empty_vec);
385    }
386
387    #[tokio::test]
388    #[should_panic = "error: assert_event_sequence! called with an empty collection. Use assert_empty! macro instead to check for no pending messages."]
389    async fn assert_event_sequence_macro_with_empty_slice() {
390        let (_tx, rx) = mpsc::channel(10);
391        let mut stream = ReceiverStream::new(rx);
392
393        let empty_vec: &[Transfer] = &[];
394        assert_event_sequence!(stream, empty_vec);
395    }
396}