event_scanner/block_range_scanner/
scanner.rs

1//! Block-range streaming service.
2//!
3//! This module provides a lower-level primitive used by [`crate::EventScanner`]: it streams
4//! contiguous block number ranges (inclusive) and emits [`crate::Notification`] values for
5//! certain state transitions (e.g. reorg detection).
6//!
7//! [`BlockRangeScanner`] is useful when you want to build your own log-fetching pipeline on top of
8//! range streaming, or when you need direct access to the scanner's batching and reorg-detection
9//! behavior.
10//!
11//! # Output stream
12//!
13//! Streams returned by [`ConnectedBlockRangeScanner`] yield [`BlockScannerResult`] items:
14//!
15//! - `Ok(ScannerMessage::Data(range))` for a block range to process.
16//! - `Ok(ScannerMessage::Notification(_))` for scanner notifications.
17//! - `Err(ScannerError)` for errors.
18//!
19//! # Ordering
20//!
21//! Range messages are streamed in chronological order within a single stream (lower block number
22//! to higher block number). On reorgs, the scanner may re-emit previously-seen ranges for the
23//! affected blocks.
24//!
25//! # Example usage:
26//!
27//! ```rust,no_run
28//! use alloy::{eips::BlockNumberOrTag, network::Ethereum, primitives::BlockNumber};
29//! use std::ops::RangeInclusive;
30//! use tokio_stream::{StreamExt, wrappers::ReceiverStream};
31//!
32//! use alloy::providers::{Provider, ProviderBuilder};
33//! use event_scanner::{
34//!     BlockRangeScannerBuilder, DEFAULT_BLOCK_CONFIRMATIONS, ScannerError, ScannerMessage,
35//! };
36//! use robust_provider::RobustProviderBuilder;
37//! use tokio::time::Duration;
38//! use tracing::{error, info};
39//!
40//! #[tokio::main]
41//! async fn main() -> Result<(), Box<dyn std::error::Error>> {
42//!     // Initialize logging
43//!     tracing_subscriber::fmt::init();
44//!
45//!     // Configuration
46//!     let provider = ProviderBuilder::new().connect("ws://localhost:8546").await?;
47//!     let robust_provider = RobustProviderBuilder::new(provider).build().await?;
48//!     let block_range_scanner = BlockRangeScannerBuilder::new().connect(robust_provider).await?;
49//!
50//!     let mut stream = block_range_scanner
51//!         .stream_from(BlockNumberOrTag::Number(5), DEFAULT_BLOCK_CONFIRMATIONS)
52//!         .await?;
53//!
54//!     while let Some(message) = stream.next().await {
55//!         match message {
56//!             Ok(ScannerMessage::Data(range)) => {
57//!                 // process range
58//!             }
59//!             Ok(ScannerMessage::Notification(notification)) => {
60//!                 info!("Received notification: {:?}", notification);
61//!             }
62//!             Err(e) => {
63//!                 error!("Received error from subscription: {e}");
64//!                 match e {
65//!                     ScannerError::Lagged(_) => break,
66//!                     _ => {
67//!                         error!("Non-fatal error, continuing: {e}");
68//!                     }
69//!                 }
70//!             }
71//!         }
72//!     }
73//!
74//!     info!("Data processing stopped.");
75//!
76//!     Ok(())
77//! }
78//! ```
79
80use robust_provider::RobustProvider;
81use std::{cmp::Ordering, fmt::Debug};
82use tokio::sync::mpsc;
83use tokio_stream::wrappers::ReceiverStream;
84
85use crate::{
86    ScannerError,
87    block_range_scanner::{
88        RingBufferCapacity,
89        common::{self, BlockScannerResult},
90        reorg_handler::ReorgHandler,
91        rewind_handler::RewindHandler,
92        sync_handler::SyncHandler,
93    },
94};
95
96use alloy::{
97    consensus::BlockHeader,
98    eips::BlockId,
99    network::{BlockResponse, Network},
100};
101
102/// A [`BlockRangeScanner`] connected to a provider.
103#[derive(Debug)]
104pub struct BlockRangeScanner<N: Network> {
105    provider: RobustProvider<N>,
106    max_block_range: u64,
107    past_blocks_storage_capacity: RingBufferCapacity,
108    buffer_capacity: usize,
109}
110
111impl<N: Network> BlockRangeScanner<N> {
112    /// Creates a new [`BlockRangeScanner`] with the specified configuration.
113    ///
114    /// # Arguments
115    ///
116    /// * `provider` - The robust provider to use for blockchain interactions
117    /// * `max_block_range` - Maximum number of blocks per streamed range (must be > 0)
118    /// * `past_blocks_storage_capacity` - How many past block hashes to keep for reorg detection
119    /// * `buffer_capacity` - Stream buffer capacity (must be > 0)
120    #[must_use]
121    pub fn new(
122        provider: RobustProvider<N>,
123        max_block_range: u64,
124        past_blocks_storage_capacity: RingBufferCapacity,
125        buffer_capacity: usize,
126    ) -> Self {
127        Self { provider, max_block_range, past_blocks_storage_capacity, buffer_capacity }
128    }
129
130    /// Returns the underlying [`RobustProvider`].
131    #[must_use]
132    pub fn provider(&self) -> &RobustProvider<N> {
133        &self.provider
134    }
135
136    /// Returns the stream buffer capacity.
137    #[must_use]
138    pub fn buffer_capacity(&self) -> usize {
139        self.buffer_capacity
140    }
141
142    /// Streams live blocks starting from the latest block.
143    ///
144    /// # Arguments
145    ///
146    /// * `block_confirmations` - Number of confirmations to apply once in live mode.
147    ///
148    /// # Errors
149    ///
150    /// * [`ScannerError::Timeout`] - if an RPC call required for startup times out.
151    /// * [`ScannerError::RpcError`] - if an RPC call required for startup fails.
152    #[cfg_attr(feature = "tracing", tracing::instrument(level = "trace", skip_all))]
153    pub async fn stream_live(
154        &self,
155        block_confirmations: u64,
156    ) -> Result<ReceiverStream<BlockScannerResult>, ScannerError> {
157        let (blocks_sender, blocks_receiver) = mpsc::channel(self.buffer_capacity);
158
159        let max_block_range = self.max_block_range;
160        let past_blocks_storage_capacity = self.past_blocks_storage_capacity;
161        let latest = self.provider.get_block_number().await?;
162        let provider = self.provider.clone();
163
164        // the next block returned by the underlying subscription will always be `latest + 1`,
165        // because `latest` was already mined and subscription by definition only streams after new
166        // blocks have been mined
167        let start_block = (latest + 1).saturating_sub(block_confirmations);
168
169        debug!(
170            latest_block = latest,
171            start_block = start_block,
172            block_confirmations = block_confirmations,
173            max_block_range = max_block_range,
174            "Starting live block stream"
175        );
176
177        let subscription = self.provider.subscribe_blocks().await?;
178
179        tokio::spawn(async move {
180            let mut reorg_handler =
181                ReorgHandler::new(provider.clone(), past_blocks_storage_capacity);
182
183            common::stream_live_blocks(
184                start_block,
185                subscription,
186                &blocks_sender,
187                &provider,
188                block_confirmations,
189                max_block_range,
190                &mut reorg_handler,
191                false, // (notification unnecessary)
192            )
193            .await;
194
195            debug!("Live block stream ended");
196        });
197
198        Ok(ReceiverStream::new(blocks_receiver))
199    }
200
201    /// Streams a batch of historical blocks from `start_id` to `end_id`.
202    ///
203    /// # Arguments
204    ///
205    /// * `start_id` - The starting block id
206    /// * `end_id` - The ending block id
207    ///
208    /// # Errors
209    ///
210    /// * [`ScannerError::Timeout`] - if an RPC call required for startup times out.
211    /// * [`ScannerError::RpcError`] - if an RPC call required for startup fails.
212    /// * [`ScannerError::BlockNotFound`] - if `start_id` or `end_id` cannot be resolved.
213    #[cfg_attr(feature = "tracing", tracing::instrument(level = "trace", skip_all))]
214    pub async fn stream_historical(
215        &self,
216        start_id: impl Into<BlockId>,
217        end_id: impl Into<BlockId>,
218    ) -> Result<ReceiverStream<BlockScannerResult>, ScannerError> {
219        let (blocks_sender, blocks_receiver) = mpsc::channel(self.buffer_capacity);
220
221        let max_block_range = self.max_block_range;
222        let past_blocks_storage_capacity = self.past_blocks_storage_capacity;
223        let provider = self.provider.clone();
224
225        let (start_block, end_block) = tokio::try_join!(
226            self.provider.get_block(start_id.into()),
227            self.provider.get_block(end_id.into())
228        )?;
229
230        let start_block_num = start_block.header().number();
231        let end_block_num = end_block.header().number();
232
233        let (start_block_num, end_block_num) = match start_block_num.cmp(&end_block_num) {
234            Ordering::Greater => (end_block_num, start_block_num),
235            _ => (start_block_num, end_block_num),
236        };
237
238        let total_blocks = end_block_num.saturating_sub(start_block_num) + 1;
239        debug!(
240            from_block = start_block_num,
241            to_block = end_block_num,
242            total_blocks = total_blocks,
243            max_block_range = max_block_range,
244            "Starting historical block stream"
245        );
246
247        tokio::spawn(async move {
248            let mut reorg_handler =
249                ReorgHandler::new(provider.clone(), past_blocks_storage_capacity);
250
251            _ = common::stream_historical_range(
252                start_block_num,
253                end_block_num,
254                max_block_range,
255                &blocks_sender,
256                &provider,
257                &mut reorg_handler,
258            )
259            .await;
260
261            debug!("Historical block stream completed");
262        });
263
264        Ok(ReceiverStream::new(blocks_receiver))
265    }
266
267    /// Streams blocks starting from `start_id` and transitions to live mode.
268    ///
269    /// # Arguments
270    ///
271    /// * `start_id` - The starting block id.
272    /// * `block_confirmations` - Number of confirmations to apply once in live mode.
273    ///
274    /// # Errors
275    ///
276    /// * [`ScannerError::Timeout`] - if an RPC call required for startup times out.
277    /// * [`ScannerError::RpcError`] - if an RPC call required for startup fails.
278    /// * [`ScannerError::BlockNotFound`] - if `start_id` cannot be resolved.
279    #[cfg_attr(feature = "tracing", tracing::instrument(level = "trace", skip_all))]
280    pub async fn stream_from(
281        &self,
282        start_id: impl Into<BlockId>,
283        block_confirmations: u64,
284    ) -> Result<ReceiverStream<BlockScannerResult>, ScannerError> {
285        let (blocks_sender, blocks_receiver) = mpsc::channel(self.buffer_capacity);
286
287        let start_id = start_id.into();
288        debug!(
289            start_block = ?start_id,
290            block_confirmations = block_confirmations,
291            max_block_range = self.max_block_range,
292            "Starting sync block stream"
293        );
294
295        let sync_handler = SyncHandler::new(
296            self.provider.clone(),
297            self.max_block_range,
298            start_id,
299            block_confirmations,
300            self.past_blocks_storage_capacity,
301            blocks_sender,
302        );
303
304        sync_handler.run().await?;
305
306        Ok(ReceiverStream::new(blocks_receiver))
307    }
308
309    /// Streams blocks in reverse order from `start_id` to `end_id`.
310    ///
311    /// The `start_id` block is assumed to be greater than or equal to the `end_id` block.
312    /// Blocks are streamed in batches, where each batch is ordered from lower to higher
313    /// block numbers (chronological order within each batch), but batches themselves
314    /// progress from newer to older blocks.
315    ///
316    /// # Arguments
317    ///
318    /// * `start_id` - The starting block id (higher block number).
319    /// * `end_id` - The ending block id (lower block number).
320    ///
321    /// # Reorg Handling
322    ///
323    /// Reorg checks are only performed when the specified block range tip is above the
324    /// current finalized block height. When a reorg is detected:
325    ///
326    /// 1. A [`Notification::ReorgDetected`][reorg] is emitted with the common ancestor block
327    /// 2. The scanner fetches the new tip block at the same height
328    /// 3. Reorged blocks are re-streamed in chronological order (from `common_ancestor + 1` up to
329    ///    the new tip)
330    /// 4. The reverse scan continues from where it left off
331    ///
332    /// If the range tip is at or below the finalized block, no reorg checks are
333    /// performed since finalized blocks cannot be reorganized.
334    ///
335    /// # Note
336    ///
337    /// The reason reorged blocks are streamed in chronological order is to make it easier to handle
338    /// reorgs in [`EventScannerBuilder::latest`][latest mode] mode, i.e. to prepend reorged blocks
339    /// to the result collection, which must maintain chronological order.
340    ///
341    /// # Errors
342    ///
343    /// * [`ScannerError::Timeout`] - if an RPC call required for startup times out.
344    /// * [`ScannerError::RpcError`] - if an RPC call required for startup fails.
345    /// * [`ScannerError::BlockNotFound`] - if `start_id` or `end_id` cannot be resolved.
346    ///
347    /// [latest mode]: crate::EventScannerBuilder::latest
348    /// [reorg]: crate::Notification::ReorgDetected
349    #[cfg_attr(feature = "tracing", tracing::instrument(level = "trace", skip_all))]
350    pub async fn stream_rewind(
351        &self,
352        start_id: impl Into<BlockId>,
353        end_id: impl Into<BlockId>,
354    ) -> Result<ReceiverStream<BlockScannerResult>, ScannerError> {
355        let (blocks_sender, blocks_receiver) = mpsc::channel(self.buffer_capacity);
356
357        let start_id = start_id.into();
358        let end_id = end_id.into();
359        debug!(
360            from_block = ?start_id,
361            to_block = ?end_id,
362            max_block_range = self.max_block_range,
363            "Starting rewind block stream"
364        );
365
366        let rewind_handler = RewindHandler::new(
367            self.provider.clone(),
368            self.max_block_range,
369            start_id,
370            end_id,
371            self.past_blocks_storage_capacity,
372            blocks_sender,
373        );
374
375        rewind_handler.run().await?;
376
377        Ok(ReceiverStream::new(blocks_receiver))
378    }
379}
380
381#[cfg(test)]
382mod tests {
383    use crate::{
384        block_range_scanner::{
385            BlockRangeScannerBuilder, DEFAULT_MAX_BLOCK_RANGE, DEFAULT_STREAM_BUFFER_CAPACITY,
386        },
387        types::TryStream,
388    };
389
390    use super::*;
391    use alloy::{
392        network::Ethereum,
393        providers::{RootProvider, mock::Asserter},
394        rpc::client::RpcClient,
395    };
396    use tokio::sync::mpsc;
397
398    #[test]
399    fn block_range_scanner_defaults_match_constants() {
400        let scanner = BlockRangeScannerBuilder::new();
401
402        assert_eq!(scanner.max_block_range, DEFAULT_MAX_BLOCK_RANGE);
403        assert_eq!(scanner.buffer_capacity, DEFAULT_STREAM_BUFFER_CAPACITY);
404    }
405
406    #[test]
407    fn builder_methods_update_configuration() {
408        let scanner = BlockRangeScannerBuilder::new().max_block_range(42).buffer_capacity(33);
409
410        assert_eq!(scanner.max_block_range, 42);
411        assert_eq!(scanner.buffer_capacity, 33);
412    }
413
414    #[tokio::test]
415    async fn try_send_forwards_errors_to_subscribers() {
416        let (tx, mut rx) = mpsc::channel::<BlockScannerResult>(1);
417
418        _ = tx.try_stream(ScannerError::BlockNotFound).await;
419
420        assert!(matches!(rx.recv().await, Some(Err(ScannerError::BlockNotFound))));
421    }
422
423    #[tokio::test]
424    async fn returns_error_with_zero_buffer_capacity() {
425        let provider = RootProvider::<Ethereum>::new(RpcClient::mocked(Asserter::new()));
426        let result = BlockRangeScannerBuilder::new().buffer_capacity(0).connect(provider).await;
427
428        assert!(matches!(result, Err(ScannerError::InvalidBufferCapacity)));
429    }
430
431    #[tokio::test]
432    async fn returns_error_with_zero_max_block_range() {
433        let provider = RootProvider::<Ethereum>::new(RpcClient::mocked(Asserter::new()));
434        let result = BlockRangeScannerBuilder::new().max_block_range(0).connect(provider).await;
435
436        assert!(matches!(result, Err(ScannerError::InvalidMaxBlockRange)));
437    }
438}