ethl 0.1.14

Tools for capturing, processing, archiving, and replaying Ethereum events
Documentation
//! Stream decoded events from RPC providers, with an optional filter
//! Note: The filters events field will be overridden to match the provided events
//!
//! Example usage:
//! ```no_run
//! use ethl::rpc::{events::EventStreamer, config::ProviderSettings, RpcError};
//! use alloy::json_abi::Event;
//! use alloy::rpc::types::Filter;
//! use anyhow::Result;
//! use futures_util::{pin_mut, StreamExt};
//! use tracing::info;
//!
//! #[tokio::main]
//! async fn main() -> Result<()> {
//!     let providers = ProviderSettings::default();
//!     let events = vec![
//!       Event::parse("Transfer(address indexed from, address indexed to, uint256 value)")?,
//!       Event::parse("Approval(address indexed owner, address indexed spender, uint256 value)")?
//!    ];
//!     let filter = Filter::new().from_block(10000000u64);
//!     let streamer = EventStreamer::new(providers, &events);
//!     let stream = streamer.stream(Some(&filter)).await;
//!     pin_mut!(stream);
//!     while let Some(result) = stream.next().await {
//!         let (from_block, to_block, decoded_events) = result?;
//!         info!("Blocks: {} to {}, Events: {}", from_block, to_block, decoded_events.len());
//!         for event in decoded_events {
//!             info!("  Event: {:?}", event);
//!         }
//!     }
//!     Ok(())
//! }
//! ```
use std::collections::HashMap;

use alloy::{dyn_abi::EventExt, json_abi::Event, rpc::types::Filter};
use alloy_primitives::B256;
use async_stream::stream;
use futures_util::{Stream, StreamExt, pin_mut};

use crate::{
    rpc::{
        DecodedEventRange, RpcError, config::ProviderSettings, stream::backfill_then_watch_logs,
    },
    storage::codec::decoder::DecodedEventWithHeader,
};

pub struct EventStreamer {
    providers: ProviderSettings,
    events: HashMap<B256, Event>,
    signatures: Vec<String>,
}

impl EventStreamer {
    pub fn new(providers: ProviderSettings, events: &[Event]) -> Self {
        let signatures = events.iter().map(|e| e.signature()).collect::<Vec<_>>();
        let events_map = events
            .iter()
            .map(|e| (e.selector(), e.clone()))
            .collect::<HashMap<_, _>>();
        Self {
            providers,
            events: events_map,
            signatures,
        }
    }

    pub async fn stream(
        &self,
        filter: Option<&Filter>,
    ) -> impl Stream<Item = Result<DecodedEventRange, RpcError>> {
        stream! {
            let filter = filter.cloned().unwrap_or(Filter::default()).events(&self.signatures);
            let log_stream = backfill_then_watch_logs(&self.providers, Some(&filter)).await;
            pin_mut!(log_stream);
            while let Some(result) = log_stream.next().await {
                match result {
                    Ok((from_block, to_block, logs)) => {
                        let decoded_events = logs.iter().filter_map(|log| {
                            let event = self.events.get(log.topic0()?)?.decode_log(log.data()).ok()?;
                            let log_block = log.block_number?;
                            let log_index = log.log_index? as u32;
                            let log_address = log.address();
                            Some(
                              DecodedEventWithHeader { log_block, log_index, log_address, event }
                            )
                        }).collect::<Vec<_>>();
                        yield Ok((from_block, to_block, decoded_events));
                    }
                    Err(e) => {
                        yield Err(e);
                        return;
                    }
                }
            }
        }
    }
}