ethl 0.1.22

Tools for capturing, processing, archiving, and replaying Ethereum events
Documentation
use crate::rpc::RpcError;

/// Decides whether the cursor may advance to `to_block` after a batch response.
///
/// A non-empty batch implies the provider has indexed the queried range; the
/// cursor advances unconditionally. An empty batch is ambiguous — some providers
/// (notably dev nodes) return `Ok([])` for ranges past their actual tip instead
/// of erroring — so advancement requires explicit confirmation from the
/// provider's reported `eth_blockNumber`.
pub(crate) fn next_cursor_after_batch(
    to_block: u64,
    logs_empty: bool,
    reported_tip: u64,
) -> Result<u64, RpcError> {
    if logs_empty && to_block > reported_tip {
        return Err(RpcError::CursorPastTip {
            to_block,
            reported_tip,
        });
    }
    Ok(to_block)
}

/// Tracks whether a lagging provider tip is advancing or stalled across polling cycles.
///
/// Created once per stream before the fetch loop. Call `reset` after each successful
/// yield so the next batch starts with a clean stall counter.
pub(crate) struct TipGate {
    last_tip: Option<u64>,
    stalled_cycles: u32,
}

/// Decision returned by `TipGate::observe` for a single polling cycle.
pub(crate) enum TipWait {
    /// Tip has advanced (or this is the first observation) — keep waiting.
    Wait,
    /// Tip has not advanced for this many consecutive cycles — terminate.
    Stall(u32),
}

impl TipGate {
    pub(crate) fn new() -> Self {
        Self {
            last_tip: None,
            stalled_cycles: 0,
        }
    }

    pub(crate) fn reset(&mut self) {
        self.last_tip = None;
        self.stalled_cycles = 0;
    }

    /// Called once per polling cycle while a batch is past the provider tip
    /// (`to_block > reported_tip`). An advancing tip resets the stall counter
    /// and returns `Wait`; a flat or regressed tip increments the counter and
    /// returns `Stall(n)` once `n >= max_stall_cycles`.
    pub(crate) fn observe(&mut self, reported_tip: u64, max_stall_cycles: u32) -> TipWait {
        if self.last_tip.is_none_or(|prev| reported_tip > prev) {
            self.last_tip = Some(reported_tip);
            self.stalled_cycles = 0;
            TipWait::Wait
        } else {
            self.stalled_cycles += 1;
            if self.stalled_cycles >= max_stall_cycles {
                TipWait::Stall(self.stalled_cycles)
            } else {
                TipWait::Wait
            }
        }
    }
}

#[cfg(test)]
mod tests {
    use super::*;

    // ==================== TipGate tests ====================

    #[test]
    fn tip_gate_first_observation_is_always_wait() {
        let mut gate = TipGate::new();
        assert!(matches!(gate.observe(100, 3), TipWait::Wait));
    }

    #[test]
    fn tip_gate_strictly_advancing_tip_never_stalls() {
        let mut gate = TipGate::new();
        for tip in 0u64..20 {
            assert!(
                matches!(gate.observe(tip, 3), TipWait::Wait),
                "tip={tip} should be Wait when strictly advancing"
            );
        }
    }

    #[test]
    fn tip_gate_flat_tip_stalls_at_bound() {
        let mut gate = TipGate::new();
        assert!(matches!(gate.observe(100, 3), TipWait::Wait)); // seeds last_tip
        assert!(matches!(gate.observe(100, 3), TipWait::Wait)); // stalled_cycles=1
        assert!(matches!(gate.observe(100, 3), TipWait::Wait)); // stalled_cycles=2
        assert!(matches!(gate.observe(100, 3), TipWait::Stall(3))); // stalled_cycles=3 >= 3
    }

    #[test]
    fn tip_gate_regressed_tip_counts_as_stall() {
        let mut gate = TipGate::new();
        assert!(matches!(gate.observe(100, 3), TipWait::Wait));
        assert!(matches!(gate.observe(90, 3), TipWait::Wait)); // regression: stalled_cycles=1
        assert!(matches!(gate.observe(90, 3), TipWait::Wait)); // stalled_cycles=2
        assert!(matches!(gate.observe(50, 3), TipWait::Stall(3)));
    }

    #[test]
    fn tip_gate_advance_after_flat_resets_stall_counter() {
        let mut gate = TipGate::new();
        assert!(matches!(gate.observe(100, 5), TipWait::Wait));
        assert!(matches!(gate.observe(100, 5), TipWait::Wait)); // stalled_cycles=1
        assert!(matches!(gate.observe(100, 5), TipWait::Wait)); // stalled_cycles=2
        assert!(matches!(gate.observe(101, 5), TipWait::Wait)); // advances: resets
        // After reset, a fresh flat run must count from zero.
        assert!(matches!(gate.observe(101, 5), TipWait::Wait)); // stalled_cycles=1
        assert!(matches!(gate.observe(101, 5), TipWait::Wait)); // stalled_cycles=2
        assert!(matches!(gate.observe(101, 5), TipWait::Wait)); // stalled_cycles=3
        assert!(matches!(gate.observe(101, 5), TipWait::Wait)); // stalled_cycles=4
        assert!(matches!(gate.observe(101, 5), TipWait::Stall(5))); // stalled_cycles=5
    }

    #[test]
    fn tip_gate_reset_restarts_counting() {
        let mut gate = TipGate::new();
        assert!(matches!(gate.observe(100, 2), TipWait::Wait));
        assert!(matches!(gate.observe(100, 2), TipWait::Wait)); // stalled_cycles=1
        gate.reset();
        // After reset, the same tip is treated as a fresh first observation.
        assert!(matches!(gate.observe(100, 2), TipWait::Wait)); // seeds again
        assert!(matches!(gate.observe(100, 2), TipWait::Wait)); // stalled_cycles=1
        assert!(matches!(gate.observe(100, 2), TipWait::Stall(2)));
    }

    #[test]
    fn next_cursor_advances_on_non_empty_batch_regardless_of_tip() {
        // Non-empty batches imply the provider indexed the range; the reported
        // tip is not consulted.
        assert_eq!(next_cursor_after_batch(100, false, 0).unwrap(), 100);
        assert_eq!(next_cursor_after_batch(100, false, 50).unwrap(), 100);
        assert_eq!(next_cursor_after_batch(100, false, u64::MAX).unwrap(), 100);
    }

    #[test]
    fn next_cursor_advances_on_empty_batch_within_tip() {
        assert_eq!(next_cursor_after_batch(100, true, 100).unwrap(), 100);
        assert_eq!(next_cursor_after_batch(100, true, 101).unwrap(), 100);
        assert_eq!(next_cursor_after_batch(100, true, u64::MAX).unwrap(), 100);
    }

    #[test]
    fn next_cursor_rejects_empty_batch_past_tip() {
        match next_cursor_after_batch(100, true, 99) {
            Err(RpcError::CursorPastTip {
                to_block,
                reported_tip,
            }) => {
                assert_eq!(to_block, 100);
                assert_eq!(reported_tip, 99);
            }
            other => panic!("expected CursorPastTip, got {:?}", other),
        }
        assert!(next_cursor_after_batch(100, true, 0).is_err());
        assert!(next_cursor_after_batch(u64::MAX, true, u64::MAX - 1).is_err());
    }
}