use std::collections::VecDeque;
use std::time::{Duration, Instant};
use anyhow::{Context, Result, anyhow, bail};
use futures_util::stream::{SplitSink, SplitStream};
use futures_util::{SinkExt, StreamExt};
use serde_json::{Value, json};
use tokio::net::TcpStream;
use tokio::sync::broadcast;
use tokio_tungstenite::tungstenite::Message;
use tokio_tungstenite::tungstenite::client::IntoClientRequest;
use tokio_tungstenite::tungstenite::error::Error as TungError;
use tokio_tungstenite::{MaybeTlsStream, WebSocketStream, connect_async};
use tracing::{debug, error, info, warn};
use crate::IngestCfg;
use crate::backfill::persist_backfilled;
use crate::metrics::{self, UpstreamOutcome};
use crate::storage::Storage;
type WsStream = WebSocketStream<MaybeTlsStream<TcpStream>>;
type WsTx = SplitSink<WsStream, Message>;
type WsRx = SplitStream<WsStream>;
pub(crate) const BROWSER_UA: &str = "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 \
(KHTML, like Gecko) Chrome/131.0.0.0 Safari/537.36";
#[derive(Debug)]
enum WsEvent {
NewHead { height: u64, hash: String },
NewBlock {
height: u64,
hash: String,
block: Value,
},
}
#[derive(Debug)]
enum WsFrame {
Event(WsEvent),
Reply { id: u64, result: Option<Value> },
}
pub(crate) async fn ingest(storage: Storage, http: reqwest::Client, cfg: IngestCfg) -> Result<()> {
if cfg.subscribe_blocks {
if let Err(e) = bootstrap_via_oldblocks(&storage, &http, &cfg).await {
warn!(error = ?e, "oldBlocks bootstrap incomplete; backfill will fill the remainder");
}
cfg.bootstrap_done.notify_one();
}
let mut attempt: u32 = 0;
loop {
match run_session(&storage, &cfg).await {
Ok(()) => {
info!("websocket session ended cleanly, reconnecting");
attempt = 0;
}
Err(e) => {
warn!(error = ?e, attempt, "websocket session failed");
attempt = attempt.saturating_add(1);
}
}
metrics::ws_reconnect();
let backoff_ms = 500u64.saturating_mul(1u64 << attempt.min(6)).min(30_000);
tokio::time::sleep(Duration::from_millis(backoff_ms)).await;
}
}
async fn run_session(storage: &Storage, cfg: &IngestCfg) -> Result<()> {
let (mut tx, mut rx) = connect_and_subscribe(cfg).await?;
let mut pending: VecDeque<WsEvent> = VecDeque::new();
let mut next_id: u64 = 2;
loop {
let event = if let Some(ev) = pending.pop_front() {
ev
} else {
match tokio::time::timeout(cfg.ws_idle_timeout, next_frame(&mut tx, &mut rx)).await {
Ok(Some(WsFrame::Event(ev))) => ev,
Ok(Some(WsFrame::Reply { .. })) => continue,
Ok(None) => break,
Err(_elapsed) => {
metrics::ws_idle_timeout();
return Err(anyhow!(
"no newHeads within {}s idle timeout; reconnecting",
cfg.ws_idle_timeout.as_secs(),
));
}
}
};
let (height, hash, block) = match event {
WsEvent::NewBlock {
height,
hash,
block,
} => {
debug!(height, %hash, "new block (full)");
(height, hash, block)
}
WsEvent::NewHead { height, hash } => {
debug!(height, %hash, "new head");
let id = next_id;
next_id = next_id.wrapping_add(1);
match fetch_block_over_ws(&mut tx, &mut rx, &mut pending, id, height, cfg).await? {
Some(block) => (height, hash, block),
None => continue,
}
}
};
persist_block(storage, height, &hash, &block, &cfg.blocks).await?;
}
Ok(())
}
async fn fetch_block_over_ws(
tx: &mut WsTx,
rx: &mut WsRx,
pending: &mut VecDeque<WsEvent>,
id: u64,
height: u64,
cfg: &IngestCfg,
) -> Result<Option<Value>> {
let req = json!({
"jsonrpc": "2.0",
"id": id,
"method": "eth_getBlockByNumber",
"params": [format!("0x{height:x}"), true],
});
let started = Instant::now();
tx.send(Message::Text(req.to_string().into()))
.await
.context("sending eth_getBlockByNumber over websocket")?;
loop {
let frame = match tokio::time::timeout(cfg.ws_idle_timeout, next_frame(tx, rx)).await {
Ok(Some(f)) => f,
Ok(None) => return Ok(None),
Err(_elapsed) => {
metrics::ws_idle_timeout();
return Err(anyhow!(
"no getBlockByNumber reply within {}s idle timeout; reconnecting",
cfg.ws_idle_timeout.as_secs(),
));
}
};
match frame {
WsFrame::Reply { id: rid, result } if rid == id => {
let outcome = if result.is_some() {
UpstreamOutcome::Ok
} else {
UpstreamOutcome::Empty
};
metrics::upstream_request(outcome, started.elapsed().as_secs_f64());
if result.is_none() {
debug!(
height,
"ws getBlockByNumber returned null; leaving for backfill"
);
}
return Ok(result);
}
WsFrame::Event(ev) => pending.push_back(ev),
WsFrame::Reply { .. } => {}
}
}
}
async fn connect_ws(cfg: &IngestCfg) -> Result<(WsTx, WsRx)> {
info!(url = %cfg.ws_url, "connecting websocket");
let mut req = cfg.ws_url.as_str().into_client_request()?;
req.headers_mut().insert(
"User-Agent",
BROWSER_UA
.parse()
.context("BROWSER_UA is not a valid header value")?,
);
let ws = match connect_async(req).await {
Ok((ws, _)) => ws,
Err(TungError::Http(resp))
if resp.status() == http::StatusCode::TOO_MANY_REQUESTS
|| resp.status() == http::StatusCode::SERVICE_UNAVAILABLE =>
{
let retry_after = retry_after_from_headers(resp.headers()).unwrap_or(5);
handle_throttle(
cfg,
"websocket connect",
retry_after,
resp.status().as_u16(),
)
.await;
return Err(anyhow!("ws throttled (slept {retry_after}s, retrying)"));
}
Err(e) => return Err(anyhow::Error::from(e).context("connecting websocket")),
};
Ok(ws.split())
}
async fn connect_and_subscribe(cfg: &IngestCfg) -> Result<(WsTx, WsRx)> {
let (mut tx, rx) = connect_ws(cfg).await?;
let kind = if cfg.subscribe_blocks {
"newBlocks"
} else {
"newHeads"
};
tx.send(Message::Text(
json!({
"jsonrpc": "2.0",
"id": 1,
"method": "eth_subscribe",
"params": [kind],
})
.to_string()
.into(),
))
.await?;
metrics::upstream_connected();
Ok((tx, rx))
}
async fn bootstrap_via_oldblocks(
storage: &Storage,
http: &reqwest::Client,
cfg: &IngestCfg,
) -> Result<()> {
let floor = cfg.backfill_floor.unwrap_or(0);
let have = storage.max_contiguous_height().await;
let from = floor.max(have.saturating_add(1));
let target = fetch_upstream_contiguous(http, &cfg.rpc_url).await?;
if from > target {
info!(
from,
target, "oldBlocks bootstrap: already current with upstream"
);
return Ok(());
}
info!(
from,
to = target,
count = target.saturating_sub(from).saturating_add(1),
"oldBlocks bootstrap: streaming historical range",
);
let (mut tx, mut rx) = connect_ws(cfg).await?;
tx.send(Message::Text(
json!({
"jsonrpc": "2.0",
"id": 1,
"method": "eth_subscribe",
"params": ["oldBlocks", format!("0x{from:x}"), format!("0x{target:x}")],
})
.to_string()
.into(),
))
.await?;
loop {
let frame =
match tokio::time::timeout(cfg.ws_idle_timeout, next_frame(&mut tx, &mut rx)).await {
Ok(Some(frame)) => frame,
Ok(None) => bail!("oldBlocks stream ended before reaching target {target}"),
Err(_elapsed) => {
metrics::ws_idle_timeout();
bail!(
"oldBlocks bootstrap idle for {}s before reaching target {target}",
cfg.ws_idle_timeout.as_secs(),
);
}
};
let WsFrame::Event(WsEvent::NewBlock { height, block, .. }) = frame else {
continue;
};
persist_backfilled(storage, height, &block).await?;
if height >= target {
info!(target, "oldBlocks bootstrap complete");
return Ok(());
}
}
}
async fn fetch_upstream_contiguous(http: &reqwest::Client, base: &str) -> Result<u64> {
let url = format!("{}/health", base.trim_end_matches('/'));
let resp = http
.get(&url)
.send()
.await
.with_context(|| format!("GET {url}"))?;
if !resp.status().is_success() {
bail!("upstream /health returned HTTP {}", resp.status());
}
let v: Value = resp.json().await.context("decode /health body")?;
v.get("blocks")
.and_then(|b| b.get("max_contiguous_height"))
.and_then(Value::as_u64)
.ok_or_else(|| {
anyhow!("/health missing blocks.max_contiguous_height (is the upstream a neve?)")
})
}
async fn next_frame(tx: &mut WsTx, rx: &mut WsRx) -> Option<WsFrame> {
while let Some(msg) = rx.next().await {
let msg = match msg {
Ok(m) => m,
Err(e) => {
warn!(error = %e, "websocket error");
return None;
}
};
let text = match msg {
Message::Text(t) => t.to_string(),
Message::Binary(b) => String::from_utf8_lossy(&b).into_owned(),
Message::Ping(p) => {
tx.send(Message::Pong(p)).await.ok();
continue;
}
Message::Close(_) => {
info!("server closed connection");
return None;
}
_ => continue,
};
let Ok(v) = serde_json::from_str::<Value>(&text) else {
warn!("bad json");
continue;
};
if let Some(frame) = classify_frame(&v) {
return Some(frame);
}
}
None
}
fn classify_frame(v: &Value) -> Option<WsFrame> {
if v.get("method").and_then(Value::as_str) == Some("eth_subscription") {
let head = v.get("params").and_then(|p| p.get("result"))?;
let number_hex = head.get("number").and_then(Value::as_str)?;
let hash = head.get("hash").and_then(Value::as_str)?.to_owned();
let height = u64::from_str_radix(number_hex.trim_start_matches("0x"), 16).ok()?;
let event = if head.get("transactions").is_some() {
WsEvent::NewBlock {
height,
hash,
block: head.clone(),
}
} else {
WsEvent::NewHead { height, hash }
};
return Some(WsFrame::Event(event));
}
if let Some(id) = v.get("id").and_then(Value::as_u64) {
if let Some(err) = v.get("error") {
warn!(%err, id, "websocket rpc error reply");
}
let result = v.get("result").filter(|r| !r.is_null()).cloned();
return Some(WsFrame::Reply { id, result });
}
None
}
pub(crate) async fn fetch_full_block(
http: &reqwest::Client,
height: u64,
cfg: &IngestCfg,
) -> Option<Value> {
fetch_rpc(
http,
height,
"eth_getBlockByNumber",
json!([format!("0x{height:x}"), true]),
cfg,
)
.await
}
const RPC_MAX_ATTEMPTS: u32 = 3;
const RPC_RETRY_BACKOFF_MS: u64 = 25;
async fn fetch_rpc(
http: &reqwest::Client,
height: u64,
method: &str,
params: Value,
cfg: &IngestCfg,
) -> Option<Value> {
let body = json!({
"jsonrpc": "2.0",
"id": 1,
"method": method,
"params": params,
});
for attempt in 0..RPC_MAX_ATTEMPTS {
let started = Instant::now();
let resp = match http.post(&cfg.rpc_url).json(&body).send().await {
Ok(r) => r,
Err(e) => {
metrics::upstream_request(UpstreamOutcome::Error, started.elapsed().as_secs_f64());
warn!(error = %e, height, "rpc request failed");
return None;
}
};
let status = resp.status();
if status == reqwest::StatusCode::TOO_MANY_REQUESTS
|| status == reqwest::StatusCode::SERVICE_UNAVAILABLE
{
metrics::upstream_request(status, started.elapsed().as_secs_f64());
let retry_after = retry_after_secs(&resp).unwrap_or(5);
handle_throttle(cfg, method, retry_after, status.as_u16()).await;
continue;
}
match resp.json::<Value>().await {
Ok(mut parsed) => {
let result = parsed
.get_mut("result")
.map(Value::take)
.filter(|r| !r.is_null());
let outcome = if !status.is_success() {
UpstreamOutcome::Error
} else if result.is_some() {
UpstreamOutcome::Ok
} else {
UpstreamOutcome::Empty
};
metrics::upstream_request(outcome, started.elapsed().as_secs_f64());
if let Some(result) = result {
return Some(result);
}
}
Err(e) => {
metrics::upstream_request(UpstreamOutcome::Error, started.elapsed().as_secs_f64());
warn!(error = %e, height, "decode rpc response");
return None;
}
}
let backoff = RPC_RETRY_BACKOFF_MS.saturating_mul(1u64 << attempt.min(10));
tokio::time::sleep(Duration::from_millis(backoff)).await;
}
debug!(
height,
method, "block not available within retry budget; leaving for backfill"
);
None
}
async fn handle_throttle(cfg: &IngestCfg, what: &str, retry_after: u64, status: u16) {
metrics::upstream_retry_after(retry_after);
let wait = Duration::from_secs(retry_after);
if wait > cfg.max_wait {
error!(
what,
status,
retry_after,
max_wait_secs = cfg.max_wait.as_secs(),
"upstream throttled longer than --max-wait; shutting down",
);
cfg.fatal.notify_one();
std::future::pending::<()>().await;
return;
}
warn!(what, status, retry_after, "throttled by upstream, sleeping");
tokio::time::sleep(wait).await;
}
fn retry_after_secs(resp: &reqwest::Response) -> Option<u64> {
retry_after_from_headers(resp.headers())
}
fn retry_after_from_headers(headers: &http::HeaderMap) -> Option<u64> {
headers
.get(http::header::RETRY_AFTER)?
.to_str()
.ok()?
.parse::<u64>()
.ok()
}
async fn persist_block(
storage: &Storage,
height: u64,
expected_hash: &str,
block: &Value,
blocks: &broadcast::Sender<Value>,
) -> Result<()> {
let body_hash = block.get("hash").and_then(Value::as_str).unwrap_or("");
if body_hash != expected_hash {
warn!(height, head = %expected_hash, body = %body_hash, "hash mismatch (fork?)");
return Ok(());
}
let hash_bytes = match decode_hash(expected_hash) {
Ok(h) => h,
Err(e) => {
warn!(error = %e, "bad hash on newHead");
return Ok(());
}
};
let tx_hashes = extract_tx_hashes(block);
let bytes = serde_json::to_vec(block)?;
let block_len = bytes.len();
storage.put(height, hash_bytes, &tx_hashes, bytes).await?;
metrics::block_persisted(metrics::BlockSource::Live);
if let Some(ts) = block
.get("timestamp")
.and_then(Value::as_str)
.and_then(|s| u64::from_str_radix(s.trim_start_matches("0x"), 16).ok())
{
metrics::last_block_timestamp(ts);
}
debug!(
height,
bytes = block_len,
txs = tx_hashes.len(),
"stored block",
);
if blocks.receiver_count() > 0 {
let _ = blocks.send(block.clone());
}
Ok(())
}
pub(crate) fn extract_tx_hashes(block: &Value) -> Vec<[u8; 32]> {
let Some(txs) = block.get("transactions").and_then(Value::as_array) else {
return Vec::new();
};
txs.iter()
.filter_map(|tx| tx.get("hash").and_then(Value::as_str))
.filter_map(|s| decode_hash(s).ok())
.collect()
}
pub(crate) async fn fetch_chain_id(
http: &reqwest::Client,
rpc_url: &str,
max_wait: Duration,
) -> Result<u64> {
let body = json!({
"jsonrpc": "2.0",
"id": 1,
"method": "eth_chainId",
"params": [],
});
loop {
let resp = http
.post(rpc_url)
.json(&body)
.send()
.await
.with_context(|| format!("eth_chainId request to {rpc_url} failed"))?;
let status = resp.status();
if status == reqwest::StatusCode::TOO_MANY_REQUESTS
|| status == reqwest::StatusCode::SERVICE_UNAVAILABLE
{
let retry_after = retry_after_secs(&resp).unwrap_or(5);
let wait = Duration::from_secs(retry_after);
if wait > max_wait {
bail!(
"eth_chainId throttled by upstream (status {status}, \
retry_after {retry_after}s exceeds --max-wait {}s); \
not waiting",
max_wait.as_secs(),
);
}
warn!(%status, retry_after, "eth_chainId throttled, sleeping");
tokio::time::sleep(wait).await;
continue;
}
if !status.is_success() {
bail!("eth_chainId returned HTTP {status}");
}
let v: Value = resp.json().await.context("eth_chainId response decode")?;
let s = v
.get("result")
.and_then(Value::as_str)
.ok_or_else(|| anyhow!("eth_chainId: missing 'result' string"))?;
let id = u64::from_str_radix(s.trim_start_matches("0x"), 16)
.context("eth_chainId: malformed hex")?;
return Ok(id);
}
}
pub(crate) fn decode_hash(s: &str) -> Result<[u8; 32]> {
let raw = hex::decode(s.trim_start_matches("0x"))?;
raw.as_slice()
.try_into()
.map_err(|_| anyhow!("hash must be 32 bytes"))
}
#[cfg(test)]
#[expect(clippy::unwrap_used)]
mod tests {
use super::*;
#[test]
fn classify_newheads_notification() {
let v = json!({
"jsonrpc": "2.0",
"method": "eth_subscription",
"params": { "subscription": "0x1", "result": {
"number": "0x10",
"hash": "0xabc",
}},
});
match classify_frame(&v) {
Some(WsFrame::Event(WsEvent::NewHead { height, hash })) => {
assert_eq!(height, 0x10);
assert_eq!(hash, "0xabc");
}
other => panic!("expected NewHead, got {other:?}"),
}
}
#[test]
fn classify_newblocks_notification_has_full_block() {
let v = json!({
"jsonrpc": "2.0",
"method": "eth_subscription",
"params": { "subscription": "0x1", "result": {
"number": "0x10",
"hash": "0xabc",
"transactions": [],
}},
});
match classify_frame(&v) {
Some(WsFrame::Event(WsEvent::NewBlock {
height,
hash,
block,
})) => {
assert_eq!(height, 0x10);
assert_eq!(hash, "0xabc");
assert!(block.get("transactions").is_some());
}
other => panic!("expected NewBlock, got {other:?}"),
}
}
#[test]
fn classify_call_reply_with_result() {
let v = json!({ "jsonrpc": "2.0", "id": 2, "result": { "hash": "0xabc" } });
match classify_frame(&v) {
Some(WsFrame::Reply { id, result }) => {
assert_eq!(id, 2);
assert_eq!(result.unwrap().get("hash").unwrap(), "0xabc");
}
other => panic!("expected Reply, got {other:?}"),
}
}
#[test]
fn classify_null_reply_yields_none_result() {
let v = json!({ "jsonrpc": "2.0", "id": 2, "result": null });
match classify_frame(&v) {
Some(WsFrame::Reply { id, result }) => {
assert_eq!(id, 2);
assert!(result.is_none());
}
other => panic!("expected Reply, got {other:?}"),
}
}
}