use std::sync::Arc;
use std::sync::atomic::{AtomicU64, Ordering};
use std::time::Duration;
use anyhow::{Result, anyhow};
use serde_json::{Value, json};
use tracing::{Level, debug, info, warn};
use crate::IngestCfg;
use crate::metrics;
use crate::storage::Storage;
use crate::subscribe::{decode_hash, extract_tx_hashes, fetch_full_block};
macro_rules! event_at {
($level:expr, $($args:tt)*) => {
match $level {
Level::DEBUG => debug!($($args)*),
Level::WARN => warn!($($args)*),
_ => info!($($args)*),
}
};
}
#[derive(Debug, Default)]
struct BackfillProgress {
start_height: Option<u64>,
start_time: Option<std::time::Instant>,
last_logged: u64,
start_behind: u64,
}
impl BackfillProgress {
fn new(contiguous: u64, behind: u64) -> Self {
Self {
start_height: Some(contiguous),
start_time: Some(std::time::Instant::now()),
last_logged: contiguous,
start_behind: behind,
}
}
fn observe(&mut self, contiguous: u64, target: u64, behind: u64) -> bool {
if self.start_height.is_none() {
*self = Self::new(contiguous, behind);
event_at!(
behind_level(behind),
contiguous,
target,
behind,
"backfill starting"
);
return true;
}
if contiguous.saturating_sub(self.last_logged) >= BACKFILL_LOG_EVERY {
self.last_logged = contiguous;
let (rate, eta) = self.eta(contiguous, behind);
info!(
contiguous,
target,
behind,
bps = format_args!("{rate:.2}"),
eta = %eta.map_or_else(|| "?".to_owned(), format_secs),
"backfill progress",
);
}
false
}
fn caught_up(&mut self, contiguous: u64) {
if let (Some(start_h), Some(start_t)) = (self.start_height, self.start_time) {
event_at!(
behind_level(self.start_behind),
blocks = contiguous.saturating_sub(start_h),
elapsed = %format_secs(start_t.elapsed().as_secs()),
"backfill caught up",
);
}
*self = Self::default();
}
#[allow(
clippy::cast_precision_loss,
clippy::cast_sign_loss,
clippy::cast_possible_truncation
)]
fn eta(&self, contiguous: u64, behind: u64) -> (f64, Option<u64>) {
let (Some(start_h), Some(start_t)) = (self.start_height, self.start_time) else {
return (0.0, None);
};
let elapsed = start_t.elapsed().as_secs_f64();
let filled = contiguous.saturating_sub(start_h);
if elapsed <= 0.0 || filled == 0 {
return (0.0, None);
}
let rate = filled as f64 / elapsed;
let eta = (behind as f64 / rate).round() as u64;
(rate, Some(eta))
}
}
const fn behind_level(behind: u64) -> Level {
match behind {
0..=2 => Level::DEBUG,
3..=20 => Level::INFO,
_ => Level::WARN,
}
}
const BACKFILL_LOG_EVERY: u64 = 300;
const SUMMARY_FIRST_DELAY: Duration = Duration::from_secs(5);
pub(crate) async fn summary_loop(
storage: Storage,
period: Duration,
backfill_count: Arc<AtomicU64>,
) {
let mut delay = SUMMARY_FIRST_DELAY;
let mut prev: Option<(u64, std::time::Instant)> = None;
loop {
tokio::time::sleep(delay).await;
delay = period;
let hw = storage.high_water().await;
let mc = storage.max_contiguous_height().await;
let now = std::time::Instant::now();
let backfills = backfill_count.swap(0, Ordering::Relaxed);
let behind = hw.saturating_sub(mc);
match prev {
None => {
info!(
block = hw,
contiguous = mc,
behind,
backfill = backfills,
"summary (startup)",
);
}
Some((prev_hw, prev_t)) => {
let elapsed = now.duration_since(prev_t).as_secs_f64();
let added = hw.saturating_sub(prev_hw);
#[allow(clippy::cast_precision_loss)]
let rate = if elapsed > 0.0 {
added as f64 / elapsed
} else {
0.0
};
info!(
block = hw,
contiguous = mc,
behind,
new = added,
bps = format_args!("{rate:.2}"),
backfill = backfills,
"summary",
);
}
}
prev = Some((hw, now));
}
}
fn format_secs(s: u64) -> String {
if s == 0 {
return "<1s".to_owned();
}
let h = s / 3600;
let m = (s % 3600) / 60;
let sec = s % 60;
if h > 0 {
format!("{h}h{m:02}m")
} else if m > 0 {
format!("{m}m{sec:02}s")
} else {
format!("{sec}s")
}
}
pub(crate) const BACKFILL_INTER_FETCH_MS: u64 = 40;
const BACKFILL_CAUGHT_UP_POLL: Duration = Duration::from_secs(1);
pub(crate) async fn backfill_loop(
storage: Storage,
http: reqwest::Client,
cfg: IngestCfg,
backfill_count: Arc<AtomicU64>,
behind_tip: Arc<AtomicU64>,
) {
wait_for_bootstrap(&cfg).await;
let mut progress = BackfillProgress::default();
loop {
let hw = storage.high_water().await;
if hw == 0 && cfg.backfill_floor.is_none() {
tokio::time::sleep(Duration::from_millis(500)).await;
continue;
}
let upstream = upstream_block_number(&http, &cfg).await.unwrap_or(0);
let target = hw.max(upstream);
let floor = cfg.backfill_floor.unwrap_or(0);
let raw_contiguous = storage.max_contiguous_height().await;
let contiguous = raw_contiguous.max(floor.saturating_sub(1));
let behind = target.saturating_sub(contiguous);
metrics::ingest_heights(
storage.high_water().await.max(contiguous),
contiguous,
behind,
);
if contiguous >= target {
behind_tip.store(0, Ordering::Relaxed);
progress.caught_up(contiguous);
tokio::time::sleep(BACKFILL_CAUGHT_UP_POLL).await;
continue;
}
behind_tip.store(behind, Ordering::Relaxed);
if progress.observe(contiguous, target, behind) {
backfill_count.fetch_add(1, Ordering::Relaxed);
}
backfill_next_block(&storage, &http, &cfg, contiguous.saturating_add(1)).await;
}
}
async fn backfill_next_block(
storage: &Storage,
http: &reqwest::Client,
cfg: &IngestCfg,
next: u64,
) {
if matches!(storage.get_by_height(next).await, Ok(Some(_))) {
return;
}
let Some(block) = fetch_full_block(http, next, cfg).await else {
tokio::time::sleep(Duration::from_secs(1)).await;
return;
};
if let Err(e) = persist_backfilled(storage, next, &block).await {
warn!(height = next, error = %e, "backfill persist failed");
tokio::time::sleep(Duration::from_secs(1)).await;
return;
}
if !cfg.backfill_inter_fetch.is_zero() {
tokio::time::sleep(cfg.backfill_inter_fetch).await;
}
}
async fn wait_for_bootstrap(cfg: &IngestCfg) {
if cfg.subscribe_blocks {
cfg.bootstrap_done.notified().await;
}
}
async fn upstream_block_number(http: &reqwest::Client, cfg: &IngestCfg) -> Option<u64> {
let body = json!({
"jsonrpc": "2.0",
"id": 1,
"method": "eth_blockNumber",
"params": [],
});
let resp = http.post(&cfg.rpc_url).json(&body).send().await.ok()?;
let v = resp.json::<Value>().await.ok()?;
let s = v.get("result")?.as_str()?;
u64::from_str_radix(s.trim_start_matches("0x"), 16).ok()
}
pub(crate) async fn persist_backfilled(
storage: &Storage,
height: u64,
block: &Value,
) -> Result<()> {
let body_hash = block
.get("hash")
.and_then(Value::as_str)
.ok_or_else(|| anyhow!("backfilled block missing hash"))?;
let hash_bytes = decode_hash(body_hash)?;
let tx_hashes = extract_tx_hashes(block);
let bytes = serde_json::to_vec(block)?;
let block_len = bytes.len();
storage.put(height, hash_bytes, &tx_hashes, bytes).await?;
metrics::block_persisted(metrics::BlockSource::Backfill);
debug!(
height,
bytes = block_len,
txs = tx_hashes.len(),
"backfilled block",
);
Ok(())
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn format_secs_buckets() {
assert_eq!(format_secs(0), "<1s");
assert_eq!(format_secs(5), "5s");
assert_eq!(format_secs(59), "59s");
assert_eq!(format_secs(60), "1m00s");
assert_eq!(format_secs(125), "2m05s");
assert_eq!(format_secs(3600), "1h00m");
assert_eq!(format_secs(3 * 3600 + 12 * 60 + 7), "3h12m");
}
#[test]
fn eta_idle_when_no_progress() {
let p = BackfillProgress::default();
let (rate, eta) = p.eta(100, 50);
assert!(rate.abs() < f64::EPSILON, "rate {rate} should be 0");
assert_eq!(eta, None, "no progress yet → ETA unknown");
}
#[test]
fn eta_math_from_known_rate() {
let start_time = std::time::Instant::now()
.checked_sub(Duration::from_secs(2))
.expect("clock can subtract 2s");
let p = BackfillProgress {
start_height: Some(1000),
start_time: Some(start_time),
last_logged: 0,
start_behind: 0,
};
let (rate, eta) = p.eta(1020, 80);
let eta = eta.expect("known rate yields a concrete ETA");
assert!((rate - 10.0).abs() < 1.5, "rate {rate} not near 10");
assert!((6..=10).contains(&eta), "eta {eta} not near 8");
}
}