ethl 0.1.21

Tools for capturing, processing, archiving, and replaying Ethereum events
Documentation
use alloy::json_abi::Event;
use alloy::providers::{Provider, ProviderBuilder};
use alloy::rpc::types::Filter;
use alloy::transports::mock::Asserter;
use anyhow::Result;
use ethl::rpc::config::ProviderSettings;
use ethl::rpc::events::EventStreamer;
use futures_util::stream::StreamExt;

#[tokio::test]
async fn test_event_streamer_ranged() -> Result<()> {
    let logs: serde_json::Value = serde_json::from_str(include_str!(
        "fixtures/transfer_getlogs_34762687_34762688.json"
    ))?;

    let asserter = Asserter::new();
    // backfill_then_watch_logs calls eth_blockNumber before backfilling
    asserter.push_success(&serde_json::json!("0x2126fc2")); // 34762690, any block > to_block
    asserter.push_success(&logs);

    let provider = ProviderBuilder::new()
        .disable_recommended_fillers()
        .connect_mocked_client(asserter)
        .erased();
    let settings = ProviderSettings::from_mock(provider);

    let filter = Filter::new().from_block(34762687).to_block(34762688);
    let event =
        Event::parse("event Transfer(address indexed from, address indexed to, uint256 value)")?;
    let streamer = EventStreamer::new(settings.clone(), std::slice::from_ref(&event));
    let stream = streamer.stream(Some(&filter)).await;
    let events = stream.collect::<Vec<_>>().await;

    assert!(!events.is_empty());
    assert_eq!(events.len(), 1);
    let (from_block, to_block, decoded_events) = match &events[0] {
        Ok((from, to, events)) => (*from, *to, events),
        Err(_) => panic!("Test failed: event stream returned error"),
    };
    assert_eq!(from_block, 34762687);
    assert_eq!(to_block, 34762688);
    assert!(!decoded_events.is_empty());
    assert!(
        decoded_events
            .iter()
            .all(|e| e.event.body.len() == 1 && e.event.indexed.len() == 2)
    );

    Ok(())
}

#[tokio::test]
#[ignore = "live WS subscription; not yet mockable with FIFO Asserter"]
async fn test_event_streamer_head() -> Result<()> {
    let wss_test_url = option_env!("TEST_WSS_URL");
    if wss_test_url.is_none() {
        return Ok(());
    }
    use ethl::rpc::config::ProviderOptions;
    let opts = ProviderOptions::default()
        .add_http(
            option_env!("TEST_RPC_URL")
                .unwrap_or("https://0xrpc.io/base")
                .to_string(),
        )
        .add_websocket(wss_test_url.unwrap().to_string());
    let settings = ProviderSettings::build(opts, 8453)?;
    let event =
        Event::parse("event Transfer(address indexed from, address indexed to, uint256 value)")?;
    let streamer = EventStreamer::new(settings.clone(), std::slice::from_ref(&event));
    let stream = streamer.stream(None).await;
    futures_util::pin_mut!(stream);
    let event = stream.next().await;

    let (from_block, to_block, decoded_events) = &event.unwrap()?;
    assert!(*from_block > 0);
    assert!(*to_block >= *from_block);
    if !decoded_events.is_empty() {
        assert!(
            decoded_events
                .iter()
                .all(|e| e.event.body.len() == 1 && e.event.indexed.len() == 2)
        );
    }

    Ok(())
}