Skip to main content

event_scanner/block_range_scanner/
scanner.rs

1//! Block-range streaming service.
2//!
3//! This module provides a lower-level primitive used by [`crate::EventScanner`]: it streams
4//! contiguous block number ranges (inclusive) and emits [`crate::Notification`] values for
5//! certain state transitions (e.g. reorg detection).
6//!
7//! [`BlockRangeScanner`] is useful when you want to build your own log-fetching pipeline on top of
8//! range streaming, or when you need direct access to the scanner's batching and reorg-detection
9//! behavior.
10//!
11//! # Output stream
12//!
13//! Streams returned by [`ConnectedBlockRangeScanner`] yield [`BlockScannerResult`] items:
14//!
15//! - `Ok(ScannerMessage::Data(range))` for a block range to process.
16//! - `Ok(ScannerMessage::Notification(_))` for scanner notifications.
17//! - `Err(ScannerError)` for errors.
18//!
19//! # Ordering
20//!
21//! Range messages are streamed in chronological order within a single stream (lower block number
22//! to higher block number). On reorgs, the scanner may re-emit previously-seen ranges for the
23//! affected blocks.
24//!
25//! # Example usage:
26//!
27//! ```rust,no_run
28//! use alloy::{eips::BlockNumberOrTag, network::Ethereum, primitives::BlockNumber};
29//! use std::ops::RangeInclusive;
30//! use tokio_stream::{StreamExt, wrappers::ReceiverStream};
31//!
32//! use alloy::providers::{Provider, ProviderBuilder};
33//! use event_scanner::{
34//!     BlockRangeScannerBuilder, DEFAULT_BLOCK_CONFIRMATIONS, ScannerError, ScannerMessage,
35//! };
36//! use robust_provider::RobustProviderBuilder;
37//! use tokio::time::Duration;
38//! use tracing::{error, info};
39//!
40//! #[tokio::main]
41//! async fn main() -> Result<(), Box<dyn std::error::Error>> {
42//!     // Initialize logging
43//!     tracing_subscriber::fmt::init();
44//!
45//!     // Configuration
46//!     let provider = ProviderBuilder::new().connect("ws://localhost:8546").await?;
47//!     let robust_provider = RobustProviderBuilder::new(provider).build().await?;
48//!     let block_range_scanner = BlockRangeScannerBuilder::new().connect(robust_provider).await?;
49//!
50//!     let mut stream = block_range_scanner
51//!         .stream_from(BlockNumberOrTag::Number(5), DEFAULT_BLOCK_CONFIRMATIONS)
52//!         .await?;
53//!
54//!     while let Some(message) = stream.next().await {
55//!         match message {
56//!             Ok(ScannerMessage::Data(range)) => {
57//!                 // process range
58//!             }
59//!             Ok(ScannerMessage::Notification(notification)) => {
60//!                 info!("Received notification: {:?}", notification);
61//!             }
62//!             Err(e) => {
63//!                 error!("Received error from subscription: {e}");
64//!                 match e {
65//!                     ScannerError::Lagged(_) => break,
66//!                     _ => {
67//!                         error!("Non-fatal error, continuing: {e}");
68//!                     }
69//!                 }
70//!             }
71//!         }
72//!     }
73//!
74//!     info!("Data processing stopped.");
75//!
76//!     Ok(())
77//! }
78//! ```
79
80use robust_provider::RobustProvider;
81use std::{cmp::Ordering, fmt::Debug};
82use tokio::sync::mpsc;
83use tokio_stream::wrappers::ReceiverStream;
84
85use crate::{
86    ScannerError,
87    block_range_scanner::{
88        RingBufferCapacity,
89        common::{self, BlockScannerResult},
90        historical_range_handler::HistoricalRangeHandler,
91        reorg_handler::DefaultReorgHandler,
92        rewind_handler::RewindHandler,
93        sync_handler::SyncHandler,
94    },
95};
96
97use alloy::{
98    consensus::BlockHeader,
99    eips::BlockId,
100    network::{BlockResponse, Network},
101};
102
103/// A [`BlockRangeScanner`] connected to a provider.
104#[derive(Debug)]
105pub struct BlockRangeScanner<N: Network> {
106    provider: RobustProvider<N>,
107    max_block_range: u64,
108    past_blocks_storage_capacity: RingBufferCapacity,
109    buffer_capacity: usize,
110}
111
112impl<N: Network> BlockRangeScanner<N> {
113    /// Creates a new [`BlockRangeScanner`] with the specified configuration.
114    ///
115    /// # Arguments
116    ///
117    /// * `provider` - The robust provider to use for blockchain interactions
118    /// * `max_block_range` - Maximum number of blocks per streamed range (must be > 0)
119    /// * `past_blocks_storage_capacity` - How many past block hashes to keep for reorg detection
120    /// * `buffer_capacity` - Stream buffer capacity (must be > 0)
121    #[must_use]
122    pub fn new(
123        provider: RobustProvider<N>,
124        max_block_range: u64,
125        past_blocks_storage_capacity: RingBufferCapacity,
126        buffer_capacity: usize,
127    ) -> Self {
128        Self { provider, max_block_range, past_blocks_storage_capacity, buffer_capacity }
129    }
130
131    /// Returns the underlying [`RobustProvider`].
132    #[must_use]
133    pub fn provider(&self) -> &RobustProvider<N> {
134        &self.provider
135    }
136
137    /// Returns the stream buffer capacity.
138    #[must_use]
139    pub fn buffer_capacity(&self) -> usize {
140        self.buffer_capacity
141    }
142
143    /// Streams live blocks starting from the latest block.
144    ///
145    /// # Arguments
146    ///
147    /// * `block_confirmations` - Number of confirmations to apply once in live mode.
148    ///
149    /// # Errors
150    ///
151    /// * [`ScannerError::Timeout`] - if an RPC call required for startup times out.
152    /// * [`ScannerError::RpcError`] - if an RPC call required for startup fails.
153    #[cfg_attr(feature = "tracing", tracing::instrument(level = "trace", skip_all))]
154    pub async fn stream_live(
155        &self,
156        block_confirmations: u64,
157    ) -> Result<ReceiverStream<BlockScannerResult>, ScannerError> {
158        let (blocks_sender, blocks_receiver) = mpsc::channel(self.buffer_capacity);
159
160        let max_block_range = self.max_block_range;
161        let past_blocks_storage_capacity = self.past_blocks_storage_capacity;
162        let latest = self.provider.get_block_number().await?;
163        let provider = self.provider.clone();
164
165        // the next block returned by the underlying subscription will always be `latest + 1`,
166        // because `latest` was already mined and subscription by definition only streams after new
167        // blocks have been mined
168        let start_block = (latest + 1).saturating_sub(block_confirmations);
169
170        debug!(
171            latest_block = latest,
172            start_block = start_block,
173            block_confirmations = block_confirmations,
174            max_block_range = max_block_range,
175            "Starting live block stream"
176        );
177
178        let subscription = self.provider.subscribe_blocks().await?;
179
180        tokio::spawn(async move {
181            let mut reorg_handler =
182                DefaultReorgHandler::new(provider.clone(), past_blocks_storage_capacity);
183
184            common::stream_live_blocks(
185                start_block,
186                subscription,
187                &blocks_sender,
188                &provider,
189                block_confirmations,
190                max_block_range,
191                &mut reorg_handler,
192                false, // (notification unnecessary)
193            )
194            .await;
195
196            debug!("Live block stream ended");
197        });
198
199        Ok(ReceiverStream::new(blocks_receiver))
200    }
201
202    /// Streams a batch of historical blocks from `start_id` to `end_id`.
203    ///
204    /// # Arguments
205    ///
206    /// * `start_id` - The starting block id
207    /// * `end_id` - The ending block id
208    ///
209    /// # Errors
210    ///
211    /// * [`ScannerError::Timeout`] - if an RPC call required for startup times out.
212    /// * [`ScannerError::RpcError`] - if an RPC call required for startup fails.
213    /// * [`ScannerError::BlockNotFound`] - if `start_id` or `end_id` cannot be resolved.
214    #[cfg_attr(feature = "tracing", tracing::instrument(level = "trace", skip_all))]
215    pub async fn stream_historical(
216        &self,
217        start_id: impl Into<BlockId>,
218        end_id: impl Into<BlockId>,
219    ) -> Result<ReceiverStream<BlockScannerResult>, ScannerError> {
220        let (blocks_sender, blocks_receiver) = mpsc::channel(self.buffer_capacity);
221
222        let max_block_range = self.max_block_range;
223        let provider = self.provider.clone();
224
225        let (start_block, end_block) = tokio::try_join!(
226            self.provider.get_block(start_id.into()),
227            self.provider.get_block(end_id.into())
228        )?;
229
230        let start_block_num = start_block.header().number();
231        let end_block_num = end_block.header().number();
232
233        let (start_block_num, end_block_num) = match start_block_num.cmp(&end_block_num) {
234            Ordering::Greater => (end_block_num, start_block_num),
235            _ => (start_block_num, end_block_num),
236        };
237
238        debug!(
239            from_block = start_block_num,
240            to_block = end_block_num,
241            total_blocks = end_block_num.saturating_sub(start_block_num) + 1,
242            max_block_range = max_block_range,
243            "Starting historical block stream"
244        );
245
246        let handler = HistoricalRangeHandler::new(
247            provider,
248            max_block_range,
249            start_block_num,
250            end_block_num,
251            blocks_sender,
252        );
253        handler.run();
254
255        Ok(ReceiverStream::new(blocks_receiver))
256    }
257
258    /// Streams blocks starting from `start_id` and transitions to live mode.
259    ///
260    /// # Arguments
261    ///
262    /// * `start_id` - The starting block id.
263    /// * `block_confirmations` - Number of confirmations to apply once in live mode.
264    ///
265    /// # Errors
266    ///
267    /// * [`ScannerError::Timeout`] - if an RPC call required for startup times out.
268    /// * [`ScannerError::RpcError`] - if an RPC call required for startup fails.
269    /// * [`ScannerError::BlockNotFound`] - if `start_id` cannot be resolved.
270    #[cfg_attr(feature = "tracing", tracing::instrument(level = "trace", skip_all))]
271    pub async fn stream_from(
272        &self,
273        start_id: impl Into<BlockId>,
274        block_confirmations: u64,
275    ) -> Result<ReceiverStream<BlockScannerResult>, ScannerError> {
276        let (blocks_sender, blocks_receiver) = mpsc::channel(self.buffer_capacity);
277
278        let start_id = start_id.into();
279        debug!(
280            start_block = ?start_id,
281            block_confirmations = block_confirmations,
282            max_block_range = self.max_block_range,
283            "Starting sync block stream"
284        );
285
286        let sync_handler = SyncHandler::new(
287            self.provider.clone(),
288            self.max_block_range,
289            start_id,
290            block_confirmations,
291            self.past_blocks_storage_capacity,
292            blocks_sender,
293        );
294
295        sync_handler.run().await?;
296
297        Ok(ReceiverStream::new(blocks_receiver))
298    }
299
300    /// Streams blocks in reverse order from `start_id` to `end_id`.
301    ///
302    /// The `start_id` block is assumed to be greater than or equal to the `end_id` block.
303    /// Blocks are streamed in batches, where each batch is ordered from lower to higher
304    /// block numbers (chronological order within each batch), but batches themselves
305    /// progress from newer to older blocks.
306    ///
307    /// # Arguments
308    ///
309    /// * `start_id` - The starting block id (higher block number).
310    /// * `end_id` - The ending block id (lower block number).
311    ///
312    /// # Reorg Handling
313    ///
314    /// Reorg checks are only performed when the specified block range tip is above the
315    /// current finalized block height. When a reorg is detected:
316    ///
317    /// 1. A [`Notification::ReorgDetected`][reorg] is emitted with the common ancestor block
318    /// 2. The scanner fetches the new tip block at the same height
319    /// 3. Reorged blocks are re-streamed in chronological order (from `common_ancestor + 1` up to
320    ///    the new tip)
321    /// 4. The reverse scan continues from where it left off
322    ///
323    /// If the range tip is at or below the finalized block, no reorg checks are
324    /// performed since finalized blocks cannot be reorganized.
325    ///
326    /// # Note
327    ///
328    /// The reason reorged blocks are streamed in chronological order is to make it easier to handle
329    /// reorgs in [`EventScannerBuilder::latest`][latest mode] mode, i.e. to prepend reorged blocks
330    /// to the result collection, which must maintain chronological order.
331    ///
332    /// # Errors
333    ///
334    /// * [`ScannerError::Timeout`] - if an RPC call required for startup times out.
335    /// * [`ScannerError::RpcError`] - if an RPC call required for startup fails.
336    /// * [`ScannerError::BlockNotFound`] - if `start_id` or `end_id` cannot be resolved.
337    ///
338    /// [latest mode]: crate::EventScannerBuilder::latest
339    /// [reorg]: crate::Notification::ReorgDetected
340    #[cfg_attr(feature = "tracing", tracing::instrument(level = "trace", skip_all))]
341    pub async fn stream_rewind(
342        &self,
343        start_id: impl Into<BlockId>,
344        end_id: impl Into<BlockId>,
345    ) -> Result<ReceiverStream<BlockScannerResult>, ScannerError> {
346        let (blocks_sender, blocks_receiver) = mpsc::channel(self.buffer_capacity);
347
348        let start_id = start_id.into();
349        let end_id = end_id.into();
350        debug!(
351            from_block = ?start_id,
352            to_block = ?end_id,
353            max_block_range = self.max_block_range,
354            "Starting rewind block stream"
355        );
356
357        let rewind_handler = RewindHandler::new(
358            self.provider.clone(),
359            self.max_block_range,
360            start_id,
361            end_id,
362            self.past_blocks_storage_capacity,
363            blocks_sender,
364        );
365
366        rewind_handler.run().await?;
367
368        Ok(ReceiverStream::new(blocks_receiver))
369    }
370}
371
372#[cfg(test)]
373mod tests {
374    use crate::{
375        block_range_scanner::{
376            BlockRangeScannerBuilder, DEFAULT_MAX_BLOCK_RANGE, DEFAULT_STREAM_BUFFER_CAPACITY,
377        },
378        types::TryStream,
379    };
380
381    use super::*;
382    use alloy::{
383        network::Ethereum,
384        providers::{RootProvider, mock::Asserter},
385        rpc::client::RpcClient,
386    };
387    use tokio::sync::mpsc;
388
389    #[test]
390    fn block_range_scanner_defaults_match_constants() {
391        let scanner = BlockRangeScannerBuilder::new();
392
393        assert_eq!(scanner.max_block_range, DEFAULT_MAX_BLOCK_RANGE);
394        assert_eq!(scanner.buffer_capacity, DEFAULT_STREAM_BUFFER_CAPACITY);
395    }
396
397    #[test]
398    fn builder_methods_update_configuration() {
399        let scanner = BlockRangeScannerBuilder::new().max_block_range(42).buffer_capacity(33);
400
401        assert_eq!(scanner.max_block_range, 42);
402        assert_eq!(scanner.buffer_capacity, 33);
403    }
404
405    #[tokio::test]
406    async fn try_send_forwards_errors_to_subscribers() {
407        let (tx, mut rx) = mpsc::channel::<BlockScannerResult>(1);
408
409        _ = tx.try_stream(ScannerError::BlockNotFound).await;
410
411        assert!(matches!(rx.recv().await, Some(Err(ScannerError::BlockNotFound))));
412    }
413
414    #[tokio::test]
415    async fn returns_error_with_zero_buffer_capacity() {
416        let provider = RootProvider::<Ethereum>::new(RpcClient::mocked(Asserter::new()));
417        let result = BlockRangeScannerBuilder::new().buffer_capacity(0).connect(provider).await;
418
419        assert!(matches!(result, Err(ScannerError::InvalidBufferCapacity)));
420    }
421
422    #[tokio::test]
423    async fn returns_error_with_zero_max_block_range() {
424        let provider = RootProvider::<Ethereum>::new(RpcClient::mocked(Asserter::new()));
425        let result = BlockRangeScannerBuilder::new().max_block_range(0).connect(provider).await;
426
427        assert!(matches!(result, Err(ScannerError::InvalidMaxBlockRange)));
428    }
429}