Crate ethl

Crate ethl 

Source
Expand description

The ethl crate provides tools for capturing, processing, archiving, and replaying Ethereum events. The impetus for this crate was to create an opinionated way to archive Ethereum logs to Parquet files in a way that is efficient for both storage, querying, and replaying deserialized data for downstream processing.

§Storage Model

Event data is stored in Parquet files, where the schema is derived from the Solidity event definition. Each events data is stored in its own Parquet file group, which allows for efficient querying and retrieval of specific event types, and makes it easy to add new event types without impacting existing data.

Core assumptions:

  • Events are stored in immutable Parquet files where the filename includes the block range covered by the file
  • Events are written in block number order, and within a block, in log index order
  • Events are stored in a directory structure that is compatible with S3 and local filesystem backends
  • Event parquet files are stored in a directory named after the event name along with the first 4 bytes of the event signature hash
  • Within a directory, parquet files are named based on the block range they cover (eg: 000000000001-000000000200.parquet)
  • The schema for each event is derived from the Solidity event definition, and includes all indexed and non-indexed parameters
  • The schema also includes metadata columns for block number, log index, and address
  • Schemas favor human friendly types (eg: strings for addresses and uint256) over compact types (size delta wes negligible, perf unknown)

Example directory structure:

/path/to/archive/
├── transfer_0xdeadbeef/
│   ├── 000000000001-000000000200.parquet
│   ├── 000000000201-000000000400.parquet
│   └── ...
├── approval_0xbeeffeee/
│   ├── 000000000001-000000000200.parquet
│   ├── 000000000201-000000000400.parquet
│   └── ...
└── ...

Example schema for a swap event:

message arrow_schema {
  REQUIRED INT64 log_block (INTEGER(64,false));
  REQUIRED INT32 log_index (INTEGER(32,false));
  REQUIRED BYTE_ARRAY log_address (STRING);
  REQUIRED BYTE_ARRAY sender (STRING);
  REQUIRED BYTE_ARRAY recipient (STRING);
  REQUIRED BYTE_ARRAY amount0 (STRING);
  REQUIRED BYTE_ARRAY amount1 (STRING);
  REQUIRED BYTE_ARRAY sqrtPriceX96 (STRING);
  REQUIRED BYTE_ARRAY liquidity (STRING);
  REQUIRED INT32 tick;
  REQUIRED BYTE_ARRAY protocolFeesToken0 (STRING);
  REQUIRED BYTE_ARRAY protocolFeesToken1 (STRING);
}

§Archiving Events and Loading Events

The main entry point for archiving and loading events is the EventArchive struct, which provides methods to advance the archive to a specific block or to the latest block. It also provides a method to create an EventLoader for reading archived events.

Example usage:

use ethl::{archive::EventArchive, rpc::config::ProviderSettings};
use alloy::json_abi::Event;
use anyhow::Result;
use tracing::info;
use futures_util::{StreamExt, pin_mut};

#[tokio::main]
async fn main() -> Result<()> {
  let providers = ProviderSettings::default();
  let events = vec![
    Event::parse("Transfer(address indexed from, address indexed to, uint256 value)")?,
    Event::parse("Approval(address indexed owner, address indexed spender, uint256 value)")?
  ];
  let archive = EventArchive::from_uri("s3://my-bucket/events", &providers, events)?;
  let latest_block = archive.advance_to_latest().await?;
  info!("Archived events to block {}", latest_block);

  let loader = archive.loader()?;
  let stream = loader.load_all().await;
  pin_mut!(stream);
  while let Some(block_events) = stream.next().await {
    info!("Block: {}", block_events.block_number);
    for event in block_events.events {
      info!("  Event: {:?}", event);
    }
  }
  Ok(())
}

§Extracting Events from RPC Nodes

Events are extracted from RPC nodes and written to Parquet files via EventExtractor, which automatically generates the schemas for decoded events

Example usage:

use ethl::{archive::extract::EventExtractor, rpc::config::ProviderSettings};
use alloy::json_abi::Event;
use alloy::rpc::types::Filter;
use anyhow::Result;

#[tokio::main]
async fn main() -> Result<()> {
    let providers = ProviderSettings::default();

    // This can also be captured from the SolEvent::abi() method if using alloy-json-abi
    let events = vec![
      Event::parse("Transfer(address indexed from, address indexed to, uint256 value)")?,
      Event::parse("Approval(address indexed owner, address indexed spender, uint256 value)")?
   ];

    // EventExtractor will automatically add events and to_block (if not set) to the filter
    let filter = Filter::new()
      .from_block(10000000u64);
    let mut extractor = EventExtractor::from_uri(
        "s3://my-bucket/events",
        200_000,
        providers,
        events,
        filter,
    )?;
    extractor.extract().await?;
    Ok(())
}

§Reading Events

Archived events are read via EventLoader, which merges events by block, sorted by log index from stored Parquet files.

Example usage:

use ethl::archive::load::EventLoader;
use alloy::json_abi::Event;
use anyhow::Result;
use futures_util::{StreamExt, pin_mut};
use tracing::info;

#[tokio::main]
async fn main() -> Result<()> {
    let events = vec![
      Event::parse("Transfer(address indexed from, address indexed to, uint256 value)")?,
      Event::parse("Approval(address indexed owner, address indexed spender, uint256 value)")?
   ];
    let loader = EventLoader::from_uri("s3://my-bucket/events", &events)?;
    let stream = loader.load_all().await;
    pin_mut!(stream);
    while let Some(block_events) = stream.next().await {
       info!("Block: {}", block_events.block_number);
       for event in block_events.events {
          info!("  Event: {:?}", event);
       }
   }
    Ok(())
}

Modules§

archive
rpc
storage