Skip to main content

event_scanner/macros/
test_utils.rs

1use alloy::primitives::LogData;
2use tokio_stream::Stream;
3
4use crate::{ScannerMessage, event_scanner::EventScannerResult};
5
6#[macro_export]
7macro_rules! assert_next {
8    // 1. Explicit Error Matching (Value based) - uses the new PartialEq implementation
9    ($stream: expr, Err($expected_err:expr)) => {
10        $crate::assert_next!($stream, Err($expected_err), timeout = 5)
11    };
12    ($stream: expr, Err($expected_err:expr), timeout = $secs: expr) => {
13        let message = tokio::time::timeout(
14            std::time::Duration::from_secs($secs),
15            tokio_stream::StreamExt::next(&mut $stream),
16        )
17        .await
18        .expect("timed out");
19        if let Some(msg) = message {
20            let expected = &$expected_err;
21            assert_eq!(&msg, expected, "Expected error {:?}, got {:?}", expected, msg);
22        } else {
23            panic!("Expected error {:?}, but channel was closed", $expected_err);
24        }
25    };
26
27    // 2. Success Matching (Implicit unwrapping) - existing behavior
28    ($stream: expr, $expected: expr) => {
29        $crate::assert_next!($stream, $expected, timeout = 5)
30    };
31    ($stream: expr, $expected: expr, timeout = $secs: expr) => {
32        let message = tokio::time::timeout(
33            std::time::Duration::from_secs($secs),
34            tokio_stream::StreamExt::next(&mut $stream),
35        )
36        .await
37        .expect("timed out");
38        let expected = $expected;
39        match message {
40            std::option::Option::Some(std::result::Result::Ok(msg)) => {
41                assert_eq!(msg, expected, "Expected {:?}, got {:?}", expected, msg);
42            }
43            std::option::Option::Some(std::result::Result::Err(e)) => {
44                panic!("Expected Ok({:?}), got Err({:?})", expected, e);
45            }
46            std::option::Option::None => {
47                panic!("Expected Ok({:?}), but channel was closed", expected);
48            }
49        }
50    };
51}
52
53#[macro_export]
54macro_rules! assert_closed {
55    ($stream: expr) => {
56        assert_closed!($stream, timeout = 5)
57    };
58    ($stream: expr, timeout = $secs: expr) => {
59        let message = tokio::time::timeout(
60            std::time::Duration::from_secs($secs),
61            tokio_stream::StreamExt::next(&mut $stream),
62        )
63        .await
64        .expect("timed out");
65        assert!(message.is_none())
66    };
67}
68
69#[macro_export]
70macro_rules! assert_empty {
71    ($stream: expr) => {{
72        let inner = $stream.into_inner();
73        assert!(inner.is_empty(), "Stream should have no pending messages");
74        tokio_stream::wrappers::ReceiverStream::new(inner)
75    }};
76}
77
78/// Asserts that a stream emits a specific sequence of events in order.
79///
80/// This macro consumes messages from a stream and verifies that the provided events are emitted
81/// in the exact order specified, regardless of how they are batched. The stream may emit events
82/// across multiple batches or all at once—the macro handles both cases. It ensures no unexpected
83/// events appear between the expected ones and that the sequence completes exactly as specified.
84///
85/// The macro accepts events of any type implementing [`SolEvent`](alloy::sol_types::SolEvent).
86/// Events are compared by their encoded log data, allowing flexible matching across different
87/// batch configurations while maintaining strict ordering requirements.
88///
89/// # Examples
90///
91/// ```no_run
92/// # use alloy::sol;
93/// sol! {
94///     event CountIncreased(uint256 newCount);
95/// }
96///
97/// #[tokio::test]
98/// async fn test_event_order() {
99///     // scanner setup...
100///
101///     let subscription = scanner.subscribe(EventFilter::new().contract_address(contract_address));
102///     let handle = scanner.start().await.unwrap();
103///     let mut stream = subscription.stream(&handle);
104///
105///     // Assert these two events are emitted in order
106///     assert_event_sequence!(
107///         stream,
108///         &[
109///             CountIncreased { newCount: U256::from(1) },
110///             CountIncreased { newCount: U256::from(2) },
111///         ]
112///     );
113/// }
114/// ```
115///
116/// The assertion passes whether events arrive in separate batches or together:
117/// * **Separate batches**: `[Event1]`, then `[Event2]`
118/// * **Single batch**: `[Event1, Event2]`
119///
120/// # Panics
121///
122/// * **Timeout**: The stream doesn't produce the next expected event within the timeout period
123///   (default 5 seconds, configurable via `timeout = N` parameter).
124/// * **Wrong event**: The stream emits a different event than the next expected one in the
125///   sequence.
126/// * **Extra events**: The stream emits more events than expected after the sequence completes.
127/// * **Stream closed early**: The stream ends before all expected events are emitted.
128/// * **Wrong message type**: The stream yields a non-`Data` message (e.g., `Error` or `Status`)
129///   when an event is expected.
130/// * **Empty sequence**: The macro is called with an empty event collection (use `assert_empty!`
131///   instead).
132///
133/// On panic, the error message includes the remaining expected events for debugging.
134#[macro_export]
135macro_rules! assert_event_sequence {
136    // owned slices just pass to the borrowed slices variant
137    ($stream: expr, [$($event:expr),+ $(,)?]) => {
138        assert_event_sequence!($stream, &[$($event),+], timeout = 5)
139    };
140    ($stream: expr, [$($event:expr),+ $(,)?], timeout = $secs: expr) => {
141        assert_event_sequence!($stream, &[$($event),+], timeout = $secs)
142    };
143    // borrowed slices
144    ($stream: expr, &[$($event:expr),+ $(,)?]) => {
145        assert_event_sequence!($stream, &[$($event),+], timeout = 5)
146    };
147    ($stream: expr, &[$($event:expr),+ $(,)?], timeout = $secs: expr) => {
148        let expected_options = &[$(alloy::sol_types::SolEvent::encode_log_data(&$event)),+];
149
150       $crate::macros::test_utils::assert_event_sequence(&mut $stream, expected_options, $secs).await
151    };
152    // variables and non-slice expressions
153    ($stream: expr, $events: expr) => {
154        assert_event_sequence!($stream, $events, timeout = 5)
155    };
156    ($stream: expr, $events: expr, timeout = $secs: expr) => {
157        let expected_options = $events.iter().map(alloy::sol_types::SolEvent::encode_log_data).collect::<Vec<_>>();
158        if expected_options.is_empty() {
159            panic!("error: assert_event_sequence! called with an empty collection. Use assert_empty! macro instead to check for no pending messages.")
160        }
161        $crate::macros::test_utils::assert_event_sequence(&mut $stream, expected_options.iter(), $secs).await
162    };
163}
164
165/// Same as [`assert_event_sequence!`], but invokes [`assert_empty!`] at the end.
166#[macro_export]
167macro_rules! assert_event_sequence_final {
168    // owned slices
169    ($stream: expr, [$($event:expr),+ $(,)?]) => {{
170        assert_event_sequence_final!($stream, &[$($event),+])
171    }};
172    ($stream: expr, [$($event:expr),+ $(,)?], timeout = $secs: expr) => {{
173        assert_event_sequence_final!($stream, &[$($event),+], timeout = $secs)
174    }};
175    // borrowed slices
176    ($stream: expr, &[$($event:expr),+ $(,)?]) => {{
177        assert_event_sequence_final!($stream, &[$($event),+], timeout = 5)
178    }};
179    ($stream: expr, &[$($event:expr),+ $(,)?], timeout = $secs: expr) => {{
180        $crate::assert_event_sequence!($stream, &[$($event),+], timeout = $secs);
181        $crate::assert_empty!($stream)
182    }};
183    // variables and non-slice expressions
184    ($stream: expr, $events: expr) => {{
185        assert_event_sequence_final!($stream, $events, timeout = 5)
186    }};
187    ($stream: expr, $events: expr, timeout = $secs: expr) => {{
188        $crate::assert_event_sequence!($stream, $events, timeout = $secs);
189        $crate::assert_empty!($stream)
190    }};
191}
192
193#[allow(clippy::missing_panics_doc)]
194pub async fn assert_event_sequence<S: Stream<Item = EventScannerResult> + Unpin>(
195    stream: &mut S,
196    expected_options: impl IntoIterator<Item = &LogData>,
197    timeout_secs: u64,
198) {
199    let mut remaining = expected_options.into_iter();
200    let start = std::time::Instant::now();
201    let timeout_duration = std::time::Duration::from_secs(timeout_secs);
202
203    while let Some(expected) = remaining.next() {
204        let elapsed = start.elapsed();
205
206        assert!(
207            elapsed < timeout_duration,
208            "Timed out waiting for events. Still expecting: {:#?}",
209            remaining.collect::<Vec<_>>()
210        );
211
212        let time_left = timeout_duration - elapsed;
213        let message = tokio::time::timeout(time_left, tokio_stream::StreamExt::next(stream))
214            .await
215            .expect("timed out waiting for next batch");
216
217        match message {
218            Some(Ok(ScannerMessage::Data(batch))) => {
219                let mut batch = batch.iter();
220                let event = batch.next().expect("Streamed batch should not be empty");
221                assert_eq!(
222                    expected,
223                    event.data(),
224                    "\nRemaining: {:#?}\n",
225                    remaining.collect::<Vec<_>>()
226                );
227                while let Some(event) = batch.next() {
228                    let expected = remaining.next().unwrap_or_else(|| panic!("Received more events than expected.\nNext event: {:#?}\nStreamed remaining: {batch:#?}", event.data()));
229                    assert_eq!(
230                        expected,
231                        event.data(),
232                        "\nRemaining: {:#?}\n",
233                        remaining.collect::<Vec<_>>()
234                    );
235                }
236            }
237            Some(Ok(other)) => {
238                panic!("Expected Message::Data,\nGot: {other:#?}");
239            }
240            Some(Err(e)) => {
241                panic!("Expected Ok(Message::Data),\nGot Err: {e:#?}");
242            }
243            None => {
244                panic!(
245                    "Stream closed while still expecting:\n{:#?}",
246                    remaining.collect::<Vec<_>>()
247                );
248            }
249        }
250    }
251}
252
253/// Asserts that a stream of block ranges completely covers an expected block range.
254///
255/// This macro consumes messages from a stream and verifies that the block ranges received
256/// sequentially cover the entire `expected_range` without gaps or overlaps. Each streamed
257/// range must start exactly where the previous one ended, and all ranges must fit within
258/// the expected bounds.
259///
260/// The macro expects the stream to yield `ScannerMessage::Data(range)` variants containing
261/// `RangeInclusive<u64>` values representing block ranges. It tracks coverage by ensuring
262/// each new range starts at the next expected block number and doesn't exceed the end of
263/// the expected range. Once the entire range is covered, the assertion succeeds.
264///
265/// # Example
266///
267/// ```rust
268/// use event_scanner::{ScannerMessage, assert_range_coverage};
269/// use tokio::sync::mpsc;
270/// use tokio_stream::wrappers::ReceiverStream;
271///
272/// #[tokio::test]
273/// async fn test_scanner_covers_range() {
274///     let (tx, rx) = mpsc::channel(10);
275///     let mut stream = ReceiverStream::new(rx);
276///
277///     // Simulate a scanner that splits blocks 100-199 into chunks
278///     tokio::spawn(async move {
279///         tx.send(ScannerMessage::Data(100..=149)).await.unwrap();
280///         tx.send(ScannerMessage::Data(150..=199)).await.unwrap();
281///     });
282///
283///     // Assert that the stream covers blocks 100-199
284///     assert_range_coverage!(stream, 100..=199);
285/// }
286/// ```
287///
288/// # Panics
289///
290/// * **Timeout**: The stream doesn't produce the next expected range within the timeout period
291///   (default 5 seconds, configurable via `timeout = N` parameter).
292/// * **Gap or overlap**: A streamed range doesn't start exactly at the next expected block number,
293///   indicating a gap or overlap in coverage.
294/// * **Out of bounds**: A streamed range extends beyond the end of the expected range.
295/// * **Wrong message type**: The stream yields a non-`Data` message (e.g., `Error` or
296///   `Notification`) when a block range is expected.
297/// * **Stream closed early**: The stream ends before the entire expected range is covered.
298///
299/// On panic, the error message includes the expected remaining range and all previously
300/// streamed ranges for debugging.
301#[macro_export]
302macro_rules! assert_range_coverage {
303    ($stream: expr, $expected_range: expr) => {
304        assert_range_coverage!($stream, $expected_range, timeout = 5)
305    };
306    ($stream: expr, $expected_range: expr, timeout = $secs: expr) => {{
307        fn bounds<R: ::std::ops::RangeBounds<u64>>(range: &R) -> (u64, u64) {
308            let start_bound = match range.start_bound() {
309                ::std::ops::Bound::Unbounded => 0,
310                ::std::ops::Bound::Excluded(&x) => x + 1,
311                ::std::ops::Bound::Included(&x) => x,
312            };
313            let end_bound = match range.end_bound() {
314                ::std::ops::Bound::Unbounded => u64::MAX,
315                ::std::ops::Bound::Excluded(&x) => x - 1,
316                ::std::ops::Bound::Included(&x) => x,
317            };
318            (start_bound, end_bound)
319        }
320
321        let original_bounds = bounds(&$expected_range);
322        let (mut start, end) = original_bounds;
323
324        let start_time = ::std::time::Instant::now();
325        let timeout_duration = ::std::time::Duration::from_secs($secs);
326
327        // log all streamed ranges on failures
328        let mut streamed_ranges = vec![];
329
330        while start <= end {
331            let elapsed = start_time.elapsed();
332
333            assert!(elapsed < timeout_duration, "Timed out. Still expecting: {:#?}", start..=end,);
334
335            let time_left = timeout_duration - elapsed;
336            let message =
337                tokio::time::timeout(time_left, tokio_stream::StreamExt::next(&mut $stream))
338                    .await
339                    .expect("Timed out waiting for the next block range");
340
341            match message {
342                std::option::Option::Some(std::result::Result::Ok(event_scanner::ScannerMessage::Data(range))) => {
343                    let (streamed_start, streamed_end) = bounds(&range);
344                    streamed_ranges.push(range.clone());
345                    assert!(
346                        start == streamed_start && streamed_end <= end,
347                        "Unexpected range bounds, expected max. range: {:#?}, got: {:#?}\nPrevious streams:\n{:#?}",
348                        start..=end,
349                        range,
350                        streamed_ranges,
351                    );
352                    start = streamed_end + 1;
353                }
354                std::option::Option::Some(std::result::Result::Ok(other)) => {
355                    panic!("Expected a block range, got: {other:#?}");
356                }
357                std::option::Option::Some(std::result::Result::Err(e)) => {
358                    panic!("Expected Ok(Message::Data), got Err: {e:#?}");
359                }
360                std::option::Option::None => {
361                    panic!("Stream closed without covering range: {:#?}", start..=end);
362                }
363            }
364        }
365    }};
366}
367
368#[cfg(test)]
369mod tests {
370    use alloy::sol;
371    use tokio::sync::mpsc;
372    use tokio_stream::wrappers::ReceiverStream;
373
374    sol! {
375        #[derive(Debug)]
376        event Transfer(address indexed from, address indexed to, uint256 value);
377    }
378
379    #[tokio::test]
380    #[should_panic = "error: assert_event_sequence! called with an empty collection. Use assert_empty! macro instead to check for no pending messages."]
381    async fn assert_event_sequence_macro_with_empty_vec() {
382        let (_tx, rx) = mpsc::channel(10);
383        let mut stream = ReceiverStream::new(rx);
384
385        let empty_vec: Vec<Transfer> = Vec::new();
386        assert_event_sequence!(stream, empty_vec);
387    }
388
389    #[tokio::test]
390    #[should_panic = "error: assert_event_sequence! called with an empty collection. Use assert_empty! macro instead to check for no pending messages."]
391    async fn assert_event_sequence_macro_with_empty_slice() {
392        let (_tx, rx) = mpsc::channel(10);
393        let mut stream = ReceiverStream::new(rx);
394
395        let empty_vec: &[Transfer] = &[];
396        assert_event_sequence!(stream, empty_vec);
397    }
398}