use alloy::json_abi::Event;
use alloy::rpc::types::Filter;
use anyhow::Result;
use ethl::rpc::config::{ProviderOptions, ProviderSettings};
use ethl::rpc::events::EventStreamer;
use futures_util::pin_mut;
use futures_util::stream::StreamExt;
#[tokio::test]
#[ntest::timeout(10000)]
async fn test_event_streamer_ranged() -> Result<()> {
let settings =
ProviderSettings::http(option_env!("TEST_RPC_URL").unwrap_or("https://0xrpc.io/base"))?;
let filter = Filter::new().from_block(34762687).to_block(34762688);
let event =
Event::parse("event Transfer(address indexed from, address indexed to, uint256 value)")?;
let streamer = EventStreamer::new(settings.clone(), std::slice::from_ref(&event));
let stream = streamer.stream(Some(&filter)).await;
let events = stream.collect::<Vec<_>>().await;
assert!(!events.is_empty());
assert_eq!(events.len(), 1);
let (from_block, to_block, decoded_events) = match &events[0] {
Ok((from, to, events)) => (*from, *to, events),
Err(_) => panic!("Test failed: event stream returned error"),
};
assert_eq!(from_block, 34762687);
assert_eq!(to_block, 34762688);
assert!(!decoded_events.is_empty());
assert!(
decoded_events
.iter()
.all(|e| e.event.body.len() == 1 && e.event.indexed.len() == 2)
);
Ok(())
}
#[tokio::test]
#[ntest::timeout(10000)]
async fn test_event_streamer_head() -> Result<()> {
let wss_test_url = option_env!("TEST_WSS_URL");
if wss_test_url.is_none() {
eprintln!("Skipping test_event_streamer_head because TEST_WSS_URL is not set");
return Ok(());
}
let opts = ProviderOptions::default()
.add_http(
option_env!("TEST_RPC_URL")
.unwrap_or("https://0xrpc.io/base")
.to_string(),
)
.add_websocket(wss_test_url.unwrap().to_string());
let settings = ProviderSettings::build(opts, 8453)?;
let event =
Event::parse("event Transfer(address indexed from, address indexed to, uint256 value)")?;
let streamer = EventStreamer::new(settings.clone(), std::slice::from_ref(&event));
let stream = streamer.stream(None).await;
pin_mut!(stream);
let event = stream.next().await;
let (from_block, to_block, decoded_events) = &event.unwrap()?;
assert!(*from_block > 0);
assert!(*to_block >= *from_block);
if !decoded_events.is_empty() {
assert!(
decoded_events
.iter()
.all(|e| e.event.body.len() == 1 && e.event.indexed.len() == 2)
);
}
Ok(())
}