event_scanner/block_range_scanner/scanner.rs
1//! Block-range streaming service.
2//!
3//! This module provides a lower-level primitive used by [`crate::EventScanner`]: it streams
4//! contiguous block number ranges (inclusive) and emits [`crate::Notification`] values for
5//! certain state transitions (e.g. reorg detection).
6//!
7//! [`BlockRangeScanner`] is useful when you want to build your own log-fetching pipeline on top of
8//! range streaming, or when you need direct access to the scanner's batching and reorg-detection
9//! behavior.
10//!
11//! # Output stream
12//!
13//! Streams returned by [`ConnectedBlockRangeScanner`] yield [`BlockScannerResult`] items:
14//!
15//! - `Ok(ScannerMessage::Data(range))` for a block range to process.
16//! - `Ok(ScannerMessage::Notification(_))` for scanner notifications.
17//! - `Err(ScannerError)` for errors.
18//!
19//! # Ordering
20//!
21//! Range messages are streamed in chronological order within a single stream (lower block number
22//! to higher block number). On reorgs, the scanner may re-emit previously-seen ranges for the
23//! affected blocks.
24//!
25//! # Example usage:
26//!
27//! ```rust,no_run
28//! use alloy::{eips::BlockNumberOrTag, network::Ethereum, primitives::BlockNumber};
29//! use std::ops::RangeInclusive;
30//! use tokio_stream::{StreamExt, wrappers::ReceiverStream};
31//!
32//! use alloy::providers::{Provider, ProviderBuilder};
33//! use event_scanner::{
34//! BlockRangeScannerBuilder, DEFAULT_BLOCK_CONFIRMATIONS, ScannerError, ScannerMessage,
35//! };
36//! use robust_provider::RobustProviderBuilder;
37//! use tokio::time::Duration;
38//! use tracing::{error, info};
39//!
40//! #[tokio::main]
41//! async fn main() -> Result<(), Box<dyn std::error::Error>> {
42//! // Initialize logging
43//! tracing_subscriber::fmt::init();
44//!
45//! // Configuration
46//! let provider = ProviderBuilder::new().connect("ws://localhost:8546").await?;
47//! let robust_provider = RobustProviderBuilder::new(provider).build().await?;
48//! let block_range_scanner = BlockRangeScannerBuilder::new().connect(robust_provider).await?;
49//!
50//! let mut stream = block_range_scanner
51//! .stream_from(BlockNumberOrTag::Number(5), DEFAULT_BLOCK_CONFIRMATIONS)
52//! .await?;
53//!
54//! while let Some(message) = stream.next().await {
55//! match message {
56//! Ok(ScannerMessage::Data(range)) => {
57//! // process range
58//! }
59//! Ok(ScannerMessage::Notification(notification)) => {
60//! info!("Received notification: {:?}", notification);
61//! }
62//! Err(e) => {
63//! error!("Received error from subscription: {e}");
64//! match e {
65//! ScannerError::Lagged(_) => break,
66//! _ => {
67//! error!("Non-fatal error, continuing: {e}");
68//! }
69//! }
70//! }
71//! }
72//! }
73//!
74//! info!("Data processing stopped.");
75//!
76//! Ok(())
77//! }
78//! ```
79
80use robust_provider::RobustProvider;
81use std::{cmp::Ordering, fmt::Debug};
82use tokio::sync::mpsc;
83use tokio_stream::wrappers::ReceiverStream;
84
85use crate::{
86 ScannerError,
87 block_range_scanner::{
88 RingBufferCapacity,
89 common::{self, BlockScannerResult},
90 historical_range_handler::HistoricalRangeHandler,
91 reorg_handler::DefaultReorgHandler,
92 rewind_handler::RewindHandler,
93 sync_handler::SyncHandler,
94 },
95};
96
97use alloy::{
98 consensus::BlockHeader,
99 eips::BlockId,
100 network::{BlockResponse, Network},
101};
102
103/// A [`BlockRangeScanner`] connected to a provider.
104#[derive(Debug)]
105pub struct BlockRangeScanner<N: Network> {
106 provider: RobustProvider<N>,
107 max_block_range: u64,
108 past_blocks_storage_capacity: RingBufferCapacity,
109 buffer_capacity: usize,
110}
111
112impl<N: Network> BlockRangeScanner<N> {
113 /// Creates a new [`BlockRangeScanner`] with the specified configuration.
114 ///
115 /// # Arguments
116 ///
117 /// * `provider` - The robust provider to use for blockchain interactions
118 /// * `max_block_range` - Maximum number of blocks per streamed range (must be > 0)
119 /// * `past_blocks_storage_capacity` - How many past block hashes to keep for reorg detection
120 /// * `buffer_capacity` - Stream buffer capacity (must be > 0)
121 #[must_use]
122 pub fn new(
123 provider: RobustProvider<N>,
124 max_block_range: u64,
125 past_blocks_storage_capacity: RingBufferCapacity,
126 buffer_capacity: usize,
127 ) -> Self {
128 Self { provider, max_block_range, past_blocks_storage_capacity, buffer_capacity }
129 }
130
131 /// Returns the underlying [`RobustProvider`].
132 #[must_use]
133 pub fn provider(&self) -> &RobustProvider<N> {
134 &self.provider
135 }
136
137 /// Returns the stream buffer capacity.
138 #[must_use]
139 pub fn buffer_capacity(&self) -> usize {
140 self.buffer_capacity
141 }
142
143 /// Streams live blocks starting from the latest block.
144 ///
145 /// # Arguments
146 ///
147 /// * `block_confirmations` - Number of confirmations to apply once in live mode.
148 ///
149 /// # Errors
150 ///
151 /// * [`ScannerError::Timeout`] - if an RPC call required for startup times out.
152 /// * [`ScannerError::RpcError`] - if an RPC call required for startup fails.
153 #[cfg_attr(feature = "tracing", tracing::instrument(level = "trace", skip_all))]
154 pub async fn stream_live(
155 &self,
156 block_confirmations: u64,
157 ) -> Result<ReceiverStream<BlockScannerResult>, ScannerError> {
158 let (blocks_sender, blocks_receiver) = mpsc::channel(self.buffer_capacity);
159
160 let max_block_range = self.max_block_range;
161 let past_blocks_storage_capacity = self.past_blocks_storage_capacity;
162 let latest = self.provider.get_block_number().await?;
163 let provider = self.provider.clone();
164
165 // the next block returned by the underlying subscription will always be `latest + 1`,
166 // because `latest` was already mined and subscription by definition only streams after new
167 // blocks have been mined
168 let start_block = (latest + 1).saturating_sub(block_confirmations);
169
170 debug!(
171 latest_block = latest,
172 start_block = start_block,
173 block_confirmations = block_confirmations,
174 max_block_range = max_block_range,
175 "Starting live block stream"
176 );
177
178 let subscription = self.provider.subscribe_blocks().await?;
179
180 tokio::spawn(async move {
181 let mut reorg_handler =
182 DefaultReorgHandler::new(provider.clone(), past_blocks_storage_capacity);
183
184 common::stream_live_blocks(
185 start_block,
186 subscription,
187 &blocks_sender,
188 &provider,
189 block_confirmations,
190 max_block_range,
191 &mut reorg_handler,
192 false, // (notification unnecessary)
193 )
194 .await;
195
196 debug!("Live block stream ended");
197 });
198
199 Ok(ReceiverStream::new(blocks_receiver))
200 }
201
202 /// Streams a batch of historical blocks from `start_id` to `end_id`.
203 ///
204 /// # Arguments
205 ///
206 /// * `start_id` - The starting block id
207 /// * `end_id` - The ending block id
208 ///
209 /// # Errors
210 ///
211 /// * [`ScannerError::Timeout`] - if an RPC call required for startup times out.
212 /// * [`ScannerError::RpcError`] - if an RPC call required for startup fails.
213 /// * [`ScannerError::BlockNotFound`] - if `start_id` or `end_id` cannot be resolved.
214 #[cfg_attr(feature = "tracing", tracing::instrument(level = "trace", skip_all))]
215 pub async fn stream_historical(
216 &self,
217 start_id: impl Into<BlockId>,
218 end_id: impl Into<BlockId>,
219 ) -> Result<ReceiverStream<BlockScannerResult>, ScannerError> {
220 let (blocks_sender, blocks_receiver) = mpsc::channel(self.buffer_capacity);
221
222 let max_block_range = self.max_block_range;
223 let provider = self.provider.clone();
224
225 let (start_block, end_block) = tokio::try_join!(
226 self.provider.get_block(start_id.into()),
227 self.provider.get_block(end_id.into())
228 )?;
229
230 let start_block_num = start_block.header().number();
231 let end_block_num = end_block.header().number();
232
233 let (start_block_num, end_block_num) = match start_block_num.cmp(&end_block_num) {
234 Ordering::Greater => (end_block_num, start_block_num),
235 _ => (start_block_num, end_block_num),
236 };
237
238 debug!(
239 from_block = start_block_num,
240 to_block = end_block_num,
241 total_blocks = end_block_num.saturating_sub(start_block_num) + 1,
242 max_block_range = max_block_range,
243 "Starting historical block stream"
244 );
245
246 let handler = HistoricalRangeHandler::new(
247 provider,
248 max_block_range,
249 start_block_num,
250 end_block_num,
251 blocks_sender,
252 );
253 handler.run();
254
255 Ok(ReceiverStream::new(blocks_receiver))
256 }
257
258 /// Streams blocks starting from `start_id` and transitions to live mode.
259 ///
260 /// # Arguments
261 ///
262 /// * `start_id` - The starting block id.
263 /// * `block_confirmations` - Number of confirmations to apply once in live mode.
264 ///
265 /// # Errors
266 ///
267 /// * [`ScannerError::Timeout`] - if an RPC call required for startup times out.
268 /// * [`ScannerError::RpcError`] - if an RPC call required for startup fails.
269 /// * [`ScannerError::BlockNotFound`] - if `start_id` cannot be resolved.
270 #[cfg_attr(feature = "tracing", tracing::instrument(level = "trace", skip_all))]
271 pub async fn stream_from(
272 &self,
273 start_id: impl Into<BlockId>,
274 block_confirmations: u64,
275 ) -> Result<ReceiverStream<BlockScannerResult>, ScannerError> {
276 let (blocks_sender, blocks_receiver) = mpsc::channel(self.buffer_capacity);
277
278 let start_id = start_id.into();
279 debug!(
280 start_block = ?start_id,
281 block_confirmations = block_confirmations,
282 max_block_range = self.max_block_range,
283 "Starting sync block stream"
284 );
285
286 let sync_handler = SyncHandler::new(
287 self.provider.clone(),
288 self.max_block_range,
289 start_id,
290 block_confirmations,
291 self.past_blocks_storage_capacity,
292 blocks_sender,
293 );
294
295 sync_handler.run().await?;
296
297 Ok(ReceiverStream::new(blocks_receiver))
298 }
299
300 /// Streams blocks in reverse order from `start_id` to `end_id`.
301 ///
302 /// The `start_id` block is assumed to be greater than or equal to the `end_id` block.
303 /// Blocks are streamed in batches, where each batch is ordered from lower to higher
304 /// block numbers (chronological order within each batch), but batches themselves
305 /// progress from newer to older blocks.
306 ///
307 /// # Arguments
308 ///
309 /// * `start_id` - The starting block id (higher block number).
310 /// * `end_id` - The ending block id (lower block number).
311 ///
312 /// # Reorg Handling
313 ///
314 /// Reorg checks are only performed when the specified block range tip is above the
315 /// current finalized block height. When a reorg is detected:
316 ///
317 /// 1. A [`Notification::ReorgDetected`][reorg] is emitted with the common ancestor block
318 /// 2. The scanner fetches the new tip block at the same height
319 /// 3. Reorged blocks are re-streamed in chronological order (from `common_ancestor + 1` up to
320 /// the new tip)
321 /// 4. The reverse scan continues from where it left off
322 ///
323 /// If the range tip is at or below the finalized block, no reorg checks are
324 /// performed since finalized blocks cannot be reorganized.
325 ///
326 /// # Note
327 ///
328 /// The reason reorged blocks are streamed in chronological order is to make it easier to handle
329 /// reorgs in [`EventScannerBuilder::latest`][latest mode] mode, i.e. to prepend reorged blocks
330 /// to the result collection, which must maintain chronological order.
331 ///
332 /// # Errors
333 ///
334 /// * [`ScannerError::Timeout`] - if an RPC call required for startup times out.
335 /// * [`ScannerError::RpcError`] - if an RPC call required for startup fails.
336 /// * [`ScannerError::BlockNotFound`] - if `start_id` or `end_id` cannot be resolved.
337 ///
338 /// [latest mode]: crate::EventScannerBuilder::latest
339 /// [reorg]: crate::Notification::ReorgDetected
340 #[cfg_attr(feature = "tracing", tracing::instrument(level = "trace", skip_all))]
341 pub async fn stream_rewind(
342 &self,
343 start_id: impl Into<BlockId>,
344 end_id: impl Into<BlockId>,
345 ) -> Result<ReceiverStream<BlockScannerResult>, ScannerError> {
346 let (blocks_sender, blocks_receiver) = mpsc::channel(self.buffer_capacity);
347
348 let start_id = start_id.into();
349 let end_id = end_id.into();
350 debug!(
351 from_block = ?start_id,
352 to_block = ?end_id,
353 max_block_range = self.max_block_range,
354 "Starting rewind block stream"
355 );
356
357 let rewind_handler = RewindHandler::new(
358 self.provider.clone(),
359 self.max_block_range,
360 start_id,
361 end_id,
362 self.past_blocks_storage_capacity,
363 blocks_sender,
364 );
365
366 rewind_handler.run().await?;
367
368 Ok(ReceiverStream::new(blocks_receiver))
369 }
370}
371
372#[cfg(test)]
373mod tests {
374 use crate::{
375 block_range_scanner::{
376 BlockRangeScannerBuilder, DEFAULT_MAX_BLOCK_RANGE, DEFAULT_STREAM_BUFFER_CAPACITY,
377 },
378 types::TryStream,
379 };
380
381 use super::*;
382 use alloy::{
383 network::Ethereum,
384 providers::{RootProvider, mock::Asserter},
385 rpc::client::RpcClient,
386 };
387 use tokio::sync::mpsc;
388
389 #[test]
390 fn block_range_scanner_defaults_match_constants() {
391 let scanner = BlockRangeScannerBuilder::new();
392
393 assert_eq!(scanner.max_block_range, DEFAULT_MAX_BLOCK_RANGE);
394 assert_eq!(scanner.buffer_capacity, DEFAULT_STREAM_BUFFER_CAPACITY);
395 }
396
397 #[test]
398 fn builder_methods_update_configuration() {
399 let scanner = BlockRangeScannerBuilder::new().max_block_range(42).buffer_capacity(33);
400
401 assert_eq!(scanner.max_block_range, 42);
402 assert_eq!(scanner.buffer_capacity, 33);
403 }
404
405 #[tokio::test]
406 async fn try_send_forwards_errors_to_subscribers() {
407 let (tx, mut rx) = mpsc::channel::<BlockScannerResult>(1);
408
409 _ = tx.try_stream(ScannerError::BlockNotFound).await;
410
411 assert!(matches!(rx.recv().await, Some(Err(ScannerError::BlockNotFound))));
412 }
413
414 #[tokio::test]
415 async fn returns_error_with_zero_buffer_capacity() {
416 let provider = RootProvider::<Ethereum>::new(RpcClient::mocked(Asserter::new()));
417 let result = BlockRangeScannerBuilder::new().buffer_capacity(0).connect(provider).await;
418
419 assert!(matches!(result, Err(ScannerError::InvalidBufferCapacity)));
420 }
421
422 #[tokio::test]
423 async fn returns_error_with_zero_max_block_range() {
424 let provider = RootProvider::<Ethereum>::new(RpcClient::mocked(Asserter::new()));
425 let result = BlockRangeScannerBuilder::new().max_block_range(0).connect(provider).await;
426
427 assert!(matches!(result, Err(ScannerError::InvalidMaxBlockRange)));
428 }
429}