1use alloy::primitives::LogData;
2use tokio_stream::Stream;
3
4use crate::{ScannerMessage, event_scanner::EventScannerResult};
5
6#[macro_export]
7macro_rules! assert_next {
8 ($stream: expr, Err($expected_err:expr)) => {
10 $crate::assert_next!($stream, Err($expected_err), timeout = 5)
11 };
12 ($stream: expr, Err($expected_err:expr), timeout = $secs: expr) => {
13 let message = tokio::time::timeout(
14 std::time::Duration::from_secs($secs),
15 tokio_stream::StreamExt::next(&mut $stream),
16 )
17 .await
18 .expect("timed out");
19 if let Some(msg) = message {
20 let expected = &$expected_err;
21 assert_eq!(&msg, expected, "Expected error {:?}, got {:?}", expected, msg);
22 } else {
23 panic!("Expected error {:?}, but channel was closed", $expected_err);
24 }
25 };
26
27 ($stream: expr, $expected: expr) => {
29 $crate::assert_next!($stream, $expected, timeout = 5)
30 };
31 ($stream: expr, $expected: expr, timeout = $secs: expr) => {
32 let message = tokio::time::timeout(
33 std::time::Duration::from_secs($secs),
34 tokio_stream::StreamExt::next(&mut $stream),
35 )
36 .await
37 .expect("timed out");
38 let expected = $expected;
39 match message {
40 std::option::Option::Some(std::result::Result::Ok(msg)) => {
41 assert_eq!(msg, expected, "Expected {:?}, got {:?}", expected, msg);
42 }
43 std::option::Option::Some(std::result::Result::Err(e)) => {
44 panic!("Expected Ok({:?}), got Err({:?})", expected, e);
45 }
46 std::option::Option::None => {
47 panic!("Expected Ok({:?}), but channel was closed", expected);
48 }
49 }
50 };
51}
52
53#[macro_export]
54macro_rules! assert_closed {
55 ($stream: expr) => {
56 assert_closed!($stream, timeout = 5)
57 };
58 ($stream: expr, timeout = $secs: expr) => {
59 let message = tokio::time::timeout(
60 std::time::Duration::from_secs($secs),
61 tokio_stream::StreamExt::next(&mut $stream),
62 )
63 .await
64 .expect("timed out");
65 assert!(message.is_none())
66 };
67}
68
69#[macro_export]
70macro_rules! assert_empty {
71 ($stream: expr) => {{
72 let inner = $stream.into_inner();
73 assert!(inner.is_empty(), "Stream should have no pending messages");
74 tokio_stream::wrappers::ReceiverStream::new(inner)
75 }};
76}
77
78#[macro_export]
135macro_rules! assert_event_sequence {
136 ($stream: expr, [$($event:expr),+ $(,)?]) => {
138 assert_event_sequence!($stream, &[$($event),+], timeout = 5)
139 };
140 ($stream: expr, [$($event:expr),+ $(,)?], timeout = $secs: expr) => {
141 assert_event_sequence!($stream, &[$($event),+], timeout = $secs)
142 };
143 ($stream: expr, &[$($event:expr),+ $(,)?]) => {
145 assert_event_sequence!($stream, &[$($event),+], timeout = 5)
146 };
147 ($stream: expr, &[$($event:expr),+ $(,)?], timeout = $secs: expr) => {
148 let expected_options = &[$(alloy::sol_types::SolEvent::encode_log_data(&$event)),+];
149
150 $crate::macros::test_utils::assert_event_sequence(&mut $stream, expected_options, $secs).await
151 };
152 ($stream: expr, $events: expr) => {
154 assert_event_sequence!($stream, $events, timeout = 5)
155 };
156 ($stream: expr, $events: expr, timeout = $secs: expr) => {
157 let expected_options = $events.iter().map(alloy::sol_types::SolEvent::encode_log_data).collect::<Vec<_>>();
158 if expected_options.is_empty() {
159 panic!("error: assert_event_sequence! called with an empty collection. Use assert_empty! macro instead to check for no pending messages.")
160 }
161 $crate::macros::test_utils::assert_event_sequence(&mut $stream, expected_options.iter(), $secs).await
162 };
163}
164
165#[macro_export]
167macro_rules! assert_event_sequence_final {
168 ($stream: expr, [$($event:expr),+ $(,)?]) => {{
170 assert_event_sequence_final!($stream, &[$($event),+])
171 }};
172 ($stream: expr, [$($event:expr),+ $(,)?], timeout = $secs: expr) => {{
173 assert_event_sequence_final!($stream, &[$($event),+], timeout = $secs)
174 }};
175 ($stream: expr, &[$($event:expr),+ $(,)?]) => {{
177 assert_event_sequence_final!($stream, &[$($event),+], timeout = 5)
178 }};
179 ($stream: expr, &[$($event:expr),+ $(,)?], timeout = $secs: expr) => {{
180 $crate::assert_event_sequence!($stream, &[$($event),+], timeout = $secs);
181 $crate::assert_empty!($stream)
182 }};
183 ($stream: expr, $events: expr) => {{
185 assert_event_sequence_final!($stream, $events, timeout = 5)
186 }};
187 ($stream: expr, $events: expr, timeout = $secs: expr) => {{
188 $crate::assert_event_sequence!($stream, $events, timeout = $secs);
189 $crate::assert_empty!($stream)
190 }};
191}
192
193#[allow(clippy::missing_panics_doc)]
194pub async fn assert_event_sequence<S: Stream<Item = EventScannerResult> + Unpin>(
195 stream: &mut S,
196 expected_options: impl IntoIterator<Item = &LogData>,
197 timeout_secs: u64,
198) {
199 let mut remaining = expected_options.into_iter();
200 let start = std::time::Instant::now();
201 let timeout_duration = std::time::Duration::from_secs(timeout_secs);
202
203 while let Some(expected) = remaining.next() {
204 let elapsed = start.elapsed();
205
206 assert!(
207 elapsed < timeout_duration,
208 "Timed out waiting for events. Still expecting: {:#?}",
209 remaining.collect::<Vec<_>>()
210 );
211
212 let time_left = timeout_duration - elapsed;
213 let message = tokio::time::timeout(time_left, tokio_stream::StreamExt::next(stream))
214 .await
215 .expect("timed out waiting for next batch");
216
217 match message {
218 Some(Ok(ScannerMessage::Data(batch))) => {
219 let mut batch = batch.iter();
220 let event = batch.next().expect("Streamed batch should not be empty");
221 assert_eq!(
222 expected,
223 event.data(),
224 "\nRemaining: {:#?}\n",
225 remaining.collect::<Vec<_>>()
226 );
227 while let Some(event) = batch.next() {
228 let expected = remaining.next().unwrap_or_else(|| panic!("Received more events than expected.\nNext event: {:#?}\nStreamed remaining: {batch:#?}", event.data()));
229 assert_eq!(
230 expected,
231 event.data(),
232 "\nRemaining: {:#?}\n",
233 remaining.collect::<Vec<_>>()
234 );
235 }
236 }
237 Some(Ok(other)) => {
238 panic!("Expected Message::Data,\nGot: {other:#?}");
239 }
240 Some(Err(e)) => {
241 panic!("Expected Ok(Message::Data),\nGot Err: {e:#?}");
242 }
243 None => {
244 panic!(
245 "Stream closed while still expecting:\n{:#?}",
246 remaining.collect::<Vec<_>>()
247 );
248 }
249 }
250 }
251}
252
253#[macro_export]
302macro_rules! assert_range_coverage {
303 ($stream: expr, $expected_range: expr) => {
304 assert_range_coverage!($stream, $expected_range, timeout = 5)
305 };
306 ($stream: expr, $expected_range: expr, timeout = $secs: expr) => {{
307 fn bounds<R: ::std::ops::RangeBounds<u64>>(range: &R) -> (u64, u64) {
308 let start_bound = match range.start_bound() {
309 ::std::ops::Bound::Unbounded => 0,
310 ::std::ops::Bound::Excluded(&x) => x + 1,
311 ::std::ops::Bound::Included(&x) => x,
312 };
313 let end_bound = match range.end_bound() {
314 ::std::ops::Bound::Unbounded => u64::MAX,
315 ::std::ops::Bound::Excluded(&x) => x - 1,
316 ::std::ops::Bound::Included(&x) => x,
317 };
318 (start_bound, end_bound)
319 }
320
321 let original_bounds = bounds(&$expected_range);
322 let (mut start, end) = original_bounds;
323
324 let start_time = ::std::time::Instant::now();
325 let timeout_duration = ::std::time::Duration::from_secs($secs);
326
327 let mut streamed_ranges = vec![];
329
330 while start <= end {
331 let elapsed = start_time.elapsed();
332
333 assert!(elapsed < timeout_duration, "Timed out. Still expecting: {:#?}", start..=end,);
334
335 let time_left = timeout_duration - elapsed;
336 let message =
337 tokio::time::timeout(time_left, tokio_stream::StreamExt::next(&mut $stream))
338 .await
339 .expect("Timed out waiting for the next block range");
340
341 match message {
342 std::option::Option::Some(std::result::Result::Ok(event_scanner::ScannerMessage::Data(range))) => {
343 let (streamed_start, streamed_end) = bounds(&range);
344 streamed_ranges.push(range.clone());
345 assert!(
346 start == streamed_start && streamed_end <= end,
347 "Unexpected range bounds, expected max. range: {:#?}, got: {:#?}\nPrevious streams:\n{:#?}",
348 start..=end,
349 range,
350 streamed_ranges,
351 );
352 start = streamed_end + 1;
353 }
354 std::option::Option::Some(std::result::Result::Ok(other)) => {
355 panic!("Expected a block range, got: {other:#?}");
356 }
357 std::option::Option::Some(std::result::Result::Err(e)) => {
358 panic!("Expected Ok(Message::Data), got Err: {e:#?}");
359 }
360 std::option::Option::None => {
361 panic!("Stream closed without covering range: {:#?}", start..=end);
362 }
363 }
364 }
365 }};
366}
367
368#[cfg(test)]
369mod tests {
370 use alloy::sol;
371 use tokio::sync::mpsc;
372 use tokio_stream::wrappers::ReceiverStream;
373
374 sol! {
375 #[derive(Debug)]
376 event Transfer(address indexed from, address indexed to, uint256 value);
377 }
378
379 #[tokio::test]
380 #[should_panic = "error: assert_event_sequence! called with an empty collection. Use assert_empty! macro instead to check for no pending messages."]
381 async fn assert_event_sequence_macro_with_empty_vec() {
382 let (_tx, rx) = mpsc::channel(10);
383 let mut stream = ReceiverStream::new(rx);
384
385 let empty_vec: Vec<Transfer> = Vec::new();
386 assert_event_sequence!(stream, empty_vec);
387 }
388
389 #[tokio::test]
390 #[should_panic = "error: assert_event_sequence! called with an empty collection. Use assert_empty! macro instead to check for no pending messages."]
391 async fn assert_event_sequence_macro_with_empty_slice() {
392 let (_tx, rx) = mpsc::channel(10);
393 let mut stream = ReceiverStream::new(rx);
394
395 let empty_vec: &[Transfer] = &[];
396 assert_event_sequence!(stream, empty_vec);
397 }
398}