Skip to main content

event_scanner/event_scanner/
builder.rs

1//! Builder pattern for constructing [`EventScanner`] instances.
2//!
3//! This module provides [`EventScannerBuilder`] which allows configuring an event scanner
4//! in different modes (historic, live, latest, or sync) with various options before connecting
5//! to a provider.
6
7use alloy::{
8    eips::{BlockId, BlockNumberOrTag},
9    network::Network,
10};
11
12use crate::{
13    BlockRangeScannerBuilder, DEFAULT_BLOCK_CONFIRMATIONS, EventScanner, RingBufferCapacity,
14    ScannerError,
15};
16
17use robust_provider::IntoRobustProvider;
18
19/// Default number of maximum concurrent fetches for each scanner mode.
20pub const DEFAULT_MAX_CONCURRENT_FETCHES: usize = 24;
21
22/// Marker indicating that a scanner mode has not been selected yet.
23#[derive(Default, Debug)]
24pub struct Unspecified;
25
26/// Mode marker for historical range scanning.
27///
28/// For more details on this scanner mode, see [`EventScannerBuilder::historic`].
29#[derive(Debug)]
30pub struct Historic {
31    pub(crate) from_block: BlockId,
32    pub(crate) to_block: BlockId,
33    pub(crate) max_concurrent_fetches: usize,
34}
35
36/// Mode marker for live streaming.
37///
38/// For more details on this scanner mode, see [`EventScannerBuilder::live`].
39#[derive(Debug)]
40pub struct Live {
41    pub(crate) block_confirmations: u64,
42    pub(crate) max_concurrent_fetches: usize,
43}
44
45/// Mode marker for latest-events collection.
46///
47/// For more details on this scanner mode, see [`EventScannerBuilder::latest`].
48#[derive(Debug)]
49pub struct LatestEvents {
50    pub(crate) count: usize,
51    pub(crate) from_block: BlockId,
52    pub(crate) to_block: BlockId,
53    pub(crate) block_confirmations: u64,
54    pub(crate) max_concurrent_fetches: usize,
55}
56
57/// Marker indicating that a sync mode must be selected.
58#[derive(Default, Debug)]
59pub struct Synchronize;
60
61/// Mode marker for scanning by syncing from the specified count of latest events and then switching
62/// to live mode.
63///
64/// For more details on this scanner mode, see
65/// [`EventScannerBuilder::sync().from_latest(count)`](crate::EventScannerBuilder::from_latest).
66#[derive(Debug)]
67pub struct SyncFromLatestEvents {
68    pub(crate) count: usize,
69    pub(crate) block_confirmations: u64,
70    pub(crate) max_concurrent_fetches: usize,
71}
72
73/// Mode marker for scanning by syncing from the specified block and then switching to live mode.
74///
75/// For more details on this scanner mode, see
76/// [`EventScannerBuilder::sync().from_block(block_id)`][sync from block].
77///
78/// [sync from block]: crate::EventScannerBuilder#method.from_block-2
79#[derive(Debug)]
80pub struct SyncFromBlock {
81    pub(crate) from_block: BlockId,
82    pub(crate) block_confirmations: u64,
83    pub(crate) max_concurrent_fetches: usize,
84}
85
86impl Default for Historic {
87    fn default() -> Self {
88        Self {
89            from_block: BlockNumberOrTag::Earliest.into(),
90            to_block: BlockNumberOrTag::Latest.into(),
91            max_concurrent_fetches: DEFAULT_MAX_CONCURRENT_FETCHES,
92        }
93    }
94}
95
96impl Default for Live {
97    fn default() -> Self {
98        Self {
99            block_confirmations: DEFAULT_BLOCK_CONFIRMATIONS,
100            max_concurrent_fetches: DEFAULT_MAX_CONCURRENT_FETCHES,
101        }
102    }
103}
104
105/// Builder for constructing an [`EventScanner`] in a particular mode.
106#[derive(Default, Debug)]
107pub struct EventScannerBuilder<Mode> {
108    pub(crate) config: Mode,
109    pub(crate) block_range_scanner: BlockRangeScannerBuilder,
110}
111
112impl EventScannerBuilder<Unspecified> {
113    /// Streams events from a historical block range.
114    ///
115    /// # Example
116    ///
117    /// ```no_run
118    /// # use alloy::{network::Ethereum, providers::{Provider, ProviderBuilder}};
119    /// # use event_scanner::{EventFilter, EventScannerBuilder, Message};
120    /// # use tokio_stream::StreamExt;
121    /// # use robust_provider::RobustProviderBuilder;
122    /// #
123    /// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
124    /// # let contract_address = alloy::primitives::address!("0xd8dA6BF26964af9d7eed9e03e53415d37aa96045");
125    /// # let provider = ProviderBuilder::new().connect("ws://localhost:8545").await?;
126    /// # let provider = RobustProviderBuilder::new(provider).build().await?;
127    /// // Stream all events from genesis to latest block
128    /// let mut scanner = EventScannerBuilder::historic().connect(provider).await?;
129    ///
130    /// let filter = EventFilter::new().contract_address(contract_address);
131    /// let subscription = scanner.subscribe(filter);
132    /// let proof = scanner.start().await?;
133    /// let mut stream = subscription.stream(&proof);
134    ///
135    /// while let Some(Ok(Message::Data(logs))) = stream.next().await {
136    ///     println!("Received {} logs", logs.len());
137    /// }
138    /// # Ok(())
139    /// # }
140    /// ```
141    ///
142    /// Specifying a custom block range:
143    ///
144    /// ```no_run
145    /// # use alloy::{network::Ethereum, providers::{Provider, ProviderBuilder}};
146    /// # use event_scanner::EventScannerBuilder;
147    /// # use robust_provider::RobustProviderBuilder;
148    /// #
149    /// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
150    /// # let provider = ProviderBuilder::new().connect("ws://localhost:8545").await?;
151    /// # let provider = RobustProviderBuilder::new(provider).build().await?;
152    /// // Stream events between blocks [1_000_000, 2_000_000]
153    /// let mut scanner = EventScannerBuilder::historic()
154    ///     .from_block(1_000_000)
155    ///     .to_block(2_000_000)
156    ///     .connect(provider)
157    ///     .await?;
158    /// # Ok(())
159    /// # }
160    /// ```
161    ///
162    /// # How it works
163    ///
164    /// The scanner streams events in chronological order (oldest to newest) within the specified
165    /// block range. Events are delivered in batches as they are fetched from the provider, with
166    /// batch sizes controlled by the [`max_block_range`][max_block_range] configuration.
167    ///
168    /// # Key behaviors
169    ///
170    /// * **Continuous streaming**: Events are delivered in multiple messages as they are fetched
171    /// * **Chronological order**: Events are always delivered oldest to newest
172    /// * **Concurrent log fetching**: Logs are fetched concurrently to reduce the execution time.
173    ///   The maximum number of concurrent RPC calls is controlled by
174    ///   [`max_concurrent_fetches`][max_concurrent_fetches]
175    /// * **Default range**: By default, scans from `Earliest` to `Latest` block
176    /// * **Batch control**: Use [`max_block_range`][max_block_range] to control how many blocks are
177    ///   queried per RPC call
178    /// * **Reorg handling**:
179    ///   * Blocks up to the chain's `finalized` height are streamed without reorg checks.
180    ///   * For the non-finalized portion of the range, the scanner takes a snapshot of the
181    ///     requested end block, streams the non-finalized portion once, and then verifies that the
182    ///     end block is still the same. If a reorg is detected, it emits
183    ///     [`Notification::ReorgDetected`][reorg] and re-streams the non-finalized portion of the
184    ///     range from the reported common ancestor.
185    ///
186    ///   Consumers should be prepared to observe benign duplicate events around reorg boundaries
187    ///   (e.g. by applying idempotency/deduplication).
188    /// * **Completion**: The scanner completes when the entire range has been processed.
189    ///
190    /// [max_block_range]: crate::EventScannerBuilder::max_block_range
191    /// [max_concurrent_fetches]: crate::EventScannerBuilder::max_concurrent_fetches
192    /// [reorg]: crate::types::Notification::ReorgDetected
193    #[must_use]
194    pub fn historic() -> EventScannerBuilder<Historic> {
195        EventScannerBuilder::default()
196    }
197
198    /// Streams new events as blocks are produced on-chain.
199    ///
200    /// # Example
201    ///
202    /// ```no_run
203    /// # use alloy::{network::Ethereum, providers::{Provider, ProviderBuilder}};
204    /// # use event_scanner::{EventFilter, EventScannerBuilder, Message};
205    /// # use tokio_stream::StreamExt;
206    /// # use robust_provider::RobustProviderBuilder;
207    /// #
208    /// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
209    /// # let contract_address = alloy::primitives::address!("0xd8dA6BF26964af9d7eed9e03e53415d37aa96045");
210    /// # let provider = ProviderBuilder::new().connect("ws://localhost:8545").await?;
211    /// # let provider = RobustProviderBuilder::new(provider).build().await?;
212    /// // Stream new events as they arrive
213    /// let mut scanner = EventScannerBuilder::live()
214    ///     .block_confirmations(20)
215    ///     .connect(provider)
216    ///     .await?;
217    ///
218    /// let filter = EventFilter::new().contract_address(contract_address);
219    /// let subscription = scanner.subscribe(filter);
220    /// let proof = scanner.start().await?;
221    /// let mut stream = subscription.stream(&proof);
222    ///
223    /// while let Some(msg) = stream.next().await {
224    ///     match msg {
225    ///         Ok(Message::Data(logs)) => {
226    ///             println!("Received {} new events", logs.len());
227    ///         }
228    ///         Ok(Message::Notification(notification)) => {
229    ///             println!("Notification received: {:?}", notification);
230    ///         }
231    ///         Err(e) => {
232    ///             eprintln!("Error: {}", e);
233    ///         }
234    ///     }
235    /// }
236    /// # Ok(())
237    /// # }
238    /// ```
239    ///
240    /// # How it works
241    ///
242    /// The scanner subscribes to new blocks via WebSocket and streams events from confirmed
243    /// blocks. The `block_confirmations` setting determines how many blocks to wait before
244    /// considering a block confirmed, providing protection against chain reorganizations.
245    ///
246    /// # Key behaviors
247    ///
248    /// * **Real-time streaming**: Events are delivered as new blocks are confirmed
249    /// * **Reorg protection**: Waits for configured confirmations before emitting events
250    /// * **Continuous operation**: Runs indefinitely until the scanner is dropped or encounters an
251    ///   error
252    /// * **Default confirmations**: By default, waits for 12 block confirmations
253    ///
254    /// # Reorg behavior
255    ///
256    /// When a reorg is detected:
257    /// 1. Emits [`Notification::ReorgDetected`][reorg] to all listeners
258    /// 2. Adjusts the next confirmed range using `block_confirmations`
259    /// 3. Re-emits events from the corrected confirmed block range
260    /// 4. Continues streaming from the new chain state
261    ///
262    /// **Important**: If a reorg occurs, the scanner will only restream blocks from the new
263    /// canonical chain that have block numbers greater than or equal to the block number that was
264    /// the "latest block" at the time when the live stream was first started. Blocks with lower
265    /// block numbers will not be restreamed, even if they are part of the new canonical chain.
266    ///
267    /// [reorg]: crate::types::Notification::ReorgDetected
268    #[must_use]
269    pub fn live() -> EventScannerBuilder<Live> {
270        EventScannerBuilder::default()
271    }
272
273    /// Creates a builder for sync mode scanners that combine historical catch-up with live
274    /// streaming.
275    ///
276    /// This method returns a builder that must be further narrowed down:
277    /// ```rust,no_run
278    /// # use event_scanner::EventScannerBuilder;
279    /// // Sync from block mode
280    /// EventScannerBuilder::sync().from_block(1_000_000);
281    /// // Sync from latest events mode
282    /// EventScannerBuilder::sync().from_latest(10);
283    /// ```
284    ///
285    /// See [`from_block`](crate::EventScannerBuilder#method.from_block-2) and
286    /// [`from_latest`](crate::EventScannerBuilder#method.from_latest) for details on each mode.
287    #[must_use]
288    pub fn sync() -> EventScannerBuilder<Synchronize> {
289        EventScannerBuilder::default()
290    }
291
292    /// Streams the latest `count` matching events per registered listener.
293    ///
294    /// # Example
295    ///
296    /// ```no_run
297    /// # use alloy::{network::Ethereum, providers::{Provider, ProviderBuilder}};
298    /// # use event_scanner::{EventFilter, EventScannerBuilder, Message};
299    /// # use tokio_stream::StreamExt;
300    /// # use robust_provider::RobustProviderBuilder;
301    /// #
302    /// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
303    /// # let contract_address = alloy::primitives::address!("0xd8dA6BF26964af9d7eed9e03e53415d37aa96045");
304    /// # let provider = ProviderBuilder::new().connect("ws://localhost:8545").await?;
305    /// # let provider = RobustProviderBuilder::new(provider).build().await?;
306    /// // Collect the latest 10 events across Earliest..=Latest
307    /// let mut scanner = EventScannerBuilder::latest(10).connect(provider).await?;
308    ///
309    /// let filter = EventFilter::new().contract_address(contract_address);
310    /// let subscription = scanner.subscribe(filter);
311    /// let proof = scanner.start().await?;
312    /// let mut stream = subscription.stream(&proof);
313    ///
314    /// // Expect a single message with up to 10 logs, then the stream ends
315    /// while let Some(Ok(Message::Data(logs))) = stream.next().await {
316    ///     println!("Latest logs: {}", logs.len());
317    /// }
318    /// # Ok(())
319    /// # }
320    /// ```
321    ///
322    /// Restricting to a specific block range:
323    ///
324    /// ```no_run
325    /// # use alloy::{network::Ethereum, providers::{Provider, ProviderBuilder}};
326    /// # use event_scanner::EventScannerBuilder;
327    /// # use robust_provider::RobustProviderBuilder;
328    /// #
329    /// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
330    /// # let provider = ProviderBuilder::new().connect("ws://localhost:8545").await?;
331    /// # let provider = RobustProviderBuilder::new(provider).build().await?;
332    /// // Collect the latest 5 events between blocks [1_000_000, 1_100_000]
333    /// let mut scanner = EventScannerBuilder::latest(5)
334    ///     .from_block(1_000_000)
335    ///     .to_block(1_100_000)
336    ///     .connect(provider)
337    ///     .await?;
338    /// # Ok(())
339    /// # }
340    /// ```
341    ///
342    /// # How it works
343    ///
344    /// The scanner performs a reverse-ordered scan (newest to oldest) within the specified block
345    /// range, collecting up to `count` events per registered listener. Once the target count is
346    /// reached or the range is exhausted, it delivers the events in chronological order (oldest to
347    /// newest) and completes.
348    ///
349    /// When using a custom block range, the scanner automatically normalizes the range boundaries.
350    /// This means you can specify `from_block` and `to_block` in any order - the scanner will
351    /// always scan from the higher block number down to the lower one, regardless of which
352    /// parameter holds which value.
353    ///
354    /// # Key behaviors
355    ///
356    /// * **Single delivery**: Each registered stream receives at most `count` logs in a single
357    ///   message, chronologically ordered
358    /// * **One-shot operation**: The scanner completes after delivering messages; it does not
359    ///   continue streaming
360    /// * **Concurrent log fetching**: Logs are fetched concurrently to reduce the execution time.
361    ///   The maximum number of concurrent RPC calls is controlled by
362    ///   [`max_concurrent_fetches`][max_concurrent_fetches]
363    /// * **Flexible count**: If fewer than `count` events exist in the range, returns all available
364    ///   events
365    /// * **Default range**: By default, scans from `Earliest` to `Latest` block
366    /// * **Reorg handling**: Periodically checks the tip to detect reorgs during the scan
367    ///
368    /// # Notifications
369    ///
370    /// The scanner can emit the following notifications:
371    ///
372    /// * [`Notification::NoPastLogsFound`][no_logs]: Emitted when no matching logs are found in the
373    ///   scanned range.
374    /// * [`Notification::ReorgDetected`][reorg]: Emitted when a reorg is detected during the scan.
375    ///
376    /// # Arguments
377    ///
378    /// * `count` - Maximum number of recent events to collect per listener (must be greater than 0)
379    ///
380    /// # Reorg behavior
381    ///
382    /// The scanner can detect reorgs during the scan by periodically checking that the range tip
383    /// has not changed. This is done only when the specified range tip is not a finalized
384    /// block.
385    ///
386    /// On reorg detection:
387    /// 1. Emits [`Notification::ReorgDetected`][reorg] to all listeners
388    /// 2. Resets to the updated tip
389    /// 3. Reloads logs from the block range affected by the reorg
390    /// 4. Continues until `count` events are collected
391    ///
392    /// Final delivery to log listeners preserves chronological order regardless of reorgs.
393    ///
394    /// # Notes
395    ///
396    /// For continuous streaming after collecting latest events, use
397    /// [`EventScannerBuilder::sync().from_latest(count)`][sync_from_latest] instead
398    ///
399    /// [subscribe]: EventScanner::subscribe
400    /// [start]: EventScanner::start
401    /// [sync_from_latest]: EventScannerBuilder::from_latest
402    /// [reorg]: crate::Notification::ReorgDetected
403    /// [no_logs]: crate::Notification::NoPastLogsFound
404    /// [max_concurrent_fetches]: crate::EventScannerBuilder#method.max_concurrent_fetches-1
405    #[must_use]
406    pub fn latest(count: usize) -> EventScannerBuilder<LatestEvents> {
407        EventScannerBuilder::<LatestEvents>::new(count)
408    }
409}
410
411impl EventScannerBuilder<LatestEvents> {
412    #[must_use]
413    pub fn new(count: usize) -> Self {
414        Self {
415            config: LatestEvents {
416                count,
417                from_block: BlockNumberOrTag::Latest.into(),
418                to_block: BlockNumberOrTag::Earliest.into(),
419                block_confirmations: DEFAULT_BLOCK_CONFIRMATIONS,
420                max_concurrent_fetches: DEFAULT_MAX_CONCURRENT_FETCHES,
421            },
422            block_range_scanner: BlockRangeScannerBuilder::default(),
423        }
424    }
425}
426
427impl EventScannerBuilder<SyncFromLatestEvents> {
428    #[must_use]
429    pub fn new(count: usize) -> Self {
430        Self {
431            config: SyncFromLatestEvents {
432                count,
433                block_confirmations: DEFAULT_BLOCK_CONFIRMATIONS,
434                max_concurrent_fetches: DEFAULT_MAX_CONCURRENT_FETCHES,
435            },
436            block_range_scanner: BlockRangeScannerBuilder::default(),
437        }
438    }
439}
440
441impl EventScannerBuilder<SyncFromBlock> {
442    #[must_use]
443    pub fn new(from_block: impl Into<BlockId>) -> Self {
444        Self {
445            config: SyncFromBlock {
446                from_block: from_block.into(),
447                block_confirmations: DEFAULT_BLOCK_CONFIRMATIONS,
448                max_concurrent_fetches: DEFAULT_MAX_CONCURRENT_FETCHES,
449            },
450            block_range_scanner: BlockRangeScannerBuilder::default(),
451        }
452    }
453}
454
455impl<Mode> EventScannerBuilder<Mode> {
456    /// Sets the maximum block range per event batch.
457    ///
458    /// Controls how the scanner splits a large block range into smaller batches for processing.
459    /// Each batch corresponds to a single RPC call to fetch logs. This prevents timeouts and
460    /// respects rate limits imposed by node providers.
461    ///
462    /// # Arguments
463    ///
464    /// * `max_block_range` - Maximum number of blocks to process per batch (must be greater than 0)
465    ///
466    /// # Example
467    ///
468    /// If scanning events from blocks 1000–1099 (100 blocks total) with `max_block_range(30)`:
469    /// * Batch 1: blocks 1000–1029 (30 blocks)
470    /// * Batch 2: blocks 1030–1059 (30 blocks)
471    /// * Batch 3: blocks 1060–1089 (30 blocks)
472    /// * Batch 4: blocks 1090–1099 (10 blocks)
473    #[must_use]
474    pub fn max_block_range(mut self, max_block_range: u64) -> Self {
475        self.block_range_scanner.max_block_range = max_block_range;
476        self
477    }
478
479    /// Sets how many of past blocks to keep in memory for reorg detection.
480    ///
481    /// IMPORTANT: If zero, reorg detection is disabled.
482    ///
483    /// # Arguments
484    ///
485    /// * `past_blocks_storage_capacity` - Maximum number of blocks to keep in memory.
486    #[must_use]
487    pub fn past_blocks_storage_capacity(
488        mut self,
489        past_blocks_storage_capacity: RingBufferCapacity,
490    ) -> Self {
491        self.block_range_scanner.past_blocks_storage_capacity = past_blocks_storage_capacity;
492        self
493    }
494
495    /// Sets the stream buffer capacity.
496    ///
497    /// Controls the maximum number of messages that can be buffered in the stream
498    /// before backpressure is applied.
499    ///
500    /// # Arguments
501    ///
502    /// * `buffer_capacity` - Maximum number of messages to buffer (must be greater than 0)
503    #[must_use]
504    pub fn buffer_capacity(mut self, buffer_capacity: usize) -> Self {
505        self.block_range_scanner.buffer_capacity = buffer_capacity;
506        self
507    }
508
509    /// Builds the scanner by connecting to an existing provider.
510    ///
511    /// This is a shared method used internally by scanner-specific `connect()` methods.
512    pub(crate) async fn build<N: Network>(
513        self,
514        provider: impl IntoRobustProvider<N>,
515    ) -> Result<EventScanner<Mode, N>, ScannerError> {
516        let block_range_scanner = self.block_range_scanner.connect::<N>(provider).await?;
517        Ok(EventScanner::new(self.config, block_range_scanner))
518    }
519}
520
521#[cfg(test)]
522mod tests {
523    use crate::block_range_scanner::DEFAULT_STREAM_BUFFER_CAPACITY;
524
525    use super::*;
526
527    #[test]
528    fn test_historic_scanner_config_defaults() {
529        let builder = EventScannerBuilder::<Historic>::default();
530
531        assert_eq!(builder.config.from_block, BlockNumberOrTag::Earliest.into());
532        assert_eq!(builder.config.to_block, BlockNumberOrTag::Latest.into());
533        assert_eq!(builder.block_range_scanner.buffer_capacity, DEFAULT_STREAM_BUFFER_CAPACITY);
534    }
535
536    #[test]
537    fn test_live_scanner_config_defaults() {
538        let builder = EventScannerBuilder::<Live>::default();
539
540        assert_eq!(builder.config.block_confirmations, DEFAULT_BLOCK_CONFIRMATIONS);
541        assert_eq!(builder.block_range_scanner.buffer_capacity, DEFAULT_STREAM_BUFFER_CAPACITY);
542    }
543
544    #[test]
545    fn test_latest_scanner_config_defaults() {
546        let builder = EventScannerBuilder::<LatestEvents>::new(10);
547
548        assert_eq!(builder.config.count, 10);
549
550        assert_eq!(builder.config.from_block, BlockNumberOrTag::Latest.into());
551        assert_eq!(builder.config.to_block, BlockNumberOrTag::Earliest.into());
552        assert_eq!(builder.config.block_confirmations, DEFAULT_BLOCK_CONFIRMATIONS);
553        assert_eq!(builder.block_range_scanner.buffer_capacity, DEFAULT_STREAM_BUFFER_CAPACITY);
554    }
555
556    #[test]
557    fn sync_scanner_config_defaults() {
558        let builder = EventScannerBuilder::<SyncFromBlock>::new(BlockNumberOrTag::Earliest);
559
560        assert_eq!(builder.config.from_block, BlockNumberOrTag::Earliest.into());
561        assert_eq!(builder.config.block_confirmations, DEFAULT_BLOCK_CONFIRMATIONS);
562        assert_eq!(builder.block_range_scanner.buffer_capacity, DEFAULT_STREAM_BUFFER_CAPACITY);
563    }
564}