use alloy::primitives::LogData;
use tokio_stream::Stream;
use crate::{ScannerMessage, event_scanner::EventScannerResult};
#[macro_export]
macro_rules! assert_next {
($stream: expr, Err($expected_err:expr)) => {
$crate::assert_next!($stream, Err($expected_err), timeout = 5)
};
($stream: expr, Err($expected_err:expr), timeout = $secs: expr) => {
let message = tokio::time::timeout(
std::time::Duration::from_secs($secs),
tokio_stream::StreamExt::next(&mut $stream),
)
.await
.expect("timed out");
if let Some(msg) = message {
let expected = &$expected_err;
assert_eq!(&msg, expected, "Expected error {:?}, got {:?}", expected, msg);
} else {
panic!("Expected error {:?}, but channel was closed", $expected_err);
}
};
($stream: expr, $expected: expr) => {
$crate::assert_next!($stream, $expected, timeout = 5)
};
($stream: expr, $expected: expr, timeout = $secs: expr) => {
let message = tokio::time::timeout(
std::time::Duration::from_secs($secs),
tokio_stream::StreamExt::next(&mut $stream),
)
.await
.expect("timed out");
let expected = $expected;
match message {
std::option::Option::Some(std::result::Result::Ok(msg)) => {
assert_eq!(msg, expected, "Expected {:?}, got {:?}", expected, msg);
}
std::option::Option::Some(std::result::Result::Err(e)) => {
panic!("Expected Ok({:?}), got Err({:?})", expected, e);
}
std::option::Option::None => {
panic!("Expected Ok({:?}), but channel was closed", expected);
}
}
};
}
#[macro_export]
macro_rules! assert_closed {
($stream: expr) => {
assert_closed!($stream, timeout = 5)
};
($stream: expr, timeout = $secs: expr) => {
let message = tokio::time::timeout(
std::time::Duration::from_secs($secs),
tokio_stream::StreamExt::next(&mut $stream),
)
.await
.expect("timed out");
assert!(message.is_none())
};
}
#[macro_export]
macro_rules! assert_empty {
($stream: expr) => {{
let inner = $stream.into_inner();
assert!(inner.is_empty(), "Stream should have no pending messages");
tokio_stream::wrappers::ReceiverStream::new(inner)
}};
}
#[macro_export]
macro_rules! assert_event_sequence {
($stream: expr, [$($event:expr),+ $(,)?]) => {
assert_event_sequence!($stream, &[$($event),+], timeout = 5)
};
($stream: expr, [$($event:expr),+ $(,)?], timeout = $secs: expr) => {
assert_event_sequence!($stream, &[$($event),+], timeout = $secs)
};
($stream: expr, &[$($event:expr),+ $(,)?]) => {
assert_event_sequence!($stream, &[$($event),+], timeout = 5)
};
($stream: expr, &[$($event:expr),+ $(,)?], timeout = $secs: expr) => {
let expected_options = &[$(alloy::sol_types::SolEvent::encode_log_data(&$event)),+];
$crate::macros::test_utils::assert_event_sequence(&mut $stream, expected_options, $secs).await
};
($stream: expr, $events: expr) => {
assert_event_sequence!($stream, $events, timeout = 5)
};
($stream: expr, $events: expr, timeout = $secs: expr) => {
let expected_options = $events.iter().map(alloy::sol_types::SolEvent::encode_log_data).collect::<Vec<_>>();
if expected_options.is_empty() {
panic!("error: assert_event_sequence! called with an empty collection. Use assert_empty! macro instead to check for no pending messages.")
}
$crate::macros::test_utils::assert_event_sequence(&mut $stream, expected_options.iter(), $secs).await
};
}
#[macro_export]
macro_rules! assert_event_sequence_final {
($stream: expr, [$($event:expr),+ $(,)?]) => {{
assert_event_sequence_final!($stream, &[$($event),+])
}};
($stream: expr, [$($event:expr),+ $(,)?], timeout = $secs: expr) => {{
assert_event_sequence_final!($stream, &[$($event),+], timeout = $secs)
}};
($stream: expr, &[$($event:expr),+ $(,)?]) => {{
assert_event_sequence_final!($stream, &[$($event),+], timeout = 5)
}};
($stream: expr, &[$($event:expr),+ $(,)?], timeout = $secs: expr) => {{
$crate::assert_event_sequence!($stream, &[$($event),+], timeout = $secs);
$crate::assert_empty!($stream)
}};
($stream: expr, $events: expr) => {{
assert_event_sequence_final!($stream, $events, timeout = 5)
}};
($stream: expr, $events: expr, timeout = $secs: expr) => {{
$crate::assert_event_sequence!($stream, $events, timeout = $secs);
$crate::assert_empty!($stream)
}};
}
#[allow(clippy::missing_panics_doc)]
pub async fn assert_event_sequence<S: Stream<Item = EventScannerResult> + Unpin>(
stream: &mut S,
expected_options: impl IntoIterator<Item = &LogData>,
timeout_secs: u64,
) {
let mut remaining = expected_options.into_iter();
let start = std::time::Instant::now();
let timeout_duration = std::time::Duration::from_secs(timeout_secs);
while let Some(expected) = remaining.next() {
let elapsed = start.elapsed();
assert!(
elapsed < timeout_duration,
"Timed out waiting for events. Still expecting: {:#?}",
remaining.collect::<Vec<_>>()
);
let time_left = timeout_duration - elapsed;
let message = tokio::time::timeout(time_left, tokio_stream::StreamExt::next(stream))
.await
.expect("timed out waiting for next batch");
match message {
Some(Ok(ScannerMessage::Data(batch))) => {
let mut batch = batch.iter();
let event = batch.next().expect("Streamed batch should not be empty");
assert_eq!(
expected,
event.data(),
"\nRemaining: {:#?}\n",
remaining.collect::<Vec<_>>()
);
while let Some(event) = batch.next() {
let expected = remaining.next().unwrap_or_else(|| panic!("Received more events than expected.\nNext event: {:#?}\nStreamed remaining: {batch:#?}", event.data()));
assert_eq!(
expected,
event.data(),
"\nRemaining: {:#?}\n",
remaining.collect::<Vec<_>>()
);
}
}
Some(Ok(other)) => {
panic!("Expected Message::Data,\nGot: {other:#?}");
}
Some(Err(e)) => {
panic!("Expected Ok(Message::Data),\nGot Err: {e:#?}");
}
None => {
panic!(
"Stream closed while still expecting:\n{:#?}",
remaining.collect::<Vec<_>>()
);
}
}
}
}
#[macro_export]
macro_rules! assert_range_coverage {
($stream: expr, $expected_range: expr) => {
assert_range_coverage!($stream, $expected_range, timeout = 5)
};
($stream: expr, $expected_range: expr, timeout = $secs: expr) => {{
fn bounds<R: ::std::ops::RangeBounds<u64>>(range: &R) -> (u64, u64) {
let start_bound = match range.start_bound() {
::std::ops::Bound::Unbounded => 0,
::std::ops::Bound::Excluded(&x) => x + 1,
::std::ops::Bound::Included(&x) => x,
};
let end_bound = match range.end_bound() {
::std::ops::Bound::Unbounded => u64::MAX,
::std::ops::Bound::Excluded(&x) => x - 1,
::std::ops::Bound::Included(&x) => x,
};
(start_bound, end_bound)
}
let original_bounds = bounds(&$expected_range);
let (mut start, end) = original_bounds;
let start_time = ::std::time::Instant::now();
let timeout_duration = ::std::time::Duration::from_secs($secs);
let mut streamed_ranges = vec![];
while start <= end {
let elapsed = start_time.elapsed();
assert!(elapsed < timeout_duration, "Timed out. Still expecting: {:#?}", start..=end,);
let time_left = timeout_duration - elapsed;
let message =
tokio::time::timeout(time_left, tokio_stream::StreamExt::next(&mut $stream))
.await
.expect("Timed out waiting for the next block range");
match message {
std::option::Option::Some(std::result::Result::Ok(event_scanner::ScannerMessage::Data(range))) => {
let (streamed_start, streamed_end) = bounds(&range);
streamed_ranges.push(range.clone());
assert!(
start == streamed_start && streamed_end <= end,
"Unexpected range bounds, expected max. range: {:#?}, got: {:#?}\nPrevious streams:\n{:#?}",
start..=end,
range,
streamed_ranges,
);
start = streamed_end + 1;
}
std::option::Option::Some(std::result::Result::Ok(other)) => {
panic!("Expected a block range, got: {other:#?}");
}
std::option::Option::Some(std::result::Result::Err(e)) => {
panic!("Expected Ok(Message::Data), got Err: {e:#?}");
}
std::option::Option::None => {
panic!("Stream closed without covering range: {:#?}", start..=end);
}
}
}
}};
}
#[cfg(test)]
mod tests {
use alloy::sol;
use tokio::sync::mpsc;
use tokio_stream::wrappers::ReceiverStream;
sol! {
#[derive(Debug)]
event Transfer(address indexed from, address indexed to, uint256 value);
}
#[tokio::test]
#[should_panic = "error: assert_event_sequence! called with an empty collection. Use assert_empty! macro instead to check for no pending messages."]
async fn assert_event_sequence_macro_with_empty_vec() {
let (_tx, rx) = mpsc::channel(10);
let mut stream = ReceiverStream::new(rx);
let empty_vec: Vec<Transfer> = Vec::new();
assert_event_sequence!(stream, empty_vec);
}
#[tokio::test]
#[should_panic = "error: assert_event_sequence! called with an empty collection. Use assert_empty! macro instead to check for no pending messages."]
async fn assert_event_sequence_macro_with_empty_slice() {
let (_tx, rx) = mpsc::channel(10);
let mut stream = ReceiverStream::new(rx);
let empty_vec: &[Transfer] = &[];
assert_event_sequence!(stream, empty_vec);
}
}