ethl 0.1.14

Tools for capturing, processing, archiving, and replaying Ethereum events
Documentation
use crate::rpc::{
    LogRange, RpcError,
    backfill::backfill,
    config::{HttpRpcSettings, ProviderSettings},
    heads::{last_block, stream_heads_with_logs},
};
use alloy::rpc::types::Filter;
use async_stream::stream;
use futures_util::{Stream, StreamExt, pin_mut};
use tracing::debug;

pub struct LogStreamConfig {
    pub wss_endpoints: Vec<String>,
    pub http_options: Vec<HttpRpcSettings>,
    pub events: Vec<String>,
    pub from_block: u64,
    pub to_block: Option<u64>,
}

pub async fn stream_logs(
    providers: &ProviderSettings,
    filter: Option<&Filter>,
) -> impl Stream<Item = Result<LogRange, RpcError>> {
    stream! {
      let heads = stream_heads_with_logs(providers, filter).await;
      pin_mut!(heads);
      while let Some(result) = heads.next().await {
          yield result;
      }
    }
}

pub async fn backfill_then_watch_logs(
    providers: &ProviderSettings,
    filter: Option<&Filter>,
) -> impl Stream<Item = Result<LogRange, RpcError>> {
    let init_from_block = filter.and_then(|f| f.get_from_block()).unwrap_or(0);
    let init_to_block = filter.and_then(|f| f.get_to_block());

    stream! {
      let mut from_block = init_from_block;
      let mut to_block = match last_block(providers).await {
          Ok(b) => init_to_block.unwrap_or(b - 2),
          Err(e) => {
              yield Err(e);
              return;
          }
      };

      // Loop backfills until lag is resolved
      while to_block > from_block {
          if to_block - from_block < 30 && init_to_block.is_none() {
              break; // No more backfilling needed
          }

          debug!("Starting backfill for block range {} to {}", from_block, to_block);
          let filter = filter.cloned().unwrap_or_else(Filter::new).from_block(from_block).to_block(to_block);

          let stream = backfill(providers, &filter).await;
          pin_mut!(stream);
          while let Some(result) = stream.next().await {
              match result {
                  Ok(range) => {
                      from_block = range.1 + 1;
                      yield Ok(range);
                  }
                  Err(e) => {
                      yield Err(e);
                      return;
                  }
              }
          }

          // Update the to_block for the next iteration
          if init_to_block.is_none() {
              match last_block(providers).await {
                  Ok(b) => to_block = b - 2,
                  Err(e) => {
                      yield Err(e);
                      return;
                  }
              }
          }
      }

      if filter.and_then(|f| f.get_to_block()).is_none() {
        debug!("No more backfilling needed, streaming live logs from block {}", from_block);
        let filter = filter.cloned().unwrap_or_else(Filter::new).from_block(from_block);

        let heads = stream_heads_with_logs(providers, Some(&filter)).await;
        pin_mut!(heads);
        while let Some(result) = heads.next().await {
            yield result;
        }
      }
    }
}