use alloy::{
consensus::BlockHeader,
eips::BlockNumberOrTag,
network::{BlockResponse, Ethereum, Network, primitives::HeaderResponse},
primitives::{BlockHash, BlockNumber},
};
use robust_provider::RobustProvider;
use super::ring_buffer::RingBuffer;
use crate::{ScannerError, block_range_scanner::ring_buffer::RingBufferCapacity};
#[allow(async_fn_in_trait)]
pub trait ReorgHandler<N: Network> {
async fn check(
&mut self,
block: &N::BlockResponse,
) -> Result<Option<N::BlockResponse>, ScannerError>;
}
#[derive(Clone, Debug)]
pub(crate) struct DefaultReorgHandler<N: Network = Ethereum> {
provider: RobustProvider<N>,
buffer: RingBuffer<(BlockNumber, BlockHash)>,
}
impl<N: Network> ReorgHandler<N> for DefaultReorgHandler<N> {
#[cfg_attr(
feature = "tracing",
tracing::instrument(level = "trace", fields(block.hash = %block.header().hash(), block.number = block.header().number()))
)]
async fn check(
&mut self,
block: &N::BlockResponse,
) -> Result<Option<N::BlockResponse>, ScannerError> {
let block = block.header();
if !self.reorg_detected(block).await? {
let block_number = block.number();
let block_hash = block.hash();
if !matches!(self.buffer.back(), Some(&(_, hash)) if hash == block_hash) {
self.buffer.push((block_number, block_hash));
trace!(
block_number,
block_hash = %block_hash,
"Block hash added to reorg buffer"
);
}
return Ok(None);
}
debug!(
block_number = block.number(),
block_hash = %block.hash(),
"Reorg detected, searching for common ancestor"
);
while let Some(&(candidate_number, candidate_hash)) = self.buffer.back() {
trace!(
block_number = candidate_number,
block_hash = %candidate_hash,
"Checking if buffered block is canonical"
);
match self.provider.get_block_by_number(candidate_number.into()).await {
Ok(canonical) if canonical.header().hash() == candidate_hash => {
debug!(
common_ancestor_hash = %candidate_hash,
common_ancestor_number = candidate_number,
"Found common ancestor"
);
return self.return_common_ancestor(canonical).await;
}
Ok(_) | Err(robust_provider::Error::BlockNotFound) => {
trace!(
block_number = candidate_number,
block_hash = %candidate_hash,
"Buffered block is no longer canonical, removing from buffer"
);
_ = self.buffer.pop_back();
}
Err(e) => return Err(e.into()),
}
}
info!("No common ancestors found in buffer, falling back to finalized block");
let finalized = self.provider.get_block_by_number(BlockNumberOrTag::Finalized).await?;
Ok(Some(finalized))
}
}
impl<N: Network> DefaultReorgHandler<N> {
pub fn new(provider: RobustProvider<N>, capacity: RingBufferCapacity) -> Self {
Self { provider, buffer: RingBuffer::new(capacity) }
}
async fn reorg_detected(&self, block: &N::HeaderResponse) -> Result<bool, ScannerError> {
match self.provider.get_block_by_number(block.number().into()).await {
Ok(canonical_block) => Ok(canonical_block.header().hash() != block.hash()),
Err(robust_provider::Error::BlockNotFound) => Ok(true),
Err(e) => Err(e.into()),
}
}
async fn return_common_ancestor(
&mut self,
common_ancestor: <N as Network>::BlockResponse,
) -> Result<Option<N::BlockResponse>, ScannerError> {
let common_ancestor_header = common_ancestor.header();
let finalized = self.provider.get_block_by_number(BlockNumberOrTag::Finalized).await?;
let finalized_header = finalized.header();
let common_ancestor = if finalized_header.number() <= common_ancestor_header.number() {
debug!(
common_ancestor_number = common_ancestor_header.number(),
common_ancestor_hash = %common_ancestor_header.hash(),
"Returning common ancestor"
);
common_ancestor
} else {
warn!(
common_ancestor_number = common_ancestor_header.number(),
common_ancestor_hash = %common_ancestor_header.hash(),
finalized_number = finalized_header.number(),
finalized_hash = %finalized_header.hash(),
"Found common ancestor predates finalized block, falling back to finalized"
);
self.buffer.clear();
finalized
};
Ok(Some(common_ancestor))
}
}
#[cfg(test)]
mod tests {
use super::*;
use alloy::{
consensus::Header as ConsensusHeader,
network::Ethereum,
providers::{RootProvider, mock::Asserter},
rpc::{client::RpcClient, types::Block},
};
use robust_provider::RobustProviderBuilder;
fn block(number: u64, hash: BlockHash) -> Block {
Block::empty(alloy::rpc::types::Header {
hash,
inner: ConsensusHeader { number, ..Default::default() },
total_difficulty: None,
size: None,
})
}
#[tokio::test]
async fn detects_reorg_when_old_hash_is_available_but_not_canonical() -> anyhow::Result<()> {
let parent_hash = BlockHash::repeat_byte(0x09);
let old_hash = BlockHash::repeat_byte(0x0a);
let new_hash = BlockHash::repeat_byte(0x0b);
let parent_block = block(9, parent_hash);
let old_block = block(10, old_hash);
let new_block = block(10, new_hash);
let asserter = Asserter::new();
asserter.push_success(&Some(new_block.clone()));
asserter.push_success(&Some(new_block));
asserter.push_success(&Some(parent_block.clone()));
asserter.push_success(&Some(parent_block.clone()));
asserter.push_success(&Some(parent_block));
let provider = RootProvider::<Ethereum>::new(RpcClient::mocked(asserter));
let provider = RobustProviderBuilder::fragile(provider).build().await?;
let mut handler = DefaultReorgHandler::new(provider, RingBufferCapacity::Limited(10));
handler.buffer.push((9, parent_hash));
handler.buffer.push((10, old_hash));
let common_ancestor = handler.check(&old_block).await?.expect("reorg should be detected");
assert_eq!(common_ancestor.header().number(), 9);
assert_eq!(common_ancestor.header().hash(), parent_hash);
Ok(())
}
}