event_scanner/event_scanner/modes/
latest.rs1use alloy::{
7 consensus::BlockHeader,
8 eips::BlockId,
9 network::{BlockResponse, Network},
10};
11
12use crate::{
13 ScannerError,
14 event_scanner::{
15 EventScanner, StartProof,
16 block_range_handler::{BlockRangeHandler, LatestEventsHandler},
17 builder::{EventScannerBuilder, LatestEvents},
18 },
19};
20
21use robust_provider::IntoRobustProvider;
22
23impl EventScannerBuilder<LatestEvents> {
24 #[must_use]
30 pub fn block_confirmations(mut self, confirmations: u64) -> Self {
31 self.config.block_confirmations = confirmations;
32 self
33 }
34
35 #[must_use]
42 pub fn from_block(mut self, block_id: impl Into<BlockId>) -> Self {
43 self.config.from_block = block_id.into();
44 self
45 }
46
47 #[must_use]
54 pub fn to_block(mut self, block_id: impl Into<BlockId>) -> Self {
55 self.config.to_block = block_id.into();
56 self
57 }
58
59 #[must_use]
74 pub fn max_concurrent_fetches(mut self, max_concurrent_fetches: usize) -> Self {
75 self.config.max_concurrent_fetches = max_concurrent_fetches;
76 self
77 }
78
79 pub async fn connect<N: Network>(
88 self,
89 provider: impl IntoRobustProvider<N>,
90 ) -> Result<EventScanner<LatestEvents, N>, ScannerError> {
91 if self.config.count == 0 {
92 return Err(ScannerError::InvalidEventCount);
93 }
94 if self.config.max_concurrent_fetches == 0 {
95 return Err(ScannerError::InvalidMaxConcurrentFetches);
96 }
97
98 let scanner = self.build(provider).await?;
99
100 let provider = scanner.block_range_scanner.provider();
101 let latest_block = provider.get_block_number().await?;
102
103 let from_num = match scanner.config.from_block {
104 BlockId::Number(from_block) => {
105 if from_block.is_pending() {
106 return Err(ScannerError::BlockExceedsLatest(
107 "from_block",
108 latest_block + 1,
109 latest_block,
110 ));
111 }
112 from_block.as_number().unwrap_or(0)
114 }
115 BlockId::Hash(from_hash) => {
116 provider.get_block_by_hash(from_hash.into()).await?.header().number()
117 }
118 };
119
120 if from_num > latest_block {
121 Err(ScannerError::BlockExceedsLatest("from_block", from_num, latest_block))?;
122 }
123
124 let to_num = match scanner.config.to_block {
125 BlockId::Number(to_block) => {
126 if to_block.is_pending() {
127 return Err(ScannerError::BlockExceedsLatest(
128 "to_block",
129 latest_block + 1,
130 latest_block,
131 ));
132 }
133 to_block.as_number().unwrap_or(0)
135 }
136 BlockId::Hash(to_hash) => {
137 provider.get_block_by_hash(to_hash.into()).await?.header().number()
138 }
139 };
140
141 if to_num > latest_block {
142 Err(ScannerError::BlockExceedsLatest("to_block", to_num, latest_block))?;
143 }
144
145 Ok(scanner)
146 }
147}
148
149impl<N: Network> EventScanner<LatestEvents, N> {
150 pub async fn start(self) -> Result<StartProof, ScannerError> {
160 info!(
161 from_block = ?self.config.from_block,
162 to_block = ?self.config.to_block,
163 count = ?self.config.count,
164 listener_count = self.listeners.len(),
165 "Starting EventScanner in LatestEvents mode"
166 );
167
168 let stream = self
169 .block_range_scanner
170 .stream_rewind(self.config.from_block, self.config.to_block)
171 .await?;
172
173 let broadcast_channel_capacity = self.buffer_capacity();
174
175 let handler = LatestEventsHandler::new(
176 self.block_range_scanner.provider().clone(),
177 self.listeners,
178 self.config.max_concurrent_fetches,
179 self.config.count,
180 broadcast_channel_capacity,
181 );
182
183 tokio::spawn(async move {
184 handler.handle(stream).await;
185 });
186
187 Ok(StartProof::new())
188 }
189}
190
191#[cfg(test)]
192mod tests {
193 use crate::{
194 block_range_scanner::{
195 DEFAULT_BLOCK_CONFIRMATIONS, DEFAULT_MAX_BLOCK_RANGE, DEFAULT_STREAM_BUFFER_CAPACITY,
196 },
197 event_scanner::builder::DEFAULT_MAX_CONCURRENT_FETCHES,
198 };
199
200 use super::*;
201 use alloy::{
202 eips::BlockNumberOrTag,
203 network::Ethereum,
204 node_bindings::Anvil,
205 primitives::keccak256,
206 providers::{Provider, ProviderBuilder, RootProvider, ext::AnvilApi, mock::Asserter},
207 rpc::client::RpcClient,
208 };
209
210 #[test]
211 fn test_latest_scanner_builder_pattern() {
212 let builder = EventScannerBuilder::latest(3)
213 .max_block_range(25)
214 .block_confirmations(5)
215 .from_block(BlockNumberOrTag::Number(50))
216 .to_block(BlockNumberOrTag::Number(150))
217 .max_concurrent_fetches(10)
218 .buffer_capacity(33);
219
220 assert_eq!(builder.block_range_scanner.max_block_range, 25);
221 assert_eq!(builder.config.block_confirmations, 5);
222 assert_eq!(builder.config.max_concurrent_fetches, 10);
223 assert_eq!(builder.config.count, 3);
224 assert_eq!(builder.config.from_block, BlockNumberOrTag::Number(50).into());
225 assert_eq!(builder.config.to_block, BlockNumberOrTag::Number(150).into());
226 assert_eq!(builder.block_range_scanner.buffer_capacity, 33);
227 }
228
229 #[test]
230 fn test_latest_scanner_builder_with_default_values() {
231 let builder = EventScannerBuilder::latest(10);
232
233 assert_eq!(builder.config.from_block, BlockNumberOrTag::Latest.into());
234 assert_eq!(builder.config.to_block, BlockNumberOrTag::Earliest.into());
235 assert_eq!(builder.config.count, 10);
236 assert_eq!(builder.config.max_concurrent_fetches, DEFAULT_MAX_CONCURRENT_FETCHES);
237 assert_eq!(builder.config.block_confirmations, DEFAULT_BLOCK_CONFIRMATIONS);
238 assert_eq!(builder.block_range_scanner.max_block_range, DEFAULT_MAX_BLOCK_RANGE);
239 assert_eq!(builder.block_range_scanner.buffer_capacity, DEFAULT_STREAM_BUFFER_CAPACITY);
240 }
241
242 #[test]
243 fn test_latest_scanner_builder_last_call_wins() {
244 let builder = EventScannerBuilder::latest(3)
245 .from_block(10)
246 .from_block(20)
247 .to_block(100)
248 .to_block(200)
249 .block_confirmations(5)
250 .block_confirmations(7)
251 .max_block_range(50)
252 .max_block_range(60)
253 .max_concurrent_fetches(10)
254 .max_concurrent_fetches(20)
255 .buffer_capacity(20)
256 .buffer_capacity(40);
257
258 assert_eq!(builder.config.count, 3);
259 assert_eq!(builder.config.from_block, BlockNumberOrTag::Number(20).into());
260 assert_eq!(builder.config.to_block, BlockNumberOrTag::Number(200).into());
261 assert_eq!(builder.config.block_confirmations, 7);
262 assert_eq!(builder.config.max_concurrent_fetches, 20);
263 assert_eq!(builder.block_range_scanner.max_block_range, 60);
264 assert_eq!(builder.block_range_scanner.buffer_capacity, 40);
265 }
266
267 #[tokio::test]
268 async fn accepts_zero_confirmations() -> anyhow::Result<()> {
269 let anvil = Anvil::new().try_spawn().unwrap();
270 let provider = ProviderBuilder::new().connect_http(anvil.endpoint_url());
271
272 let scanner =
273 EventScannerBuilder::latest(1).block_confirmations(0).connect(provider).await?;
274
275 assert_eq!(scanner.config.block_confirmations, 0);
276
277 Ok(())
278 }
279
280 #[tokio::test]
281 async fn test_latest_returns_error_with_zero_count() {
282 let provider = RootProvider::<Ethereum>::new(RpcClient::mocked(Asserter::new()));
283 let result = EventScannerBuilder::latest(0).connect(provider).await;
284
285 match result {
286 Err(ScannerError::InvalidEventCount) => {}
287 _ => panic!("Expected InvalidEventCount error"),
288 }
289 }
290
291 #[tokio::test]
292 async fn test_latest_returns_error_with_zero_max_block_range() {
293 let provider = RootProvider::<Ethereum>::new(RpcClient::mocked(Asserter::new()));
294 let result = EventScannerBuilder::latest(10).max_block_range(0).connect(provider).await;
295
296 match result {
297 Err(ScannerError::InvalidMaxBlockRange) => {}
298 _ => panic!("Expected InvalidMaxBlockRange error"),
299 }
300 }
301
302 #[tokio::test]
303 async fn returns_error_with_zero_buffer_capacity() {
304 let provider = RootProvider::<Ethereum>::new(RpcClient::mocked(Asserter::new()));
305 let result = EventScannerBuilder::latest(10).buffer_capacity(0).connect(provider).await;
306
307 assert!(matches!(result, Err(ScannerError::InvalidBufferCapacity)));
308 }
309
310 #[tokio::test]
311 async fn returns_error_with_zero_max_concurrent_fetches() {
312 let provider = RootProvider::<Ethereum>::new(RpcClient::mocked(Asserter::new()));
313 let result =
314 EventScannerBuilder::latest(10).max_concurrent_fetches(0).connect(provider).await;
315
316 assert!(matches!(result, Err(ScannerError::InvalidMaxConcurrentFetches)));
317 }
318
319 #[tokio::test]
320 async fn test_latest_scanner_with_valid_block_hash() {
321 let anvil = Anvil::new().try_spawn().unwrap();
322 let provider = ProviderBuilder::new().connect_http(anvil.endpoint_url());
323
324 provider.anvil_mine(Some(5), None).await.unwrap();
325
326 let block_1_hash =
327 provider.get_block_by_number(1.into()).await.unwrap().unwrap().header.hash;
328 let block_5_hash =
329 provider.get_block_by_number(5.into()).await.unwrap().unwrap().header.hash;
330
331 let result = EventScannerBuilder::latest(1)
332 .from_block(block_1_hash)
333 .to_block(block_5_hash)
334 .connect(provider.clone())
335 .await;
336
337 assert!(result.is_ok());
338 }
339
340 #[tokio::test]
341 async fn test_latest_scanner_with_invalid_to_hash() {
342 let anvil = Anvil::new().try_spawn().unwrap();
343 let provider = ProviderBuilder::new().connect_http(anvil.endpoint_url());
344
345 let random_hash = keccak256("Invalid Hash");
346 let result = EventScannerBuilder::latest(1).to_block(random_hash).connect(provider).await;
347
348 assert!(matches!(result, Err(ScannerError::BlockNotFound)));
349 }
350
351 #[tokio::test]
352 async fn test_latest_scanner_with_invalid_from_hash() {
353 let anvil = Anvil::new().try_spawn().unwrap();
354 let provider = ProviderBuilder::new().connect_http(anvil.endpoint_url());
355
356 let random_hash = keccak256("Invalid Hash");
357 let result = EventScannerBuilder::latest(1).from_block(random_hash).connect(provider).await;
358
359 assert!(matches!(result, Err(ScannerError::BlockNotFound)));
360 }
361
362 #[tokio::test]
363 async fn test_latest_scanner_with_invalid_from_and_to_hash() {
364 let anvil = Anvil::new().try_spawn().unwrap();
365 let provider = ProviderBuilder::new().connect_http(anvil.endpoint_url());
366
367 let random_from_hash = keccak256("Invalid From Hash");
368 let random_to_hash = keccak256("Invalid To Hash");
369
370 let result = EventScannerBuilder::latest(1)
371 .from_block(random_from_hash)
372 .to_block(random_to_hash)
373 .connect(provider)
374 .await;
375
376 assert!(matches!(result, Err(ScannerError::BlockNotFound)));
378 }
379
380 #[tokio::test]
381 async fn test_latest_scanner_with_mixed_block_types() {
382 let anvil = Anvil::new().try_spawn().unwrap();
383 let provider = ProviderBuilder::new().connect_http(anvil.endpoint_url());
384
385 provider.anvil_mine(Some(5), None).await.unwrap();
386
387 let block_1_hash =
388 provider.get_block_by_number(1.into()).await.unwrap().unwrap().header.hash;
389 let block_5_hash =
390 provider.get_block_by_number(5.into()).await.unwrap().unwrap().header.hash;
391
392 let result = EventScannerBuilder::latest(1)
393 .from_block(block_1_hash)
394 .to_block(5)
395 .connect(provider.clone())
396 .await;
397
398 assert!(result.is_ok());
399
400 let result = EventScannerBuilder::latest(1)
401 .from_block(1)
402 .to_block(block_5_hash)
403 .connect(provider)
404 .await;
405
406 assert!(result.is_ok());
407 }
408
409 #[tokio::test]
410 async fn test_from_block_above_latest_returns_error() {
411 let anvil = Anvil::new().try_spawn().unwrap();
412 let provider = ProviderBuilder::new().connect_http(anvil.endpoint_url());
413
414 let latest_block = provider.get_block_number().await.unwrap();
415
416 let result = EventScannerBuilder::latest(1)
417 .from_block(latest_block + 100)
418 .to_block(latest_block)
419 .connect(provider)
420 .await;
421
422 match result {
423 Err(ScannerError::BlockExceedsLatest("from_block", max, latest)) => {
424 assert_eq!(max, latest_block + 100);
425 assert_eq!(latest, latest_block);
426 }
427 _ => panic!("Expected BlockExceedsLatest error"),
428 }
429 }
430
431 #[tokio::test]
432 async fn test_to_block_above_latest_returns_error() {
433 let anvil = Anvil::new().try_spawn().unwrap();
434 let provider = ProviderBuilder::new().connect_http(anvil.endpoint_url());
435
436 let latest_block = provider.get_block_number().await.unwrap();
437
438 let result = EventScannerBuilder::latest(1)
439 .from_block(0)
440 .to_block(latest_block + 100)
441 .connect(provider)
442 .await;
443
444 match result {
445 Err(ScannerError::BlockExceedsLatest("to_block", max, latest)) => {
446 assert_eq!(max, latest_block + 100);
447 assert_eq!(latest, latest_block);
448 }
449 _ => panic!("Expected BlockExceedsLatest error"),
450 }
451 }
452
453 #[tokio::test]
454 async fn test_to_and_from_block_above_latest_returns_error() {
455 let anvil = Anvil::new().try_spawn().unwrap();
456 let provider = ProviderBuilder::new().connect_http(anvil.endpoint_url());
457
458 let latest_block = provider.get_block_number().await.unwrap();
459
460 let result = EventScannerBuilder::latest(1)
461 .from_block(latest_block + 50)
462 .to_block(latest_block + 100)
463 .connect(provider)
464 .await;
465
466 match result {
467 Err(ScannerError::BlockExceedsLatest("from_block", max, latest)) => {
468 assert_eq!(max, latest_block + 50);
469 assert_eq!(latest, latest_block);
470 }
471 _ => panic!("Expected BlockExceedsLatest error for 'from_block'"),
472 }
473 }
474
475 #[tokio::test]
476 async fn test_from_block_pending_returns_error() {
477 let anvil = Anvil::new().try_spawn().unwrap();
478 let provider = ProviderBuilder::new().connect_http(anvil.endpoint_url());
479
480 let latest_block = provider.get_block_number().await.unwrap();
481
482 let result = EventScannerBuilder::latest(1)
483 .from_block(BlockNumberOrTag::Pending)
484 .to_block(latest_block)
485 .connect(provider)
486 .await;
487
488 match result {
489 Err(ScannerError::BlockExceedsLatest("from_block", max, latest)) => {
490 assert_eq!(max, latest_block + 1);
491 assert_eq!(latest, latest_block);
492 }
493 _ => panic!("Expected BlockExceedsLatest error for 'from_block'"),
494 }
495 }
496
497 #[tokio::test]
498 async fn test_to_block_pending_returns_error() {
499 let anvil = Anvil::new().try_spawn().unwrap();
500 let provider = ProviderBuilder::new().connect_http(anvil.endpoint_url());
501
502 let latest_block = provider.get_block_number().await.unwrap();
503
504 let result = EventScannerBuilder::latest(1)
505 .from_block(0)
506 .to_block(BlockNumberOrTag::Pending)
507 .connect(provider)
508 .await;
509
510 match result {
511 Err(ScannerError::BlockExceedsLatest("to_block", max, latest)) => {
512 assert_eq!(max, latest_block + 1);
513 assert_eq!(latest, latest_block);
514 }
515 _ => panic!("Expected BlockExceedsLatest error for 'to_block'"),
516 }
517 }
518
519 #[tokio::test]
520 async fn test_from_and_to_block_pending_returns_error() {
521 let anvil = Anvil::new().try_spawn().unwrap();
522 let provider = ProviderBuilder::new().connect_http(anvil.endpoint_url());
523
524 let latest_block = provider.get_block_number().await.unwrap();
525
526 let result = EventScannerBuilder::latest(1)
527 .from_block(BlockNumberOrTag::Pending)
528 .to_block(BlockNumberOrTag::Pending)
529 .connect(provider)
530 .await;
531
532 match result {
534 Err(ScannerError::BlockExceedsLatest("from_block", max, latest)) => {
535 assert_eq!(max, latest_block + 1);
536 assert_eq!(latest, latest_block);
537 }
538 _ => panic!("Expected BlockExceedsLatest error for 'from_block'"),
539 }
540 }
541}