event_scanner/event_scanner/builder.rs
1//! Builder pattern for constructing [`EventScanner`] instances.
2//!
3//! This module provides [`EventScannerBuilder`] which allows configuring an event scanner
4//! in different modes (historic, live, latest, or sync) with various options before connecting
5//! to a provider.
6
7use alloy::{
8 eips::{BlockId, BlockNumberOrTag},
9 network::Network,
10};
11
12use crate::{
13 BlockRangeScannerBuilder, DEFAULT_BLOCK_CONFIRMATIONS, EventScanner, RingBufferCapacity,
14 ScannerError,
15};
16
17use robust_provider::IntoRobustProvider;
18
19/// Default number of maximum concurrent fetches for each scanner mode.
20pub const DEFAULT_MAX_CONCURRENT_FETCHES: usize = 24;
21
22/// Marker indicating that a scanner mode has not been selected yet.
23#[derive(Default, Debug)]
24pub struct Unspecified;
25
26/// Mode marker for historical range scanning.
27///
28/// For more details on this scanner mode, see [`EventScannerBuilder::historic`].
29#[derive(Debug)]
30pub struct Historic {
31 pub(crate) from_block: BlockId,
32 pub(crate) to_block: BlockId,
33 pub(crate) max_concurrent_fetches: usize,
34}
35
36/// Mode marker for live streaming.
37///
38/// For more details on this scanner mode, see [`EventScannerBuilder::live`].
39#[derive(Debug)]
40pub struct Live {
41 pub(crate) block_confirmations: u64,
42 pub(crate) max_concurrent_fetches: usize,
43}
44
45/// Mode marker for latest-events collection.
46///
47/// For more details on this scanner mode, see [`EventScannerBuilder::latest`].
48#[derive(Debug)]
49pub struct LatestEvents {
50 pub(crate) count: usize,
51 pub(crate) from_block: BlockId,
52 pub(crate) to_block: BlockId,
53 pub(crate) block_confirmations: u64,
54 pub(crate) max_concurrent_fetches: usize,
55}
56
57/// Marker indicating that a sync mode must be selected.
58#[derive(Default, Debug)]
59pub struct Synchronize;
60
61/// Mode marker for scanning by syncing from the specified count of latest events and then switching
62/// to live mode.
63///
64/// For more details on this scanner mode, see
65/// [`EventScannerBuilder::sync().from_latest(count)`](crate::EventScannerBuilder::from_latest).
66#[derive(Debug)]
67pub struct SyncFromLatestEvents {
68 pub(crate) count: usize,
69 pub(crate) block_confirmations: u64,
70 pub(crate) max_concurrent_fetches: usize,
71}
72
73/// Mode marker for scanning by syncing from the specified block and then switching to live mode.
74///
75/// For more details on this scanner mode, see
76/// [`EventScannerBuilder::sync().from_block(block_id)`][sync from block].
77///
78/// [sync from block]: crate::EventScannerBuilder#method.from_block-2
79#[derive(Debug)]
80pub struct SyncFromBlock {
81 pub(crate) from_block: BlockId,
82 pub(crate) block_confirmations: u64,
83 pub(crate) max_concurrent_fetches: usize,
84}
85
86impl Default for Historic {
87 fn default() -> Self {
88 Self {
89 from_block: BlockNumberOrTag::Earliest.into(),
90 to_block: BlockNumberOrTag::Latest.into(),
91 max_concurrent_fetches: DEFAULT_MAX_CONCURRENT_FETCHES,
92 }
93 }
94}
95
96impl Default for Live {
97 fn default() -> Self {
98 Self {
99 block_confirmations: DEFAULT_BLOCK_CONFIRMATIONS,
100 max_concurrent_fetches: DEFAULT_MAX_CONCURRENT_FETCHES,
101 }
102 }
103}
104
105/// Builder for constructing an [`EventScanner`] in a particular mode.
106#[derive(Default, Debug)]
107pub struct EventScannerBuilder<Mode> {
108 pub(crate) config: Mode,
109 pub(crate) block_range_scanner: BlockRangeScannerBuilder,
110}
111
112impl EventScannerBuilder<Unspecified> {
113 /// Streams events from a historical block range.
114 ///
115 /// # Example
116 ///
117 /// ```no_run
118 /// # use alloy::{network::Ethereum, providers::{Provider, ProviderBuilder}};
119 /// # use event_scanner::{EventFilter, EventScannerBuilder, Message};
120 /// # use tokio_stream::StreamExt;
121 /// # use robust_provider::RobustProviderBuilder;
122 /// #
123 /// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
124 /// # let contract_address = alloy::primitives::address!("0xd8dA6BF26964af9d7eed9e03e53415d37aa96045");
125 /// # let provider = ProviderBuilder::new().connect("ws://localhost:8545").await?;
126 /// # let provider = RobustProviderBuilder::new(provider).build().await?;
127 /// // Stream all events from genesis to latest block
128 /// let mut scanner = EventScannerBuilder::historic().connect(provider).await?;
129 ///
130 /// let filter = EventFilter::new().contract_address(contract_address);
131 /// let subscription = scanner.subscribe(filter);
132 /// let proof = scanner.start().await?;
133 /// let mut stream = subscription.stream(&proof);
134 ///
135 /// while let Some(Ok(Message::Data(logs))) = stream.next().await {
136 /// println!("Received {} logs", logs.len());
137 /// }
138 /// # Ok(())
139 /// # }
140 /// ```
141 ///
142 /// Specifying a custom block range:
143 ///
144 /// ```no_run
145 /// # use alloy::{network::Ethereum, providers::{Provider, ProviderBuilder}};
146 /// # use event_scanner::EventScannerBuilder;
147 /// # use robust_provider::RobustProviderBuilder;
148 /// #
149 /// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
150 /// # let provider = ProviderBuilder::new().connect("ws://localhost:8545").await?;
151 /// # let provider = RobustProviderBuilder::new(provider).build().await?;
152 /// // Stream events between blocks [1_000_000, 2_000_000]
153 /// let mut scanner = EventScannerBuilder::historic()
154 /// .from_block(1_000_000)
155 /// .to_block(2_000_000)
156 /// .connect(provider)
157 /// .await?;
158 /// # Ok(())
159 /// # }
160 /// ```
161 ///
162 /// # How it works
163 ///
164 /// The scanner streams events in chronological order (oldest to newest) within the specified
165 /// block range. Events are delivered in batches as they are fetched from the provider, with
166 /// batch sizes controlled by the [`max_block_range`][max_block_range] configuration.
167 ///
168 /// # Key behaviors
169 ///
170 /// * **Continuous streaming**: Events are delivered in multiple messages as they are fetched
171 /// * **Chronological order**: Events are always delivered oldest to newest
172 /// * **Concurrent log fetching**: Logs are fetched concurrently to reduce the execution time.
173 /// The maximum number of concurrent RPC calls is controlled by
174 /// [`max_concurrent_fetches`][max_concurrent_fetches]
175 /// * **Default range**: By default, scans from `Earliest` to `Latest` block
176 /// * **Batch control**: Use [`max_block_range`][max_block_range] to control how many blocks are
177 /// queried per RPC call
178 /// * **Reorg handling**:
179 /// * Blocks up to the chain's `finalized` height are streamed without reorg checks.
180 /// * For the non-finalized portion of the range, the scanner takes a snapshot of the
181 /// requested end block, streams the non-finalized portion once, and then verifies that the
182 /// end block is still the same. If a reorg is detected, it emits
183 /// [`Notification::ReorgDetected`][reorg] and re-streams the non-finalized portion of the
184 /// range from the reported common ancestor.
185 ///
186 /// Consumers should be prepared to observe benign duplicate events around reorg boundaries
187 /// (e.g. by applying idempotency/deduplication).
188 /// * **Completion**: The scanner completes when the entire range has been processed.
189 ///
190 /// [max_block_range]: crate::EventScannerBuilder::max_block_range
191 /// [max_concurrent_fetches]: crate::EventScannerBuilder::max_concurrent_fetches
192 /// [reorg]: crate::types::Notification::ReorgDetected
193 #[must_use]
194 pub fn historic() -> EventScannerBuilder<Historic> {
195 EventScannerBuilder::default()
196 }
197
198 /// Streams new events as blocks are produced on-chain.
199 ///
200 /// # Example
201 ///
202 /// ```no_run
203 /// # use alloy::{network::Ethereum, providers::{Provider, ProviderBuilder}};
204 /// # use event_scanner::{EventFilter, EventScannerBuilder, Message};
205 /// # use tokio_stream::StreamExt;
206 /// # use robust_provider::RobustProviderBuilder;
207 /// #
208 /// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
209 /// # let contract_address = alloy::primitives::address!("0xd8dA6BF26964af9d7eed9e03e53415d37aa96045");
210 /// # let provider = ProviderBuilder::new().connect("ws://localhost:8545").await?;
211 /// # let provider = RobustProviderBuilder::new(provider).build().await?;
212 /// // Stream new events as they arrive
213 /// let mut scanner = EventScannerBuilder::live()
214 /// .block_confirmations(20)
215 /// .connect(provider)
216 /// .await?;
217 ///
218 /// let filter = EventFilter::new().contract_address(contract_address);
219 /// let subscription = scanner.subscribe(filter);
220 /// let proof = scanner.start().await?;
221 /// let mut stream = subscription.stream(&proof);
222 ///
223 /// while let Some(msg) = stream.next().await {
224 /// match msg {
225 /// Ok(Message::Data(logs)) => {
226 /// println!("Received {} new events", logs.len());
227 /// }
228 /// Ok(Message::Notification(notification)) => {
229 /// println!("Notification received: {:?}", notification);
230 /// }
231 /// Err(e) => {
232 /// eprintln!("Error: {}", e);
233 /// }
234 /// }
235 /// }
236 /// # Ok(())
237 /// # }
238 /// ```
239 ///
240 /// # How it works
241 ///
242 /// The scanner subscribes to new blocks via WebSocket and streams events from confirmed
243 /// blocks. The `block_confirmations` setting determines how many blocks to wait before
244 /// considering a block confirmed, providing protection against chain reorganizations.
245 ///
246 /// # Key behaviors
247 ///
248 /// * **Real-time streaming**: Events are delivered as new blocks are confirmed
249 /// * **Reorg protection**: Waits for configured confirmations before emitting events
250 /// * **Continuous operation**: Runs indefinitely until the scanner is dropped or encounters an
251 /// error
252 /// * **Default confirmations**: By default, waits for 12 block confirmations
253 ///
254 /// # Reorg behavior
255 ///
256 /// When a reorg is detected:
257 /// 1. Emits [`Notification::ReorgDetected`][reorg] to all listeners
258 /// 2. Adjusts the next confirmed range using `block_confirmations`
259 /// 3. Re-emits events from the corrected confirmed block range
260 /// 4. Continues streaming from the new chain state
261 ///
262 /// **Important**: If a reorg occurs, the scanner will only restream blocks from the new
263 /// canonical chain that have block numbers greater than or equal to the block number that was
264 /// the "latest block" at the time when the live stream was first started. Blocks with lower
265 /// block numbers will not be restreamed, even if they are part of the new canonical chain.
266 ///
267 /// [reorg]: crate::types::Notification::ReorgDetected
268 #[must_use]
269 pub fn live() -> EventScannerBuilder<Live> {
270 EventScannerBuilder::default()
271 }
272
273 /// Creates a builder for sync mode scanners that combine historical catch-up with live
274 /// streaming.
275 ///
276 /// This method returns a builder that must be further narrowed down:
277 /// ```rust,no_run
278 /// # use event_scanner::EventScannerBuilder;
279 /// // Sync from block mode
280 /// EventScannerBuilder::sync().from_block(1_000_000);
281 /// // Sync from latest events mode
282 /// EventScannerBuilder::sync().from_latest(10);
283 /// ```
284 ///
285 /// See [`from_block`](crate::EventScannerBuilder#method.from_block-2) and
286 /// [`from_latest`](crate::EventScannerBuilder#method.from_latest) for details on each mode.
287 #[must_use]
288 pub fn sync() -> EventScannerBuilder<Synchronize> {
289 EventScannerBuilder::default()
290 }
291
292 /// Streams the latest `count` matching events per registered listener.
293 ///
294 /// # Example
295 ///
296 /// ```no_run
297 /// # use alloy::{network::Ethereum, providers::{Provider, ProviderBuilder}};
298 /// # use event_scanner::{EventFilter, EventScannerBuilder, Message};
299 /// # use tokio_stream::StreamExt;
300 /// # use robust_provider::RobustProviderBuilder;
301 /// #
302 /// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
303 /// # let contract_address = alloy::primitives::address!("0xd8dA6BF26964af9d7eed9e03e53415d37aa96045");
304 /// # let provider = ProviderBuilder::new().connect("ws://localhost:8545").await?;
305 /// # let provider = RobustProviderBuilder::new(provider).build().await?;
306 /// // Collect the latest 10 events across Earliest..=Latest
307 /// let mut scanner = EventScannerBuilder::latest(10).connect(provider).await?;
308 ///
309 /// let filter = EventFilter::new().contract_address(contract_address);
310 /// let subscription = scanner.subscribe(filter);
311 /// let proof = scanner.start().await?;
312 /// let mut stream = subscription.stream(&proof);
313 ///
314 /// // Expect a single message with up to 10 logs, then the stream ends
315 /// while let Some(Ok(Message::Data(logs))) = stream.next().await {
316 /// println!("Latest logs: {}", logs.len());
317 /// }
318 /// # Ok(())
319 /// # }
320 /// ```
321 ///
322 /// Restricting to a specific block range:
323 ///
324 /// ```no_run
325 /// # use alloy::{network::Ethereum, providers::{Provider, ProviderBuilder}};
326 /// # use event_scanner::EventScannerBuilder;
327 /// # use robust_provider::RobustProviderBuilder;
328 /// #
329 /// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
330 /// # let provider = ProviderBuilder::new().connect("ws://localhost:8545").await?;
331 /// # let provider = RobustProviderBuilder::new(provider).build().await?;
332 /// // Collect the latest 5 events between blocks [1_000_000, 1_100_000]
333 /// let mut scanner = EventScannerBuilder::latest(5)
334 /// .from_block(1_000_000)
335 /// .to_block(1_100_000)
336 /// .connect(provider)
337 /// .await?;
338 /// # Ok(())
339 /// # }
340 /// ```
341 ///
342 /// # How it works
343 ///
344 /// The scanner performs a reverse-ordered scan (newest to oldest) within the specified block
345 /// range, collecting up to `count` events per registered listener. Once the target count is
346 /// reached or the range is exhausted, it delivers the events in chronological order (oldest to
347 /// newest) and completes.
348 ///
349 /// When using a custom block range, the scanner automatically normalizes the range boundaries.
350 /// This means you can specify `from_block` and `to_block` in any order - the scanner will
351 /// always scan from the higher block number down to the lower one, regardless of which
352 /// parameter holds which value.
353 ///
354 /// # Key behaviors
355 ///
356 /// * **Single delivery**: Each registered stream receives at most `count` logs in a single
357 /// message, chronologically ordered
358 /// * **One-shot operation**: The scanner completes after delivering messages; it does not
359 /// continue streaming
360 /// * **Concurrent log fetching**: Logs are fetched concurrently to reduce the execution time.
361 /// The maximum number of concurrent RPC calls is controlled by
362 /// [`max_concurrent_fetches`][max_concurrent_fetches]
363 /// * **Flexible count**: If fewer than `count` events exist in the range, returns all available
364 /// events
365 /// * **Default range**: By default, scans from `Earliest` to `Latest` block
366 /// * **Reorg handling**: Periodically checks the tip to detect reorgs during the scan
367 ///
368 /// # Notifications
369 ///
370 /// The scanner can emit the following notifications:
371 ///
372 /// * [`Notification::NoPastLogsFound`][no_logs]: Emitted when no matching logs are found in the
373 /// scanned range.
374 /// * [`Notification::ReorgDetected`][reorg]: Emitted when a reorg is detected during the scan.
375 ///
376 /// # Arguments
377 ///
378 /// * `count` - Maximum number of recent events to collect per listener (must be greater than 0)
379 ///
380 /// # Reorg behavior
381 ///
382 /// The scanner can detect reorgs during the scan by periodically checking that the range tip
383 /// has not changed. This is done only when the specified range tip is not a finalized
384 /// block.
385 ///
386 /// On reorg detection:
387 /// 1. Emits [`Notification::ReorgDetected`][reorg] to all listeners
388 /// 2. Resets to the updated tip
389 /// 3. Reloads logs from the block range affected by the reorg
390 /// 4. Continues until `count` events are collected
391 ///
392 /// Final delivery to log listeners preserves chronological order regardless of reorgs.
393 ///
394 /// # Notes
395 ///
396 /// For continuous streaming after collecting latest events, use
397 /// [`EventScannerBuilder::sync().from_latest(count)`][sync_from_latest] instead
398 ///
399 /// [subscribe]: EventScanner::subscribe
400 /// [start]: EventScanner::start
401 /// [sync_from_latest]: EventScannerBuilder::from_latest
402 /// [reorg]: crate::Notification::ReorgDetected
403 /// [no_logs]: crate::Notification::NoPastLogsFound
404 /// [max_concurrent_fetches]: crate::EventScannerBuilder#method.max_concurrent_fetches-1
405 #[must_use]
406 pub fn latest(count: usize) -> EventScannerBuilder<LatestEvents> {
407 EventScannerBuilder::<LatestEvents>::new(count)
408 }
409}
410
411impl EventScannerBuilder<LatestEvents> {
412 #[must_use]
413 pub fn new(count: usize) -> Self {
414 Self {
415 config: LatestEvents {
416 count,
417 from_block: BlockNumberOrTag::Latest.into(),
418 to_block: BlockNumberOrTag::Earliest.into(),
419 block_confirmations: DEFAULT_BLOCK_CONFIRMATIONS,
420 max_concurrent_fetches: DEFAULT_MAX_CONCURRENT_FETCHES,
421 },
422 block_range_scanner: BlockRangeScannerBuilder::default(),
423 }
424 }
425}
426
427impl EventScannerBuilder<SyncFromLatestEvents> {
428 #[must_use]
429 pub fn new(count: usize) -> Self {
430 Self {
431 config: SyncFromLatestEvents {
432 count,
433 block_confirmations: DEFAULT_BLOCK_CONFIRMATIONS,
434 max_concurrent_fetches: DEFAULT_MAX_CONCURRENT_FETCHES,
435 },
436 block_range_scanner: BlockRangeScannerBuilder::default(),
437 }
438 }
439}
440
441impl EventScannerBuilder<SyncFromBlock> {
442 #[must_use]
443 pub fn new(from_block: impl Into<BlockId>) -> Self {
444 Self {
445 config: SyncFromBlock {
446 from_block: from_block.into(),
447 block_confirmations: DEFAULT_BLOCK_CONFIRMATIONS,
448 max_concurrent_fetches: DEFAULT_MAX_CONCURRENT_FETCHES,
449 },
450 block_range_scanner: BlockRangeScannerBuilder::default(),
451 }
452 }
453}
454
455impl<Mode> EventScannerBuilder<Mode> {
456 /// Sets the maximum block range per event batch.
457 ///
458 /// Controls how the scanner splits a large block range into smaller batches for processing.
459 /// Each batch corresponds to a single RPC call to fetch logs. This prevents timeouts and
460 /// respects rate limits imposed by node providers.
461 ///
462 /// # Arguments
463 ///
464 /// * `max_block_range` - Maximum number of blocks to process per batch (must be greater than 0)
465 ///
466 /// # Example
467 ///
468 /// If scanning events from blocks 1000–1099 (100 blocks total) with `max_block_range(30)`:
469 /// * Batch 1: blocks 1000–1029 (30 blocks)
470 /// * Batch 2: blocks 1030–1059 (30 blocks)
471 /// * Batch 3: blocks 1060–1089 (30 blocks)
472 /// * Batch 4: blocks 1090–1099 (10 blocks)
473 #[must_use]
474 pub fn max_block_range(mut self, max_block_range: u64) -> Self {
475 self.block_range_scanner.max_block_range = max_block_range;
476 self
477 }
478
479 /// Sets how many of past blocks to keep in memory for reorg detection.
480 ///
481 /// IMPORTANT: If zero, reorg detection is disabled.
482 ///
483 /// # Arguments
484 ///
485 /// * `past_blocks_storage_capacity` - Maximum number of blocks to keep in memory.
486 #[must_use]
487 pub fn past_blocks_storage_capacity(
488 mut self,
489 past_blocks_storage_capacity: RingBufferCapacity,
490 ) -> Self {
491 self.block_range_scanner.past_blocks_storage_capacity = past_blocks_storage_capacity;
492 self
493 }
494
495 /// Sets the stream buffer capacity.
496 ///
497 /// Controls the maximum number of messages that can be buffered in the stream
498 /// before backpressure is applied.
499 ///
500 /// # Arguments
501 ///
502 /// * `buffer_capacity` - Maximum number of messages to buffer (must be greater than 0)
503 #[must_use]
504 pub fn buffer_capacity(mut self, buffer_capacity: usize) -> Self {
505 self.block_range_scanner.buffer_capacity = buffer_capacity;
506 self
507 }
508
509 /// Builds the scanner by connecting to an existing provider.
510 ///
511 /// This is a shared method used internally by scanner-specific `connect()` methods.
512 pub(crate) async fn build<N: Network>(
513 self,
514 provider: impl IntoRobustProvider<N>,
515 ) -> Result<EventScanner<Mode, N>, ScannerError> {
516 let block_range_scanner = self.block_range_scanner.connect::<N>(provider).await?;
517 Ok(EventScanner::new(self.config, block_range_scanner))
518 }
519}
520
521#[cfg(test)]
522mod tests {
523 use crate::block_range_scanner::DEFAULT_STREAM_BUFFER_CAPACITY;
524
525 use super::*;
526
527 #[test]
528 fn test_historic_scanner_config_defaults() {
529 let builder = EventScannerBuilder::<Historic>::default();
530
531 assert_eq!(builder.config.from_block, BlockNumberOrTag::Earliest.into());
532 assert_eq!(builder.config.to_block, BlockNumberOrTag::Latest.into());
533 assert_eq!(builder.block_range_scanner.buffer_capacity, DEFAULT_STREAM_BUFFER_CAPACITY);
534 }
535
536 #[test]
537 fn test_live_scanner_config_defaults() {
538 let builder = EventScannerBuilder::<Live>::default();
539
540 assert_eq!(builder.config.block_confirmations, DEFAULT_BLOCK_CONFIRMATIONS);
541 assert_eq!(builder.block_range_scanner.buffer_capacity, DEFAULT_STREAM_BUFFER_CAPACITY);
542 }
543
544 #[test]
545 fn test_latest_scanner_config_defaults() {
546 let builder = EventScannerBuilder::<LatestEvents>::new(10);
547
548 assert_eq!(builder.config.count, 10);
549
550 assert_eq!(builder.config.from_block, BlockNumberOrTag::Latest.into());
551 assert_eq!(builder.config.to_block, BlockNumberOrTag::Earliest.into());
552 assert_eq!(builder.config.block_confirmations, DEFAULT_BLOCK_CONFIRMATIONS);
553 assert_eq!(builder.block_range_scanner.buffer_capacity, DEFAULT_STREAM_BUFFER_CAPACITY);
554 }
555
556 #[test]
557 fn sync_scanner_config_defaults() {
558 let builder = EventScannerBuilder::<SyncFromBlock>::new(BlockNumberOrTag::Earliest);
559
560 assert_eq!(builder.config.from_block, BlockNumberOrTag::Earliest.into());
561 assert_eq!(builder.config.block_confirmations, DEFAULT_BLOCK_CONFIRMATIONS);
562 assert_eq!(builder.block_range_scanner.buffer_capacity, DEFAULT_STREAM_BUFFER_CAPACITY);
563 }
564}