Skip to main content

event_scanner/block_range_scanner/
reorg_handler.rs

1use alloy::{
2    consensus::BlockHeader,
3    eips::BlockNumberOrTag,
4    network::{BlockResponse, Ethereum, Network, primitives::HeaderResponse},
5    primitives::BlockHash,
6};
7use robust_provider::RobustProvider;
8
9use super::ring_buffer::RingBuffer;
10use crate::{ScannerError, block_range_scanner::ring_buffer::RingBufferCapacity};
11
12/// Trait for handling chain reorganizations.
13#[allow(async_fn_in_trait)]
14pub trait ReorgHandler<N: Network> {
15    /// Checks if a block was reorged and returns the common ancestor if found.
16    ///
17    /// # Arguments
18    ///
19    /// * `block` - The block to check for reorg.
20    ///
21    /// # Returns
22    ///
23    /// * `Ok(Some(common_ancestor))` - If a reorg was detected, returns the common ancestor block.
24    /// * `Ok(None)` - If no reorg was detected, returns `None`.
25    /// * `Err(e)` - If an error occurred while checking for reorg.
26    async fn check(
27        &mut self,
28        block: &N::BlockResponse,
29    ) -> Result<Option<N::BlockResponse>, ScannerError>;
30}
31
32/// Default implementation of [`ReorgHandler`] that uses an RPC provider.
33#[derive(Clone, Debug)]
34pub(crate) struct DefaultReorgHandler<N: Network = Ethereum> {
35    provider: RobustProvider<N>,
36    buffer: RingBuffer<BlockHash>,
37}
38
39impl<N: Network> ReorgHandler<N> for DefaultReorgHandler<N> {
40    /// Checks if a block was reorged and returns the common ancestor if found.
41    ///
42    /// # Arguments
43    ///
44    /// * `block` - The block to check for reorg.
45    ///
46    /// # Returns
47    ///
48    /// * `Ok(Some(common_ancestor))` - If a reorg was detected, returns the common ancestor block.
49    /// * `Ok(None)` - If no reorg was detected, returns `None`.
50    /// * `Err(e)` - If an error occurred while checking for reorg.
51    ///
52    /// # Edge Cases
53    ///
54    /// * **Duplicate block detection** - If the incoming block hash matches the last buffered hash,
55    ///   it won't be added again to prevent buffer pollution from duplicate checks.
56    ///
57    /// * **Empty buffer on reorg** - If a reorg is detected but the buffer is empty (e.g., first
58    ///   block after initialization), the function falls back to the finalized block as the common
59    ///   ancestor.
60    ///
61    /// * **Deep reorg beyond buffer capacity** - If all buffered blocks are reorged (buffer
62    ///   exhausted), the finalized block is used as a safe fallback to prevent data loss.
63    ///
64    /// * **Common ancestor beyond finalized** - This can happen if not all sequental blocks are
65    ///   checked and stored. If the found common ancestor has a lower block number than the
66    ///   finalized block, the finalized block is used instead and the buffer is cleared.
67    ///
68    /// * **Network errors during lookup** - Non-`BlockNotFound` errors (e.g., RPC failures) are
69    ///   propagated immediately rather than being treated as reorgs.
70    ///
71    /// * **Finalized block unavailable** - If the finalized block cannot be fetched when needed as
72    ///   a fallback, the error is propagated to the caller.
73    #[cfg_attr(
74        feature = "tracing",
75        tracing::instrument(level = "trace", fields(block.hash = %block.header().hash(), block.number = block.header().number()))
76    )]
77    async fn check(
78        &mut self,
79        block: &N::BlockResponse,
80    ) -> Result<Option<N::BlockResponse>, ScannerError> {
81        let block = block.header();
82
83        if !self.reorg_detected(block).await? {
84            let block_hash = block.hash();
85            // store the incoming block's hash for future reference
86            if !matches!(self.buffer.back(), Some(&hash) if hash == block_hash) {
87                self.buffer.push(block_hash);
88                trace!(
89                    block_number = block.number(),
90                    block_hash = %block_hash,
91                    "Block hash added to reorg buffer"
92                );
93            }
94            return Ok(None);
95        }
96
97        debug!(
98            block_number = block.number(),
99            block_hash = %block.hash(),
100            "Reorg detected, searching for common ancestor"
101        );
102
103        while let Some(&block_hash) = self.buffer.back() {
104            trace!(block_hash = %block_hash, "Checking if buffered block exists on chain");
105            match self.provider.get_block_by_hash(block_hash).await {
106                Ok(common_ancestor) => {
107                    debug!(
108                        common_ancestor_hash = %block_hash,
109                        common_ancestor_number = common_ancestor.header().number(),
110                        "Found common ancestor"
111                    );
112                    return self.return_common_ancestor(common_ancestor).await;
113                }
114                Err(robust_provider::Error::BlockNotFound) => {
115                    // block was reorged
116                    trace!(block_hash = %block_hash, "Buffered block was reorged, removing from buffer");
117                    _ = self.buffer.pop_back();
118                }
119                Err(e) => return Err(e.into()),
120            }
121        }
122
123        // return last finalized block as common ancestor
124
125        // no need to store finalized block's hash in the buffer, as it is returned by default only
126        // if not buffered hashes exist on-chain
127
128        info!("No common ancestors found in buffer, falling back to finalized block");
129
130        let finalized = self.provider.get_block_by_number(BlockNumberOrTag::Finalized).await?;
131
132        Ok(Some(finalized))
133    }
134}
135
136impl<N: Network> DefaultReorgHandler<N> {
137    pub fn new(provider: RobustProvider<N>, capacity: RingBufferCapacity) -> Self {
138        Self { provider, buffer: RingBuffer::new(capacity) }
139    }
140
141    async fn reorg_detected(&self, block: &N::HeaderResponse) -> Result<bool, ScannerError> {
142        match self.provider.get_block_by_hash(block.hash()).await {
143            Ok(_) => Ok(false),
144            Err(robust_provider::Error::BlockNotFound) => Ok(true),
145            Err(e) => Err(e.into()),
146        }
147    }
148
149    async fn return_common_ancestor(
150        &mut self,
151        common_ancestor: <N as Network>::BlockResponse,
152    ) -> Result<Option<N::BlockResponse>, ScannerError> {
153        let common_ancestor_header = common_ancestor.header();
154
155        let finalized = self.provider.get_block_by_number(BlockNumberOrTag::Finalized).await?;
156        let finalized_header = finalized.header();
157
158        let common_ancestor = if finalized_header.number() <= common_ancestor_header.number() {
159            debug!(
160                common_ancestor_number = common_ancestor_header.number(),
161                common_ancestor_hash = %common_ancestor_header.hash(),
162                "Returning common ancestor"
163            );
164            common_ancestor
165        } else {
166            warn!(
167                common_ancestor_number = common_ancestor_header.number(),
168                common_ancestor_hash = %common_ancestor_header.hash(),
169                finalized_number = finalized_header.number(),
170                finalized_hash = %finalized_header.hash(),
171                "Found common ancestor predates finalized block, falling back to finalized"
172            );
173            // all buffered blocks are finalized, so no more need to track them
174            self.buffer.clear();
175            finalized
176        };
177        Ok(Some(common_ancestor))
178    }
179}