Skip to main content

event_scanner/event_scanner/modes/
latest.rs

1//! Collects the most recent matching events.
2//!
3//! Performs a reverse scan to collect a specified number of the most recent matching events.
4//! See [`EventScannerBuilder::latest`] for usage details.
5
6use alloy::{
7    consensus::BlockHeader,
8    eips::BlockId,
9    network::{BlockResponse, Network},
10};
11
12use crate::{
13    ScannerError,
14    event_scanner::{
15        EventScanner, StartProof,
16        block_range_handler::{BlockRangeHandler, LatestEventsHandler},
17        builder::{EventScannerBuilder, LatestEvents},
18    },
19};
20
21use robust_provider::IntoRobustProvider;
22
23impl EventScannerBuilder<LatestEvents> {
24    /// Sets the number of confirmations required before a block is considered stable enough to
25    /// include when collecting the latest events.
26    ///
27    /// Higher values reduce the likelihood of emitting logs from blocks that are later reorged,
28    /// at the cost of potentially excluding very recent events.
29    #[must_use]
30    pub fn block_confirmations(mut self, confirmations: u64) -> Self {
31        self.config.block_confirmations = confirmations;
32        self
33    }
34
35    /// Sets the starting block for the historic scan.
36    ///
37    /// # Note
38    ///
39    /// Although passing `BlockNumberOrTag::Pending` will compile, the subsequent call to
40    /// `connect` will fail at runtime. See issue <https://github.com/OpenZeppelin/Event-Scanner/issues/244>
41    #[must_use]
42    pub fn from_block(mut self, block_id: impl Into<BlockId>) -> Self {
43        self.config.from_block = block_id.into();
44        self
45    }
46
47    /// Sets the starting block for the historic scan.
48    ///
49    /// # Note
50    ///
51    /// Although passing `BlockNumberOrTag::Pending` will compile, the subsequent call to
52    /// `connect` will fail at runtime. See issue <https://github.com/OpenZeppelin/Event-Scanner/issues/244>
53    #[must_use]
54    pub fn to_block(mut self, block_id: impl Into<BlockId>) -> Self {
55        self.config.to_block = block_id.into();
56        self
57    }
58
59    /// Sets the maximum number of block-range fetches to process concurrently when
60    /// collecting the latest events.
61    ///
62    /// Higher values can increase throughput by issuing multiple RPC requests
63    /// concurrently, at the expense of more load on the provider.
64    ///
65    /// **Note**: This limit applies **per listener**. With N listeners and a limit of M,
66    /// up to N × M concurrent RPC requests may be in-flight simultaneously.
67    ///
68    /// Must be greater than 0.
69    ///
70    /// Defaults to [`DEFAULT_MAX_CONCURRENT_FETCHES`][default].
71    ///
72    /// [default]: crate::event_scanner::builder::DEFAULT_MAX_CONCURRENT_FETCHES
73    #[must_use]
74    pub fn max_concurrent_fetches(mut self, max_concurrent_fetches: usize) -> Self {
75        self.config.max_concurrent_fetches = max_concurrent_fetches;
76        self
77    }
78
79    /// Connects to an existing provider.
80    ///
81    /// # Errors
82    ///
83    /// Returns an error if:
84    /// * The provider connection fails
85    /// * The event count is zero
86    /// * The max block range is zero
87    pub async fn connect<N: Network>(
88        self,
89        provider: impl IntoRobustProvider<N>,
90    ) -> Result<EventScanner<LatestEvents, N>, ScannerError> {
91        if self.config.count == 0 {
92            return Err(ScannerError::InvalidEventCount);
93        }
94        if self.config.max_concurrent_fetches == 0 {
95            return Err(ScannerError::InvalidMaxConcurrentFetches);
96        }
97
98        let scanner = self.build(provider).await?;
99
100        let provider = scanner.block_range_scanner.provider();
101        let latest_block = provider.get_block_number().await?;
102
103        let from_num = match scanner.config.from_block {
104            BlockId::Number(from_block) => {
105                if from_block.is_pending() {
106                    return Err(ScannerError::BlockExceedsLatest(
107                        "from_block",
108                        latest_block + 1,
109                        latest_block,
110                    ));
111                }
112                // can safely unwrap to 0 because any other tag < latest block
113                from_block.as_number().unwrap_or(0)
114            }
115            BlockId::Hash(from_hash) => {
116                provider.get_block_by_hash(from_hash.into()).await?.header().number()
117            }
118        };
119
120        if from_num > latest_block {
121            Err(ScannerError::BlockExceedsLatest("from_block", from_num, latest_block))?;
122        }
123
124        let to_num = match scanner.config.to_block {
125            BlockId::Number(to_block) => {
126                if to_block.is_pending() {
127                    return Err(ScannerError::BlockExceedsLatest(
128                        "to_block",
129                        latest_block + 1,
130                        latest_block,
131                    ));
132                }
133                // can safely unwrap to 0 because any other tag < latest block
134                to_block.as_number().unwrap_or(0)
135            }
136            BlockId::Hash(to_hash) => {
137                provider.get_block_by_hash(to_hash.into()).await?.header().number()
138            }
139        };
140
141        if to_num > latest_block {
142            Err(ScannerError::BlockExceedsLatest("to_block", to_num, latest_block))?;
143        }
144
145        Ok(scanner)
146    }
147}
148
149impl<N: Network> EventScanner<LatestEvents, N> {
150    /// Starts the scanner in [`LatestEvents`] mode.
151    ///
152    /// See [`EventScanner`] for general startup notes.
153    ///
154    /// # Errors
155    ///
156    /// * [`ScannerError::Timeout`] - if an RPC call required for startup times out.
157    /// * [`ScannerError::RpcError`] - if an RPC call required for startup fails.
158    /// * [`ScannerError::BlockNotFound`] - if `from_block` or `to_block` cannot be resolved.
159    pub async fn start(self) -> Result<StartProof, ScannerError> {
160        info!(
161            from_block = ?self.config.from_block,
162            to_block = ?self.config.to_block,
163            count = ?self.config.count,
164            listener_count = self.listeners.len(),
165            "Starting EventScanner in LatestEvents mode"
166        );
167
168        let stream = self
169            .block_range_scanner
170            .stream_rewind(self.config.from_block, self.config.to_block)
171            .await?;
172
173        let broadcast_channel_capacity = self.buffer_capacity();
174
175        let handler = LatestEventsHandler::new(
176            self.block_range_scanner.provider().clone(),
177            self.listeners,
178            self.config.max_concurrent_fetches,
179            self.config.count,
180            broadcast_channel_capacity,
181        );
182
183        tokio::spawn(async move {
184            handler.handle(stream).await;
185        });
186
187        Ok(StartProof::new())
188    }
189}
190
191#[cfg(test)]
192mod tests {
193    use crate::{
194        block_range_scanner::{
195            DEFAULT_BLOCK_CONFIRMATIONS, DEFAULT_MAX_BLOCK_RANGE, DEFAULT_STREAM_BUFFER_CAPACITY,
196        },
197        event_scanner::builder::DEFAULT_MAX_CONCURRENT_FETCHES,
198    };
199
200    use super::*;
201    use alloy::{
202        eips::BlockNumberOrTag,
203        network::Ethereum,
204        node_bindings::Anvil,
205        primitives::keccak256,
206        providers::{Provider, ProviderBuilder, RootProvider, ext::AnvilApi, mock::Asserter},
207        rpc::client::RpcClient,
208    };
209
210    #[test]
211    fn test_latest_scanner_builder_pattern() {
212        let builder = EventScannerBuilder::latest(3)
213            .max_block_range(25)
214            .block_confirmations(5)
215            .from_block(BlockNumberOrTag::Number(50))
216            .to_block(BlockNumberOrTag::Number(150))
217            .max_concurrent_fetches(10)
218            .buffer_capacity(33);
219
220        assert_eq!(builder.block_range_scanner.max_block_range, 25);
221        assert_eq!(builder.config.block_confirmations, 5);
222        assert_eq!(builder.config.max_concurrent_fetches, 10);
223        assert_eq!(builder.config.count, 3);
224        assert_eq!(builder.config.from_block, BlockNumberOrTag::Number(50).into());
225        assert_eq!(builder.config.to_block, BlockNumberOrTag::Number(150).into());
226        assert_eq!(builder.block_range_scanner.buffer_capacity, 33);
227    }
228
229    #[test]
230    fn test_latest_scanner_builder_with_default_values() {
231        let builder = EventScannerBuilder::latest(10);
232
233        assert_eq!(builder.config.from_block, BlockNumberOrTag::Latest.into());
234        assert_eq!(builder.config.to_block, BlockNumberOrTag::Earliest.into());
235        assert_eq!(builder.config.count, 10);
236        assert_eq!(builder.config.max_concurrent_fetches, DEFAULT_MAX_CONCURRENT_FETCHES);
237        assert_eq!(builder.config.block_confirmations, DEFAULT_BLOCK_CONFIRMATIONS);
238        assert_eq!(builder.block_range_scanner.max_block_range, DEFAULT_MAX_BLOCK_RANGE);
239        assert_eq!(builder.block_range_scanner.buffer_capacity, DEFAULT_STREAM_BUFFER_CAPACITY);
240    }
241
242    #[test]
243    fn test_latest_scanner_builder_last_call_wins() {
244        let builder = EventScannerBuilder::latest(3)
245            .from_block(10)
246            .from_block(20)
247            .to_block(100)
248            .to_block(200)
249            .block_confirmations(5)
250            .block_confirmations(7)
251            .max_block_range(50)
252            .max_block_range(60)
253            .max_concurrent_fetches(10)
254            .max_concurrent_fetches(20)
255            .buffer_capacity(20)
256            .buffer_capacity(40);
257
258        assert_eq!(builder.config.count, 3);
259        assert_eq!(builder.config.from_block, BlockNumberOrTag::Number(20).into());
260        assert_eq!(builder.config.to_block, BlockNumberOrTag::Number(200).into());
261        assert_eq!(builder.config.block_confirmations, 7);
262        assert_eq!(builder.config.max_concurrent_fetches, 20);
263        assert_eq!(builder.block_range_scanner.max_block_range, 60);
264        assert_eq!(builder.block_range_scanner.buffer_capacity, 40);
265    }
266
267    #[tokio::test]
268    async fn accepts_zero_confirmations() -> anyhow::Result<()> {
269        let anvil = Anvil::new().try_spawn().unwrap();
270        let provider = ProviderBuilder::new().connect_http(anvil.endpoint_url());
271
272        let scanner =
273            EventScannerBuilder::latest(1).block_confirmations(0).connect(provider).await?;
274
275        assert_eq!(scanner.config.block_confirmations, 0);
276
277        Ok(())
278    }
279
280    #[tokio::test]
281    async fn test_latest_returns_error_with_zero_count() {
282        let provider = RootProvider::<Ethereum>::new(RpcClient::mocked(Asserter::new()));
283        let result = EventScannerBuilder::latest(0).connect(provider).await;
284
285        match result {
286            Err(ScannerError::InvalidEventCount) => {}
287            _ => panic!("Expected InvalidEventCount error"),
288        }
289    }
290
291    #[tokio::test]
292    async fn test_latest_returns_error_with_zero_max_block_range() {
293        let provider = RootProvider::<Ethereum>::new(RpcClient::mocked(Asserter::new()));
294        let result = EventScannerBuilder::latest(10).max_block_range(0).connect(provider).await;
295
296        match result {
297            Err(ScannerError::InvalidMaxBlockRange) => {}
298            _ => panic!("Expected InvalidMaxBlockRange error"),
299        }
300    }
301
302    #[tokio::test]
303    async fn returns_error_with_zero_buffer_capacity() {
304        let provider = RootProvider::<Ethereum>::new(RpcClient::mocked(Asserter::new()));
305        let result = EventScannerBuilder::latest(10).buffer_capacity(0).connect(provider).await;
306
307        assert!(matches!(result, Err(ScannerError::InvalidBufferCapacity)));
308    }
309
310    #[tokio::test]
311    async fn returns_error_with_zero_max_concurrent_fetches() {
312        let provider = RootProvider::<Ethereum>::new(RpcClient::mocked(Asserter::new()));
313        let result =
314            EventScannerBuilder::latest(10).max_concurrent_fetches(0).connect(provider).await;
315
316        assert!(matches!(result, Err(ScannerError::InvalidMaxConcurrentFetches)));
317    }
318
319    #[tokio::test]
320    async fn test_latest_scanner_with_valid_block_hash() {
321        let anvil = Anvil::new().try_spawn().unwrap();
322        let provider = ProviderBuilder::new().connect_http(anvil.endpoint_url());
323
324        provider.anvil_mine(Some(5), None).await.unwrap();
325
326        let block_1_hash =
327            provider.get_block_by_number(1.into()).await.unwrap().unwrap().header.hash;
328        let block_5_hash =
329            provider.get_block_by_number(5.into()).await.unwrap().unwrap().header.hash;
330
331        let result = EventScannerBuilder::latest(1)
332            .from_block(block_1_hash)
333            .to_block(block_5_hash)
334            .connect(provider.clone())
335            .await;
336
337        assert!(result.is_ok());
338    }
339
340    #[tokio::test]
341    async fn test_latest_scanner_with_invalid_to_hash() {
342        let anvil = Anvil::new().try_spawn().unwrap();
343        let provider = ProviderBuilder::new().connect_http(anvil.endpoint_url());
344
345        let random_hash = keccak256("Invalid Hash");
346        let result = EventScannerBuilder::latest(1).to_block(random_hash).connect(provider).await;
347
348        assert!(matches!(result, Err(ScannerError::BlockNotFound)));
349    }
350
351    #[tokio::test]
352    async fn test_latest_scanner_with_invalid_from_hash() {
353        let anvil = Anvil::new().try_spawn().unwrap();
354        let provider = ProviderBuilder::new().connect_http(anvil.endpoint_url());
355
356        let random_hash = keccak256("Invalid Hash");
357        let result = EventScannerBuilder::latest(1).from_block(random_hash).connect(provider).await;
358
359        assert!(matches!(result, Err(ScannerError::BlockNotFound)));
360    }
361
362    #[tokio::test]
363    async fn test_latest_scanner_with_invalid_from_and_to_hash() {
364        let anvil = Anvil::new().try_spawn().unwrap();
365        let provider = ProviderBuilder::new().connect_http(anvil.endpoint_url());
366
367        let random_from_hash = keccak256("Invalid From Hash");
368        let random_to_hash = keccak256("Invalid To Hash");
369
370        let result = EventScannerBuilder::latest(1)
371            .from_block(random_from_hash)
372            .to_block(random_to_hash)
373            .connect(provider)
374            .await;
375
376        // We expect it to fail on the first checked block (from_block)
377        assert!(matches!(result, Err(ScannerError::BlockNotFound)));
378    }
379
380    #[tokio::test]
381    async fn test_latest_scanner_with_mixed_block_types() {
382        let anvil = Anvil::new().try_spawn().unwrap();
383        let provider = ProviderBuilder::new().connect_http(anvil.endpoint_url());
384
385        provider.anvil_mine(Some(5), None).await.unwrap();
386
387        let block_1_hash =
388            provider.get_block_by_number(1.into()).await.unwrap().unwrap().header.hash;
389        let block_5_hash =
390            provider.get_block_by_number(5.into()).await.unwrap().unwrap().header.hash;
391
392        let result = EventScannerBuilder::latest(1)
393            .from_block(block_1_hash)
394            .to_block(5)
395            .connect(provider.clone())
396            .await;
397
398        assert!(result.is_ok());
399
400        let result = EventScannerBuilder::latest(1)
401            .from_block(1)
402            .to_block(block_5_hash)
403            .connect(provider)
404            .await;
405
406        assert!(result.is_ok());
407    }
408
409    #[tokio::test]
410    async fn test_from_block_above_latest_returns_error() {
411        let anvil = Anvil::new().try_spawn().unwrap();
412        let provider = ProviderBuilder::new().connect_http(anvil.endpoint_url());
413
414        let latest_block = provider.get_block_number().await.unwrap();
415
416        let result = EventScannerBuilder::latest(1)
417            .from_block(latest_block + 100)
418            .to_block(latest_block)
419            .connect(provider)
420            .await;
421
422        match result {
423            Err(ScannerError::BlockExceedsLatest("from_block", max, latest)) => {
424                assert_eq!(max, latest_block + 100);
425                assert_eq!(latest, latest_block);
426            }
427            _ => panic!("Expected BlockExceedsLatest error"),
428        }
429    }
430
431    #[tokio::test]
432    async fn test_to_block_above_latest_returns_error() {
433        let anvil = Anvil::new().try_spawn().unwrap();
434        let provider = ProviderBuilder::new().connect_http(anvil.endpoint_url());
435
436        let latest_block = provider.get_block_number().await.unwrap();
437
438        let result = EventScannerBuilder::latest(1)
439            .from_block(0)
440            .to_block(latest_block + 100)
441            .connect(provider)
442            .await;
443
444        match result {
445            Err(ScannerError::BlockExceedsLatest("to_block", max, latest)) => {
446                assert_eq!(max, latest_block + 100);
447                assert_eq!(latest, latest_block);
448            }
449            _ => panic!("Expected BlockExceedsLatest error"),
450        }
451    }
452
453    #[tokio::test]
454    async fn test_to_and_from_block_above_latest_returns_error() {
455        let anvil = Anvil::new().try_spawn().unwrap();
456        let provider = ProviderBuilder::new().connect_http(anvil.endpoint_url());
457
458        let latest_block = provider.get_block_number().await.unwrap();
459
460        let result = EventScannerBuilder::latest(1)
461            .from_block(latest_block + 50)
462            .to_block(latest_block + 100)
463            .connect(provider)
464            .await;
465
466        match result {
467            Err(ScannerError::BlockExceedsLatest("from_block", max, latest)) => {
468                assert_eq!(max, latest_block + 50);
469                assert_eq!(latest, latest_block);
470            }
471            _ => panic!("Expected BlockExceedsLatest error for 'from_block'"),
472        }
473    }
474
475    #[tokio::test]
476    async fn test_from_block_pending_returns_error() {
477        let anvil = Anvil::new().try_spawn().unwrap();
478        let provider = ProviderBuilder::new().connect_http(anvil.endpoint_url());
479
480        let latest_block = provider.get_block_number().await.unwrap();
481
482        let result = EventScannerBuilder::latest(1)
483            .from_block(BlockNumberOrTag::Pending)
484            .to_block(latest_block)
485            .connect(provider)
486            .await;
487
488        match result {
489            Err(ScannerError::BlockExceedsLatest("from_block", max, latest)) => {
490                assert_eq!(max, latest_block + 1);
491                assert_eq!(latest, latest_block);
492            }
493            _ => panic!("Expected BlockExceedsLatest error for 'from_block'"),
494        }
495    }
496
497    #[tokio::test]
498    async fn test_to_block_pending_returns_error() {
499        let anvil = Anvil::new().try_spawn().unwrap();
500        let provider = ProviderBuilder::new().connect_http(anvil.endpoint_url());
501
502        let latest_block = provider.get_block_number().await.unwrap();
503
504        let result = EventScannerBuilder::latest(1)
505            .from_block(0)
506            .to_block(BlockNumberOrTag::Pending)
507            .connect(provider)
508            .await;
509
510        match result {
511            Err(ScannerError::BlockExceedsLatest("to_block", max, latest)) => {
512                assert_eq!(max, latest_block + 1);
513                assert_eq!(latest, latest_block);
514            }
515            _ => panic!("Expected BlockExceedsLatest error for 'to_block'"),
516        }
517    }
518
519    #[tokio::test]
520    async fn test_from_and_to_block_pending_returns_error() {
521        let anvil = Anvil::new().try_spawn().unwrap();
522        let provider = ProviderBuilder::new().connect_http(anvil.endpoint_url());
523
524        let latest_block = provider.get_block_number().await.unwrap();
525
526        let result = EventScannerBuilder::latest(1)
527            .from_block(BlockNumberOrTag::Pending)
528            .to_block(BlockNumberOrTag::Pending)
529            .connect(provider)
530            .await;
531
532        // from_block is checked first
533        match result {
534            Err(ScannerError::BlockExceedsLatest("from_block", max, latest)) => {
535                assert_eq!(max, latest_block + 1);
536                assert_eq!(latest, latest_block);
537            }
538            _ => panic!("Expected BlockExceedsLatest error for 'from_block'"),
539        }
540    }
541}