use std::time::Duration;
use alloy::{
network::Ethereum,
providers::{DynProvider, Provider},
rpc::types::{Filter, Header},
};
use async_stream::stream;
use futures_util::{StreamExt, pin_mut, stream::Stream};
use tokio::time::{Instant, sleep, sleep_until};
use tracing::{debug, warn};
use crate::rpc::error_tracking::{
ErrorCategory, ProviderErrorTracker, find_active_provider, truncate_error_short,
};
use crate::rpc::{LogRange, RpcError, config::ProviderSettings};
const MAX_BLOCKS_PER_WS_REQUEST: u64 = 300;
const LIVE_HEAD_COALESCE_WINDOW: Duration = Duration::from_millis(500);
#[derive(Debug, Clone)]
struct HeadCoalescer {
block_cursor: Option<u64>,
latest_seen_head: Option<u64>,
flush_deadline: Option<Instant>,
}
impl HeadCoalescer {
fn new(block_cursor: Option<u64>) -> Self {
Self {
block_cursor,
latest_seen_head: None,
flush_deadline: None,
}
}
fn block_cursor(&self) -> Option<u64> {
self.block_cursor
}
fn observe_head(&mut self, head_number: u64, now: Instant) -> bool {
let from_block = self.block_cursor.unwrap_or(head_number);
if head_number < from_block {
return false;
}
self.latest_seen_head = Some(
self.latest_seen_head
.map_or(head_number, |latest| latest.max(head_number)),
);
if self.flush_deadline.is_none() {
self.flush_deadline = Some(now + LIVE_HEAD_COALESCE_WINDOW);
}
true
}
fn pending_range(&self) -> Option<(u64, u64)> {
let to_block = self.latest_seen_head?;
let from_block = self.block_cursor.unwrap_or(to_block);
(to_block >= from_block).then_some((from_block, to_block))
}
fn flush_deadline(&self) -> Option<Instant> {
self.pending_range()?;
self.flush_deadline
}
fn should_flush(&self, now: Instant) -> bool {
let Some((from_block, to_block)) = self.pending_range() else {
return false;
};
if to_block - from_block + 1 >= MAX_BLOCKS_PER_WS_REQUEST {
return true;
}
self.flush_deadline.is_some_and(|deadline| now >= deadline)
}
fn advance_cursor(&mut self, next_block: u64) {
self.block_cursor = Some(next_block);
if self
.latest_seen_head
.is_some_and(|latest| next_block > latest)
{
self.latest_seen_head = None;
self.flush_deadline = None;
}
}
}
fn fallback_http_index(start_index: usize, attempt: usize, num_http_providers: usize) -> usize {
(start_index + attempt) % num_http_providers
}
fn fallback_http_exhausted_error(
from_block: u64,
to_block: u64,
attempted_hosts: &[String],
last_err: &str,
) -> RpcError {
RpcError::LogFetchError(format!(
"HTTP fallback exhausted for blocks {}-{} after trying providers [{}]: {}",
from_block,
to_block,
attempted_hosts.join(", "),
last_err
))
}
async fn get_logs_with_http_fallback_retry(
providers: &ProviderSettings,
query_filter: &Filter,
start_index: usize,
from_block: u64,
to_block: u64,
) -> Result<Vec<alloy::rpc::types::Log>, RpcError> {
let num_http_providers = providers.http_providers.len();
let mut attempted_hosts: Vec<String> = Vec::with_capacity(num_http_providers);
let mut last_err = String::new();
for attempt in 0..num_http_providers {
let provider_idx = fallback_http_index(start_index, attempt, num_http_providers);
let provider_host = providers.http_settings(provider_idx).host();
attempted_hosts.push(provider_host.clone());
let http_provider = providers.connect_http(provider_idx);
match http_provider.get_logs(query_filter).await {
Ok(logs) => return Ok(logs),
Err(err) => {
last_err = err.to_string();
warn!(
"HTTP fallback {} failed for blocks {}-{}: {}",
provider_host,
from_block,
to_block,
truncate_error_short(&last_err, 100)
);
}
}
}
Err(fallback_http_exhausted_error(
from_block,
to_block,
&attempted_hosts,
&last_err,
))
}
async fn stream_heads_provider(
provider: &DynProvider<Ethereum>,
provider_host: &str,
) -> impl Stream<Item = Header> {
stream! {
let head_stream = provider.subscribe_blocks().await;
match head_stream {
Ok(heads) => {
debug!("WebSocket connected, streaming headers...");
let mut s = heads.into_stream();
while let Some(header) = s.next().await {
yield header;
}
}
Err(e) => {
warn!(
"Failed to subscribe to blocks on {}: {} - retrying",
provider_host,
truncate_error_short(e, 100)
);
}
}
}
}
pub async fn stream_heads_with_logs(
providers: &ProviderSettings,
filter: Option<&Filter>,
) -> impl Stream<Item = Result<LogRange, RpcError>> {
let num_providers = providers.wss_endpoints.len();
stream! {
let mut provider_trackers: Vec<ProviderErrorTracker> = (0..num_providers)
.map(|i| {
let host = providers.wss_endpoints[i]
.host_str()
.unwrap_or("unknown")
.to_string();
ProviderErrorTracker::new(host)
})
.collect();
let mut provider_index = 0;
let mut coalescer = HeadCoalescer::new(filter.and_then(|f| f.get_from_block()));
loop {
let active_provider = find_active_provider(&mut provider_trackers, provider_index);
let Some(idx) = active_provider else {
yield Err(RpcError::AllProvidersSuspended(format!(
"All {} WebSocket providers suspended. Unable to continue streaming.",
num_providers
)));
return;
};
provider_index = idx;
let provider = match providers.connect_ws(provider_index).await {
Ok(p) => {
provider_trackers[provider_index].record_success();
p
}
Err(e) => {
let suspended = provider_trackers[provider_index].record_error();
let backoff = provider_trackers[provider_index].backoff_duration();
if suspended {
warn!(
"WebSocket provider {} suspended after connection failure: {}",
provider_trackers[provider_index].identifier(),
truncate_error_short(&e, 100)
);
} else {
warn!(
"Failed to connect to WebSocket {}: {} - retrying in {:?}",
provider_trackers[provider_index].identifier(),
truncate_error_short(&e, 100),
backoff
);
sleep(backoff).await;
}
provider_index = (provider_index + 1) % num_providers;
continue;
}
};
let provider_host = provider_trackers[provider_index].identifier().to_string();
let s = stream_heads_provider(&provider, &provider_host).await;
pin_mut!(s);
let mut should_switch_provider = false;
loop {
if coalescer.should_flush(Instant::now()) {
while let Some((from_block, pending_to_block)) = coalescer.pending_range() {
let to_block =
std::cmp::min(from_block + MAX_BLOCKS_PER_WS_REQUEST - 1, pending_to_block);
let query_filter = filter
.cloned()
.unwrap_or_else(Filter::new)
.from_block(from_block)
.to_block(to_block);
let logs = provider.get_logs(&query_filter).await;
match logs {
Ok(logs) => {
provider_trackers[provider_index].record_success();
yield Ok((from_block, to_block, logs));
coalescer.advance_cursor(to_block + 1);
}
Err(e) => {
let error_msg = e.to_string();
let error_category = ErrorCategory::from_error_msg(&error_msg);
match error_category {
ErrorCategory::ResponseTooLarge => {
warn!(
"Response too large for blocks {}-{}, falling back to HTTP RPC: {}",
from_block, to_block, truncate_error_short(&error_msg, 100)
);
match get_logs_with_http_fallback_retry(
providers,
&query_filter,
provider_index,
from_block,
to_block,
)
.await
{
Ok(logs) => {
provider_trackers[provider_index].record_success();
yield Ok((from_block, to_block, logs));
coalescer.advance_cursor(to_block + 1);
}
Err(http_err) => {
warn!(
"HTTP fallback retry exhausted for blocks {}-{}: {}",
from_block,
to_block,
truncate_error_short(&http_err, 100)
);
should_switch_provider = true;
break;
}
}
}
ErrorCategory::RateLimit => {
provider_trackers[provider_index].record_error();
let backoff = provider_trackers[provider_index].backoff_duration();
warn!(
"Rate limit hit on WS provider {} for blocks {}-{}: {} - backing off {:?}",
provider_trackers[provider_index].identifier(),
from_block, to_block,
truncate_error_short(&error_msg, 100),
backoff
);
sleep(backoff).await;
continue;
}
ErrorCategory::Connection | ErrorCategory::Other => {
let suspended = provider_trackers[provider_index].record_error();
if suspended {
warn!(
"WebSocket provider {} suspended after error on blocks {}-{}: {}",
provider_trackers[provider_index].identifier(),
from_block, to_block,
truncate_error_short(&error_msg, 100)
);
} else {
let backoff = provider_trackers[provider_index].backoff_duration();
warn!(
"Error fetching logs for blocks {}-{} from {}: {} - backing off {:?}",
from_block, to_block,
provider_trackers[provider_index].identifier(),
truncate_error_short(&error_msg, 100),
backoff
);
sleep(backoff).await;
}
should_switch_provider = true;
break;
}
}
}
}
}
if should_switch_provider {
break;
}
continue;
}
let maybe_header = if let Some(deadline) = coalescer.flush_deadline() {
tokio::select! {
maybe_header = s.next() => maybe_header,
_ = sleep_until(deadline) => {
continue;
}
}
} else {
s.next().await
};
match maybe_header {
Some(header) => {
let now = Instant::now();
if !coalescer.observe_head(header.number, now) {
let from_block = coalescer.block_cursor().unwrap_or(header.number);
warn!(
"Skipping header {} as it's before the cursor {}",
header.number, from_block
);
}
}
None => break,
}
}
if !should_switch_provider {
provider_trackers[provider_index].record_error();
let backoff = provider_trackers[provider_index].backoff_duration();
warn!(
"WS connection to {} lost or stream ended - reconnecting in {:?}",
provider_trackers[provider_index].identifier(),
backoff
);
sleep(backoff).await;
}
provider_index = (provider_index + 1) % num_providers;
}
}
}
pub async fn last_block(providers: &ProviderSettings) -> Result<u64, RpcError> {
let num_providers = providers.http_providers.len();
if num_providers == 0 {
return Err(RpcError::ConnectionError(
"No HTTP providers configured for latest block fetch".to_string(),
));
}
let mut index: usize = 0;
loop {
let provider_index = index % num_providers;
let provider = providers.connect_http(provider_index);
match provider.get_block_number().await {
Ok(block_number) => return Ok(block_number),
Err(e) => {
let provider_host = providers.http_settings(provider_index).host();
if index >= 10 {
return Err(RpcError::ConnectionError(format!(
"Failed to fetch latest block number after {} attempts from HTTP provider {}: {}",
index, provider_host, e
)));
}
index += 1;
warn!(
"Failed to fetch latest block number from HTTP provider {} (index {}) - retrying",
provider_host, provider_index
);
let progressive_sleep = (5 * index).min(50) as u64;
sleep(Duration::from_secs(progressive_sleep)).await;
}
}
}
}
#[cfg(test)]
mod tests {
use super::{
HeadCoalescer, LIVE_HEAD_COALESCE_WINDOW, MAX_BLOCKS_PER_WS_REQUEST,
fallback_http_exhausted_error, fallback_http_index,
};
use std::time::Duration;
use tokio::time::Instant;
#[test]
fn fallback_http_index_wraps_round_robin() {
assert_eq!(fallback_http_index(0, 0, 3), 0);
assert_eq!(fallback_http_index(0, 1, 3), 1);
assert_eq!(fallback_http_index(0, 2, 3), 2);
assert_eq!(fallback_http_index(0, 3, 3), 0);
assert_eq!(fallback_http_index(2, 1, 3), 0);
}
#[test]
fn fallback_http_exhausted_error_includes_hosts() {
let err = fallback_http_exhausted_error(
100,
120,
&["rpc.ankr.com".to_string(), "mainnet.infura.io".to_string()],
"timeout",
)
.to_string();
assert!(err.contains("rpc.ankr.com"));
assert!(err.contains("mainnet.infura.io"));
assert!(err.contains("timeout"));
}
#[test]
fn head_coalescer_ignores_stale_heads_before_cursor() {
let now = Instant::now();
let mut coalescer = HeadCoalescer::new(Some(42));
assert!(!coalescer.observe_head(41, now));
assert_eq!(coalescer.pending_range(), None);
assert_eq!(coalescer.flush_deadline(), None);
}
#[test]
fn head_coalescer_keeps_first_flush_deadline_while_heads_accumulate() {
let now = Instant::now();
let mut coalescer = HeadCoalescer::new(Some(100));
assert!(coalescer.observe_head(100, now));
let deadline = coalescer.flush_deadline().unwrap();
assert!(coalescer.observe_head(105, now + Duration::from_millis(50)));
assert_eq!(coalescer.pending_range(), Some((100, 105)));
assert_eq!(coalescer.flush_deadline(), Some(deadline));
assert!(
!coalescer.should_flush(now + LIVE_HEAD_COALESCE_WINDOW - Duration::from_millis(1))
);
assert!(coalescer.should_flush(now + LIVE_HEAD_COALESCE_WINDOW));
}
#[test]
fn head_coalescer_flushes_immediately_when_pending_range_reaches_chunk_limit() {
let now = Instant::now();
let mut coalescer = HeadCoalescer::new(Some(1));
assert!(coalescer.observe_head(MAX_BLOCKS_PER_WS_REQUEST, now));
assert_eq!(
coalescer.pending_range(),
Some((1, MAX_BLOCKS_PER_WS_REQUEST))
);
assert!(coalescer.should_flush(now));
}
#[test]
fn head_coalescer_preserves_pending_range_across_partial_cursor_advances() {
let now = Instant::now();
let mut coalescer = HeadCoalescer::new(Some(10));
assert!(coalescer.observe_head(25, now));
coalescer.advance_cursor(20);
assert_eq!(coalescer.pending_range(), Some((20, 25)));
assert!(coalescer.flush_deadline().is_some());
coalescer.advance_cursor(26);
assert_eq!(coalescer.pending_range(), None);
assert_eq!(coalescer.flush_deadline(), None);
assert_eq!(coalescer.block_cursor(), Some(26));
}
}