1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
use alloy::{
consensus::BlockHeader,
eips::BlockNumberOrTag,
network::{BlockResponse, Ethereum, Network, primitives::HeaderResponse},
primitives::BlockHash,
};
use robust_provider::RobustProvider;
use super::ring_buffer::RingBuffer;
use crate::{ScannerError, block_range_scanner::ring_buffer::RingBufferCapacity};
/// Trait for handling chain reorganizations.
#[allow(async_fn_in_trait)]
pub trait ReorgHandler<N: Network> {
/// Checks if a block was reorged and returns the common ancestor if found.
///
/// # Arguments
///
/// * `block` - The block to check for reorg.
///
/// # Returns
///
/// * `Ok(Some(common_ancestor))` - If a reorg was detected, returns the common ancestor block.
/// * `Ok(None)` - If no reorg was detected, returns `None`.
/// * `Err(e)` - If an error occurred while checking for reorg.
async fn check(
&mut self,
block: &N::BlockResponse,
) -> Result<Option<N::BlockResponse>, ScannerError>;
}
/// Default implementation of [`ReorgHandler`] that uses an RPC provider.
#[derive(Clone, Debug)]
pub(crate) struct DefaultReorgHandler<N: Network = Ethereum> {
provider: RobustProvider<N>,
buffer: RingBuffer<BlockHash>,
}
impl<N: Network> ReorgHandler<N> for DefaultReorgHandler<N> {
/// Checks if a block was reorged and returns the common ancestor if found.
///
/// # Arguments
///
/// * `block` - The block to check for reorg.
///
/// # Returns
///
/// * `Ok(Some(common_ancestor))` - If a reorg was detected, returns the common ancestor block.
/// * `Ok(None)` - If no reorg was detected, returns `None`.
/// * `Err(e)` - If an error occurred while checking for reorg.
///
/// # Edge Cases
///
/// * **Duplicate block detection** - If the incoming block hash matches the last buffered hash,
/// it won't be added again to prevent buffer pollution from duplicate checks.
///
/// * **Empty buffer on reorg** - If a reorg is detected but the buffer is empty (e.g., first
/// block after initialization), the function falls back to the finalized block as the common
/// ancestor.
///
/// * **Deep reorg beyond buffer capacity** - If all buffered blocks are reorged (buffer
/// exhausted), the finalized block is used as a safe fallback to prevent data loss.
///
/// * **Common ancestor beyond finalized** - This can happen if not all sequental blocks are
/// checked and stored. If the found common ancestor has a lower block number than the
/// finalized block, the finalized block is used instead and the buffer is cleared.
///
/// * **Network errors during lookup** - Non-`BlockNotFound` errors (e.g., RPC failures) are
/// propagated immediately rather than being treated as reorgs.
///
/// * **Finalized block unavailable** - If the finalized block cannot be fetched when needed as
/// a fallback, the error is propagated to the caller.
#[cfg_attr(
feature = "tracing",
tracing::instrument(level = "trace", fields(block.hash = %block.header().hash(), block.number = block.header().number()))
)]
async fn check(
&mut self,
block: &N::BlockResponse,
) -> Result<Option<N::BlockResponse>, ScannerError> {
let block = block.header();
if !self.reorg_detected(block).await? {
let block_hash = block.hash();
// store the incoming block's hash for future reference
if !matches!(self.buffer.back(), Some(&hash) if hash == block_hash) {
self.buffer.push(block_hash);
trace!(
block_number = block.number(),
block_hash = %block_hash,
"Block hash added to reorg buffer"
);
}
return Ok(None);
}
debug!(
block_number = block.number(),
block_hash = %block.hash(),
"Reorg detected, searching for common ancestor"
);
while let Some(&block_hash) = self.buffer.back() {
trace!(block_hash = %block_hash, "Checking if buffered block exists on chain");
match self.provider.get_block_by_hash(block_hash).await {
Ok(common_ancestor) => {
debug!(
common_ancestor_hash = %block_hash,
common_ancestor_number = common_ancestor.header().number(),
"Found common ancestor"
);
return self.return_common_ancestor(common_ancestor).await;
}
Err(robust_provider::Error::BlockNotFound) => {
// block was reorged
trace!(block_hash = %block_hash, "Buffered block was reorged, removing from buffer");
_ = self.buffer.pop_back();
}
Err(e) => return Err(e.into()),
}
}
// return last finalized block as common ancestor
// no need to store finalized block's hash in the buffer, as it is returned by default only
// if not buffered hashes exist on-chain
info!("No common ancestors found in buffer, falling back to finalized block");
let finalized = self.provider.get_block_by_number(BlockNumberOrTag::Finalized).await?;
Ok(Some(finalized))
}
}
impl<N: Network> DefaultReorgHandler<N> {
pub fn new(provider: RobustProvider<N>, capacity: RingBufferCapacity) -> Self {
Self { provider, buffer: RingBuffer::new(capacity) }
}
async fn reorg_detected(&self, block: &N::HeaderResponse) -> Result<bool, ScannerError> {
match self.provider.get_block_by_hash(block.hash()).await {
Ok(_) => Ok(false),
Err(robust_provider::Error::BlockNotFound) => Ok(true),
Err(e) => Err(e.into()),
}
}
async fn return_common_ancestor(
&mut self,
common_ancestor: <N as Network>::BlockResponse,
) -> Result<Option<N::BlockResponse>, ScannerError> {
let common_ancestor_header = common_ancestor.header();
let finalized = self.provider.get_block_by_number(BlockNumberOrTag::Finalized).await?;
let finalized_header = finalized.header();
let common_ancestor = if finalized_header.number() <= common_ancestor_header.number() {
debug!(
common_ancestor_number = common_ancestor_header.number(),
common_ancestor_hash = %common_ancestor_header.hash(),
"Returning common ancestor"
);
common_ancestor
} else {
warn!(
common_ancestor_number = common_ancestor_header.number(),
common_ancestor_hash = %common_ancestor_header.hash(),
finalized_number = finalized_header.number(),
finalized_hash = %finalized_header.hash(),
"Found common ancestor predates finalized block, falling back to finalized"
);
// all buffered blocks are finalized, so no more need to track them
self.buffer.clear();
finalized
};
Ok(Some(common_ancestor))
}
}