ethl 0.1.14

Tools for capturing, processing, archiving, and replaying Ethereum events
Documentation
use std::sync::Arc;

use crate::storage::{codec::SolEventCodec, store::EventStore};
use alloy::{json_abi::Event, rpc::types::Log};
use alloy_primitives::B256;
use anyhow::Result;
use object_store::{ObjectStore, path::Path};
use tracing::debug;

/// This struct is used to archive events to an object store.
/// It allows for appending logs and flushing them to the store in batches.
/// It is designed to work with a specific event signature.
pub struct EventStoreWriter {
    store: EventStore,
    batch_size: usize,
    codec: SolEventCodec,
    buffered_from_block: Option<u64>,
    buffered_to_block: Option<u64>,
    selector: B256,
}

impl EventStoreWriter {
    pub fn new(
        store: Arc<dyn ObjectStore>,
        path: Path,
        batch_size: usize,
        event: Event,
    ) -> Result<Self> {
        let codec = SolEventCodec::new(&event)?;
        let store = EventStore::new(store, path, &codec)?;

        Ok(EventStoreWriter {
            store,
            batch_size,
            codec,
            buffered_from_block: None,
            buffered_to_block: None,
            selector: event.selector(),
        })
    }

    pub fn selector(&self) -> B256 {
        self.selector
    }

    pub fn event(&self) -> &Event {
        &self.codec.event
    }

    fn name(&self) -> String {
        self.codec.event.name.to_string()
    }

    pub fn append<'a>(
        &mut self,
        logs: impl Iterator<Item = &'a Log>,
        from_block: u64,
        to_block: u64,
    ) -> Result<()> {
        self.codec.encoder.append_iter(logs)?;
        if self.buffered_from_block.is_none() {
            self.buffered_from_block = Some(from_block);
        }
        self.buffered_to_block = Some(to_block);
        Ok(())
    }

    pub async fn last_block(&self) -> Result<u64> {
        self.store
            .stored_range()
            .await
            .map(|(_, to_block)| to_block)
    }

    pub async fn flush(&mut self) -> Result<()> {
        if self.buffered_from_block.is_none() || self.buffered_to_block.is_none() {
            debug!("No buffered events to flush for {}", self.name());
            return Ok(()); // Nothing to flush
        }

        debug!(
            "flushing {:08} records in block range ({:08}, {:08}) for {}",
            self.codec.encoder.len(),
            self.buffered_from_block.unwrap(),
            self.buffered_to_block.unwrap(),
            self.name(),
        );

        let records = self.codec.encoder.finish()?;
        self.store
            .write_records(
                (
                    self.buffered_from_block.unwrap(),
                    self.buffered_to_block.unwrap(),
                ),
                records,
            )
            .await?;
        self.buffered_from_block = None;
        Ok(())
    }

    pub async fn append_and_maybe_flush<'a>(
        &mut self,
        logs: impl Iterator<Item = &'a Log>,
        from_block: u64,
        to_block: u64,
    ) -> Result<bool> {
        self.append(logs, from_block, to_block)?;
        if self.codec.encoder.len() >= self.batch_size {
            self.flush().await?;
            Ok(true)
        } else {
            Ok(false)
        }
    }
}