ethl 0.1.14

Tools for capturing, processing, archiving, and replaying Ethereum events
Documentation
//! The ethl crate provides tools for capturing, processing, archiving, and replaying Ethereum events. The impetus for this
//! crate was to create an opinionated way to archive Ethereum logs to Parquet files in a way that is efficient for both
//! storage, querying, and replaying deserialized data for downstream processing.
//!
//! # Storage Model
//!
//! Event data is stored in Parquet files, where the schema is derived from the Solidity event definition. Each events data
//! is stored in its own Parquet file group, which allows for efficient querying and retrieval of specific event types, and makes it easy to
//! add new event types without impacting existing data.
//!
//! Core assumptions:
//! - Events are stored in immutable Parquet files where the filename includes the block range covered by the file
//! - Events are written in block number order, and within a block, in log index order
//! - Events are stored in a directory structure that is compatible with S3 and local filesystem backends
//! - Event parquet files are stored in a directory named after the event name along with the first 4 bytes of the event signature hash
//! - Within a directory, parquet files are named based on the block range they cover (eg: 000000000001-000000000200.parquet)
//! - The schema for each event is derived from the Solidity event definition, and includes all indexed and non-indexed parameters
//! - The schema also includes metadata columns for block number, log index, and address
//! - Schemas favor human friendly types (eg: strings for addresses and uint256) over compact types (size delta wes negligible, perf unknown)
//!
//! Example directory structure:
//! ```text
//! /path/to/archive/
//! ├── transfer_0xdeadbeef/
//! │   ├── 000000000001-000000000200.parquet
//! │   ├── 000000000201-000000000400.parquet
//! │   └── ...
//! ├── approval_0xbeeffeee/
//! │   ├── 000000000001-000000000200.parquet
//! │   ├── 000000000201-000000000400.parquet
//! │   └── ...
//! └── ...
//! ```
//!
//! Example schema for a swap event:
//! ```text
//! message arrow_schema {
//!   REQUIRED INT64 log_block (INTEGER(64,false));
//!   REQUIRED INT32 log_index (INTEGER(32,false));
//!   REQUIRED BYTE_ARRAY log_address (STRING);
//!   REQUIRED BYTE_ARRAY sender (STRING);
//!   REQUIRED BYTE_ARRAY recipient (STRING);
//!   REQUIRED BYTE_ARRAY amount0 (STRING);
//!   REQUIRED BYTE_ARRAY amount1 (STRING);
//!   REQUIRED BYTE_ARRAY sqrtPriceX96 (STRING);
//!   REQUIRED BYTE_ARRAY liquidity (STRING);
//!   REQUIRED INT32 tick;
//!   REQUIRED BYTE_ARRAY protocolFeesToken0 (STRING);
//!   REQUIRED BYTE_ARRAY protocolFeesToken1 (STRING);
//! }
//! ```
//!
//! # Archiving Events and Loading Events
//!
//! The main entry point for archiving and loading events is the `EventArchive` struct, which provides methods to advance the archive to a specific block or to the latest block.
//! It also provides a method to create an `EventLoader` for reading archived events.
//!
//! Example usage:
//! ```no_run
//! use ethl::{archive::EventArchive, rpc::config::ProviderSettings};
//! use alloy::json_abi::Event;
//! use anyhow::Result;
//! use tracing::info;
//! use futures_util::{StreamExt, pin_mut};
//!
//! #[tokio::main]
//! async fn main() -> Result<()> {
//!   let providers = ProviderSettings::default();
//!   let events = vec![
//!     Event::parse("Transfer(address indexed from, address indexed to, uint256 value)")?,
//!     Event::parse("Approval(address indexed owner, address indexed spender, uint256 value)")?
//!   ];
//!   let archive = EventArchive::from_uri("s3://my-bucket/events", &providers, events)?;
//!   let latest_block = archive.advance_to_latest().await?;
//!   info!("Archived events to block {}", latest_block);
//!
//!   let loader = archive.loader()?;
//!   let stream = loader.load_all().await;
//!   pin_mut!(stream);
//!   while let Some(block_events) = stream.next().await {
//!     info!("Block: {}", block_events.block_number);
//!     for event in block_events.events {
//!       info!("  Event: {:?}", event);
//!     }
//!   }
//!   Ok(())
//! }
//! ```
//!
//!
//! # Extracting Events from RPC Nodes
//!
//! Events are extracted from RPC nodes and written to Parquet files via `EventExtractor`, which automatically generates the schemas for decoded events
//!
//! Example usage:
//! ```no_run
//! use ethl::{archive::extract::EventExtractor, rpc::config::ProviderSettings};
//! use alloy::json_abi::Event;
//! use alloy::rpc::types::Filter;
//! use anyhow::Result;
//!
//! #[tokio::main]
//! async fn main() -> Result<()> {
//!     let providers = ProviderSettings::default();
//!
//!     // This can also be captured from the SolEvent::abi() method if using alloy-json-abi
//!     let events = vec![
//!       Event::parse("Transfer(address indexed from, address indexed to, uint256 value)")?,
//!       Event::parse("Approval(address indexed owner, address indexed spender, uint256 value)")?
//!    ];
//!
//!     // EventExtractor will automatically add events and to_block (if not set) to the filter
//!     let filter = Filter::new()
//!       .from_block(10000000u64);
//!     let mut extractor = EventExtractor::from_uri(
//!         "s3://my-bucket/events",
//!         200_000,
//!         providers,
//!         events,
//!         filter,
//!     )?;
//!     extractor.extract().await?;
//!     Ok(())
//! }
//! ```
//!
//! # Reading Events
//! Archived events are read via `EventLoader`, which merges events by block, sorted by log index from stored Parquet files.
//!
//! Example usage:
//! ```no_run
//! use ethl::archive::load::EventLoader;
//! use alloy::json_abi::Event;
//! use anyhow::Result;
//! use futures_util::{StreamExt, pin_mut};
//! use tracing::info;
//!
//! #[tokio::main]
//! async fn main() -> Result<()> {
//!     let events = vec![
//!       Event::parse("Transfer(address indexed from, address indexed to, uint256 value)")?,
//!       Event::parse("Approval(address indexed owner, address indexed spender, uint256 value)")?
//!    ];
//!     let loader = EventLoader::from_uri("s3://my-bucket/events", &events)?;
//!     let stream = loader.load_all().await;
//!     pin_mut!(stream);
//!     while let Some(block_events) = stream.next().await {
//!        info!("Block: {}", block_events.block_number);
//!        for event in block_events.events {
//!           info!("  Event: {:?}", event);
//!        }
//!    }
//!     Ok(())
//! }
//! ```
//!

pub mod archive;
pub mod rpc;
pub mod storage;