ethl 0.1.14

Tools for capturing, processing, archiving, and replaying Ethereum events
Documentation
use std::{collections::VecDeque, sync::Arc};

use crate::storage::{
    codec::{SolEventCodec, decoder::DecodedEventWithHeader},
    store::EventStore,
};
use alloy::json_abi::Event;
use anyhow::Result;
use futures_util::{Stream, StreamExt, stream};
use object_store::{ObjectStore, path::Path};

pub struct EventStoreReader {
    store: EventStore,
    codec: SolEventCodec,
}

impl EventStoreReader {
    pub fn new(store: Arc<dyn ObjectStore>, path: Path, event: &Event) -> Result<Self> {
        let codec = SolEventCodec::new(event)?;
        let store = EventStore::new(store, path, &codec)?;
        Ok(EventStoreReader { store, codec })
    }

    pub async fn stream(
        &self,
        from_block: Option<u64>,
        to_block: Option<u64>,
    ) -> Result<impl Stream<Item = Vec<Result<DecodedEventWithHeader>>>> {
        let stream = self.store.read_range(from_block, to_block).await?;
        Ok(stream.map(|batch| {
            self.codec
                .decoder
                .decode_batch_iter(&batch)
                .collect::<Vec<_>>()
        }))
    }

    pub async fn stream_deq(
        &self,
        from_block: Option<u64>,
        to_block: Option<u64>,
    ) -> Result<impl Stream<Item = Result<VecDeque<DecodedEventWithHeader>>>> {
        let stream = self.store.read_range(from_block, to_block).await?;
        Ok(stream.map(|batch| self.codec.decoder.par_decode_batch_deque(&batch)))
    }

    pub async fn stream_flat(
        &self,
        from_block: Option<u64>,
        to_block: Option<u64>,
    ) -> Result<impl Stream<Item = Result<DecodedEventWithHeader>>> {
        let stream = self.store.read_range(from_block, to_block).await?;
        Ok(stream.flat_map(|batch| {
            let iter = self
                .codec
                .decoder
                .decode_batch_iter(&batch)
                .collect::<Vec<_>>();
            stream::iter(iter)
        }))
    }
}