use alloy_provider::Provider;
use alloy_rpc_types::{Filter, Log};
use tracing::debug;
use crate::errors::EventProcessingError;
use crate::MaxBlockRange;
pub async fn fetch_logs_chunked<P: Provider>(
provider: &P,
filter: Filter,
chunk_size: u64,
) -> Result<Vec<Log>, EventProcessingError> {
if chunk_size == 0 {
return Err(EventProcessingError::invalid_input(
"chunk_size must be greater than 0",
));
}
let start_block = filter
.get_from_block()
.ok_or_else(|| EventProcessingError::invalid_input("Filter must have from_block set"))?;
let end_block = filter
.get_to_block()
.ok_or_else(|| EventProcessingError::invalid_input("Filter must have to_block set"))?;
let max_block_range = MaxBlockRange::new(chunk_size);
debug!(
start_block = start_block,
end_block = end_block,
chunk_size = chunk_size,
num_chunks = max_block_range.chunks_needed(start_block, end_block),
"Starting chunked log fetch"
);
let mut all_logs = Vec::new();
for (chunk_start, chunk_end) in max_block_range.chunk_range(start_block, end_block) {
let chunk_filter = filter.clone().from_block(chunk_start).to_block(chunk_end);
debug!(
chunk_start = chunk_start,
chunk_end = chunk_end,
"Fetching logs for chunk"
);
let logs = provider.get_logs(&chunk_filter).await.map_err(|e| {
EventProcessingError::rpc_failed(format!(
"Failed to fetch logs for blocks {chunk_start}-{chunk_end}: {e}"
))
})?;
debug!(logs_count = logs.len(), "Fetched logs for chunk");
all_logs.extend(logs);
}
debug!(total_logs = all_logs.len(), "Finished chunked log fetch");
Ok(all_logs)
}
#[cfg(test)]
mod tests {
use super::*;
use alloy_provider::ProviderBuilder;
fn dummy_provider() -> impl Provider {
ProviderBuilder::new().connect_http("http://localhost:1".parse().unwrap())
}
#[test]
fn test_uses_max_block_range_chunk_iterator() {
let max_block_range = MaxBlockRange::new(30);
let chunks: Vec<_> = max_block_range.chunk_range(0, 99).collect();
assert_eq!(chunks.len(), 4);
assert_eq!(chunks[0], (0, 29));
assert_eq!(chunks[1], (30, 59));
assert_eq!(chunks[2], (60, 89));
assert_eq!(chunks[3], (90, 99));
}
#[tokio::test]
async fn test_chunk_size_zero_returns_error() {
let provider = dummy_provider();
let filter = Filter::new().from_block(0).to_block(100);
let result = fetch_logs_chunked(&provider, filter, 0).await;
assert!(result.is_err());
let err = result.unwrap_err();
assert!(
matches!(err, EventProcessingError::InvalidInput { .. }),
"Expected InvalidInput error, got: {err:?}"
);
assert!(err.to_string().contains("chunk_size"));
}
#[tokio::test]
async fn test_missing_from_block_returns_error() {
let provider = dummy_provider();
let filter = Filter::new().to_block(100);
let result = fetch_logs_chunked(&provider, filter, 500).await;
assert!(result.is_err());
let err = result.unwrap_err();
assert!(
matches!(err, EventProcessingError::InvalidInput { .. }),
"Expected InvalidInput error, got: {err:?}"
);
assert!(err.to_string().contains("from_block"));
}
#[tokio::test]
async fn test_missing_to_block_returns_error() {
let provider = dummy_provider();
let filter = Filter::new().from_block(0);
let result = fetch_logs_chunked(&provider, filter, 500).await;
assert!(result.is_err());
let err = result.unwrap_err();
assert!(
matches!(err, EventProcessingError::InvalidInput { .. }),
"Expected InvalidInput error, got: {err:?}"
);
assert!(err.to_string().contains("to_block"));
}
}