use std::{
collections::{HashMap, HashSet},
sync::Arc,
time::{Duration, Instant},
};
use alloy::{
primitives::{Address, BlockNumber, StorageKey},
providers::{Provider, RootProvider},
rpc::types::EIP1186AccountProofResponse,
transports::{
http::{Client, Http},
RpcError, TransportErrorKind,
},
};
use futures::future::join_all;
use reqwest::Url;
use thiserror::Error;
use tokio::sync::{
mpsc::{self, Sender},
RwLock,
};
use tracing::debug;
#[derive(Error, Debug)]
pub enum RpcProviderError {
#[error("Failed to send proofs with mpsc")]
MpscError(
#[from]
tokio::sync::mpsc::error::SendError<(
BlockNumber,
alloy::rpc::types::EIP1186AccountProofResponse,
)>,
),
}
#[derive(Clone)]
pub struct RpcProvider {
provider: RootProvider<Http<Client>>,
chunk_size: u64,
}
impl RpcProvider {
pub fn new(rpc_url: Url, chunk_size: u64) -> Self {
let provider = RootProvider::new_http(rpc_url);
Self {
provider,
chunk_size,
}
}
pub async fn get_account_proofs(
&self,
blocks: Vec<BlockNumber>,
address: Address,
) -> Result<HashMap<BlockNumber, EIP1186AccountProofResponse>, RpcProviderError> {
self.get_proofs(blocks, address, None).await
}
pub async fn get_storage_proofs(
&self,
block_range: Vec<BlockNumber>,
address: Address,
storage_key: StorageKey,
) -> Result<HashMap<BlockNumber, EIP1186AccountProofResponse>, RpcProviderError> {
self.get_proofs(block_range, address, Some(storage_key))
.await
}
async fn get_proofs(
&self,
blocks: Vec<BlockNumber>,
address: Address,
storage_key: Option<StorageKey>,
) -> Result<HashMap<BlockNumber, EIP1186AccountProofResponse>, RpcProviderError> {
let start_fetch = Instant::now();
let (rpc_sender, mut rx) = mpsc::channel::<(BlockNumber, EIP1186AccountProofResponse)>(32);
self.spawn_proof_fetcher(rpc_sender, blocks, address, storage_key);
let mut fetched_proofs = HashMap::new();
while let Some((block_number, proof)) = rx.recv().await {
fetched_proofs.insert(block_number, proof);
}
let duration = start_fetch.elapsed();
debug!("RPC| Time taken (Fetch): {:?}", duration);
Ok(fetched_proofs)
}
fn spawn_proof_fetcher(
&self,
rpc_sender: Sender<(BlockNumber, EIP1186AccountProofResponse)>,
blocks: Vec<BlockNumber>,
address: Address,
storage_key: Option<StorageKey>,
) {
let chunk_size = self.chunk_size;
let provider_clone = self.provider.clone();
let target_blocks_length = blocks.len();
debug!("Fetching proofs for {} chunk size: {}", address, chunk_size);
tokio::spawn(async move {
let mut try_count = 0;
let blocks_map = Arc::new(RwLock::new(HashSet::<BlockNumber>::new()));
while blocks_map.read().await.len() < target_blocks_length {
try_count += 1;
if try_count > 50 {
panic!("❗️❗️❗️ Too many retries, failed to fetch all blocks")
}
let fetched_blocks_clone = blocks_map.read().await.clone();
let blocks_to_fetch: Vec<BlockNumber> = blocks
.iter()
.filter(|block_number| !fetched_blocks_clone.contains(block_number))
.take(chunk_size as usize)
.cloned()
.collect();
let fetch_futures = blocks_to_fetch
.into_iter()
.map(|block_number| {
let fetched_blocks_clone = blocks_map.clone();
let rpc_sender = rpc_sender.clone();
let provider_clone = provider_clone.clone();
async move {
let proof =
fetch_proof(&provider_clone, address, block_number, storage_key)
.await;
handle_proof_result(
proof,
block_number,
fetched_blocks_clone,
rpc_sender,
)
.await;
}
})
.collect::<Vec<_>>();
join_all(fetch_futures).await;
}
});
}
}
async fn fetch_proof(
provider: &RootProvider<Http<Client>>,
address: Address,
block_number: BlockNumber,
storage_key: Option<StorageKey>,
) -> Result<EIP1186AccountProofResponse, RpcError<TransportErrorKind>> {
match storage_key {
Some(key) => {
provider
.get_proof(address, vec![key])
.block_id(block_number.into())
.await
}
None => {
provider
.get_proof(address, vec![])
.block_id(block_number.into())
.await
}
}
}
async fn handle_proof_result(
proof: Result<EIP1186AccountProofResponse, RpcError<TransportErrorKind>>,
block_number: BlockNumber,
blocks_map: Arc<RwLock<HashSet<BlockNumber>>>,
rpc_sender: Sender<(BlockNumber, EIP1186AccountProofResponse)>,
) {
match proof {
Ok(proof) => {
let mut blocks_identifier = blocks_map.write().await;
rpc_sender
.send((block_number, proof))
.await
.map_err(RpcProviderError::MpscError)
.unwrap();
blocks_identifier.insert(block_number);
}
Err(e) => {
if let Some(backoff) = handle_error(e) {
let mut delay = backoff;
while delay <= 4 {
tokio::time::sleep(Duration::from_nanos(delay)).await;
delay *= 2;
}
}
}
}
}
fn handle_error(e: RpcError<TransportErrorKind>) -> Option<u64> {
match e {
RpcError::Transport(TransportErrorKind::HttpError(http_error))
if http_error.status == 429 =>
{
Some(1)
}
_ => None,
}
}
#[cfg(test)]
mod tests {
use alloy::primitives::{address, b256, B256, U256};
use super::*;
const SEPOLIA_RPC_URL: &str =
"https://eth-sepolia.g.alchemy.com/v2/a-w72ZvoUS0dfMD_LBPAuRzHOlQEhi_m";
#[tokio::test]
async fn test_get_100_range_storage_with_proof_by_storage_key() {
let start_time = Instant::now();
let rpc_url = Url::parse(SEPOLIA_RPC_URL).unwrap();
let provider = RpcProvider::new(rpc_url, 100);
let block_range_start = 6127485;
let block_range_end = 6127584;
let target_block_range = (block_range_start..=block_range_end).collect::<Vec<u64>>();
let target_address = address!("75CeC1db9dCeb703200EAa6595f66885C962B920");
let target_key = b256!("3c2b98cf472a02b84793a789af8876a73167e29a1a4f8bdbcb51dbfef0a75d7b");
let result = provider
.get_storage_proofs(target_block_range, target_address, target_key)
.await;
assert!(result.is_ok());
let result = result.unwrap();
let length = result.len();
assert_eq!(length, 100);
let value = result.get(&6127485).unwrap();
assert_eq!(value.storage_proof[0].key.0, target_key);
assert_eq!(value.storage_proof[0].value, U256::from(20000000000000u64));
let duration = start_time.elapsed();
println!("Time taken (Storage Fetch): {:?}", duration);
}
#[tokio::test]
async fn test_get_100_range_storage_with_proof_by_storage_slot() {
let start_time = Instant::now();
let rpc_url = Url::parse(SEPOLIA_RPC_URL).unwrap();
let provider = RpcProvider::new(rpc_url, 100);
let block_range_start = 6127485;
let block_range_end = 6127584;
let target_block_range =
(block_range_start..=block_range_end).collect::<Vec<BlockNumber>>();
let target_address = address!("75CeC1db9dCeb703200EAa6595f66885C962B920");
let target_slot = B256::from(U256::from(1));
let result = provider
.get_storage_proofs(target_block_range, target_address, target_slot)
.await;
assert!(result.is_ok());
let result = result.unwrap();
let length = result.len();
assert_eq!(length, 100);
let value = result.get(&6127485).unwrap();
assert_eq!(value.storage_proof[0].key.0, target_slot);
assert_eq!(value.storage_proof[0].value, U256::from(20000000000000u64));
let duration = start_time.elapsed();
println!("Time taken (Storage Fetch): {:?}", duration);
}
#[tokio::test]
async fn test_get_100_range_account_with_proof() {
let start_time = Instant::now();
let rpc_url = Url::parse(SEPOLIA_RPC_URL).unwrap();
let provider = RpcProvider::new(rpc_url, 100);
let block_range_start = 6127485;
let block_range_end = 6127584;
let target_block_range =
(block_range_start..=block_range_end).collect::<Vec<BlockNumber>>();
let target_address = address!("7f2c6f930306d3aa736b3a6c6a98f512f74036d4");
let result = provider
.get_account_proofs(target_block_range, target_address)
.await;
assert!(result.is_ok());
let length = result.unwrap().len();
assert_eq!(length, 100);
let duration = start_time.elapsed();
println!("Time taken (Account Fetch): {:?}", duration);
}
}