use std::future::Future;
use std::time::Duration;
use tokio::time::{Instant, sleep};
use tracing::{error, warn};
use crate::rpc::RpcError;
use crate::rpc::config::ProviderSettings;
pub(crate) fn next_cursor_after_batch(
to_block: u64,
logs_empty: bool,
reported_tip: u64,
) -> Result<u64, RpcError> {
if logs_empty && to_block > reported_tip {
return Err(RpcError::CursorPastTip {
to_block,
reported_tip,
});
}
Ok(to_block)
}
pub(crate) struct TipGate {
last_tip: Option<u64>,
stalled_cycles: u32,
}
pub(crate) enum TipWait {
Wait,
Stall(u32),
}
impl TipGate {
pub(crate) fn new() -> Self {
Self {
last_tip: None,
stalled_cycles: 0,
}
}
pub(crate) fn reset(&mut self) {
self.last_tip = None;
self.stalled_cycles = 0;
}
pub(crate) fn observe(&mut self, reported_tip: u64, max_stall_cycles: u32) -> TipWait {
if self.last_tip.is_none_or(|prev| reported_tip > prev) {
self.last_tip = Some(reported_tip);
self.stalled_cycles = 0;
TipWait::Wait
} else {
self.stalled_cycles += 1;
if self.stalled_cycles >= max_stall_cycles {
TipWait::Stall(self.stalled_cycles)
} else {
TipWait::Wait
}
}
}
}
pub(crate) enum TipWaitOutcome {
CaughtUp,
Terminal {
to_block: u64,
reported_tip: u64,
stalled_cycles: u32,
},
RepollFailed(String),
}
pub(crate) async fn wait_for_tip<F, Fut, E>(
to_block: u64,
initial_tip: u64,
gate: &mut TipGate,
providers: &ProviderSettings,
host: &str,
get_tip: F,
) -> TipWaitOutcome
where
F: Fn() -> Fut,
Fut: Future<Output = Result<u64, E>>,
E: std::fmt::Display,
{
let wait_started = Instant::now();
let mut reported_tip = initial_tip;
let mut wait_cycle: u32 = 0;
loop {
match gate.observe(reported_tip, providers.tip_wait_max_stall_cycles) {
TipWait::Stall(stalled_cycles) => {
error!(
"provider stalled: tip {} did not reach block {} after {} cycles at {}",
reported_tip, to_block, stalled_cycles, host
);
return TipWaitOutcome::Terminal {
to_block,
reported_tip,
stalled_cycles,
};
}
TipWait::Wait => {
wait_cycle += 1;
if providers.tip_wait_max_total_secs > 0
&& wait_started.elapsed()
>= Duration::from_secs(providers.tip_wait_max_total_secs)
{
error!(
"provider advancing but not converging: tip {} did not reach block {} after {} s ({} cycles) at {}",
reported_tip,
to_block,
wait_started.elapsed().as_secs(),
wait_cycle,
host
);
return TipWaitOutcome::Terminal {
to_block,
reported_tip,
stalled_cycles: wait_cycle,
};
}
warn!(
"provider tip {} behind block {} — waiting (cycle {}) at {}",
reported_tip, to_block, wait_cycle, host
);
sleep(Duration::from_secs(providers.tip_wait_backoff_secs)).await;
match get_tip().await {
Ok(new_tip) => {
reported_tip = new_tip;
if reported_tip >= to_block {
return TipWaitOutcome::CaughtUp;
}
}
Err(e) => {
return TipWaitOutcome::RepollFailed(e.to_string());
}
}
}
}
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn tip_gate_first_observation_is_always_wait() {
let mut gate = TipGate::new();
assert!(matches!(gate.observe(100, 3), TipWait::Wait));
}
#[test]
fn tip_gate_strictly_advancing_tip_never_stalls() {
let mut gate = TipGate::new();
for tip in 0u64..20 {
assert!(
matches!(gate.observe(tip, 3), TipWait::Wait),
"tip={tip} should be Wait when strictly advancing"
);
}
}
#[test]
fn tip_gate_flat_tip_stalls_at_bound() {
let mut gate = TipGate::new();
assert!(matches!(gate.observe(100, 3), TipWait::Wait)); assert!(matches!(gate.observe(100, 3), TipWait::Wait)); assert!(matches!(gate.observe(100, 3), TipWait::Wait)); assert!(matches!(gate.observe(100, 3), TipWait::Stall(3))); }
#[test]
fn tip_gate_regressed_tip_counts_as_stall() {
let mut gate = TipGate::new();
assert!(matches!(gate.observe(100, 3), TipWait::Wait));
assert!(matches!(gate.observe(90, 3), TipWait::Wait)); assert!(matches!(gate.observe(90, 3), TipWait::Wait)); assert!(matches!(gate.observe(50, 3), TipWait::Stall(3)));
}
#[test]
fn tip_gate_advance_after_flat_resets_stall_counter() {
let mut gate = TipGate::new();
assert!(matches!(gate.observe(100, 5), TipWait::Wait));
assert!(matches!(gate.observe(100, 5), TipWait::Wait)); assert!(matches!(gate.observe(100, 5), TipWait::Wait)); assert!(matches!(gate.observe(101, 5), TipWait::Wait)); assert!(matches!(gate.observe(101, 5), TipWait::Wait)); assert!(matches!(gate.observe(101, 5), TipWait::Wait)); assert!(matches!(gate.observe(101, 5), TipWait::Wait)); assert!(matches!(gate.observe(101, 5), TipWait::Wait)); assert!(matches!(gate.observe(101, 5), TipWait::Stall(5))); }
#[test]
fn tip_gate_reset_restarts_counting() {
let mut gate = TipGate::new();
assert!(matches!(gate.observe(100, 2), TipWait::Wait));
assert!(matches!(gate.observe(100, 2), TipWait::Wait)); gate.reset();
assert!(matches!(gate.observe(100, 2), TipWait::Wait)); assert!(matches!(gate.observe(100, 2), TipWait::Wait)); assert!(matches!(gate.observe(100, 2), TipWait::Stall(2)));
}
#[test]
fn next_cursor_advances_on_non_empty_batch_regardless_of_tip() {
assert_eq!(next_cursor_after_batch(100, false, 0).unwrap(), 100);
assert_eq!(next_cursor_after_batch(100, false, 50).unwrap(), 100);
assert_eq!(next_cursor_after_batch(100, false, u64::MAX).unwrap(), 100);
}
#[test]
fn next_cursor_advances_on_empty_batch_within_tip() {
assert_eq!(next_cursor_after_batch(100, true, 100).unwrap(), 100);
assert_eq!(next_cursor_after_batch(100, true, 101).unwrap(), 100);
assert_eq!(next_cursor_after_batch(100, true, u64::MAX).unwrap(), 100);
}
#[test]
fn next_cursor_rejects_empty_batch_past_tip() {
match next_cursor_after_batch(100, true, 99) {
Err(RpcError::CursorPastTip {
to_block,
reported_tip,
}) => {
assert_eq!(to_block, 100);
assert_eq!(reported_tip, 99);
}
other => panic!("expected CursorPastTip, got {:?}", other),
}
assert!(next_cursor_after_batch(100, true, 0).is_err());
assert!(next_cursor_after_batch(u64::MAX, true, u64::MAX - 1).is_err());
}
use crate::rpc::config::ProviderSettings;
use crate::rpc::cursor::{TipWaitOutcome, wait_for_tip};
use std::collections::VecDeque;
use std::sync::{Arc, Mutex};
fn tip_queue(
responses: impl IntoIterator<Item = Result<u64, &'static str>>,
) -> impl Fn() -> std::future::Ready<Result<u64, &'static str>> {
let q = Arc::new(Mutex::new(responses.into_iter().collect::<VecDeque<_>>()));
move || {
let val = q
.lock()
.unwrap()
.pop_front()
.expect("tip_queue exhausted — test drove more re-polls than expected");
std::future::ready(val)
}
}
#[tokio::test]
async fn wait_for_tip_catches_up() {
let settings = ProviderSettings::default().with_tip_wait_backoff_secs(0);
let mut gate = TipGate::new();
let outcome =
wait_for_tip(100, 99, &mut gate, &settings, "host", tip_queue([Ok(100)])).await;
assert!(
matches!(outcome, TipWaitOutcome::CaughtUp),
"expected CaughtUp, got non-CaughtUp"
);
}
#[tokio::test]
async fn wait_for_tip_flat_tip_stalls() {
let settings = ProviderSettings::default()
.with_tip_wait_backoff_secs(0)
.with_tip_wait_max_stall_cycles(3);
let mut gate = TipGate::new();
let outcome = wait_for_tip(
100,
99,
&mut gate,
&settings,
"host",
tip_queue([Ok(99), Ok(99), Ok(99)]),
)
.await;
assert!(
matches!(
outcome,
TipWaitOutcome::Terminal {
to_block: 100,
reported_tip: 99,
stalled_cycles: 3,
}
),
"expected Terminal(stalled_cycles=3), got non-Terminal"
);
}
#[tokio::test(start_paused = true)]
async fn wait_for_tip_budget_exceeded_with_advancing_tip() {
let settings = ProviderSettings::default()
.with_tip_wait_backoff_secs(1)
.with_tip_wait_max_total_secs(2)
.with_tip_wait_max_stall_cycles(10);
let mut gate = TipGate::new();
let outcome = wait_for_tip(
100,
95,
&mut gate,
&settings,
"host",
tip_queue([Ok(96), Ok(97)]),
)
.await;
assert!(
matches!(
outcome,
TipWaitOutcome::Terminal {
to_block: 100,
reported_tip: 97,
stalled_cycles: 3,
}
),
"expected Terminal from wall-clock budget (tip advancing), got non-Terminal"
);
}
#[tokio::test]
async fn wait_for_tip_repoll_failure() {
let settings = ProviderSettings::default().with_tip_wait_backoff_secs(0);
let mut gate = TipGate::new();
let outcome = wait_for_tip(
100,
99,
&mut gate,
&settings,
"host",
tip_queue([Err("transport error")]),
)
.await;
assert!(
matches!(outcome, TipWaitOutcome::RepollFailed(ref e) if e.contains("transport error")),
"expected RepollFailed carrying error text, got {:?}",
matches!(outcome, TipWaitOutcome::CaughtUp)
);
}
}