use crate::rpc::RpcError;
pub(crate) fn next_cursor_after_batch(
to_block: u64,
logs_empty: bool,
reported_tip: u64,
) -> Result<u64, RpcError> {
if logs_empty && to_block > reported_tip {
return Err(RpcError::CursorPastTip {
to_block,
reported_tip,
});
}
Ok(to_block)
}
pub(crate) struct TipGate {
last_tip: Option<u64>,
stalled_cycles: u32,
}
pub(crate) enum TipWait {
Wait,
Stall(u32),
}
impl TipGate {
pub(crate) fn new() -> Self {
Self {
last_tip: None,
stalled_cycles: 0,
}
}
pub(crate) fn reset(&mut self) {
self.last_tip = None;
self.stalled_cycles = 0;
}
pub(crate) fn observe(&mut self, reported_tip: u64, max_stall_cycles: u32) -> TipWait {
if self.last_tip.is_none_or(|prev| reported_tip > prev) {
self.last_tip = Some(reported_tip);
self.stalled_cycles = 0;
TipWait::Wait
} else {
self.stalled_cycles += 1;
if self.stalled_cycles >= max_stall_cycles {
TipWait::Stall(self.stalled_cycles)
} else {
TipWait::Wait
}
}
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn tip_gate_first_observation_is_always_wait() {
let mut gate = TipGate::new();
assert!(matches!(gate.observe(100, 3), TipWait::Wait));
}
#[test]
fn tip_gate_strictly_advancing_tip_never_stalls() {
let mut gate = TipGate::new();
for tip in 0u64..20 {
assert!(
matches!(gate.observe(tip, 3), TipWait::Wait),
"tip={tip} should be Wait when strictly advancing"
);
}
}
#[test]
fn tip_gate_flat_tip_stalls_at_bound() {
let mut gate = TipGate::new();
assert!(matches!(gate.observe(100, 3), TipWait::Wait)); assert!(matches!(gate.observe(100, 3), TipWait::Wait)); assert!(matches!(gate.observe(100, 3), TipWait::Wait)); assert!(matches!(gate.observe(100, 3), TipWait::Stall(3))); }
#[test]
fn tip_gate_regressed_tip_counts_as_stall() {
let mut gate = TipGate::new();
assert!(matches!(gate.observe(100, 3), TipWait::Wait));
assert!(matches!(gate.observe(90, 3), TipWait::Wait)); assert!(matches!(gate.observe(90, 3), TipWait::Wait)); assert!(matches!(gate.observe(50, 3), TipWait::Stall(3)));
}
#[test]
fn tip_gate_advance_after_flat_resets_stall_counter() {
let mut gate = TipGate::new();
assert!(matches!(gate.observe(100, 5), TipWait::Wait));
assert!(matches!(gate.observe(100, 5), TipWait::Wait)); assert!(matches!(gate.observe(100, 5), TipWait::Wait)); assert!(matches!(gate.observe(101, 5), TipWait::Wait)); assert!(matches!(gate.observe(101, 5), TipWait::Wait)); assert!(matches!(gate.observe(101, 5), TipWait::Wait)); assert!(matches!(gate.observe(101, 5), TipWait::Wait)); assert!(matches!(gate.observe(101, 5), TipWait::Wait)); assert!(matches!(gate.observe(101, 5), TipWait::Stall(5))); }
#[test]
fn tip_gate_reset_restarts_counting() {
let mut gate = TipGate::new();
assert!(matches!(gate.observe(100, 2), TipWait::Wait));
assert!(matches!(gate.observe(100, 2), TipWait::Wait)); gate.reset();
assert!(matches!(gate.observe(100, 2), TipWait::Wait)); assert!(matches!(gate.observe(100, 2), TipWait::Wait)); assert!(matches!(gate.observe(100, 2), TipWait::Stall(2)));
}
#[test]
fn next_cursor_advances_on_non_empty_batch_regardless_of_tip() {
assert_eq!(next_cursor_after_batch(100, false, 0).unwrap(), 100);
assert_eq!(next_cursor_after_batch(100, false, 50).unwrap(), 100);
assert_eq!(next_cursor_after_batch(100, false, u64::MAX).unwrap(), 100);
}
#[test]
fn next_cursor_advances_on_empty_batch_within_tip() {
assert_eq!(next_cursor_after_batch(100, true, 100).unwrap(), 100);
assert_eq!(next_cursor_after_batch(100, true, 101).unwrap(), 100);
assert_eq!(next_cursor_after_batch(100, true, u64::MAX).unwrap(), 100);
}
#[test]
fn next_cursor_rejects_empty_batch_past_tip() {
match next_cursor_after_batch(100, true, 99) {
Err(RpcError::CursorPastTip {
to_block,
reported_tip,
}) => {
assert_eq!(to_block, 100);
assert_eq!(reported_tip, 99);
}
other => panic!("expected CursorPastTip, got {:?}", other),
}
assert!(next_cursor_after_batch(100, true, 0).is_err());
assert!(next_cursor_after_batch(u64::MAX, true, u64::MAX - 1).is_err());
}
}