ethl 0.1.14

Tools for capturing, processing, archiving, and replaying Ethereum events
Documentation
use std::sync::Arc;

use crate::{
    rpc::{config::ProviderSettings, stream::backfill_then_watch_logs},
    storage::{store::parse_store_uri, writer::EventStoreWriter},
};

use alloy::{json_abi::Event, rpc::types::Filter};
use alloy_primitives::B256;
use anyhow::Result;
use futures_util::{StreamExt, pin_mut};
use object_store::{ObjectStore, path::Path};
use tracing::info;

pub struct EventExtractor {
    writers: Vec<EventStoreWriter>,
    settings: ProviderSettings,
    filter: Filter,
}

impl EventExtractor {
    /// Create a new EventExtractor for the given base URI, batch size, RPC settings, events, and filter.
    ///
    /// # Arguments
    ///
    /// - `store`: The object store where events will be archived.
    /// - `path`: The base path within the object store for storing event data.
    /// - `batch_size`: The number of events to buffer before flushing to storage.
    /// - `settings`: The RPC provider settings to use for fetching events.
    /// - `events`: The events to extract and store.
    /// - `filter`: The filter to use for fetching events. The filter's `events` field will be overridden to match the provided events.
    ///
    /// # Returns
    ///
    /// An `EventExtractor` instance.
    pub fn new(
        store: Arc<dyn ObjectStore>,
        path: Path,
        batch_size: usize,
        settings: ProviderSettings,
        events: Vec<Event>,
        filter: Filter,
    ) -> Result<Self> {
        // Ensure the events match
        let filter = filter.events(events.iter().map(|w| w.signature()).collect::<Vec<_>>());
        let writers = events
            .into_iter()
            .map(|event| {
                let archiver =
                    EventStoreWriter::new(store.clone(), path.clone(), batch_size, event)?;
                Ok(archiver)
            })
            .collect::<Result<Vec<_>>>()?;

        Ok(EventExtractor {
            writers,
            settings,
            filter,
        })
    }

    /// Create a new EventExtractor for the given base URI and events.
    ///
    /// # Arguments
    ///
    /// - `base_uri`: The base URI where the events are stored (e.g. "s3://my-bucket/events" or file:///path).
    /// - `batch_size`: The number of events to buffer before flushing to storage.
    /// - `settings`: The RPC provider settings to use for fetching events.
    /// - `events`: The events to extract and store.
    /// - `filter`: The filter to use for fetching events. The filter's `events
    ///
    /// # Returns
    ///
    /// An `EventLoader` instance.
    pub fn from_uri(
        base_uri: impl AsRef<str>,
        batch_size: usize,
        settings: ProviderSettings,
        events: Vec<Event>,
        filter: Filter,
    ) -> Result<Self> {
        let (store, path) = parse_store_uri(base_uri)?;
        Self::new(Arc::new(store), path, batch_size, settings, events, filter)
    }

    /// Extract events based on the provided filter, writing to the configured storage.
    /// This will ensure all event stores are aligned to the same block before starting multi-event extraction.
    pub async fn extract(&mut self) -> Result<()> {
        let aligned_block = self.ensure_alignment().await?;

        let filter = self.inferred_filter(aligned_block)?;
        info!(
            "Indexing {} events for range {} to {}",
            self.writers.len(),
            filter.get_from_block().unwrap_or(0),
            filter
                .get_to_block()
                .map(|v| v.to_string())
                .unwrap_or("live".to_string())
        );

        let stream = backfill_then_watch_logs(&self.settings, Some(&filter)).await;
        pin_mut!(stream);

        // Route logs to the appropriate archiver based on the event signature
        while let Some(result) = stream.next().await {
            let (from, to, logs) = result?;
            for writer in self.writers.iter_mut() {
                let selector = writer.selector();
                let events = logs
                    .iter()
                    .filter(|log| log.topics().first().is_some_and(|t| *t == selector));
                writer.append_and_maybe_flush(events, from, to).await?;
            }
        }

        info!("Indexing complete, flushing all archivers");

        // Final flush if there are buffered events (or empty flush)
        for writer in self.writers.iter_mut() {
            writer.flush().await?;
        }

        Ok(())
    }

    // Ensure all stores are aligned to the same last_block
    async fn ensure_alignment(&mut self) -> Result<u64> {
        let heights = self.writer_heights().await?;
        let max_height = heights.last().map(|(_, h)| *h).unwrap_or(0);

        for (selector, height) in heights.into_iter() {
            if height != max_height {
                self.align_writer(selector, height + 1, max_height).await?;
            }
        }

        Ok(max_height)
    }

    async fn align_writer(&mut self, selector: B256, from_block: u64, to_block: u64) -> Result<()> {
        let writer = self
            .writers
            .iter_mut()
            .find(|w| w.selector() == selector)
            .expect("Writer should exist");

        info!(
            "Aligning event {} from block {} to {}",
            writer.event().signature(),
            from_block,
            to_block
        );

        let filter = self
            .filter
            .clone()
            .events(vec![writer.event().signature()])
            .from_block(from_block)
            .to_block(to_block);

        let stream = backfill_then_watch_logs(&self.settings, Some(&filter)).await;
        pin_mut!(stream);

        // Route logs to the appropriate archiver based on the event signature
        while let Some(result) = stream.next().await {
            let (from, to, logs) = result?;
            writer.append_and_maybe_flush(logs.iter(), from, to).await?;
        }

        // Final flush if there are buffered events (or empty flush)
        writer.flush().await?;

        Ok(())
    }

    // Get the heights of all writers, so we can align them if needed
    async fn writer_heights(&self) -> Result<Vec<(B256, u64)>> {
        let heights_futs = self
            .writers
            .iter()
            .map(async |archiver| archiver.last_block().await)
            .collect::<Vec<_>>();

        let heights = futures_util::future::join_all(heights_futs).await;
        let heights: Vec<u64> = heights.into_iter().collect::<Result<Vec<_>, _>>()?;

        let mut tuples = self
            .writers
            .iter()
            .map(|w| w.selector())
            .zip(heights)
            .collect::<Vec<_>>();

        tuples.sort_by_key(|a| a.1);

        Ok(tuples)
    }

    fn inferred_filter(&self, last_block: u64) -> Result<Filter> {
        // Re-indexing isn't supported
        if let Some(to_block) = self.filter.get_to_block()
            && to_block <= last_block
        {
            return Err(anyhow::anyhow!("No new events to index"));
        }

        let user_from = self.filter.get_from_block();

        // Allow user to specify from_block if they want to re-index
        if last_block == 0 && user_from.is_some() {
            return Ok(self.filter.clone());
        } else if self
            .filter
            .get_from_block()
            .is_some_and(|b| b != last_block + 1)
        {
            return Err(anyhow::anyhow!("No new events to index"));
        }

        Ok(self.filter.clone().from_block(last_block + 1))
    }
}