event_scanner/block_range_scanner/scanner.rs
1//! Block-range streaming service.
2//!
3//! This module provides a lower-level primitive used by [`crate::EventScanner`]: it streams
4//! contiguous block number ranges (inclusive) and emits [`crate::Notification`] values for
5//! certain state transitions (e.g. reorg detection).
6//!
7//! [`BlockRangeScanner`] is useful when you want to build your own log-fetching pipeline on top of
8//! range streaming, or when you need direct access to the scanner's batching and reorg-detection
9//! behavior.
10//!
11//! # Output stream
12//!
13//! Streams returned by [`ConnectedBlockRangeScanner`] yield [`BlockScannerResult`] items:
14//!
15//! - `Ok(ScannerMessage::Data(range))` for a block range to process.
16//! - `Ok(ScannerMessage::Notification(_))` for scanner notifications.
17//! - `Err(ScannerError)` for errors.
18//!
19//! # Ordering
20//!
21//! Range messages are streamed in chronological order within a single stream (lower block number
22//! to higher block number). On reorgs, the scanner may re-emit previously-seen ranges for the
23//! affected blocks.
24//!
25//! # Example usage:
26//!
27//! ```rust,no_run
28//! use alloy::{eips::BlockNumberOrTag, network::Ethereum, primitives::BlockNumber};
29//! use std::ops::RangeInclusive;
30//! use tokio_stream::{StreamExt, wrappers::ReceiverStream};
31//!
32//! use alloy::providers::{Provider, ProviderBuilder};
33//! use event_scanner::{
34//! BlockRangeScannerBuilder, DEFAULT_BLOCK_CONFIRMATIONS, ScannerError, ScannerMessage,
35//! };
36//! use robust_provider::RobustProviderBuilder;
37//! use tokio::time::Duration;
38//! use tracing::{error, info};
39//!
40//! #[tokio::main]
41//! async fn main() -> Result<(), Box<dyn std::error::Error>> {
42//! // Initialize logging
43//! tracing_subscriber::fmt::init();
44//!
45//! // Configuration
46//! let provider = ProviderBuilder::new().connect("ws://localhost:8546").await?;
47//! let robust_provider = RobustProviderBuilder::new(provider).build().await?;
48//! let block_range_scanner = BlockRangeScannerBuilder::new().connect(robust_provider).await?;
49//!
50//! let mut stream = block_range_scanner
51//! .stream_from(BlockNumberOrTag::Number(5), DEFAULT_BLOCK_CONFIRMATIONS)
52//! .await?;
53//!
54//! while let Some(message) = stream.next().await {
55//! match message {
56//! Ok(ScannerMessage::Data(range)) => {
57//! // process range
58//! }
59//! Ok(ScannerMessage::Notification(notification)) => {
60//! info!("Received notification: {:?}", notification);
61//! }
62//! Err(e) => {
63//! error!("Received error from subscription: {e}");
64//! match e {
65//! ScannerError::Lagged(_) => break,
66//! _ => {
67//! error!("Non-fatal error, continuing: {e}");
68//! }
69//! }
70//! }
71//! }
72//! }
73//!
74//! info!("Data processing stopped.");
75//!
76//! Ok(())
77//! }
78//! ```
79
80use robust_provider::RobustProvider;
81use std::{cmp::Ordering, fmt::Debug};
82use tokio::sync::mpsc;
83use tokio_stream::wrappers::ReceiverStream;
84
85use crate::{
86 ScannerError,
87 block_range_scanner::{
88 RingBufferCapacity,
89 common::{self, BlockScannerResult},
90 reorg_handler::ReorgHandler,
91 rewind_handler::RewindHandler,
92 sync_handler::SyncHandler,
93 },
94};
95
96use alloy::{
97 consensus::BlockHeader,
98 eips::BlockId,
99 network::{BlockResponse, Network},
100};
101
102/// A [`BlockRangeScanner`] connected to a provider.
103#[derive(Debug)]
104pub struct BlockRangeScanner<N: Network> {
105 provider: RobustProvider<N>,
106 max_block_range: u64,
107 past_blocks_storage_capacity: RingBufferCapacity,
108 buffer_capacity: usize,
109}
110
111impl<N: Network> BlockRangeScanner<N> {
112 /// Creates a new [`BlockRangeScanner`] with the specified configuration.
113 ///
114 /// # Arguments
115 ///
116 /// * `provider` - The robust provider to use for blockchain interactions
117 /// * `max_block_range` - Maximum number of blocks per streamed range (must be > 0)
118 /// * `past_blocks_storage_capacity` - How many past block hashes to keep for reorg detection
119 /// * `buffer_capacity` - Stream buffer capacity (must be > 0)
120 #[must_use]
121 pub fn new(
122 provider: RobustProvider<N>,
123 max_block_range: u64,
124 past_blocks_storage_capacity: RingBufferCapacity,
125 buffer_capacity: usize,
126 ) -> Self {
127 Self { provider, max_block_range, past_blocks_storage_capacity, buffer_capacity }
128 }
129
130 /// Returns the underlying [`RobustProvider`].
131 #[must_use]
132 pub fn provider(&self) -> &RobustProvider<N> {
133 &self.provider
134 }
135
136 /// Returns the stream buffer capacity.
137 #[must_use]
138 pub fn buffer_capacity(&self) -> usize {
139 self.buffer_capacity
140 }
141
142 /// Streams live blocks starting from the latest block.
143 ///
144 /// # Arguments
145 ///
146 /// * `block_confirmations` - Number of confirmations to apply once in live mode.
147 ///
148 /// # Errors
149 ///
150 /// * [`ScannerError::Timeout`] - if an RPC call required for startup times out.
151 /// * [`ScannerError::RpcError`] - if an RPC call required for startup fails.
152 #[cfg_attr(feature = "tracing", tracing::instrument(level = "trace", skip_all))]
153 pub async fn stream_live(
154 &self,
155 block_confirmations: u64,
156 ) -> Result<ReceiverStream<BlockScannerResult>, ScannerError> {
157 let (blocks_sender, blocks_receiver) = mpsc::channel(self.buffer_capacity);
158
159 let max_block_range = self.max_block_range;
160 let past_blocks_storage_capacity = self.past_blocks_storage_capacity;
161 let latest = self.provider.get_block_number().await?;
162 let provider = self.provider.clone();
163
164 // the next block returned by the underlying subscription will always be `latest + 1`,
165 // because `latest` was already mined and subscription by definition only streams after new
166 // blocks have been mined
167 let start_block = (latest + 1).saturating_sub(block_confirmations);
168
169 debug!(
170 latest_block = latest,
171 start_block = start_block,
172 block_confirmations = block_confirmations,
173 max_block_range = max_block_range,
174 "Starting live block stream"
175 );
176
177 let subscription = self.provider.subscribe_blocks().await?;
178
179 tokio::spawn(async move {
180 let mut reorg_handler =
181 ReorgHandler::new(provider.clone(), past_blocks_storage_capacity);
182
183 common::stream_live_blocks(
184 start_block,
185 subscription,
186 &blocks_sender,
187 &provider,
188 block_confirmations,
189 max_block_range,
190 &mut reorg_handler,
191 false, // (notification unnecessary)
192 )
193 .await;
194
195 debug!("Live block stream ended");
196 });
197
198 Ok(ReceiverStream::new(blocks_receiver))
199 }
200
201 /// Streams a batch of historical blocks from `start_id` to `end_id`.
202 ///
203 /// # Arguments
204 ///
205 /// * `start_id` - The starting block id
206 /// * `end_id` - The ending block id
207 ///
208 /// # Errors
209 ///
210 /// * [`ScannerError::Timeout`] - if an RPC call required for startup times out.
211 /// * [`ScannerError::RpcError`] - if an RPC call required for startup fails.
212 /// * [`ScannerError::BlockNotFound`] - if `start_id` or `end_id` cannot be resolved.
213 #[cfg_attr(feature = "tracing", tracing::instrument(level = "trace", skip_all))]
214 pub async fn stream_historical(
215 &self,
216 start_id: impl Into<BlockId>,
217 end_id: impl Into<BlockId>,
218 ) -> Result<ReceiverStream<BlockScannerResult>, ScannerError> {
219 let (blocks_sender, blocks_receiver) = mpsc::channel(self.buffer_capacity);
220
221 let max_block_range = self.max_block_range;
222 let past_blocks_storage_capacity = self.past_blocks_storage_capacity;
223 let provider = self.provider.clone();
224
225 let (start_block, end_block) = tokio::try_join!(
226 self.provider.get_block(start_id.into()),
227 self.provider.get_block(end_id.into())
228 )?;
229
230 let start_block_num = start_block.header().number();
231 let end_block_num = end_block.header().number();
232
233 let (start_block_num, end_block_num) = match start_block_num.cmp(&end_block_num) {
234 Ordering::Greater => (end_block_num, start_block_num),
235 _ => (start_block_num, end_block_num),
236 };
237
238 let total_blocks = end_block_num.saturating_sub(start_block_num) + 1;
239 debug!(
240 from_block = start_block_num,
241 to_block = end_block_num,
242 total_blocks = total_blocks,
243 max_block_range = max_block_range,
244 "Starting historical block stream"
245 );
246
247 tokio::spawn(async move {
248 let mut reorg_handler =
249 ReorgHandler::new(provider.clone(), past_blocks_storage_capacity);
250
251 _ = common::stream_historical_range(
252 start_block_num,
253 end_block_num,
254 max_block_range,
255 &blocks_sender,
256 &provider,
257 &mut reorg_handler,
258 )
259 .await;
260
261 debug!("Historical block stream completed");
262 });
263
264 Ok(ReceiverStream::new(blocks_receiver))
265 }
266
267 /// Streams blocks starting from `start_id` and transitions to live mode.
268 ///
269 /// # Arguments
270 ///
271 /// * `start_id` - The starting block id.
272 /// * `block_confirmations` - Number of confirmations to apply once in live mode.
273 ///
274 /// # Errors
275 ///
276 /// * [`ScannerError::Timeout`] - if an RPC call required for startup times out.
277 /// * [`ScannerError::RpcError`] - if an RPC call required for startup fails.
278 /// * [`ScannerError::BlockNotFound`] - if `start_id` cannot be resolved.
279 #[cfg_attr(feature = "tracing", tracing::instrument(level = "trace", skip_all))]
280 pub async fn stream_from(
281 &self,
282 start_id: impl Into<BlockId>,
283 block_confirmations: u64,
284 ) -> Result<ReceiverStream<BlockScannerResult>, ScannerError> {
285 let (blocks_sender, blocks_receiver) = mpsc::channel(self.buffer_capacity);
286
287 let start_id = start_id.into();
288 debug!(
289 start_block = ?start_id,
290 block_confirmations = block_confirmations,
291 max_block_range = self.max_block_range,
292 "Starting sync block stream"
293 );
294
295 let sync_handler = SyncHandler::new(
296 self.provider.clone(),
297 self.max_block_range,
298 start_id,
299 block_confirmations,
300 self.past_blocks_storage_capacity,
301 blocks_sender,
302 );
303
304 sync_handler.run().await?;
305
306 Ok(ReceiverStream::new(blocks_receiver))
307 }
308
309 /// Streams blocks in reverse order from `start_id` to `end_id`.
310 ///
311 /// The `start_id` block is assumed to be greater than or equal to the `end_id` block.
312 /// Blocks are streamed in batches, where each batch is ordered from lower to higher
313 /// block numbers (chronological order within each batch), but batches themselves
314 /// progress from newer to older blocks.
315 ///
316 /// # Arguments
317 ///
318 /// * `start_id` - The starting block id (higher block number).
319 /// * `end_id` - The ending block id (lower block number).
320 ///
321 /// # Reorg Handling
322 ///
323 /// Reorg checks are only performed when the specified block range tip is above the
324 /// current finalized block height. When a reorg is detected:
325 ///
326 /// 1. A [`Notification::ReorgDetected`][reorg] is emitted with the common ancestor block
327 /// 2. The scanner fetches the new tip block at the same height
328 /// 3. Reorged blocks are re-streamed in chronological order (from `common_ancestor + 1` up to
329 /// the new tip)
330 /// 4. The reverse scan continues from where it left off
331 ///
332 /// If the range tip is at or below the finalized block, no reorg checks are
333 /// performed since finalized blocks cannot be reorganized.
334 ///
335 /// # Note
336 ///
337 /// The reason reorged blocks are streamed in chronological order is to make it easier to handle
338 /// reorgs in [`EventScannerBuilder::latest`][latest mode] mode, i.e. to prepend reorged blocks
339 /// to the result collection, which must maintain chronological order.
340 ///
341 /// # Errors
342 ///
343 /// * [`ScannerError::Timeout`] - if an RPC call required for startup times out.
344 /// * [`ScannerError::RpcError`] - if an RPC call required for startup fails.
345 /// * [`ScannerError::BlockNotFound`] - if `start_id` or `end_id` cannot be resolved.
346 ///
347 /// [latest mode]: crate::EventScannerBuilder::latest
348 /// [reorg]: crate::Notification::ReorgDetected
349 #[cfg_attr(feature = "tracing", tracing::instrument(level = "trace", skip_all))]
350 pub async fn stream_rewind(
351 &self,
352 start_id: impl Into<BlockId>,
353 end_id: impl Into<BlockId>,
354 ) -> Result<ReceiverStream<BlockScannerResult>, ScannerError> {
355 let (blocks_sender, blocks_receiver) = mpsc::channel(self.buffer_capacity);
356
357 let start_id = start_id.into();
358 let end_id = end_id.into();
359 debug!(
360 from_block = ?start_id,
361 to_block = ?end_id,
362 max_block_range = self.max_block_range,
363 "Starting rewind block stream"
364 );
365
366 let rewind_handler = RewindHandler::new(
367 self.provider.clone(),
368 self.max_block_range,
369 start_id,
370 end_id,
371 self.past_blocks_storage_capacity,
372 blocks_sender,
373 );
374
375 rewind_handler.run().await?;
376
377 Ok(ReceiverStream::new(blocks_receiver))
378 }
379}
380
381#[cfg(test)]
382mod tests {
383 use crate::{
384 block_range_scanner::{
385 BlockRangeScannerBuilder, DEFAULT_MAX_BLOCK_RANGE, DEFAULT_STREAM_BUFFER_CAPACITY,
386 },
387 types::TryStream,
388 };
389
390 use super::*;
391 use alloy::{
392 network::Ethereum,
393 providers::{RootProvider, mock::Asserter},
394 rpc::client::RpcClient,
395 };
396 use tokio::sync::mpsc;
397
398 #[test]
399 fn block_range_scanner_defaults_match_constants() {
400 let scanner = BlockRangeScannerBuilder::new();
401
402 assert_eq!(scanner.max_block_range, DEFAULT_MAX_BLOCK_RANGE);
403 assert_eq!(scanner.buffer_capacity, DEFAULT_STREAM_BUFFER_CAPACITY);
404 }
405
406 #[test]
407 fn builder_methods_update_configuration() {
408 let scanner = BlockRangeScannerBuilder::new().max_block_range(42).buffer_capacity(33);
409
410 assert_eq!(scanner.max_block_range, 42);
411 assert_eq!(scanner.buffer_capacity, 33);
412 }
413
414 #[tokio::test]
415 async fn try_send_forwards_errors_to_subscribers() {
416 let (tx, mut rx) = mpsc::channel::<BlockScannerResult>(1);
417
418 _ = tx.try_stream(ScannerError::BlockNotFound).await;
419
420 assert!(matches!(rx.recv().await, Some(Err(ScannerError::BlockNotFound))));
421 }
422
423 #[tokio::test]
424 async fn returns_error_with_zero_buffer_capacity() {
425 let provider = RootProvider::<Ethereum>::new(RpcClient::mocked(Asserter::new()));
426 let result = BlockRangeScannerBuilder::new().buffer_capacity(0).connect(provider).await;
427
428 assert!(matches!(result, Err(ScannerError::InvalidBufferCapacity)));
429 }
430
431 #[tokio::test]
432 async fn returns_error_with_zero_max_block_range() {
433 let provider = RootProvider::<Ethereum>::new(RpcClient::mocked(Asserter::new()));
434 let result = BlockRangeScannerBuilder::new().max_block_range(0).connect(provider).await;
435
436 assert!(matches!(result, Err(ScannerError::InvalidMaxBlockRange)));
437 }
438}