ethl 0.1.13

Tools for capturing, processing, archiving, and replaying Ethereum events
Documentation
use std::sync::Arc;

use alloy::json_abi::Event;
use alloy::rpc::types::Filter;
use anyhow::Result;
use futures_util::stream::StreamExt;
use tempfile::TempDir;

use ethl::archive::extract::EventExtractor;
use ethl::rpc::config::ProviderSettings;
use ethl::storage::{
    codec::SolEventCodec,
    store::{EventStore, parse_store_uri},
};

#[tokio::test]
#[ntest::timeout(10000)]
async fn test_backfill_and_extract() -> Result<()> {
    let temp_dir = TempDir::new()?;
    let temp_path = temp_dir.path().to_string_lossy().to_string();
    let file_uri = format!("file://{}", temp_path);

    let codec = SolEventCodec::new(&Event::parse(
        "event Transfer(address indexed from, address indexed to, uint256 value)",
    )?)?;
    let settings =
        ProviderSettings::http(option_env!("TEST_RPC_URL").unwrap_or("https://0xrpc.io/base"))?;
    let filter = Filter::new().from_block(34762687).to_block(34762688);
    let (store, path) = parse_store_uri(&file_uri)?;
    let mut indexer = EventExtractor::new(
        Arc::new(store),
        path,
        10000,
        settings,
        vec![codec.event.clone()],
        filter,
    )?;
    indexer.extract().await?;

    assert!(
        std::path::Path::new(&format!(
            "{}/transfer-0xddf252ad/000034762687-000034762688.parquet",
            temp_path
        ))
        .exists()
    );
    let store = EventStore::from_uri(&file_uri, &codec)?;
    let stream = store.read_all().await?;
    let data = stream.collect::<Vec<_>>().await;

    assert!(!data.is_empty());
    assert_eq!(1, data.len());
    assert_eq!(1510, data[0].num_rows());

    Ok(())
}