ethl 0.1.13

Tools for capturing, processing, archiving, and replaying Ethereum events
Documentation
use alloy::json_abi::Event;
use alloy::rpc::types::Filter;
use anyhow::Result;
use ethl::rpc::config::{ProviderOptions, ProviderSettings};
use ethl::rpc::events::EventStreamer;
use futures_util::pin_mut;
use futures_util::stream::StreamExt;

#[tokio::test]
#[ntest::timeout(10000)]
async fn test_event_streamer_ranged() -> Result<()> {
    let settings =
        ProviderSettings::http(option_env!("TEST_RPC_URL").unwrap_or("https://0xrpc.io/base"))?;
    let filter = Filter::new().from_block(34762687).to_block(34762688);
    let event =
        Event::parse("event Transfer(address indexed from, address indexed to, uint256 value)")?;
    let streamer = EventStreamer::new(settings.clone(), std::slice::from_ref(&event));
    let stream = streamer.stream(Some(&filter)).await;
    let events = stream.collect::<Vec<_>>().await;

    assert!(!events.is_empty());
    assert_eq!(events.len(), 1);
    let (from_block, to_block, decoded_events) = match &events[0] {
        Ok((from, to, events)) => (*from, *to, events),
        Err(_) => panic!("Test failed: event stream returned error"),
    };
    assert_eq!(from_block, 34762687);
    assert_eq!(to_block, 34762688);
    assert!(!decoded_events.is_empty());
    assert!(
        decoded_events
            .iter()
            .all(|e| e.event.body.len() == 1 && e.event.indexed.len() == 2)
    );

    Ok(())
}

#[tokio::test]
#[ntest::timeout(10000)]
async fn test_event_streamer_head() -> Result<()> {
    let wss_test_url = option_env!("TEST_WSS_URL");
    if wss_test_url.is_none() {
        eprintln!("Skipping test_event_streamer_head because TEST_WSS_URL is not set");
        return Ok(());
    }
    let opts = ProviderOptions::default()
        .add_http(
            option_env!("TEST_RPC_URL")
                .unwrap_or("https://0xrpc.io/base")
                .to_string(),
        )
        .add_websocket(wss_test_url.unwrap().to_string());
    let settings = ProviderSettings::build(opts, 8453)?;
    let event =
        Event::parse("event Transfer(address indexed from, address indexed to, uint256 value)")?;
    let streamer = EventStreamer::new(settings.clone(), std::slice::from_ref(&event));
    let stream = streamer.stream(None).await;
    pin_mut!(stream);
    let event = stream.next().await;

    // This could be flakey if no events are emitted in the time window, chain down, etc
    let (from_block, to_block, decoded_events) = &event.unwrap()?;
    assert!(*from_block > 0);
    assert!(*to_block >= *from_block);
    if !decoded_events.is_empty() {
        assert!(
            decoded_events
                .iter()
                .all(|e| e.event.body.len() == 1 && e.event.indexed.len() == 2)
        );
    }

    Ok(())
}