event_scanner/block_range_scanner/
reorg_handler.rs1use alloy::{
2 consensus::BlockHeader,
3 eips::BlockNumberOrTag,
4 network::{BlockResponse, Ethereum, Network, primitives::HeaderResponse},
5 primitives::BlockHash,
6};
7use robust_provider::RobustProvider;
8
9use super::ring_buffer::RingBuffer;
10use crate::{ScannerError, block_range_scanner::ring_buffer::RingBufferCapacity};
11
12#[allow(async_fn_in_trait)]
14pub trait ReorgHandler<N: Network> {
15 async fn check(
27 &mut self,
28 block: &N::BlockResponse,
29 ) -> Result<Option<N::BlockResponse>, ScannerError>;
30}
31
32#[derive(Clone, Debug)]
34pub(crate) struct DefaultReorgHandler<N: Network = Ethereum> {
35 provider: RobustProvider<N>,
36 buffer: RingBuffer<BlockHash>,
37}
38
39impl<N: Network> ReorgHandler<N> for DefaultReorgHandler<N> {
40 #[cfg_attr(
74 feature = "tracing",
75 tracing::instrument(level = "trace", fields(block.hash = %block.header().hash(), block.number = block.header().number()))
76 )]
77 async fn check(
78 &mut self,
79 block: &N::BlockResponse,
80 ) -> Result<Option<N::BlockResponse>, ScannerError> {
81 let block = block.header();
82
83 if !self.reorg_detected(block).await? {
84 let block_hash = block.hash();
85 if !matches!(self.buffer.back(), Some(&hash) if hash == block_hash) {
87 self.buffer.push(block_hash);
88 trace!(
89 block_number = block.number(),
90 block_hash = %block_hash,
91 "Block hash added to reorg buffer"
92 );
93 }
94 return Ok(None);
95 }
96
97 debug!(
98 block_number = block.number(),
99 block_hash = %block.hash(),
100 "Reorg detected, searching for common ancestor"
101 );
102
103 while let Some(&block_hash) = self.buffer.back() {
104 trace!(block_hash = %block_hash, "Checking if buffered block exists on chain");
105 match self.provider.get_block_by_hash(block_hash).await {
106 Ok(common_ancestor) => {
107 debug!(
108 common_ancestor_hash = %block_hash,
109 common_ancestor_number = common_ancestor.header().number(),
110 "Found common ancestor"
111 );
112 return self.return_common_ancestor(common_ancestor).await;
113 }
114 Err(robust_provider::Error::BlockNotFound) => {
115 trace!(block_hash = %block_hash, "Buffered block was reorged, removing from buffer");
117 _ = self.buffer.pop_back();
118 }
119 Err(e) => return Err(e.into()),
120 }
121 }
122
123 info!("No common ancestors found in buffer, falling back to finalized block");
129
130 let finalized = self.provider.get_block_by_number(BlockNumberOrTag::Finalized).await?;
131
132 Ok(Some(finalized))
133 }
134}
135
136impl<N: Network> DefaultReorgHandler<N> {
137 pub fn new(provider: RobustProvider<N>, capacity: RingBufferCapacity) -> Self {
138 Self { provider, buffer: RingBuffer::new(capacity) }
139 }
140
141 async fn reorg_detected(&self, block: &N::HeaderResponse) -> Result<bool, ScannerError> {
142 match self.provider.get_block_by_hash(block.hash()).await {
143 Ok(_) => Ok(false),
144 Err(robust_provider::Error::BlockNotFound) => Ok(true),
145 Err(e) => Err(e.into()),
146 }
147 }
148
149 async fn return_common_ancestor(
150 &mut self,
151 common_ancestor: <N as Network>::BlockResponse,
152 ) -> Result<Option<N::BlockResponse>, ScannerError> {
153 let common_ancestor_header = common_ancestor.header();
154
155 let finalized = self.provider.get_block_by_number(BlockNumberOrTag::Finalized).await?;
156 let finalized_header = finalized.header();
157
158 let common_ancestor = if finalized_header.number() <= common_ancestor_header.number() {
159 debug!(
160 common_ancestor_number = common_ancestor_header.number(),
161 common_ancestor_hash = %common_ancestor_header.hash(),
162 "Returning common ancestor"
163 );
164 common_ancestor
165 } else {
166 warn!(
167 common_ancestor_number = common_ancestor_header.number(),
168 common_ancestor_hash = %common_ancestor_header.hash(),
169 finalized_number = finalized_header.number(),
170 finalized_hash = %finalized_header.hash(),
171 "Found common ancestor predates finalized block, falling back to finalized"
172 );
173 self.buffer.clear();
175 finalized
176 };
177 Ok(Some(common_ancestor))
178 }
179}