use std::cmp::{max, min};
use std::time::{Duration, Instant};
use alloy::{
network::Ethereum,
providers::{DynProvider, Provider},
rpc::types::{Filter, Log},
};
use async_stream::stream;
use futures_util::{Stream, future::join_all};
use tokio::time::sleep;
use tracing::{debug, info, trace, warn};
use crate::rpc::cursor::next_cursor_after_batch;
use crate::rpc::error_tracking::{
ErrorCategory, MAX_ERRORS_PER_WINDOW, ProviderErrorTracker, find_active_provider,
truncate_error,
};
use crate::rpc::{
LogRange, RpcError,
config::{HttpRpcSettings, ProviderSettings},
};
const BACKFILL_PROGRESS_LOG_INTERVAL: Duration = Duration::from_secs(120);
struct DynamicBatchSizer<'a> {
settings: &'a HttpRpcSettings,
target: usize,
pub batch_size: u32,
}
impl<'a> DynamicBatchSizer<'a> {
fn new(settings: &'a HttpRpcSettings) -> Self {
Self {
settings,
target: (settings.max_logs_per_request as f64 * 0.9) as usize,
batch_size: settings.init_batch_size,
}
}
fn update(&mut self, count: usize) {
if count > self.target {
self.batch_size = max(10, (self.batch_size as f64 * 0.85) as u32);
} else if count < self.target / 2 {
self.batch_size = min(
self.settings.max_batch_size,
(self.batch_size as f64 * 1.02) as u32,
);
}
}
fn stride(&self) -> u64 {
(self.batch_size * self.settings.max_concurrency) as u64
}
}
pub async fn backfill(
providers: &ProviderSettings,
filter: &Filter,
) -> impl Stream<Item = Result<LogRange, RpcError>> {
let init_from_block = filter.get_from_block().unwrap_or(0);
let init_to_block = filter
.get_to_block()
.expect("Filter::to_block must be set for backfill");
let num_providers = providers.http_providers.len();
stream! {
let mut provider_trackers: Vec<ProviderErrorTracker> = (0..num_providers)
.map(|i| ProviderErrorTracker::new(providers.http_settings(i).host()))
.collect();
let mut provider_index = 0;
let mut last_block: u64 = if init_from_block == 0 { 0 } else { init_from_block - 1 };
let progress_start_block = if init_from_block == 0 { 1 } else { init_from_block };
let total_blocks = if init_to_block >= progress_start_block {
init_to_block - progress_start_block + 1
} else {
0
};
let started_at = Instant::now();
let mut next_progress_log = started_at + BACKFILL_PROGRESS_LOG_INTERVAL;
while last_block < init_to_block {
let active_provider = find_active_provider(&mut provider_trackers, provider_index);
let Some(idx) = active_provider else {
yield Err(RpcError::AllProvidersSuspended(format!(
"All {} providers suspended at block {}. Unable to continue backfill.",
num_providers,
last_block + 1
)));
return;
};
provider_index = idx;
let provider = providers.connect_http(provider_index);
let settings = providers.http_settings(provider_index);
let mut sizer = DynamicBatchSizer::new(settings);
loop {
if last_block >= init_to_block {
break;
}
let from_block = last_block + 1;
let to_block = min(from_block + sizer.stride() - 1, init_to_block);
let concurrency = max(1, ((to_block - from_block) + 1) / sizer.batch_size as u64);
let batch_filter = filter.clone().from_block(from_block).to_block(to_block);
debug!(
"Fetching logs from block {} to {} using provider {} (c={}, b={})",
from_block, to_block, settings.host(), concurrency, sizer.batch_size
);
let log_vector = get_logs_concurrently(&provider, batch_filter, concurrency as u16).await;
match log_vector {
Ok((max_count, logs)) => {
debug!("Captured {} logs from blocks {} to {}", logs.len(), from_block, to_block);
sizer.update(max_count);
if logs.is_empty() {
match provider.get_block_number().await {
Ok(reported_tip) => {
if let Err(e) = next_cursor_after_batch(to_block, true, reported_tip) {
yield Err(e);
return;
}
}
Err(e) => {
let suspended = provider_trackers[provider_index].record_error();
let error_count = provider_trackers[provider_index].error_count();
if suspended {
warn!(
"Provider {} suspended after tip fetch failed during empty-batch guard at blocks {}-{}: {}",
settings.host(), from_block, to_block, truncate_error(&e)
);
} else {
let backoff = provider_trackers[provider_index].backoff_duration();
warn!(
"Tip fetch failed during empty-batch guard for blocks {}-{} from {}: {} (error {}/{}, backoff {:?})",
from_block, to_block, settings.host(), truncate_error(&e), error_count, MAX_ERRORS_PER_WINDOW, backoff
);
sleep(backoff).await;
}
break;
}
}
}
last_block = to_block;
let now = Instant::now();
if total_blocks > 0 && now >= next_progress_log {
let completed_blocks =
(last_block.saturating_sub(progress_start_block) + 1).min(total_blocks);
let progress_pct = completed_blocks as f64 / total_blocks as f64 * 100.0;
info!(
"Backfill progress: {} / {} blocks ({:.1}%), up to block {} in {}s",
completed_blocks,
total_blocks,
progress_pct,
last_block,
started_at.elapsed().as_secs()
);
next_progress_log = now + BACKFILL_PROGRESS_LOG_INTERVAL;
}
provider_trackers[provider_index].record_success();
yield Ok((from_block, to_block, logs));
}
Err(e) => {
let error_msg = e.to_string();
let error_category = ErrorCategory::from_error_msg(&error_msg);
match error_category {
ErrorCategory::ResponseTooLarge => {
let new_batch = max(5, (sizer.batch_size as f64 * 0.5) as u32);
if new_batch >= sizer.batch_size {
yield Err(RpcError::LogFetchError(format!(
"Response too large for blocks {}-{} from {} and batch size {} cannot be reduced further",
from_block, to_block, settings.host(), sizer.batch_size
)));
return;
}
warn!(
"Response too large for blocks {}-{} from {}, reducing batch size from {} to {} (error: {})",
from_block, to_block, settings.host(), sizer.batch_size, new_batch,
&error_msg[..std::cmp::min(100, error_msg.len())]
);
sizer.batch_size = new_batch;
continue;
}
ErrorCategory::RateLimit | ErrorCategory::Connection | ErrorCategory::Other => {
let suspended = provider_trackers[provider_index].record_error();
let error_count = provider_trackers[provider_index].error_count();
if suspended {
warn!(
"Provider {} suspended after error on blocks {}-{}: {}",
settings.host(), from_block, to_block, e
);
} else {
let backoff = provider_trackers[provider_index].backoff_duration();
warn!(
"Error fetching logs for blocks {}-{} from {}: {} (error {}/{}, backoff {:?})",
from_block, to_block, settings.host(), e, error_count, MAX_ERRORS_PER_WINDOW, backoff
);
sleep(backoff).await;
}
break;
}
}
}
}
}
provider_index = (provider_index + 1) % num_providers;
}
}
}
async fn get_logs_concurrently(
provider: &DynProvider<Ethereum>,
filter: Filter,
concurrency: u16,
) -> Result<(usize, Vec<Log>), RpcError> {
let chunks = chunk_filter(&filter, concurrency);
let futures = chunks
.into_iter()
.map(|chunk| get_logs_bisecting(provider, chunk));
let resolved = join_all(futures).await;
let mut max_logs = 0;
let mut result = vec![];
for item in resolved {
match item {
Ok(logs) => {
max_logs = max(max_logs, logs.len());
result.extend(logs);
}
Err(e) => {
return Err(e);
}
}
}
trace!(
"Range {:?}->{:?} logs={:?}",
&filter.get_from_block().unwrap(),
&filter.get_to_block().unwrap(),
result.len()
);
Ok((max_logs, result))
}
async fn get_logs_bisecting(
provider: &DynProvider<Ethereum>,
filter: Filter,
) -> Result<Vec<Log>, RpcError> {
let logs = provider.get_logs(&filter).await;
match logs {
Ok(logs) => Ok(logs),
Err(e) => {
let from_block = filter.get_from_block().unwrap();
let to_block = filter.get_to_block().unwrap();
if e.to_string().contains("max") && to_block != from_block {
let mid_block = (from_block + to_block) / 2;
let left_query = filter.clone().from_block(from_block).to_block(mid_block);
let right_query = filter.clone().from_block(mid_block + 1).to_block(to_block);
let left_logs = Box::pin(get_logs_bisecting(provider, left_query)).await;
let right_logs = Box::pin(get_logs_bisecting(provider, right_query)).await;
match (left_logs, right_logs) {
(Ok(left), Ok(right)) => {
let mut merged_logs = left;
merged_logs.extend(right);
Ok(merged_logs)
}
(Err(e), _) => Err(e),
(_, Err(e)) => Err(e),
}
} else {
Err(RpcError::LogFetchError(truncate_error(e)))
}
}
}
}
fn chunk_filter(filter: &Filter, num_chunks: u16) -> Vec<Filter> {
let from_block = filter.get_from_block().unwrap();
let to_block = filter.get_to_block().unwrap();
let span = (to_block - from_block + 1) as f64;
let chunk_size = (span / num_chunks as f64).ceil() as usize;
if chunk_size <= 1 {
return vec![filter.clone()];
}
let mut filters = Vec::with_capacity(num_chunks as usize);
let mut current_from_block = from_block;
while current_from_block <= to_block {
let current_to_block = min(current_from_block + chunk_size as u64 - 1, to_block);
filters.push(
filter
.clone()
.from_block(current_from_block)
.to_block(current_to_block),
);
current_from_block = current_to_block + 1;
}
filters
}
#[cfg(test)]
mod test {
use super::*;
use alloy::rpc::types::Filter;
#[test]
fn test_chunk_filter() {
let filter = Filter::new().from_block(0).to_block(99);
let chunks = chunk_filter(&filter, 10);
assert_eq!(chunks.len(), 10);
assert_eq!(chunks[0].get_from_block().unwrap(), 0);
assert_eq!(chunks[0].get_to_block().unwrap(), 9);
assert_eq!(chunks[9].get_from_block().unwrap(), 90);
assert_eq!(chunks[9].get_to_block().unwrap(), 99);
}
#[test]
fn test_chunk_filter_iter() {
let filter = Filter::new().from_block(0).to_block(100);
for i in 1..101 {
let chunks = chunk_filter(&filter, i);
assert_eq!(chunks.last().unwrap().get_to_block().unwrap(), 100);
assert!(chunks.len() <= 101_usize);
}
}
#[test]
fn test_chunk_filter_single_block() {
let filter = Filter::new().from_block(100).to_block(100);
let chunks = chunk_filter(&filter, 10);
assert_eq!(chunks.len(), 1);
assert_eq!(chunks[0].get_from_block().unwrap(), 100);
assert_eq!(chunks[0].get_to_block().unwrap(), 100);
}
fn test_settings() -> HttpRpcSettings {
HttpRpcSettings {
url: "http://test".parse().unwrap(),
max_concurrency: 8,
max_batch_size: 5000,
init_batch_size: 1000,
max_logs_per_request: 10_000,
}
}
#[test]
fn test_dynamic_batch_sizer_new() {
let settings = test_settings();
let sizer = DynamicBatchSizer::new(&settings);
assert_eq!(sizer.batch_size, 1000); assert_eq!(sizer.target, 9000); }
#[test]
fn test_dynamic_batch_sizer_decreases_on_high_count() {
let settings = test_settings();
let mut sizer = DynamicBatchSizer::new(&settings);
let initial = sizer.batch_size;
sizer.update(10_000);
assert!(
sizer.batch_size < initial,
"Batch size should decrease when count exceeds target"
);
}
#[test]
fn test_dynamic_batch_sizer_increases_on_low_count() {
let settings = test_settings();
let mut sizer = DynamicBatchSizer::new(&settings);
let initial = sizer.batch_size;
sizer.update(1000);
assert!(
sizer.batch_size > initial,
"Batch size should increase when count is below target"
);
}
#[test]
fn test_dynamic_batch_sizer_respects_max() {
let settings = test_settings();
let mut sizer = DynamicBatchSizer::new(&settings);
sizer.batch_size = 4900;
sizer.update(100);
assert!(
sizer.batch_size <= settings.max_batch_size,
"Batch size should not exceed max"
);
}
#[test]
fn test_dynamic_batch_sizer_stride() {
let settings = test_settings();
let sizer = DynamicBatchSizer::new(&settings);
assert_eq!(sizer.stride(), 8000);
}
}