ethl 0.1.23

Tools for capturing, processing, archiving, and replaying Ethereum events
Documentation
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
use std::future::Future;
use std::time::Duration;

use tokio::time::{Instant, sleep};
use tracing::{error, warn};

use crate::rpc::RpcError;
use crate::rpc::config::ProviderSettings;

/// Decides whether the cursor may advance to `to_block` after a batch response.
///
/// A non-empty batch implies the provider has indexed the queried range; the
/// cursor advances unconditionally. An empty batch is ambiguous — some providers
/// (notably dev nodes) return `Ok([])` for ranges past their actual tip instead
/// of erroring — so advancement requires explicit confirmation from the
/// provider's reported `eth_blockNumber`.
pub(crate) fn next_cursor_after_batch(
    to_block: u64,
    logs_empty: bool,
    reported_tip: u64,
) -> Result<u64, RpcError> {
    if logs_empty && to_block > reported_tip {
        return Err(RpcError::CursorPastTip {
            to_block,
            reported_tip,
        });
    }
    Ok(to_block)
}

/// Tracks whether a lagging provider tip is advancing or stalled across polling cycles.
///
/// Created once per stream before the fetch loop. Call `reset` after each successful
/// yield so the next batch starts with a clean stall counter.
pub(crate) struct TipGate {
    last_tip: Option<u64>,
    stalled_cycles: u32,
}

/// Decision returned by `TipGate::observe` for a single polling cycle.
pub(crate) enum TipWait {
    /// Tip has advanced (or this is the first observation) — keep waiting.
    Wait,
    /// Tip has not advanced for this many consecutive cycles — terminate.
    Stall(u32),
}

impl TipGate {
    pub(crate) fn new() -> Self {
        Self {
            last_tip: None,
            stalled_cycles: 0,
        }
    }

    pub(crate) fn reset(&mut self) {
        self.last_tip = None;
        self.stalled_cycles = 0;
    }

    /// Called once per polling cycle while a batch is past the provider tip
    /// (`to_block > reported_tip`). An advancing tip resets the stall counter
    /// and returns `Wait`; a flat or regressed tip increments the counter and
    /// returns `Stall(n)` once `n >= max_stall_cycles`.
    pub(crate) fn observe(&mut self, reported_tip: u64, max_stall_cycles: u32) -> TipWait {
        if self.last_tip.is_none_or(|prev| reported_tip > prev) {
            self.last_tip = Some(reported_tip);
            self.stalled_cycles = 0;
            TipWait::Wait
        } else {
            self.stalled_cycles += 1;
            if self.stalled_cycles >= max_stall_cycles {
                TipWait::Stall(self.stalled_cycles)
            } else {
                TipWait::Wait
            }
        }
    }
}

/// Outcome returned by `wait_for_tip` after one tip-wait episode.
pub(crate) enum TipWaitOutcome {
    /// Provider tip reached `to_block` — caller re-fetches the same range without
    /// advancing the cursor.
    CaughtUp,
    /// Episode terminated: either the tip was flat for `tip_wait_max_stall_cycles`
    /// consecutive cycles, or the wall-clock budget (`tip_wait_max_total_secs`) was
    /// exhausted while the tip was still advancing. Caller yields `ProviderStalled`.
    Terminal {
        to_block: u64,
        reported_tip: u64,
        stalled_cycles: u32,
    },
    /// Tip re-poll failed. Carries the error as a string so the caller can log
    /// it with site-specific context (`from_block`, provider identity) in one place.
    RepollFailed(String),
}

/// Polls a lagging provider tip until it reaches `to_block` or a termination condition fires.
///
/// Invoked at each empty-batch guard site after `next_cursor_after_batch` returns
/// `CursorPastTip`. Drives the stall gate and the wall-clock budget and sleeps between
/// polls. The caller is responsible for `gate.reset()` after a subsequent successful yield.
pub(crate) async fn wait_for_tip<F, Fut, E>(
    to_block: u64,
    initial_tip: u64,
    gate: &mut TipGate,
    providers: &ProviderSettings,
    host: &str,
    get_tip: F,
) -> TipWaitOutcome
where
    F: Fn() -> Fut,
    Fut: Future<Output = Result<u64, E>>,
    E: std::fmt::Display,
{
    let wait_started = Instant::now();
    let mut reported_tip = initial_tip;
    let mut wait_cycle: u32 = 0;
    loop {
        match gate.observe(reported_tip, providers.tip_wait_max_stall_cycles) {
            TipWait::Stall(stalled_cycles) => {
                error!(
                    "provider stalled: tip {} did not reach block {} after {} cycles at {}",
                    reported_tip, to_block, stalled_cycles, host
                );
                return TipWaitOutcome::Terminal {
                    to_block,
                    reported_tip,
                    stalled_cycles,
                };
            }
            TipWait::Wait => {
                wait_cycle += 1;
                if providers.tip_wait_max_total_secs > 0
                    && wait_started.elapsed()
                        >= Duration::from_secs(providers.tip_wait_max_total_secs)
                {
                    error!(
                        "provider advancing but not converging: tip {} did not reach block {} after {} s ({} cycles) at {}",
                        reported_tip,
                        to_block,
                        wait_started.elapsed().as_secs(),
                        wait_cycle,
                        host
                    );
                    return TipWaitOutcome::Terminal {
                        to_block,
                        reported_tip,
                        stalled_cycles: wait_cycle,
                    };
                }
                warn!(
                    "provider tip {} behind block {} — waiting (cycle {}) at {}",
                    reported_tip, to_block, wait_cycle, host
                );
                sleep(Duration::from_secs(providers.tip_wait_backoff_secs)).await;
                match get_tip().await {
                    Ok(new_tip) => {
                        reported_tip = new_tip;
                        if reported_tip >= to_block {
                            return TipWaitOutcome::CaughtUp;
                        }
                    }
                    Err(e) => {
                        return TipWaitOutcome::RepollFailed(e.to_string());
                    }
                }
            }
        }
    }
}

#[cfg(test)]
mod tests {
    use super::*;

    // ==================== TipGate tests ====================

    #[test]
    fn tip_gate_first_observation_is_always_wait() {
        let mut gate = TipGate::new();
        assert!(matches!(gate.observe(100, 3), TipWait::Wait));
    }

    #[test]
    fn tip_gate_strictly_advancing_tip_never_stalls() {
        let mut gate = TipGate::new();
        for tip in 0u64..20 {
            assert!(
                matches!(gate.observe(tip, 3), TipWait::Wait),
                "tip={tip} should be Wait when strictly advancing"
            );
        }
    }

    #[test]
    fn tip_gate_flat_tip_stalls_at_bound() {
        let mut gate = TipGate::new();
        assert!(matches!(gate.observe(100, 3), TipWait::Wait)); // seeds last_tip
        assert!(matches!(gate.observe(100, 3), TipWait::Wait)); // stalled_cycles=1
        assert!(matches!(gate.observe(100, 3), TipWait::Wait)); // stalled_cycles=2
        assert!(matches!(gate.observe(100, 3), TipWait::Stall(3))); // stalled_cycles=3 >= 3
    }

    #[test]
    fn tip_gate_regressed_tip_counts_as_stall() {
        let mut gate = TipGate::new();
        assert!(matches!(gate.observe(100, 3), TipWait::Wait));
        assert!(matches!(gate.observe(90, 3), TipWait::Wait)); // regression: stalled_cycles=1
        assert!(matches!(gate.observe(90, 3), TipWait::Wait)); // stalled_cycles=2
        assert!(matches!(gate.observe(50, 3), TipWait::Stall(3)));
    }

    #[test]
    fn tip_gate_advance_after_flat_resets_stall_counter() {
        let mut gate = TipGate::new();
        assert!(matches!(gate.observe(100, 5), TipWait::Wait));
        assert!(matches!(gate.observe(100, 5), TipWait::Wait)); // stalled_cycles=1
        assert!(matches!(gate.observe(100, 5), TipWait::Wait)); // stalled_cycles=2
        assert!(matches!(gate.observe(101, 5), TipWait::Wait)); // advances: resets
        // After reset, a fresh flat run must count from zero.
        assert!(matches!(gate.observe(101, 5), TipWait::Wait)); // stalled_cycles=1
        assert!(matches!(gate.observe(101, 5), TipWait::Wait)); // stalled_cycles=2
        assert!(matches!(gate.observe(101, 5), TipWait::Wait)); // stalled_cycles=3
        assert!(matches!(gate.observe(101, 5), TipWait::Wait)); // stalled_cycles=4
        assert!(matches!(gate.observe(101, 5), TipWait::Stall(5))); // stalled_cycles=5
    }

    #[test]
    fn tip_gate_reset_restarts_counting() {
        let mut gate = TipGate::new();
        assert!(matches!(gate.observe(100, 2), TipWait::Wait));
        assert!(matches!(gate.observe(100, 2), TipWait::Wait)); // stalled_cycles=1
        gate.reset();
        // After reset, the same tip is treated as a fresh first observation.
        assert!(matches!(gate.observe(100, 2), TipWait::Wait)); // seeds again
        assert!(matches!(gate.observe(100, 2), TipWait::Wait)); // stalled_cycles=1
        assert!(matches!(gate.observe(100, 2), TipWait::Stall(2)));
    }

    #[test]
    fn next_cursor_advances_on_non_empty_batch_regardless_of_tip() {
        // Non-empty batches imply the provider indexed the range; the reported
        // tip is not consulted.
        assert_eq!(next_cursor_after_batch(100, false, 0).unwrap(), 100);
        assert_eq!(next_cursor_after_batch(100, false, 50).unwrap(), 100);
        assert_eq!(next_cursor_after_batch(100, false, u64::MAX).unwrap(), 100);
    }

    #[test]
    fn next_cursor_advances_on_empty_batch_within_tip() {
        assert_eq!(next_cursor_after_batch(100, true, 100).unwrap(), 100);
        assert_eq!(next_cursor_after_batch(100, true, 101).unwrap(), 100);
        assert_eq!(next_cursor_after_batch(100, true, u64::MAX).unwrap(), 100);
    }

    #[test]
    fn next_cursor_rejects_empty_batch_past_tip() {
        match next_cursor_after_batch(100, true, 99) {
            Err(RpcError::CursorPastTip {
                to_block,
                reported_tip,
            }) => {
                assert_eq!(to_block, 100);
                assert_eq!(reported_tip, 99);
            }
            other => panic!("expected CursorPastTip, got {:?}", other),
        }
        assert!(next_cursor_after_batch(100, true, 0).is_err());
        assert!(next_cursor_after_batch(u64::MAX, true, u64::MAX - 1).is_err());
    }

    // ==================== wait_for_tip tests ====================
    //
    // These tests drive wait_for_tip directly via a pre-loaded tip queue, bypassing
    // the provider stack entirely. Each test exercises a distinct termination path;
    // removing the corresponding code branch causes a different outcome (wrong variant
    // or panic) — none of these tests can pass vacuously.

    use crate::rpc::config::ProviderSettings;
    use crate::rpc::cursor::{TipWaitOutcome, wait_for_tip};
    use std::collections::VecDeque;
    use std::sync::{Arc, Mutex};

    /// Build a closure that pops sequential tip values from `responses`.
    /// Panics if called after the queue is empty — deliberate, so a test that drives
    /// too many cycles fails loudly rather than silently succeeding on a stale value.
    fn tip_queue(
        responses: impl IntoIterator<Item = Result<u64, &'static str>>,
    ) -> impl Fn() -> std::future::Ready<Result<u64, &'static str>> {
        let q = Arc::new(Mutex::new(responses.into_iter().collect::<VecDeque<_>>()));
        move || {
            let val = q
                .lock()
                .unwrap()
                .pop_front()
                .expect("tip_queue exhausted — test drove more re-polls than expected");
            std::future::ready(val)
        }
    }

    /// Tip catches up on the first re-poll → `CaughtUp`.
    ///
    /// Non-vacuous: removing the `if reported_tip >= to_block { return CaughtUp }` branch
    /// causes the queue to exhaust → panic, not `CaughtUp`.
    #[tokio::test]
    async fn wait_for_tip_catches_up() {
        let settings = ProviderSettings::default().with_tip_wait_backoff_secs(0);
        let mut gate = TipGate::new();
        // initial_tip=99 < to_block=100; first re-poll returns 100 → caught up.
        let outcome =
            wait_for_tip(100, 99, &mut gate, &settings, "host", tip_queue([Ok(100)])).await;
        assert!(
            matches!(outcome, TipWaitOutcome::CaughtUp),
            "expected CaughtUp, got non-CaughtUp"
        );
    }

    /// Tip stays flat for `max_stall_cycles` → `Terminal` via the flat-tip gate.
    ///
    /// Non-vacuous: removing `TipWait::Stall` → `Terminal` causes the loop to spin
    /// past the stall threshold into more re-polls, exhausting the queue → panic.
    #[tokio::test]
    async fn wait_for_tip_flat_tip_stalls() {
        // max_stall_cycles=3: seed observe(99)→Wait, then 3 flat re-polls → Stall(3).
        let settings = ProviderSettings::default()
            .with_tip_wait_backoff_secs(0)
            .with_tip_wait_max_stall_cycles(3);
        let mut gate = TipGate::new();
        let outcome = wait_for_tip(
            100,
            99,
            &mut gate,
            &settings,
            "host",
            // 3 re-polls all return 99 (flat). A 4th call would panic — proves stall fired at 3.
            tip_queue([Ok(99), Ok(99), Ok(99)]),
        )
        .await;
        assert!(
            matches!(
                outcome,
                TipWaitOutcome::Terminal {
                    to_block: 100,
                    reported_tip: 99,
                    stalled_cycles: 3,
                }
            ),
            "expected Terminal(stalled_cycles=3), got non-Terminal"
        );
    }

    /// Tip advances every cycle but never converges → `Terminal` via wall-clock budget.
    ///
    /// Uses `start_paused = true`: each `sleep(1s)` auto-advances virtual time by 1s,
    /// so the test is deterministic and runs near-instantly.
    ///
    /// Non-vacuous: removing the budget check causes the loop to consume all queued
    /// tips without returning `Terminal`, then the queue exhausts → panic. The flat-tip
    /// gate cannot fire here because the tip advances on every re-poll.
    #[tokio::test(start_paused = true)]
    async fn wait_for_tip_budget_exceeded_with_advancing_tip() {
        // backoff=1s so sleep suspends and virtual time auto-advances; budget=2s fires
        // after 2 sleeps. max_stall_cycles=10 ensures the flat-tip gate cannot trigger
        // (tip advances 95→96→97, never flat).
        let settings = ProviderSettings::default()
            .with_tip_wait_backoff_secs(1)
            .with_tip_wait_max_total_secs(2)
            .with_tip_wait_max_stall_cycles(10);
        let mut gate = TipGate::new();
        // 2 re-polls: 96 (cycle 1, T→1s), 97 (cycle 2, T→2s). Budget fires at cycle 3 before
        // a 3rd re-poll — providing a 3rd Ok would mean the budget check didn't fire.
        let outcome = wait_for_tip(
            100,
            95,
            &mut gate,
            &settings,
            "host",
            tip_queue([Ok(96), Ok(97)]),
        )
        .await;
        assert!(
            matches!(
                outcome,
                TipWaitOutcome::Terminal {
                    to_block: 100,
                    reported_tip: 97,
                    stalled_cycles: 3,
                }
            ),
            "expected Terminal from wall-clock budget (tip advancing), got non-Terminal"
        );
    }

    /// Re-poll error → `RepollFailed`.
    ///
    /// Non-vacuous: removing the `Err(e) => return RepollFailed` arm would cause the
    /// function to silently ignore the error and loop, exhausting the queue → panic.
    #[tokio::test]
    async fn wait_for_tip_repoll_failure() {
        let settings = ProviderSettings::default().with_tip_wait_backoff_secs(0);
        let mut gate = TipGate::new();
        let outcome = wait_for_tip(
            100,
            99,
            &mut gate,
            &settings,
            "host",
            tip_queue([Err("transport error")]),
        )
        .await;
        assert!(
            matches!(outcome, TipWaitOutcome::RepollFailed(ref e) if e.contains("transport error")),
            "expected RepollFailed carrying error text, got {:?}",
            matches!(outcome, TipWaitOutcome::CaughtUp)
        );
    }
}