ethl 0.1.22

Tools for capturing, processing, archiving, and replaying Ethereum events
Documentation
use std::{collections::VecDeque, sync::Arc};

use crate::storage::{
    codec::{SolEventCodec, decoder::DecodedEventWithHeader},
    store::EventStore,
};
use alloy::json_abi::Event;
use anyhow::Result;
use futures_util::{Stream, StreamExt, stream};
use object_store::{ObjectStore, path::Path};

pub struct EventStoreReader {
    store: EventStore,
    codec: SolEventCodec,
}

impl EventStoreReader {
    pub fn new(store: Arc<dyn ObjectStore>, path: Path, event: &Event) -> Result<Self> {
        let codec = SolEventCodec::new(event)?;
        let store = EventStore::new(store, path, &codec)?;
        Ok(EventStoreReader { store, codec })
    }

    pub async fn stream(
        &self,
        from_block: Option<u64>,
        to_block: Option<u64>,
    ) -> Result<impl Stream<Item = Vec<Result<DecodedEventWithHeader>>>> {
        let raw = self.store.read_range(from_block, to_block).await?;
        Ok(raw.map(|batch_result| match batch_result {
            Ok(batch) => self
                .codec
                .decoder
                .decode_batch_iter(&batch)
                .collect::<Vec<_>>(),
            Err(e) => vec![Err(anyhow::Error::from(e))],
        }))
    }

    pub async fn stream_deq(
        &self,
        from_block: Option<u64>,
        to_block: Option<u64>,
    ) -> Result<impl Stream<Item = Result<VecDeque<DecodedEventWithHeader>>>> {
        let raw = self.store.read_range(from_block, to_block).await?;
        Ok(raw.map(|batch_result| {
            let batch = batch_result.map_err(anyhow::Error::from)?;
            self.codec.decoder.par_decode_batch_deque(&batch)
        }))
    }

    pub async fn stream_flat(
        &self,
        from_block: Option<u64>,
        to_block: Option<u64>,
    ) -> Result<impl Stream<Item = Result<DecodedEventWithHeader>>> {
        let raw = self.store.read_range(from_block, to_block).await?;
        Ok(raw.flat_map(|batch_result| match batch_result {
            Ok(batch) => {
                let iter = self
                    .codec
                    .decoder
                    .decode_batch_iter(&batch)
                    .collect::<Vec<_>>();
                stream::iter(iter)
            }
            Err(e) => stream::iter(vec![Err(anyhow::Error::from(e))]),
        }))
    }
}