event_scanner/test_utils/
macros.rs1use alloy::primitives::LogData;
2use tokio_stream::Stream;
3
4use crate::{ScannerMessage, event_scanner::EventScannerResult};
5
6#[macro_export]
7macro_rules! assert_next {
8 ($stream: expr, Err($expected_err:expr)) => {
10 $crate::assert_next!($stream, Err($expected_err), timeout = 5)
11 };
12 ($stream: expr, Err($expected_err:expr), timeout = $secs: expr) => {
13 let message = tokio::time::timeout(
14 std::time::Duration::from_secs($secs),
15 tokio_stream::StreamExt::next(&mut $stream),
16 )
17 .await
18 .expect("timed out");
19 if let Some(msg) = message {
20 let expected = &$expected_err;
21 assert_eq!(&msg, expected, "Expected error {:?}, got {:?}", expected, msg);
22 } else {
23 panic!("Expected error {:?}, but channel was closed", $expected_err);
24 }
25 };
26
27 ($stream: expr, $expected: expr) => {
29 $crate::assert_next!($stream, $expected, timeout = 5)
30 };
31 ($stream: expr, $expected: expr, timeout = $secs: expr) => {
32 let message = tokio::time::timeout(
33 std::time::Duration::from_secs($secs),
34 tokio_stream::StreamExt::next(&mut $stream),
35 )
36 .await
37 .expect("timed out");
38 let expected = $expected;
39 match message {
40 std::option::Option::Some(std::result::Result::Ok(msg)) => {
41 assert_eq!(msg, expected, "Expected {:?}, got {:?}", expected, msg);
42 }
43 std::option::Option::Some(std::result::Result::Err(e)) => {
44 panic!("Expected Ok({:?}), got Err({:?})", expected, e);
45 }
46 std::option::Option::None => {
47 panic!("Expected Ok({:?}), but channel was closed", expected);
48 }
49 }
50 };
51}
52
53#[macro_export]
54macro_rules! assert_closed {
55 ($stream: expr) => {
56 assert_closed!($stream, timeout = 5)
57 };
58 ($stream: expr, timeout = $secs: expr) => {
59 let message = tokio::time::timeout(
60 std::time::Duration::from_secs($secs),
61 tokio_stream::StreamExt::next(&mut $stream),
62 )
63 .await
64 .expect("timed out");
65 assert!(message.is_none())
66 };
67}
68
69#[macro_export]
70macro_rules! assert_empty {
71 ($stream: expr) => {{
72 let inner = $stream.into_inner();
73 assert!(inner.is_empty(), "Stream should have no pending messages");
74 tokio_stream::wrappers::ReceiverStream::new(inner)
75 }};
76}
77
78#[macro_export]
133macro_rules! assert_event_sequence {
134 ($stream: expr, [$($event:expr),+ $(,)?]) => {
136 assert_event_sequence!($stream, &[$($event),+], timeout = 5)
137 };
138 ($stream: expr, [$($event:expr),+ $(,)?], timeout = $secs: expr) => {
139 assert_event_sequence!($stream, &[$($event),+], timeout = $secs)
140 };
141 ($stream: expr, &[$($event:expr),+ $(,)?]) => {
143 assert_event_sequence!($stream, &[$($event),+], timeout = 5)
144 };
145 ($stream: expr, &[$($event:expr),+ $(,)?], timeout = $secs: expr) => {
146 let expected_options = &[$(alloy::sol_types::SolEvent::encode_log_data(&$event)),+];
147
148 $crate::test_utils::macros::assert_event_sequence(&mut $stream, expected_options, $secs).await
149 };
150 ($stream: expr, $events: expr) => {
152 assert_event_sequence!($stream, $events, timeout = 5)
153 };
154 ($stream: expr, $events: expr, timeout = $secs: expr) => {
155 let expected_options = $events.iter().map(alloy::sol_types::SolEvent::encode_log_data).collect::<Vec<_>>();
156 if expected_options.is_empty() {
157 panic!("error: assert_event_sequence! called with an empty collection. Use assert_empty! macro instead to check for no pending messages.")
158 }
159 $crate::test_utils::macros::assert_event_sequence(&mut $stream, expected_options.iter(), $secs).await
160 };
161}
162
163#[macro_export]
165macro_rules! assert_event_sequence_final {
166 ($stream: expr, [$($event:expr),+ $(,)?]) => {{
168 assert_event_sequence_final!($stream, &[$($event),+])
169 }};
170 ($stream: expr, [$($event:expr),+ $(,)?], timeout = $secs: expr) => {{
171 assert_event_sequence_final!($stream, &[$($event),+], timeout = $secs)
172 }};
173 ($stream: expr, &[$($event:expr),+ $(,)?]) => {{
175 assert_event_sequence_final!($stream, &[$($event),+], timeout = 5)
176 }};
177 ($stream: expr, &[$($event:expr),+ $(,)?], timeout = $secs: expr) => {{
178 $crate::assert_event_sequence!($stream, &[$($event),+], timeout = $secs);
179 $crate::assert_empty!($stream)
180 }};
181 ($stream: expr, $events: expr) => {{
183 assert_event_sequence_final!($stream, $events, timeout = 5)
184 }};
185 ($stream: expr, $events: expr, timeout = $secs: expr) => {{
186 $crate::assert_event_sequence!($stream, $events, timeout = $secs);
187 $crate::assert_empty!($stream)
188 }};
189}
190
191#[allow(clippy::missing_panics_doc)]
192pub async fn assert_event_sequence<S: Stream<Item = EventScannerResult> + Unpin>(
193 stream: &mut S,
194 expected_options: impl IntoIterator<Item = &LogData>,
195 timeout_secs: u64,
196) {
197 let mut remaining = expected_options.into_iter();
198 let start = std::time::Instant::now();
199 let timeout_duration = std::time::Duration::from_secs(timeout_secs);
200
201 while let Some(expected) = remaining.next() {
202 let elapsed = start.elapsed();
203
204 assert!(
205 elapsed < timeout_duration,
206 "Timed out waiting for events. Still expecting: {:#?}",
207 remaining.collect::<Vec<_>>()
208 );
209
210 let time_left = timeout_duration - elapsed;
211 let message = tokio::time::timeout(time_left, tokio_stream::StreamExt::next(stream))
212 .await
213 .expect("timed out waiting for next batch");
214
215 match message {
216 Some(Ok(ScannerMessage::Data(batch))) => {
217 let mut batch = batch.iter();
218 let event = batch.next().expect("Streamed batch should not be empty");
219 assert_eq!(
220 expected,
221 event.data(),
222 "\nRemaining: {:#?}\n",
223 remaining.collect::<Vec<_>>()
224 );
225 while let Some(event) = batch.next() {
226 let expected = remaining.next().unwrap_or_else(|| panic!("Received more events than expected.\nNext event: {:#?}\nStreamed remaining: {batch:#?}", event.data()));
227 assert_eq!(
228 expected,
229 event.data(),
230 "\nRemaining: {:#?}\n",
231 remaining.collect::<Vec<_>>()
232 );
233 }
234 }
235 Some(Ok(other)) => {
236 panic!("Expected Message::Data,\nGot: {other:#?}");
237 }
238 Some(Err(e)) => {
239 panic!("Expected Ok(Message::Data),\nGot Err: {e:#?}");
240 }
241 None => {
242 panic!(
243 "Stream closed while still expecting:\n{:#?}",
244 remaining.collect::<Vec<_>>()
245 );
246 }
247 }
248 }
249}
250
251#[macro_export]
300macro_rules! assert_range_coverage {
301 ($stream: expr, $expected_range: expr) => {
302 assert_range_coverage!($stream, $expected_range, timeout = 5)
303 };
304 ($stream: expr, $expected_range: expr, timeout = $secs: expr) => {{
305 fn bounds<R: ::std::ops::RangeBounds<u64>>(range: &R) -> (u64, u64) {
306 let start_bound = match range.start_bound() {
307 ::std::ops::Bound::Unbounded => 0,
308 ::std::ops::Bound::Excluded(&x) => x + 1,
309 ::std::ops::Bound::Included(&x) => x,
310 };
311 let end_bound = match range.end_bound() {
312 ::std::ops::Bound::Unbounded => u64::MAX,
313 ::std::ops::Bound::Excluded(&x) => x - 1,
314 ::std::ops::Bound::Included(&x) => x,
315 };
316 (start_bound, end_bound)
317 }
318
319 let original_bounds = bounds(&$expected_range);
320 let (mut start, end) = original_bounds;
321
322 let start_time = ::std::time::Instant::now();
323 let timeout_duration = ::std::time::Duration::from_secs($secs);
324
325 let mut streamed_ranges = vec![];
327
328 while start <= end {
329 let elapsed = start_time.elapsed();
330
331 assert!(elapsed < timeout_duration, "Timed out. Still expecting: {:#?}", start..=end,);
332
333 let time_left = timeout_duration - elapsed;
334 let message =
335 tokio::time::timeout(time_left, tokio_stream::StreamExt::next(&mut $stream))
336 .await
337 .expect("Timed out waiting for the next block range");
338
339 match message {
340 std::option::Option::Some(std::result::Result::Ok(event_scanner::ScannerMessage::Data(range))) => {
341 let (streamed_start, streamed_end) = bounds(&range);
342 streamed_ranges.push(range.clone());
343 assert!(
344 start == streamed_start && streamed_end <= end,
345 "Unexpected range bounds, expected max. range: {:#?}, got: {:#?}\nPrevious streams:\n{:#?}",
346 start..=end,
347 range,
348 streamed_ranges,
349 );
350 start = streamed_end + 1;
351 }
352 std::option::Option::Some(std::result::Result::Ok(other)) => {
353 panic!("Expected a block range, got: {other:#?}");
354 }
355 std::option::Option::Some(std::result::Result::Err(e)) => {
356 panic!("Expected Ok(Message::Data), got Err: {e:#?}");
357 }
358 std::option::Option::None => {
359 panic!("Stream closed without covering range: {:#?}", start..=end);
360 }
361 }
362 }
363 }};
364}
365
366#[cfg(test)]
367mod tests {
368 use alloy::sol;
369 use tokio::sync::mpsc;
370 use tokio_stream::wrappers::ReceiverStream;
371
372 sol! {
373 #[derive(Debug)]
374 event Transfer(address indexed from, address indexed to, uint256 value);
375 }
376
377 #[tokio::test]
378 #[should_panic = "error: assert_event_sequence! called with an empty collection. Use assert_empty! macro instead to check for no pending messages."]
379 async fn assert_event_sequence_macro_with_empty_vec() {
380 let (_tx, rx) = mpsc::channel(10);
381 let mut stream = ReceiverStream::new(rx);
382
383 let empty_vec: Vec<Transfer> = Vec::new();
384 assert_event_sequence!(stream, empty_vec);
385 }
386
387 #[tokio::test]
388 #[should_panic = "error: assert_event_sequence! called with an empty collection. Use assert_empty! macro instead to check for no pending messages."]
389 async fn assert_event_sequence_macro_with_empty_slice() {
390 let (_tx, rx) = mpsc::channel(10);
391 let mut stream = ReceiverStream::new(rx);
392
393 let empty_vec: &[Transfer] = &[];
394 assert_event_sequence!(stream, empty_vec);
395 }
396}