ethl 0.1.22

Tools for capturing, processing, archiving, and replaying Ethereum events
Documentation
use std::{collections::VecDeque, sync::Arc};

use alloy::json_abi::Event;
use anyhow::Result;
use async_stream::stream;
use futures_util::{Stream, StreamExt, stream::LocalBoxStream};
use object_store::{ObjectStore, path::Path};
use parquet::errors::ParquetError;
use thiserror::Error;

use crate::storage::{
    codec::decoder::DecodedEventWithHeader, reader::EventStoreReader, store::parse_store_uri,
};

#[derive(Debug, Error)]
pub enum ArchiveError {
    #[error("Parquet read error: {0}")]
    ParquetRead(#[source] ParquetError),
    #[error("Decode error: {0}")]
    Decode(#[source] anyhow::Error),
}

pub struct EventLoader {
    readers: Vec<EventStoreReader>,
}

pub struct BlockEvents {
    pub block_number: u64,
    pub events: Vec<DecodedEventWithHeader>,
}

struct Cursor<'a> {
    stream: LocalBoxStream<'a, Result<VecDeque<DecodedEventWithHeader>>>,
    queue: VecDeque<DecodedEventWithHeader>,
    eof: bool,
}

impl<'a> Cursor<'a> {
    pub fn new(stream: LocalBoxStream<'a, Result<VecDeque<DecodedEventWithHeader>>>) -> Self {
        Cursor {
            stream,
            queue: VecDeque::new(),
            eof: false,
        }
    }

    pub fn next_enqueued_block(&self) -> Option<u64> {
        self.queue.front().map(|e| e.log_block)
    }

    async fn next_block(&mut self) -> Result<Option<u64>> {
        if self.eof {
            return Ok(None);
        }
        if self.queue.is_empty() && !self.buffer().await? {
            return Ok(None);
        }

        Ok(self.next_enqueued_block())
    }

    async fn buffer(&mut self) -> Result<bool> {
        let buffer = self.stream.next().await;
        match buffer {
            Some(Ok(events)) => {
                self.queue = events;
                Ok(true)
            }
            Some(Err(e)) => Err(e),
            None => {
                self.eof = true;
                Ok(false)
            }
        }
    }

    pub async fn block_events(
        &mut self,
        block_number: u64,
    ) -> Result<Option<Vec<DecodedEventWithHeader>>> {
        let next_block = self.next_block().await?;
        if next_block.is_none() {
            return Ok(None); // No more events to read
        }
        let next_block_number = next_block.unwrap();

        // Early terminate
        if next_block_number > block_number {
            return Ok(None);
        }

        let mut events = Vec::new();
        while let Some(block) = self.next_block().await?
            && block == block_number
        {
            events.push(self.queue.pop_front().unwrap());
        }

        Ok(Some(events))
    }
}

impl EventLoader {
    /// Create a new EventLoader for the given store and path
    ///
    /// # Arguments
    ///
    /// - `store`: The object store where the events are stored.
    /// - `path`: The base path within the object store where the events are stored.
    /// - `events`: The events to load.
    ///
    /// # Returns
    ///
    /// An `EventLoader` instance.
    pub fn new(store: Arc<dyn ObjectStore>, path: Path, events: &[Event]) -> Result<Self> {
        let readers = events
            .iter()
            .map(|event| EventStoreReader::new(store.clone(), path.clone(), event))
            .collect::<Result<Vec<_>>>()?;

        Ok(EventLoader { readers })
    }

    /// Create a new EventLoader for the given base URI and events.
    ///
    /// # Arguments
    ///
    /// - `base_uri`: The base URI where the events are stored (e.g. "s3://my-bucket/events" or file:///path).
    /// - `events`: The events to load.
    ///
    /// # Returns
    ///
    /// An `EventLoader` instance.
    pub fn from_uri(base_uri: impl AsRef<str>, events: &[Event]) -> Result<Self> {
        let (store, path) = parse_store_uri(base_uri)?;
        Self::new(Arc::new(store), path, events)
    }

    /// Load events in the specified block range, merging events by block and sorting by log index.
    ///
    /// # Arguments
    ///
    /// - `from`: The starting block number (inclusive). If `None`, starts from the earliest block.
    /// - `to`: The ending block number (inclusive). If `None`, goes to the latest block.
    ///
    /// # Returns
    ///
    /// A stream of `Result<BlockEvents, ArchiveError>`. Per-file parquet errors and decode errors
    /// are surfaced as `Err` items rather than panicking; the stream terminates after the first error.
    pub async fn load_range(
        &self,
        from: Option<u64>,
        to: Option<u64>,
    ) -> impl Stream<Item = Result<BlockEvents, ArchiveError>> {
        stream! {
            let mut block_cursor = 0;
            let mut cursors: Vec<Cursor> = vec![];

            for reader in &self.readers {
                match reader.stream_deq(from, to).await {
                    Ok(stream) => cursors.push(Cursor::new(stream.boxed_local())),
                    Err(e) => {
                        yield Err(ArchiveError::Decode(e));
                        return;
                    }
                }
            }

            while !cursors.is_empty() {
                let mut block_events = Vec::new();
                let mut stream_error: Option<ArchiveError> = None;

                'cursors: for cursor in &mut cursors {
                    match cursor.block_events(block_cursor).await {
                        Ok(Some(events)) => block_events.extend(events),
                        Ok(None) => {}
                        Err(e) => {
                            stream_error = Some(classify_read_error(e));
                            break 'cursors;
                        }
                    }
                }

                if let Some(err) = stream_error {
                    yield Err(err);
                    return;
                }

                block_events.sort_by_key(|e| e.log_index);
                cursors.retain(|c| !c.eof);

                // Sparse block event optimization (consider removing)
                if block_events.is_empty() {
                    let min_blocks = cursors.iter()
                        .filter_map(|c| c.next_enqueued_block())
                        .collect::<Vec<_>>();

                    if let Some(min_block) = min_blocks.iter().min() && min_blocks.len() == cursors.len() {
                        block_cursor = *min_block;
                    } else {
                        block_cursor += 1;
                    }

                    continue;
                }

                yield Ok(BlockEvents {
                    block_number: block_cursor,
                    events: block_events,
                });

                block_cursor += 1;
            }
        }
    }
}

/// Maps an `anyhow::Error` from the storage/reader layer to an `ArchiveError`.
/// Parquet errors are preserved as `ParquetRead`; all others become `Decode`.
fn classify_read_error(e: anyhow::Error) -> ArchiveError {
    match e.downcast::<ParquetError>() {
        Ok(pe) => ArchiveError::ParquetRead(pe),
        Err(e) => ArchiveError::Decode(e),
    }
}

#[cfg(test)]
mod tests {
    // TODO(ethl-53a): add an integration test that writes a valid archive, corrupts one parquet
    // file, then asserts load_range yields ArchiveError::ParquetRead. Blocked on a test harness
    // that can inject per-file corruption at the EventLoader level (planned for tokindex #53a).

    use alloy::dyn_abi::DecodedEvent;
    use alloy_primitives::Address;

    use super::*;

    fn block_events(block_number: u64, count: u64) -> VecDeque<DecodedEventWithHeader> {
        let mut events = VecDeque::new();
        for i in 0..count {
            events.push_back(DecodedEventWithHeader {
                log_block: block_number,
                log_index: i as u32,
                log_address: Address::ZERO,
                event: DecodedEvent {
                    selector: None,
                    indexed: vec![],
                    body: vec![],
                },
            });
        }
        events
    }

    #[tokio::test]
    async fn test_cursor() {
        let seq = vec![
            Ok(block_events(1, 2)),
            Ok(block_events(2, 2)),
            Ok(block_events(2, 2)),
            Ok(block_events(3, 3)),
            Ok(block_events(7, 2)),
        ];
        let stream = futures_util::stream::iter(seq);
        let mut cursor = Cursor::new(stream.boxed_local());

        let mut event_blocks: Vec<u64> = vec![];
        let mut gaps: Vec<u64> = vec![];
        for i in 0..9 {
            let events = cursor.block_events(i).await.unwrap();
            if let Some(events) = &events {
                event_blocks.extend(events.iter().map(|e| e.log_block));
            } else {
                gaps.push(i);
            }
        }

        assert!(cursor.eof);
        assert_eq!(vec![0, 4, 5, 6, 8], gaps);
        assert_eq!(vec![1, 1, 2, 2, 2, 2, 3, 3, 3, 7, 7], event_blocks);
    }
}