ethl 0.1.22

Tools for capturing, processing, archiving, and replaying Ethereum events
Documentation
use crate::rpc::{
    LogRange, RpcError,
    backfill::backfill,
    config::{HttpRpcSettings, ProviderSettings},
    heads::{collect_window_hashes, fetch_logs_by_hash_range, last_block, stream_heads_with_logs},
};
use alloy::providers::Provider;
use alloy::rpc::types::{BlockNumberOrTag, Filter};
use async_stream::stream;
use futures_util::{Stream, StreamExt, pin_mut};
use tracing::debug;

pub struct LogStreamConfig {
    pub wss_endpoints: Vec<String>,
    pub http_options: Vec<HttpRpcSettings>,
    pub events: Vec<String>,
    pub from_block: u64,
    pub to_block: Option<u64>,
}

pub async fn stream_logs(
    providers: &ProviderSettings,
    filter: Option<&Filter>,
) -> impl Stream<Item = Result<LogRange, RpcError>> {
    stream! {
      let heads = stream_heads_with_logs(providers, filter).await;
      pin_mut!(heads);
      while let Some(result) = heads.next().await {
          yield result;
      }
    }
}

pub async fn backfill_then_watch_logs(
    providers: &ProviderSettings,
    filter: Option<&Filter>,
) -> impl Stream<Item = Result<LogRange, RpcError>> {
    let init_from_block = filter.and_then(|f| f.get_from_block()).unwrap_or(0);
    let init_to_block = filter.and_then(|f| f.get_to_block());
    let depth = providers.reorg_detect_depth;

    stream! {
      let mut from_block = init_from_block;
      let mut tip = match last_block(providers).await {
          Ok(b) => init_to_block.unwrap_or(b - 2),
          Err(e) => {
              yield Err(e);
              return;
          }
      };

      // The reorg-exposed window boundary: blocks above this number are in the
      // window and must be fetched by hash to prevent orphaned data from leaking
      // through. When depth == 0, window_floor is u64::MAX (no window).
      let window_floor = if depth > 0 { tip.saturating_sub(depth) } else { u64::MAX };

      // Loop backfills until lag is resolved. When reorg detection is active the
      // loop stops at the window boundary for unbounded (live) fetches; a bounded
      // historical fetch is not the reorg-exposed live path, so its whole range is
      // fetched by number without a floor stop. When disabled, the legacy < 30 block
      // dormancy threshold applies. The two cutoffs do not interact.
      while tip > from_block {
          let stop_condition = if depth > 0 {
              // Only apply the window-floor stop for unbounded live fetches.
              // A bounded historical fetch (init_to_block.is_some()) must not stop
              // at the floor — doing so silently drops [window_floor, to_block].
              init_to_block.is_none() && from_block >= window_floor
          } else {
              // Legacy: stop when close enough to hand off to the live stream.
              tip - from_block < 30 && init_to_block.is_none()
          };
          if stop_condition {
              break;
          }

          // For unbounded live fetches: clamp below the window (by-hash tail follows).
          // For bounded historical fetches: fetch to tip without clamping.
          let backfill_to = if depth > 0 && init_to_block.is_none() {
              tip.min(window_floor.saturating_sub(1))
          } else {
              tip
          };

          if backfill_to < from_block {
              break;
          }

          debug!("Starting backfill for block range {} to {}", from_block, backfill_to);
          let batch_filter = filter.cloned().unwrap_or_else(Filter::new).from_block(from_block).to_block(backfill_to);

          let stream = backfill(providers, &batch_filter).await;
          pin_mut!(stream);
          while let Some(result) = stream.next().await {
              match result {
                  Ok(range) => {
                      from_block = range.1 + 1;
                      yield Ok(range);
                  }
                  Err(e) => {
                      yield Err(e);
                      return;
                  }
              }
          }

          // Update tip for the next iteration (only when not bounded by an explicit to_block).
          if init_to_block.is_none() {
              match last_block(providers).await {
                  Ok(b) => tip = b - 2,
                  Err(e) => {
                      yield Err(e);
                      return;
                  }
              }
          }
      }

      // If reorg detection is active, fetch the in-window tail [from_block, tip]
      // by hash. This ensures no orphaned blocks can contaminate events in the
      // reorg-exposed zone before the live stream takes over.
      if depth > 0 && from_block <= tip && init_to_block.is_none() {
          debug!(
              "Fetching reorg-exposed window [{}, {}] by canonical block hash",
              from_block, tip
          );
          // Obtain the tip block to get its hash, then walk backward.
          let http_provider = providers.connect_http(0);
          let tip_block = match http_provider
              .get_block_by_number(BlockNumberOrTag::Number(tip))
              .await
          {
              Ok(Some(block)) => block,
              Ok(None) => {
                  yield Err(RpcError::ConnectionError(format!(
                      "tip block {} not found during window hash walk",
                      tip
                  )));
                  return;
              }
              Err(e) => {
                  yield Err(RpcError::ConnectionError(format!(
                      "failed to fetch tip block {}: {}",
                      tip, e
                  )));
                  return;
              }
          };
          let tip_hash = tip_block.header.hash;

          // Walk backward from tip to from_block collecting canonical (number, hash) pairs.
          let pairs_desc = match collect_window_hashes(
              &http_provider,
              tip_hash,
              tip,
              from_block,
          )
          .await
          {
              Ok(pairs) => pairs,
              Err(e) => {
                  yield Err(e);
                  return;
              }
          };

          // Reverse to ascending order (from_block → tip) for sequential fetch.
          let mut pairs = pairs_desc;
          pairs.reverse();

          // Fetch logs per block pinned to the canonical hash.
          match fetch_logs_by_hash_range(providers, &pairs, filter).await {
              Ok(ranges) => {
                  for range in ranges {
                      from_block = range.1 + 1;
                      yield Ok(range);
                  }
              }
              Err(e) => {
                  yield Err(e);
                  return;
              }
          }
      }

      if filter.and_then(|f| f.get_to_block()).is_none() {
        debug!("No more backfilling needed, streaming live logs from block {}", from_block);
        let live_filter = filter.cloned().unwrap_or_else(Filter::new).from_block(from_block);

        let heads = stream_heads_with_logs(providers, Some(&live_filter)).await;
        pin_mut!(heads);
        while let Some(result) = heads.next().await {
            yield result;
        }
      }
    }
}

#[cfg(test)]
mod tests {
    /// Bounded historical fetch (`init_to_block.is_some()`) with `depth > 0` must
    /// not apply the window-floor stop or clamp `backfill_to` below the requested
    /// `to_block`. Doing so silently drops `[window_floor, to_block]`.
    ///
    /// This test exercises the predicate logic directly, mirroring `window_boundary`
    /// in heads.rs. The full stream I/O is covered by integration tests.
    #[test]
    fn bounded_fetch_with_depth_no_window_floor_stop() {
        let tip: u64 = 200;
        let depth: u64 = 10;
        let init_to_block: Option<u64> = Some(200);
        let window_floor = tip.saturating_sub(depth); // 190

        // At window_floor (190), a bounded fetch must NOT stop.
        let from_block = window_floor;
        let stop_condition = if depth > 0 {
            init_to_block.is_none() && from_block >= window_floor
        } else {
            tip - from_block < 30 && init_to_block.is_none()
        };
        assert!(
            !stop_condition,
            "bounded fetch must not stop at the window floor (would drop blocks)"
        );

        // backfill_to must not be clamped for a bounded fetch — fetch to tip.
        let backfill_to = if depth > 0 && init_to_block.is_none() {
            tip.min(window_floor.saturating_sub(1))
        } else {
            tip
        };
        assert_eq!(
            backfill_to, tip,
            "bounded fetch must backfill to tip, not window_floor - 1"
        );

        // Unbounded fetch (init_to_block == None) still stops at the floor.
        let unbounded_stop = if depth > 0 {
            None::<u64>.is_none() && from_block >= window_floor
        } else {
            false
        };
        assert!(unbounded_stop, "unbounded fetch must stop at window floor");
    }
}