ethl 0.1.14

Tools for capturing, processing, archiving, and replaying Ethereum events
Documentation
use std::sync::Arc;

use crate::{
    archive::{extract::EventExtractor, load::EventLoader},
    rpc::{config::ProviderSettings, heads::last_block},
    storage::store::parse_store_uri,
};
use alloy::{json_abi::Event, rpc::types::Filter};
use anyhow::Result;
use object_store::{ObjectStore, path::Path};
use tracing::info;

pub mod extract;
pub mod load;

pub struct EventArchive {
    store: Arc<dyn ObjectStore>,
    path: Path,
    settings: ProviderSettings,
    batch_size: usize,
    events: Vec<Event>,
}

impl EventArchive {
    /// Create a new EventArchive for the given store, path, RPC settings, and events.
    ///
    /// # Arguments
    ///
    /// - `store`: The object store where the events are stored.
    /// - `path`: The base path within the object store where the events are stored.
    /// - `settings`: The RPC provider settings to use for fetching events from the blockchain.
    /// - `events`: The events to extract and load
    ///
    /// # Returns
    ///
    /// An `EventArchive` instance.
    pub fn new(
        store: Arc<dyn ObjectStore>,
        path: Path,
        settings: &ProviderSettings,
        events: Vec<Event>,
    ) -> Result<Self> {
        Ok(Self {
            store,
            path,
            batch_size: 100_000,
            events,
            settings: settings.clone(),
        })
    }

    /// Create a new EventArchive for the given base URI, RPC settings, and events.
    ///
    /// # Arguments
    /// - `base_uri`: The base URI where the events are stored (e.g. "s3://my-bucket/events" or file:///path).
    /// - `settings`: The RPC provider settings to use for fetching events from the blockchain.
    /// - `events`: The events to extract and load.
    /// # Returns
    /// An `EventArchive` instance.
    pub fn from_uri(
        uri: impl AsRef<str>,
        settings: &ProviderSettings,
        events: Vec<Event>,
    ) -> Result<Self> {
        let (store, path) = parse_store_uri(uri)?;
        Self::new(Arc::new(store), path, settings, events)
    }

    pub fn batch_size(mut self, size: usize) -> Self {
        self.batch_size = size;
        self
    }

    /// Advance the archive to the specified block number.
    pub async fn advance_to(&self, block: u64) -> Result<u64> {
        info!("Archiving {} events to block {}", self.events.len(), block);

        let filter = Filter::default().to_block(block);

        let mut indexer = EventExtractor::new(
            self.store.clone(),
            self.path.clone(),
            self.batch_size,
            self.settings.clone(),
            self.events.clone(),
            filter,
        )
        .unwrap();
        indexer.extract().await?;

        Ok(block)
    }

    /// Advance the archive to the latest block available from the RPC providers
    pub async fn advance_to_latest(&self) -> Result<u64> {
        let block = last_block(&self.settings).await?;
        self.advance_to(block).await
    }

    pub fn loader(&self) -> Result<EventLoader> {
        EventLoader::new(self.store.clone(), self.path.clone(), &self.events)
    }
}