ethl 0.1.14

Tools for capturing, processing, archiving, and replaying Ethereum events
Documentation
use std::{collections::VecDeque, sync::Arc};

use alloy::json_abi::Event;
use anyhow::Result;
use async_stream::stream;
use futures_util::{Stream, StreamExt, stream::LocalBoxStream};
use object_store::{ObjectStore, path::Path};

use crate::storage::{
    codec::decoder::DecodedEventWithHeader, reader::EventStoreReader, store::parse_store_uri,
};

pub struct EventLoader {
    readers: Vec<EventStoreReader>,
}

pub struct BlockEvents {
    pub block_number: u64,
    pub events: Vec<DecodedEventWithHeader>,
}

struct Cursor<'a> {
    stream: LocalBoxStream<'a, Result<VecDeque<DecodedEventWithHeader>>>,
    queue: VecDeque<DecodedEventWithHeader>,
    eof: bool,
}

impl<'a> Cursor<'a> {
    pub fn new(stream: LocalBoxStream<'a, Result<VecDeque<DecodedEventWithHeader>>>) -> Self {
        Cursor {
            stream,
            queue: VecDeque::new(),
            eof: false,
        }
    }

    pub fn next_enqueued_block(&self) -> Option<u64> {
        self.queue.front().map(|e| e.log_block)
    }

    async fn next_block(&mut self) -> Result<Option<u64>> {
        if self.eof {
            return Ok(None);
        }
        if self.queue.is_empty() && !self.buffer().await? {
            return Ok(None);
        }

        Ok(self.next_enqueued_block())
    }

    async fn buffer(&mut self) -> Result<bool> {
        let buffer = self.stream.next().await;
        match buffer {
            Some(Ok(events)) => {
                self.queue = events;
                Ok(true)
            }
            Some(Err(e)) => Err(e),
            None => {
                self.eof = true;
                Ok(false)
            }
        }
    }

    pub async fn block_events(
        &mut self,
        block_number: u64,
    ) -> Result<Option<Vec<DecodedEventWithHeader>>> {
        let next_block = self.next_block().await?;
        if next_block.is_none() {
            return Ok(None); // No more events to read
        }
        let next_block_number = next_block.unwrap();

        // Early terminate
        if next_block_number > block_number {
            return Ok(None);
        }

        let mut events = Vec::new();
        while let Some(block) = self.next_block().await?
            && block == block_number
        {
            events.push(self.queue.pop_front().unwrap());
        }

        Ok(Some(events))
    }
}

impl EventLoader {
    /// Create a new EventLoader for the given store and path
    ///
    /// # Arguments
    ///
    /// - `store`: The object store where the events are stored.
    /// - `path`: The base path within the object store where the events are stored.
    /// - `events`: The events to load.
    ///
    /// # Returns
    ///
    /// An `EventLoader` instance.
    pub fn new(store: Arc<dyn ObjectStore>, path: Path, events: &[Event]) -> Result<Self> {
        let readers = events
            .iter()
            .map(|event| EventStoreReader::new(store.clone(), path.clone(), event))
            .collect::<Result<Vec<_>>>()?;

        Ok(EventLoader { readers })
    }

    /// Create a new EventLoader for the given base URI and events.
    ///
    /// # Arguments
    ///
    /// - `base_uri`: The base URI where the events are stored (e.g. "s3://my-bucket/events" or file:///path).
    /// - `events`: The events to load.
    ///
    /// # Returns
    ///
    /// An `EventLoader` instance.
    pub fn from_uri(base_uri: impl AsRef<str>, events: &[Event]) -> Result<Self> {
        let (store, path) = parse_store_uri(base_uri)?;
        Self::new(Arc::new(store), path, events)
    }

    /// Load events in the specified block range, merging events by block and sorting by log index.
    ///
    /// # Arguments
    ///
    /// - `from`: The starting block number (inclusive). If `None`, starts from the earliest block.
    /// - `to`: The ending block number (inclusive). If `None`, goes to the latest block.
    ///
    /// # Returns
    ///
    /// A stream of `BlockEvents`, each containing the block number and a vector of decoded events for that block.
    pub async fn load_range(
        &self,
        from: Option<u64>,
        to: Option<u64>,
    ) -> impl Stream<Item = BlockEvents> {
        stream! {
          let mut block_cursor = 0;
          let mut cursors: Vec<Cursor> = vec![];

          for reader in &self.readers {
              let stream = reader.stream_deq(from, to).await.expect("Failed to get block events");
              cursors.push(Cursor::new(stream.boxed_local()));
          }

          while !cursors.is_empty() {
              let mut block_events = Vec::new();

              for cursor in &mut cursors {
                  let events = cursor.block_events(block_cursor).await.expect("Failed to get block events");
                  if let Some(events) = events {
                      block_events.extend(events);
                  }
              }
              block_events.sort_by_key(|e| e.log_index);

              cursors.retain(|c| !c.eof);


              // Sparse block event optimization (consider removing)
              if block_events.is_empty() {
                  // no block event, advance to min block cursor
                  let min_blocks = cursors.iter()
                      .filter_map(|c| c.next_enqueued_block())
                      .collect::<Vec<_>>();

                  if let Some(min_block) = min_blocks.iter().min() && min_blocks.len() == cursors.len() {
                      block_cursor = *min_block;
                  } else {
                     block_cursor += 1;
                  }

                  continue;
              }

              yield BlockEvents {
                  block_number: block_cursor,
                  events: block_events,
              };

              block_cursor += 1;
          }
        }
    }

    /// Load all events, merging events by block and sorting by log index. See `load_range` for details.
    pub async fn load_all(&self) -> impl Stream<Item = BlockEvents> {
        self.load_range(None, None).await
    }
}

#[cfg(test)]
mod tests {
    use alloy::dyn_abi::DecodedEvent;
    use alloy_primitives::Address;

    use super::*;

    fn block_events(block_number: u64, count: u64) -> VecDeque<DecodedEventWithHeader> {
        let mut events = VecDeque::new();
        for i in 0..count {
            events.push_back(DecodedEventWithHeader {
                log_block: block_number,
                log_index: i as u32,
                log_address: Address::ZERO,
                event: DecodedEvent {
                    selector: None,
                    indexed: vec![],
                    body: vec![],
                },
            });
        }
        events
    }

    #[tokio::test]
    async fn test_cursor() {
        let seq = vec![
            Ok(block_events(1, 2)),
            Ok(block_events(2, 2)),
            Ok(block_events(2, 2)),
            Ok(block_events(3, 3)),
            Ok(block_events(7, 2)),
        ];
        let stream = futures_util::stream::iter(seq);
        let mut cursor = Cursor::new(stream.boxed_local());

        let mut event_blocks: Vec<u64> = vec![];
        let mut gaps: Vec<u64> = vec![];
        for i in 0..9 {
            let events = cursor.block_events(i).await.unwrap();
            if let Some(events) = &events {
                event_blocks.extend(events.iter().map(|e| e.log_block));
            } else {
                gaps.push(i);
            }
        }

        assert!(cursor.eof);
        assert_eq!(vec![0, 4, 5, 6, 8], gaps);
        assert_eq!(vec![1, 1, 2, 2, 2, 2, 3, 3, 3, 7, 7], event_blocks);
    }
}