event_scanner/
block_range_scanner.rs

1//! Example usage:
2//!
3//! ```rust,no_run
4//! use alloy::{eips::BlockNumberOrTag, network::Ethereum, primitives::BlockNumber};
5//! use std::ops::RangeInclusive;
6//! use tokio_stream::{StreamExt, wrappers::ReceiverStream};
7//!
8//! use alloy::transports::http::reqwest::Url;
9//! use event_scanner::{
10//!     ScannerError,
11//!     block_range_scanner::{
12//!         BlockRangeScanner, BlockRangeScannerClient, DEFAULT_BLOCK_CONFIRMATIONS,
13//!         DEFAULT_MAX_BLOCK_RANGE, Message,
14//!     },
15//! };
16//! use tokio::time::Duration;
17//! use tracing::{error, info};
18//!
19//! #[tokio::main]
20//! async fn main() -> Result<(), Box<dyn std::error::Error>> {
21//!     // Initialize logging
22//!     tracing_subscriber::fmt::init();
23//!
24//!     // Configuration
25//!     let block_range_scanner = BlockRangeScanner::new()
26//!         .connect_ws::<Ethereum>(Url::parse("ws://localhost:8546").unwrap())
27//!         .await?;
28//!
29//!     // Create client to send subscribe command to block scanner
30//!     let client: BlockRangeScannerClient = block_range_scanner.run()?;
31//!
32//!     let mut stream =
33//!         client.stream_from(BlockNumberOrTag::Number(5), DEFAULT_BLOCK_CONFIRMATIONS).await?;
34//!
35//!     while let Some(message) = stream.next().await {
36//!         match message {
37//!             Message::Data(range) => {
38//!                 // process range
39//!             }
40//!             Message::Error(e) => {
41//!                 error!("Received error from subscription: {e}");
42//!                 match e {
43//!                     ScannerError::ServiceShutdown => break,
44//!                     ScannerError::WebSocketConnectionFailed(_) => {
45//!                         error!(
46//!                             "WebSocket connection failed, continuing to listen for reconnection"
47//!                         );
48//!                     }
49//!                     _ => {
50//!                         error!("Non-fatal error, continuing: {e}");
51//!                     }
52//!                 }
53//!             }
54//!             Message::Status(status) => {
55//!                 info!("Received status message: {:?}", status);
56//!             }
57//!         }
58//!     }
59//!
60//!     info!("Data processing stopped.");
61//!
62//!     Ok(())
63//! }
64//! ```
65
66use std::{cmp::Ordering, ops::RangeInclusive};
67use tokio::{
68    join,
69    sync::{mpsc, oneshot},
70};
71use tokio_stream::{StreamExt, wrappers::ReceiverStream};
72
73use crate::{
74    ScannerMessage,
75    error::ScannerError,
76    types::{ScannerStatus, TryStream},
77};
78use alloy::{
79    consensus::BlockHeader,
80    eips::BlockNumberOrTag,
81    network::{BlockResponse, Network, primitives::HeaderResponse},
82    primitives::{B256, BlockNumber},
83    providers::{Provider, RootProvider},
84    pubsub::Subscription,
85    rpc::client::ClientBuilder,
86    transports::{
87        RpcError, TransportErrorKind, TransportResult, http::reqwest::Url, ws::WsConnect,
88    },
89};
90use tracing::{debug, error, info, warn};
91
92pub const DEFAULT_MAX_BLOCK_RANGE: u64 = 1000;
93// copied form https://github.com/taikoxyz/taiko-mono/blob/f4b3a0e830e42e2fee54829326389709dd422098/packages/taiko-client/pkg/chain_iterator/block_batch_iterator.go#L19
94pub const DEFAULT_BLOCK_CONFIRMATIONS: u64 = 0;
95
96pub const MAX_BUFFERED_MESSAGES: usize = 50000;
97
98// Maximum amount of reorged blocks on Ethereum (after this amount of block confirmations, a block
99// is considered final)
100pub const DEFAULT_REORG_REWIND_DEPTH: u64 = 64;
101
102pub type Message = ScannerMessage<RangeInclusive<BlockNumber>, ScannerError>;
103
104impl From<RangeInclusive<BlockNumber>> for Message {
105    fn from(logs: RangeInclusive<BlockNumber>) -> Self {
106        Message::Data(logs)
107    }
108}
109
110impl PartialEq<RangeInclusive<BlockNumber>> for Message {
111    fn eq(&self, other: &RangeInclusive<BlockNumber>) -> bool {
112        if let Message::Data(range) = self { range.eq(other) } else { false }
113    }
114}
115
116impl From<RpcError<TransportErrorKind>> for Message {
117    fn from(error: RpcError<TransportErrorKind>) -> Self {
118        Message::Error(error.into())
119    }
120}
121
122impl From<ScannerError> for Message {
123    fn from(error: ScannerError) -> Self {
124        Message::Error(error)
125    }
126}
127
128#[derive(Clone, Copy)]
129pub struct BlockRangeScanner {
130    pub max_block_range: u64,
131}
132
133impl Default for BlockRangeScanner {
134    fn default() -> Self {
135        Self::new()
136    }
137}
138
139impl BlockRangeScanner {
140    #[must_use]
141    pub fn new() -> Self {
142        Self { max_block_range: DEFAULT_MAX_BLOCK_RANGE }
143    }
144
145    #[must_use]
146    pub fn max_block_range(mut self, max_block_range: u64) -> Self {
147        self.max_block_range = max_block_range;
148        self
149    }
150
151    /// Connects to the provider via WebSocket
152    ///
153    /// # Errors
154    ///
155    /// Returns an error if the connection fails
156    pub async fn connect_ws<N: Network>(
157        self,
158        ws_url: Url,
159    ) -> TransportResult<ConnectedBlockRangeScanner<N>> {
160        let provider =
161            RootProvider::<N>::new(ClientBuilder::default().ws(WsConnect::new(ws_url)).await?);
162        Ok(self.connect(provider))
163    }
164
165    /// Connects to the provider via IPC
166    ///
167    /// # Errors
168    ///
169    /// Returns an error if the connection fails
170    pub async fn connect_ipc<N: Network>(
171        self,
172        ipc_path: String,
173    ) -> Result<ConnectedBlockRangeScanner<N>, RpcError<TransportErrorKind>> {
174        let provider = RootProvider::<N>::new(ClientBuilder::default().ipc(ipc_path.into()).await?);
175        Ok(self.connect(provider))
176    }
177
178    /// Connects to an existing provider
179    ///
180    /// # Errors
181    ///
182    /// Returns an error if the connection fails
183    #[must_use]
184    pub fn connect<N: Network>(self, provider: RootProvider<N>) -> ConnectedBlockRangeScanner<N> {
185        ConnectedBlockRangeScanner { provider, max_block_range: self.max_block_range }
186    }
187}
188
189pub struct ConnectedBlockRangeScanner<N: Network> {
190    provider: RootProvider<N>,
191    max_block_range: u64,
192}
193
194impl<N: Network> ConnectedBlockRangeScanner<N> {
195    /// Returns the underlying Provider.
196    #[must_use]
197    pub fn provider(&self) -> &RootProvider<N> {
198        &self.provider
199    }
200
201    /// Starts the subscription service and returns a client for sending commands.
202    ///
203    /// # Errors
204    ///
205    /// Returns an error if the subscription service fails to start.
206    pub fn run(&self) -> Result<BlockRangeScannerClient, ScannerError> {
207        let (service, cmd_tx) = Service::new(self.provider.clone(), self.max_block_range);
208        tokio::spawn(async move {
209            service.run().await;
210        });
211        Ok(BlockRangeScannerClient::new(cmd_tx))
212    }
213}
214
215#[derive(Debug)]
216pub enum Command {
217    StreamLive {
218        sender: mpsc::Sender<Message>,
219        block_confirmations: u64,
220        response: oneshot::Sender<Result<(), ScannerError>>,
221    },
222    StreamHistorical {
223        sender: mpsc::Sender<Message>,
224        start_height: BlockNumberOrTag,
225        end_height: BlockNumberOrTag,
226        response: oneshot::Sender<Result<(), ScannerError>>,
227    },
228    StreamFrom {
229        sender: mpsc::Sender<Message>,
230        start_height: BlockNumberOrTag,
231        block_confirmations: u64,
232        response: oneshot::Sender<Result<(), ScannerError>>,
233    },
234    Rewind {
235        sender: mpsc::Sender<Message>,
236        start_height: BlockNumberOrTag,
237        end_height: BlockNumberOrTag,
238        response: oneshot::Sender<Result<(), ScannerError>>,
239    },
240}
241
242struct Service<N: Network> {
243    provider: RootProvider<N>,
244    max_block_range: u64,
245    error_count: u64,
246    command_receiver: mpsc::Receiver<Command>,
247    shutdown: bool,
248}
249
250impl<N: Network> Service<N> {
251    pub fn new(provider: RootProvider<N>, max_block_range: u64) -> (Self, mpsc::Sender<Command>) {
252        let (cmd_tx, cmd_rx) = mpsc::channel(100);
253
254        let service = Self {
255            provider,
256            max_block_range,
257            error_count: 0,
258            command_receiver: cmd_rx,
259            shutdown: false,
260        };
261
262        (service, cmd_tx)
263    }
264
265    pub async fn run(mut self) {
266        info!("Starting subscription service");
267
268        while !self.shutdown {
269            tokio::select! {
270                cmd = self.command_receiver.recv() => {
271                    if let Some(command) = cmd {
272                        if let Err(e) = self.handle_command(command).await {
273                            error!("Command handling error: {}", e);
274                            self.error_count += 1;
275                        }
276                    } else {
277                        warn!("Command channel closed, shutting down");
278                        break;
279                    }
280                }
281            }
282        }
283
284        info!("Subscription service stopped");
285    }
286
287    async fn handle_command(&mut self, command: Command) -> Result<(), ScannerError> {
288        match command {
289            Command::StreamLive { sender, block_confirmations, response } => {
290                info!("Starting live stream");
291                let result = self.handle_live(block_confirmations, sender).await;
292                let _ = response.send(result);
293            }
294            Command::StreamHistorical { sender, start_height, end_height, response } => {
295                info!(start_height = ?start_height, end_height = ?end_height, "Starting historical stream");
296                let result = self.handle_historical(start_height, end_height, sender).await;
297                let _ = response.send(result);
298            }
299            Command::StreamFrom { sender, start_height, block_confirmations, response } => {
300                info!(start_height = ?start_height, "Starting streaming from");
301                let result = self.handle_sync(start_height, block_confirmations, sender).await;
302                let _ = response.send(result);
303            }
304            Command::Rewind { sender, start_height, end_height, response } => {
305                info!(start_height = ?start_height, end_height = ?end_height, "Starting rewind");
306                let result = self.handle_rewind(start_height, end_height, sender).await;
307                let _ = response.send(result);
308            }
309        }
310        Ok(())
311    }
312
313    async fn handle_live(
314        &mut self,
315        block_confirmations: u64,
316        sender: mpsc::Sender<Message>,
317    ) -> Result<(), ScannerError> {
318        let max_block_range = self.max_block_range;
319        let provider = self.provider.clone();
320        let latest = self.provider.get_block_number().await?;
321
322        // the next block returned by the underlying subscription will always be `latest + 1`,
323        // because `latest` was already mined and subscription by definition only streams after new
324        // blocks have been mined
325        let range_start = (latest + 1).saturating_sub(block_confirmations);
326
327        tokio::spawn(async move {
328            Self::stream_live_blocks(
329                range_start,
330                provider,
331                sender,
332                block_confirmations,
333                max_block_range,
334            )
335            .await;
336        });
337
338        Ok(())
339    }
340
341    async fn handle_historical(
342        &mut self,
343        start_height: BlockNumberOrTag,
344        end_height: BlockNumberOrTag,
345        sender: mpsc::Sender<Message>,
346    ) -> Result<(), ScannerError> {
347        let max_block_range = self.max_block_range;
348
349        let (start_block, end_block) = tokio::try_join!(
350            self.provider.get_block_by_number(start_height),
351            self.provider.get_block_by_number(end_height)
352        )?;
353
354        let start_block_num =
355            start_block.ok_or_else(|| ScannerError::BlockNotFound(start_height))?.header().number();
356        let end_block_num =
357            end_block.ok_or_else(|| ScannerError::BlockNotFound(end_height))?.header().number();
358
359        let (start_block_num, end_block_num) = match start_block_num.cmp(&end_block_num) {
360            Ordering::Greater => (end_block_num, start_block_num),
361            _ => (start_block_num, end_block_num),
362        };
363
364        info!(start_block = start_block_num, end_block = end_block_num, "Syncing historical data");
365
366        tokio::spawn(async move {
367            Self::stream_historical_blocks(
368                start_block_num,
369                end_block_num,
370                max_block_range,
371                &sender,
372            )
373            .await;
374        });
375
376        Ok(())
377    }
378
379    async fn handle_sync(
380        &mut self,
381        start_height: BlockNumberOrTag,
382        block_confirmations: u64,
383        sender: mpsc::Sender<Message>,
384    ) -> Result<(), ScannerError> {
385        let provider = self.provider.clone();
386        let max_block_range = self.max_block_range;
387
388        let get_start_block = async || -> Result<BlockNumber, ScannerError> {
389            let block = match start_height {
390                BlockNumberOrTag::Number(num) => num,
391                block_tag => provider
392                    .get_block_by_number(block_tag)
393                    .await?
394                    .ok_or_else(|| ScannerError::BlockNotFound(block_tag))?
395                    .header()
396                    .number(),
397            };
398            Ok(block)
399        };
400
401        let get_latest_block = async || -> Result<BlockNumber, ScannerError> {
402            let block = provider
403                .get_block_by_number(BlockNumberOrTag::Latest)
404                .await?
405                .ok_or_else(|| ScannerError::BlockNotFound(BlockNumberOrTag::Latest))?
406                .header()
407                .number();
408            Ok(block)
409        };
410
411        // Step 1:
412        // Fetches the starting block and end block for historical sync in parallel
413        let (start_block, latest_block) = tokio::try_join!(get_start_block(), get_latest_block())?;
414
415        let confirmed_tip = latest_block.saturating_sub(block_confirmations);
416
417        // If start is beyond confirmed tip, skip historical and go straight to live
418        if start_block > confirmed_tip {
419            info!(
420                start_block = start_block,
421                confirmed_tip = confirmed_tip,
422                "Start block is beyond confirmed tip, starting live stream"
423            );
424
425            tokio::spawn(async move {
426                Self::stream_live_blocks(
427                    start_block,
428                    provider,
429                    sender,
430                    block_confirmations,
431                    max_block_range,
432                )
433                .await;
434            });
435
436            return Ok(());
437        }
438
439        info!(start_block = start_block, end_block = confirmed_tip, "Syncing historical data");
440
441        // Step 2: Setup the live streaming buffer
442        // This channel will accumulate while historical sync is running
443        let (live_block_buffer_sender, live_block_buffer_receiver) =
444            mpsc::channel::<Message>(MAX_BUFFERED_MESSAGES);
445
446        // The cutoff is the last block we have synced historically
447        // Any block > cutoff will come from the live stream
448        let cutoff = confirmed_tip;
449
450        // This task runs independently, accumulating new blocks while wehistorical data is syncing
451        tokio::spawn(async move {
452            Self::stream_live_blocks(
453                cutoff + 1,
454                provider,
455                live_block_buffer_sender,
456                block_confirmations,
457                max_block_range,
458            )
459            .await;
460        });
461
462        tokio::spawn(async move {
463            // Step 4: Perform historical synchronization
464            // This processes blocks from start_block to end_block (cutoff)
465            // If this fails, we need to abort the live streaming task
466            Self::stream_historical_blocks(start_block, confirmed_tip, max_block_range, &sender)
467                .await;
468
469            info!("Chain tip reached, switching to live");
470
471            if !sender.try_stream(ScannerStatus::SwitchingToLive).await {
472                return;
473            }
474
475            info!("Successfully transitioned from historical to live data");
476
477            // Step 5:
478            // Spawn the buffer processor task
479            // This will:
480            // 1. Process all buffered blocks, filtering out any ≤ cutoff
481            // 2. Forward blocks > cutoff to the user
482            // 3. Continue forwarding until the buffer if exhausted (waits for new blocks from live
483            //    stream)
484            Self::process_live_block_buffer(live_block_buffer_receiver, sender, cutoff).await;
485        });
486
487        Ok(())
488    }
489
490    async fn handle_rewind(
491        &mut self,
492        start_height: BlockNumberOrTag,
493        end_height: BlockNumberOrTag,
494        sender: mpsc::Sender<Message>,
495    ) -> Result<(), ScannerError> {
496        let max_block_range = self.max_block_range;
497        let provider = self.provider.clone();
498
499        let (start_block, end_block) = join!(
500            self.provider.get_block_by_number(start_height),
501            self.provider.get_block_by_number(end_height),
502        );
503
504        let start_block = start_block?.ok_or(ScannerError::BlockNotFound(start_height))?;
505        let end_block = end_block?.ok_or(ScannerError::BlockNotFound(end_height))?;
506
507        // normalize block range
508        let (from, to) = match start_block.header().number().cmp(&end_block.header().number()) {
509            Ordering::Greater => (start_block, end_block),
510            _ => (end_block, start_block),
511        };
512
513        tokio::spawn(async move {
514            Self::stream_rewind(from, to, max_block_range, &sender, &provider).await;
515        });
516
517        Ok(())
518    }
519
520    /// Streams blocks in reverse order from `from` to `to`.
521    ///
522    /// The `from` block is assumed to be greater than or equal to the `to` block.
523    ///
524    /// # Errors
525    ///
526    /// Returns an error if the stream fails
527    async fn stream_rewind(
528        from: N::BlockResponse,
529        to: N::BlockResponse,
530        max_block_range: u64,
531        sender: &mpsc::Sender<Message>,
532        provider: &RootProvider<N>,
533    ) {
534        let mut batch_count = 0;
535
536        // for checking whether reorg occurred
537        let mut tip_hash = from.header().hash();
538
539        let from = from.header().number();
540        let to = to.header().number();
541
542        // we're iterating in reverse
543        let mut batch_from = from;
544
545        while batch_from >= to {
546            let batch_to = batch_from.saturating_sub(max_block_range - 1).max(to);
547
548            // stream the range regularly, i.e. from smaller block number to greater
549            if !sender.try_stream(batch_to..=batch_from).await {
550                break;
551            }
552
553            batch_count += 1;
554            if batch_count % 10 == 0 {
555                debug!(batch_count = batch_count, "Processed rewind batches");
556            }
557
558            // check early if end of stream achieved to avoid subtraction overflow when `to
559            // == 0`
560            if batch_to == to {
561                break;
562            }
563
564            let reorged = match reorg_detected(provider, tip_hash).await {
565                Ok(detected) => {
566                    info!(block_number = %from, hash = %tip_hash, "Reorg detected");
567                    detected
568                }
569                Err(e) => {
570                    error!(error = %e, "Terminal RPC call error, shutting down");
571                    _ = sender.try_stream(e);
572                    return;
573                }
574            };
575
576            if reorged {
577                info!(block_number = %from, hash = %tip_hash, "Reorg detected");
578
579                if !sender.try_stream(ScannerStatus::ReorgDetected).await {
580                    break;
581                }
582
583                // restart rewind
584                batch_from = from;
585                // store the updated end block hash
586                tip_hash = match provider.get_block_by_number(from.into()).await {
587                    Ok(block) => block
588                        .unwrap_or_else(|| {
589                            panic!("Block with number '{from}' should exist post-reorg")
590                        })
591                        .header()
592                        .hash(),
593                    Err(e) => {
594                        error!(error = %e, "Terminal RPC call error, shutting down");
595                        _ = sender.try_stream(e);
596                        return;
597                    }
598                };
599            } else {
600                // `batch_to` is always greater than `to`, so `batch_to - 1` is always a valid
601                // unsigned integer
602                batch_from = batch_to - 1;
603            }
604        }
605
606        info!(batch_count = batch_count, "Rewind completed");
607    }
608
609    async fn stream_historical_blocks(
610        start: BlockNumber,
611        end: BlockNumber,
612        max_block_range: u64,
613        sender: &mpsc::Sender<Message>,
614    ) {
615        let mut batch_count = 0;
616
617        let mut next_start_block = start;
618
619        // must be <= to include the edge case when start == end (i.e. return the single block
620        // range)
621        while next_start_block <= end {
622            let batch_end_block_number =
623                next_start_block.saturating_add(max_block_range - 1).min(end);
624
625            if !sender.try_stream(next_start_block..=batch_end_block_number).await {
626                break;
627            }
628
629            batch_count += 1;
630            if batch_count % 10 == 0 {
631                debug!(batch_count = batch_count, "Processed historical batches");
632            }
633
634            if batch_end_block_number == end {
635                break;
636            }
637
638            // Next block number always exists as we checked end block previously
639            let next_start_block_number = batch_end_block_number.saturating_add(1);
640
641            next_start_block = next_start_block_number;
642        }
643
644        info!(batch_count = batch_count, "Historical sync completed");
645    }
646
647    async fn stream_live_blocks<P: Provider<N>>(
648        mut range_start: BlockNumber,
649        provider: P,
650        sender: mpsc::Sender<Message>,
651        block_confirmations: u64,
652        max_block_range: u64,
653    ) {
654        match Self::get_block_subscription(&provider).await {
655            Ok(ws_stream) => {
656                info!("WebSocket connected for live blocks");
657
658                // ensure we start streaming only after the expected_next_block cutoff
659                let cutoff = range_start;
660                let mut stream =
661                    ws_stream.into_stream().skip_while(|header| header.number() < cutoff);
662
663                while let Some(incoming_block) = stream.next().await {
664                    let incoming_block_num = incoming_block.number();
665                    info!(block_number = incoming_block_num, "Received block header");
666
667                    if incoming_block_num < range_start {
668                        warn!("Reorg detected: sending forked range");
669                        if !sender.try_stream(ScannerStatus::ReorgDetected).await {
670                            return;
671                        }
672
673                        // Calculate the confirmed block position for the incoming block
674                        let incoming_confirmed =
675                            incoming_block_num.saturating_sub(block_confirmations);
676
677                        // updated expected block to updated confirmed
678                        range_start = incoming_confirmed;
679                    }
680
681                    let confirmed = incoming_block_num.saturating_sub(block_confirmations);
682                    if confirmed >= range_start {
683                        // NOTE: Edge case when difference between range end and range start >= max
684                        // reads
685                        let range_end =
686                            confirmed.min(range_start.saturating_add(max_block_range - 1));
687
688                        info!(
689                            range_start = range_start,
690                            range_end = range_end,
691                            "Sending live block range"
692                        );
693
694                        if !sender.try_stream(range_start..=range_end).await {
695                            return;
696                        }
697
698                        // Overflow can not realistically happen
699                        range_start = range_end + 1;
700                    }
701                }
702            }
703            Err(e) => {
704                _ = sender.try_stream(e).await;
705            }
706        }
707    }
708
709    async fn process_live_block_buffer(
710        mut buffer_rx: mpsc::Receiver<Message>,
711        sender: mpsc::Sender<Message>,
712        cutoff: BlockNumber,
713    ) {
714        let mut processed = 0;
715        let mut discarded = 0;
716
717        // Process all buffered messages
718        while let Some(data) = buffer_rx.recv().await {
719            match data {
720                Message::Data(range) => {
721                    let (start, end) = (*range.start(), *range.end());
722                    if start >= cutoff {
723                        if !sender.try_stream(range).await {
724                            break;
725                        }
726                        processed += end - start;
727                    } else if end >= cutoff {
728                        discarded += cutoff - start;
729
730                        let start = cutoff;
731                        if !sender.try_stream(start..=end).await {
732                            break;
733                        }
734                        processed += end - start;
735                    } else {
736                        discarded += end - start;
737                    }
738                }
739                other => {
740                    // Could be error or status
741                    if !sender.try_stream(other).await {
742                        break;
743                    }
744                }
745            }
746        }
747
748        info!(processed = processed, discarded = discarded, "Processed buffered messages");
749    }
750
751    async fn get_block_subscription(
752        provider: &impl Provider<N>,
753    ) -> Result<Subscription<N::HeaderResponse>, ScannerError> {
754        let ws_stream = provider
755            .subscribe_blocks()
756            .await
757            .map_err(|_| ScannerError::WebSocketConnectionFailed(1))?;
758
759        Ok(ws_stream)
760    }
761}
762
763async fn reorg_detected<N: Network>(
764    provider: &RootProvider<N>,
765    hash_to_check: B256,
766) -> Result<bool, RpcError<TransportErrorKind>> {
767    Ok(provider.get_block_by_hash(hash_to_check).await?.is_none())
768}
769
770pub struct BlockRangeScannerClient {
771    command_sender: mpsc::Sender<Command>,
772}
773
774impl BlockRangeScannerClient {
775    /// Creates a new subscription client.
776    ///
777    /// # Arguments
778    ///
779    /// * `command_sender` - The sender for sending commands to the subscription service.
780    #[must_use]
781    pub fn new(command_sender: mpsc::Sender<Command>) -> Self {
782        Self { command_sender }
783    }
784
785    /// Streams live blocks starting from the latest block.
786    ///
787    /// # Arguments
788    ///
789    /// * `block_confirmations` - Number of confirmations to apply once in live mode.
790    ///
791    /// # Errors
792    ///
793    /// * `ScannerError::ServiceShutdown` - if the service is already shutting down.
794    pub async fn stream_live(
795        &self,
796        block_confirmations: u64,
797    ) -> Result<ReceiverStream<Message>, ScannerError> {
798        let (blocks_sender, blocks_receiver) = mpsc::channel(MAX_BUFFERED_MESSAGES);
799        let (response_tx, response_rx) = oneshot::channel();
800
801        let command = Command::StreamLive {
802            sender: blocks_sender,
803            block_confirmations,
804            response: response_tx,
805        };
806
807        self.command_sender.send(command).await.map_err(|_| ScannerError::ServiceShutdown)?;
808
809        response_rx.await.map_err(|_| ScannerError::ServiceShutdown)??;
810
811        Ok(ReceiverStream::new(blocks_receiver))
812    }
813
814    /// Streams a batch of historical blocks from `start_height` to `end_height`.
815    ///
816    /// # Arguments
817    ///
818    /// * `start_height` - The starting block number or tag.
819    /// * `end_height` - The ending block number or tag.
820    ///
821    /// # Errors
822    ///
823    /// * `ScannerError::ServiceShutdown` - if the service is already shutting down.
824    pub async fn stream_historical(
825        &self,
826        start_height: impl Into<BlockNumberOrTag>,
827        end_height: impl Into<BlockNumberOrTag>,
828    ) -> Result<ReceiverStream<Message>, ScannerError> {
829        let (blocks_sender, blocks_receiver) = mpsc::channel(MAX_BUFFERED_MESSAGES);
830        let (response_tx, response_rx) = oneshot::channel();
831
832        let command = Command::StreamHistorical {
833            sender: blocks_sender,
834            start_height: start_height.into(),
835            end_height: end_height.into(),
836            response: response_tx,
837        };
838
839        self.command_sender.send(command).await.map_err(|_| ScannerError::ServiceShutdown)?;
840
841        response_rx.await.map_err(|_| ScannerError::ServiceShutdown)??;
842
843        Ok(ReceiverStream::new(blocks_receiver))
844    }
845
846    /// Streams blocks starting from `start_height` and transitions to live mode.
847    ///
848    /// # Arguments
849    ///
850    /// * `start_height` - The starting block number or tag.
851    /// * `block_confirmations` - Number of confirmations to apply once in live mode.
852    ///
853    /// # Errors
854    ///
855    /// * `ScannerError::ServiceShutdown` - if the service is already shutting down.
856    pub async fn stream_from(
857        &self,
858        start_height: impl Into<BlockNumberOrTag>,
859        block_confirmations: u64,
860    ) -> Result<ReceiverStream<Message>, ScannerError> {
861        let (blocks_sender, blocks_receiver) = mpsc::channel(MAX_BUFFERED_MESSAGES);
862        let (response_tx, response_rx) = oneshot::channel();
863
864        let command = Command::StreamFrom {
865            sender: blocks_sender,
866            start_height: start_height.into(),
867            block_confirmations,
868            response: response_tx,
869        };
870
871        self.command_sender.send(command).await.map_err(|_| ScannerError::ServiceShutdown)?;
872
873        response_rx.await.map_err(|_| ScannerError::ServiceShutdown)??;
874
875        Ok(ReceiverStream::new(blocks_receiver))
876    }
877
878    /// Streams blocks in reverse order from `start_height` to `end_height`.
879    ///
880    /// # Arguments
881    ///
882    /// * `start_height` - The starting block number or tag (defaults to Latest if None).
883    /// * `end_height` - The ending block number or tag (defaults to Earliest if None).
884    ///
885    /// # Errors
886    ///
887    /// * `ScannerError::ServiceShutdown` - if the service is already shutting down.
888    pub async fn rewind(
889        &self,
890        start_height: impl Into<BlockNumberOrTag>,
891        end_height: impl Into<BlockNumberOrTag>,
892    ) -> Result<ReceiverStream<Message>, ScannerError> {
893        let (blocks_sender, blocks_receiver) = mpsc::channel(MAX_BUFFERED_MESSAGES);
894        let (response_tx, response_rx) = oneshot::channel();
895
896        let command = Command::Rewind {
897            sender: blocks_sender,
898            start_height: start_height.into(),
899            end_height: end_height.into(),
900            response: response_tx,
901        };
902
903        self.command_sender.send(command).await.map_err(|_| ScannerError::ServiceShutdown)?;
904
905        response_rx.await.map_err(|_| ScannerError::ServiceShutdown)??;
906
907        Ok(ReceiverStream::new(blocks_receiver))
908    }
909}
910
911#[cfg(test)]
912mod tests {
913    use super::*;
914    use crate::{assert_closed, assert_empty, assert_next};
915    use alloy::{
916        network::Ethereum,
917        providers::{ProviderBuilder, ext::AnvilApi},
918        rpc::types::anvil::ReorgOptions,
919    };
920    use alloy_node_bindings::Anvil;
921    use tokio::sync::mpsc;
922    use tokio_stream::StreamExt;
923
924    #[test]
925    fn block_range_scanner_defaults_match_constants() {
926        let scanner = BlockRangeScanner::new();
927
928        assert_eq!(scanner.max_block_range, DEFAULT_MAX_BLOCK_RANGE);
929    }
930
931    #[test]
932    fn builder_methods_update_configuration() {
933        let max_block_range = 42;
934
935        let scanner = BlockRangeScanner::new().max_block_range(max_block_range);
936
937        assert_eq!(scanner.max_block_range, max_block_range);
938    }
939
940    #[tokio::test]
941    async fn live_mode_processes_all_blocks_respecting_block_confirmations() -> anyhow::Result<()> {
942        let anvil = Anvil::new().try_spawn()?;
943        let provider = ProviderBuilder::new().connect(anvil.endpoint().as_str()).await?;
944
945        // --- Zero block confirmations -> stream immediately ---
946
947        let client = BlockRangeScanner::new()
948            .connect_ws::<Ethereum>(anvil.ws_endpoint_url())
949            .await?
950            .run()?;
951
952        let mut stream = client.stream_live(0).await?;
953
954        provider.anvil_mine(Some(5), None).await?;
955
956        assert_next!(stream, 1..=1);
957        assert_next!(stream, 2..=2);
958        assert_next!(stream, 3..=3);
959        assert_next!(stream, 4..=4);
960        assert_next!(stream, 5..=5);
961        let mut stream = assert_empty!(stream);
962
963        provider.anvil_mine(Some(1), None).await?;
964
965        assert_next!(stream, 6..=6);
966        assert_empty!(stream);
967
968        // --- 1 block confirmation  ---
969
970        let mut stream = client.stream_live(1).await?;
971
972        provider.anvil_mine(Some(5), None).await?;
973
974        assert_next!(stream, 6..=6);
975        assert_next!(stream, 7..=7);
976        assert_next!(stream, 8..=8);
977        assert_next!(stream, 9..=9);
978        assert_next!(stream, 10..=10);
979        let mut stream = assert_empty!(stream);
980
981        provider.anvil_mine(Some(1), None).await?;
982
983        assert_next!(stream, 11..=11);
984        assert_empty!(stream);
985
986        Ok(())
987    }
988
989    #[tokio::test]
990    async fn stream_from_latest_starts_at_tip_not_confirmed() -> anyhow::Result<()> {
991        let anvil = Anvil::new().try_spawn()?;
992
993        let provider = ProviderBuilder::new().connect(anvil.endpoint().as_str()).await?;
994        provider.anvil_mine(Some(20), None).await?;
995
996        let block_confirmations = 5;
997
998        let client = BlockRangeScanner::new()
999            .connect_ws::<Ethereum>(anvil.ws_endpoint_url())
1000            .await?
1001            .run()?;
1002
1003        let stream = client.stream_from(BlockNumberOrTag::Latest, block_confirmations).await?;
1004
1005        let stream = assert_empty!(stream);
1006
1007        provider.anvil_mine(Some(4), None).await?;
1008
1009        let mut stream = assert_empty!(stream);
1010
1011        provider.anvil_mine(Some(1), None).await?;
1012
1013        assert_next!(stream, 20..=20);
1014        let mut stream = assert_empty!(stream);
1015
1016        provider.anvil_mine(Some(1), None).await?;
1017
1018        assert_next!(stream, 21..=21);
1019        assert_empty!(stream);
1020
1021        Ok(())
1022    }
1023
1024    #[tokio::test]
1025    #[ignore = "Flaky test, see: https://github.com/OpenZeppelin/Event-Scanner/issues/109"]
1026    async fn continuous_blocks_if_reorg_less_than_block_confirmation() -> anyhow::Result<()> {
1027        let anvil = Anvil::new().try_spawn()?;
1028
1029        let provider = ProviderBuilder::new().connect(anvil.endpoint().as_str()).await?;
1030
1031        let block_confirmations = 5;
1032
1033        let client = BlockRangeScanner::new()
1034            .connect_ws::<Ethereum>(anvil.ws_endpoint_url())
1035            .await?
1036            .run()?;
1037
1038        let mut receiver = client.stream_live(block_confirmations).await?;
1039
1040        provider.anvil_mine(Some(10), None).await?;
1041
1042        provider
1043            .anvil_reorg(ReorgOptions { depth: block_confirmations - 1, tx_block_pairs: vec![] })
1044            .await?;
1045
1046        provider.anvil_mine(Some(20), None).await?;
1047
1048        let mut block_range_start = 0;
1049
1050        let end_loop = 20;
1051        let mut i = 0;
1052        while let Some(Message::Data(range)) = receiver.next().await {
1053            if block_range_start == 0 {
1054                block_range_start = *range.start();
1055            }
1056
1057            assert_eq!(block_range_start, *range.start());
1058            assert!(range.end() >= range.start());
1059            block_range_start = *range.end() + 1;
1060            i += 1;
1061            if i == end_loop {
1062                break;
1063            }
1064        }
1065        Ok(())
1066    }
1067
1068    #[tokio::test]
1069    #[ignore = "Flaky test, see: https://github.com/OpenZeppelin/Event-Scanner/issues/109"]
1070    async fn shallow_block_confirmation_does_not_mitigate_reorg() -> anyhow::Result<()> {
1071        let anvil = Anvil::new().block_time(1).try_spawn()?;
1072
1073        let provider = ProviderBuilder::new().connect(anvil.endpoint().as_str()).await?;
1074
1075        let block_confirmations = 3;
1076
1077        let client = BlockRangeScanner::new()
1078            .connect_ws::<Ethereum>(anvil.ws_endpoint_url())
1079            .await?
1080            .run()?;
1081
1082        let mut receiver = client.stream_live(block_confirmations).await?;
1083
1084        provider.anvil_mine(Some(10), None).await?;
1085
1086        provider
1087            .anvil_reorg(ReorgOptions { depth: block_confirmations + 5, tx_block_pairs: vec![] })
1088            .await?;
1089
1090        provider.anvil_mine(Some(30), None).await?;
1091        receiver.close();
1092
1093        let mut block_range_start = 0;
1094
1095        let mut block_num = vec![];
1096        let mut reorg_detected = false;
1097        while let Some(msg) = receiver.next().await {
1098            match msg {
1099                Message::Data(range) => {
1100                    if block_range_start == 0 {
1101                        block_range_start = *range.start();
1102                    }
1103                    block_num.push(range);
1104                    if block_num.len() == 15 {
1105                        break;
1106                    }
1107                }
1108                Message::Status(ScannerStatus::ReorgDetected) => {
1109                    reorg_detected = true;
1110                }
1111                _ => {
1112                    break;
1113                }
1114            }
1115        }
1116        assert!(reorg_detected, "Reorg should have been detected");
1117
1118        // Generally check that there is a reorg in the range i.e.
1119        //                                                        REORG
1120        // [0..=0, 1..=1, 2..=2, 3..=3, 4..=4, 5..=5, 6..=6, 7..=7, 3..=3, 4..=4, 5..=5, 6..=6,
1121        // 7..=7, 8..=8, 9..=9] (Less flaky to assert this way)
1122        let mut found_reorg_pattern = false;
1123        for window in block_num.windows(2) {
1124            if window[1].start() < window[0].end() {
1125                found_reorg_pattern = true;
1126                break;
1127            }
1128        }
1129        assert!(found_reorg_pattern,);
1130
1131        Ok(())
1132    }
1133
1134    #[tokio::test]
1135    #[ignore = "too flaky, un-ignore once a full local node is used: https://github.com/OpenZeppelin/Event-Scanner/issues/109"]
1136    async fn historical_emits_correction_range_when_reorg_below_end() -> anyhow::Result<()> {
1137        let anvil = Anvil::new().try_spawn()?;
1138        let provider = ProviderBuilder::new().connect(anvil.ws_endpoint_url().as_str()).await?;
1139
1140        provider.anvil_mine(Some(120), None).await?;
1141
1142        let end_num = 110;
1143
1144        let client = BlockRangeScanner::new()
1145            .max_block_range(30)
1146            .connect_ws::<Ethereum>(anvil.ws_endpoint_url())
1147            .await?
1148            .run()?;
1149
1150        let mut stream = client
1151            .stream_historical(BlockNumberOrTag::Number(0), BlockNumberOrTag::Number(end_num))
1152            .await?;
1153
1154        let depth = 15;
1155        _ = provider.anvil_reorg(ReorgOptions { depth, tx_block_pairs: vec![] }).await;
1156        _ = provider.anvil_mine(Some(20), None).await;
1157
1158        assert_next!(stream, 0..=29);
1159        assert_next!(stream, 30..=59);
1160        assert_next!(stream, 60..=89);
1161        assert_next!(stream, 90..=110);
1162        assert_next!(stream, ScannerStatus::ReorgDetected);
1163        assert_next!(stream, 105..=110);
1164        assert_closed!(stream);
1165
1166        Ok(())
1167    }
1168
1169    #[tokio::test]
1170    #[ignore = "too flaky, un-ignore once a full local node is used: https://github.com/OpenZeppelin/Event-Scanner/issues/109"]
1171    async fn historical_emits_correction_range_when_end_num_reorgs() -> anyhow::Result<()> {
1172        let anvil = Anvil::new().try_spawn()?;
1173        let provider = ProviderBuilder::new().connect(anvil.ws_endpoint_url().as_str()).await?;
1174
1175        provider.anvil_mine(Some(120), None).await?;
1176
1177        let end_num = 120;
1178
1179        let client = BlockRangeScanner::new()
1180            .max_block_range(30)
1181            .connect_ws::<Ethereum>(anvil.ws_endpoint_url())
1182            .await?
1183            .run()?;
1184
1185        let mut stream = client
1186            .stream_historical(BlockNumberOrTag::Number(0), BlockNumberOrTag::Number(end_num))
1187            .await?;
1188
1189        let pre_reorg_mine = 20;
1190        _ = provider.anvil_mine(Some(pre_reorg_mine), None).await;
1191        let depth = pre_reorg_mine + 1;
1192        _ = provider.anvil_reorg(ReorgOptions { depth, tx_block_pairs: vec![] }).await;
1193        _ = provider.anvil_mine(Some(20), None).await;
1194
1195        assert_next!(stream, 0..=29);
1196        assert_next!(stream, 30..=59);
1197        assert_next!(stream, 60..=89);
1198        assert_next!(stream, 90..=120);
1199        assert_next!(stream, ScannerStatus::ReorgDetected);
1200        assert_next!(stream, 120..=120);
1201        assert_closed!(stream);
1202
1203        Ok(())
1204    }
1205
1206    #[tokio::test]
1207    async fn historic_mode_respects_blocks_read_per_epoch() -> anyhow::Result<()> {
1208        let anvil = Anvil::new().try_spawn()?;
1209
1210        let provider = ProviderBuilder::new().connect(anvil.endpoint().as_str()).await?;
1211
1212        provider.anvil_mine(Some(100), None).await?;
1213
1214        let client = BlockRangeScanner::new()
1215            .max_block_range(5)
1216            .connect_ws::<Ethereum>(anvil.ws_endpoint_url())
1217            .await?
1218            .run()?;
1219
1220        // ranges where each batch is of max blocks per epoch size
1221        let mut stream = client.stream_historical(0, 19).await?;
1222        assert_next!(stream, 0..=4);
1223        assert_next!(stream, 5..=9);
1224        assert_next!(stream, 10..=14);
1225        assert_next!(stream, 15..=19);
1226        assert_closed!(stream);
1227
1228        // ranges where last batch is smaller than blocks per epoch
1229        let mut stream = client.stream_historical(93, 99).await?;
1230        assert_next!(stream, 93..=97);
1231        assert_next!(stream, 98..=99);
1232        assert_closed!(stream);
1233
1234        // range where blocks per epoch is larger than the number of blocks in the range
1235        let mut stream = client.stream_historical(3, 5).await?;
1236        assert_next!(stream, 3..=5);
1237        assert_closed!(stream);
1238
1239        // single item range
1240        let mut stream = client.stream_historical(3, 3).await?;
1241        assert_next!(stream, 3..=3);
1242        assert_closed!(stream);
1243
1244        // range where blocks per epoch is larger than the number of blocks on chain
1245        let client = BlockRangeScanner::new()
1246            .max_block_range(200)
1247            .connect_ws::<Ethereum>(anvil.ws_endpoint_url())
1248            .await?
1249            .run()?;
1250
1251        let mut stream = client.stream_historical(0, 20).await?;
1252        assert_next!(stream, 0..=20);
1253        assert_closed!(stream);
1254
1255        let mut stream = client.stream_historical(0, 99).await?;
1256        assert_next!(stream, 0..=99);
1257        assert_closed!(stream);
1258
1259        Ok(())
1260    }
1261
1262    #[tokio::test]
1263    async fn historic_mode_normalises_start_and_end_block() -> anyhow::Result<()> {
1264        let anvil = Anvil::new().try_spawn()?;
1265
1266        let provider = ProviderBuilder::new().connect(anvil.endpoint().as_str()).await?;
1267        provider.anvil_mine(Some(11), None).await?;
1268
1269        let client = BlockRangeScanner::new()
1270            .max_block_range(5)
1271            .connect_ws::<Ethereum>(anvil.ws_endpoint_url())
1272            .await?
1273            .run()?;
1274
1275        let mut stream = client.stream_historical(10, 0).await?;
1276        assert_next!(stream, 0..=4);
1277        assert_next!(stream, 5..=9);
1278        assert_next!(stream, 10..=10);
1279        assert_closed!(stream);
1280
1281        Ok(())
1282    }
1283
1284    #[tokio::test]
1285    async fn buffered_messages_after_cutoff_are_all_passed() {
1286        let cutoff = 50;
1287        let (buffer_tx, buffer_rx) = mpsc::channel(8);
1288        buffer_tx.send(Message::Data(51..=55)).await.unwrap();
1289        buffer_tx.send(Message::Data(56..=60)).await.unwrap();
1290        buffer_tx.send(Message::Data(61..=70)).await.unwrap();
1291        drop(buffer_tx);
1292
1293        let (out_tx, out_rx) = mpsc::channel(8);
1294        Service::<Ethereum>::process_live_block_buffer(buffer_rx, out_tx, cutoff).await;
1295
1296        let mut stream = ReceiverStream::new(out_rx);
1297
1298        assert_next!(stream, 51..=55);
1299        assert_next!(stream, 56..=60);
1300        assert_next!(stream, 61..=70);
1301        assert_closed!(stream);
1302    }
1303
1304    #[tokio::test]
1305    async fn ranges_entirely_before_cutoff_are_discarded() {
1306        let cutoff = 100;
1307
1308        let (buffer_tx, buffer_rx) = mpsc::channel(8);
1309        buffer_tx.send(Message::Data(40..=50)).await.unwrap();
1310        buffer_tx.send(Message::Data(51..=60)).await.unwrap();
1311        buffer_tx.send(Message::Data(61..=70)).await.unwrap();
1312        drop(buffer_tx);
1313
1314        let (out_tx, out_rx) = mpsc::channel(8);
1315        Service::<Ethereum>::process_live_block_buffer(buffer_rx, out_tx, cutoff).await;
1316
1317        let mut stream = ReceiverStream::new(out_rx);
1318
1319        assert_closed!(stream);
1320    }
1321
1322    #[tokio::test]
1323    async fn ranges_overlapping_cutoff_are_trimmed() {
1324        let cutoff = 75;
1325
1326        let (buffer_tx, buffer_rx) = mpsc::channel(8);
1327        buffer_tx.send(Message::Data(60..=70)).await.unwrap();
1328        buffer_tx.send(Message::Data(71..=80)).await.unwrap();
1329        buffer_tx.send(Message::Data(81..=86)).await.unwrap();
1330        drop(buffer_tx);
1331
1332        let (out_tx, out_rx) = mpsc::channel(8);
1333        Service::<Ethereum>::process_live_block_buffer(buffer_rx, out_tx, cutoff).await;
1334
1335        let mut stream = ReceiverStream::new(out_rx);
1336
1337        assert_next!(stream, 75..=80);
1338        assert_next!(stream, 81..=86);
1339        assert_closed!(stream);
1340    }
1341
1342    #[tokio::test]
1343    async fn edge_case_range_exactly_at_cutoff() {
1344        let cutoff = 100;
1345
1346        let (buffer_tx, buffer_rx) = mpsc::channel(8);
1347        buffer_tx.send(Message::Data(98..=98)).await.unwrap(); // Just before: discard
1348        buffer_tx.send(Message::Data(99..=100)).await.unwrap(); // Includes cutoff: trim to 100..=100
1349        buffer_tx.send(Message::Data(100..=100)).await.unwrap(); // Exactly at: forward
1350        buffer_tx.send(Message::Data(100..=101)).await.unwrap(); // Starts at cutoff: forward
1351        buffer_tx.send(Message::Data(102..=102)).await.unwrap(); // After cutoff: forward
1352        drop(buffer_tx);
1353
1354        let (out_tx, out_rx) = mpsc::channel(8);
1355        Service::<Ethereum>::process_live_block_buffer(buffer_rx, out_tx, cutoff).await;
1356
1357        let mut stream = ReceiverStream::new(out_rx);
1358
1359        assert_next!(stream, 100..=100);
1360        assert_next!(stream, 100..=100);
1361        assert_next!(stream, 100..=101);
1362        assert_next!(stream, 102..=102);
1363        assert_closed!(stream);
1364    }
1365
1366    #[tokio::test]
1367    async fn try_send_forwards_errors_to_subscribers() {
1368        let (tx, mut rx) = mpsc::channel(1);
1369
1370        _ = tx.try_stream(ScannerError::WebSocketConnectionFailed(4)).await;
1371
1372        assert!(matches!(
1373            rx.recv().await,
1374            Some(Message::Error(ScannerError::WebSocketConnectionFailed(4)))
1375        ));
1376    }
1377
1378    #[tokio::test]
1379    async fn rewind_single_batch_when_epoch_larger_than_range() -> anyhow::Result<()> {
1380        let anvil = Anvil::new().try_spawn()?;
1381
1382        let provider = ProviderBuilder::new().connect(anvil.endpoint().as_str()).await?;
1383
1384        provider.anvil_mine(Some(150), None).await?;
1385
1386        let client = BlockRangeScanner::new()
1387            .max_block_range(100)
1388            .connect_ws::<Ethereum>(anvil.ws_endpoint_url())
1389            .await?
1390            .run()?;
1391
1392        let mut stream = client.rewind(100, 150).await?;
1393
1394        // Range length is 51, epoch is 100 -> single batch [100..=150]
1395        assert_next!(stream, 100..=150);
1396        assert_closed!(stream);
1397
1398        Ok(())
1399    }
1400
1401    #[tokio::test]
1402    async fn rewind_exact_multiple_of_epoch_creates_full_batches_in_reverse() -> anyhow::Result<()>
1403    {
1404        let anvil = Anvil::new().try_spawn()?;
1405
1406        let provider = ProviderBuilder::new().connect(anvil.endpoint().as_str()).await?;
1407
1408        provider.anvil_mine(Some(15), None).await?;
1409
1410        let client = BlockRangeScanner::new()
1411            .max_block_range(5)
1412            .connect_ws::<Ethereum>(anvil.ws_endpoint_url())
1413            .await?
1414            .run()?;
1415
1416        let mut stream = client.rewind(0, 14).await?;
1417
1418        // 0..=14 with epoch 5 -> [10..=14, 5..=9, 0..=4]
1419        assert_next!(stream, 10..=14);
1420        assert_next!(stream, 5..=9);
1421        assert_next!(stream, 0..=4);
1422        assert_closed!(stream);
1423
1424        Ok(())
1425    }
1426
1427    #[tokio::test]
1428    async fn rewind_with_remainder_trims_first_batch_to_stream_start() -> anyhow::Result<()> {
1429        let anvil = Anvil::new().try_spawn()?;
1430
1431        let provider = ProviderBuilder::new().connect(anvil.endpoint().as_str()).await?;
1432
1433        provider.anvil_mine(Some(15), None).await?;
1434
1435        let client = BlockRangeScanner::new()
1436            .max_block_range(4)
1437            .connect_ws::<Ethereum>(anvil.ws_endpoint_url())
1438            .await?
1439            .run()?;
1440
1441        let mut stream = client.rewind(3, 12).await?;
1442
1443        // 3..=12 with epoch 4 -> ends: 12,8,4 -> batches: [9..=12, 5..=8, 3..=4]
1444        assert_next!(stream, 9..=12);
1445        assert_next!(stream, 5..=8);
1446        assert_next!(stream, 3..=4);
1447        assert_closed!(stream);
1448
1449        Ok(())
1450    }
1451
1452    #[tokio::test]
1453    async fn rewind_single_block_range() -> anyhow::Result<()> {
1454        let anvil = Anvil::new().try_spawn()?;
1455
1456        let provider = ProviderBuilder::new().connect(anvil.endpoint().as_str()).await?;
1457
1458        provider.anvil_mine(Some(15), None).await?;
1459
1460        let client = BlockRangeScanner::new()
1461            .max_block_range(5)
1462            .connect_ws::<Ethereum>(anvil.ws_endpoint_url())
1463            .await?
1464            .run()?;
1465
1466        let mut stream = client.rewind(7, 7).await?;
1467
1468        assert_next!(stream, 7..=7);
1469        assert_closed!(stream);
1470
1471        Ok(())
1472    }
1473
1474    #[tokio::test]
1475    async fn rewind_epoch_of_one_sends_each_block_in_reverse_order() -> anyhow::Result<()> {
1476        let anvil = Anvil::new().try_spawn()?;
1477
1478        let provider = ProviderBuilder::new().connect(anvil.endpoint().as_str()).await?;
1479
1480        provider.anvil_mine(Some(15), None).await?;
1481
1482        let client = BlockRangeScanner::new()
1483            .max_block_range(1)
1484            .connect_ws::<Ethereum>(anvil.ws_endpoint_url())
1485            .await?
1486            .run()?;
1487
1488        let mut stream = client.rewind(5, 8).await?;
1489
1490        // 5..=8 with epoch 1 -> [8..=8, 7..=7, 6..=6, 5..=5]
1491        assert_next!(stream, 8..=8);
1492        assert_next!(stream, 7..=7);
1493        assert_next!(stream, 6..=6);
1494        assert_next!(stream, 5..=5);
1495        assert_closed!(stream);
1496
1497        Ok(())
1498    }
1499
1500    #[tokio::test]
1501    async fn command_rewind_defaults_latest_to_earliest_batches_correctly() -> anyhow::Result<()> {
1502        let anvil = Anvil::new().try_spawn()?;
1503
1504        let provider = ProviderBuilder::new().connect(anvil.endpoint().as_str()).await?;
1505        // Mine 20 blocks, so the total number of blocks is 21 (including 0th block)
1506        provider.anvil_mine(Some(20), None).await?;
1507
1508        let client = BlockRangeScanner::new()
1509            .max_block_range(7)
1510            .connect_ws::<Ethereum>(anvil.ws_endpoint_url())
1511            .await?
1512            .run()?;
1513
1514        let mut stream =
1515            client.rewind(BlockNumberOrTag::Earliest, BlockNumberOrTag::Latest).await?;
1516
1517        assert_next!(stream, 14..=20);
1518        assert_next!(stream, 7..=13);
1519        assert_next!(stream, 0..=6);
1520        assert_closed!(stream);
1521
1522        Ok(())
1523    }
1524
1525    #[tokio::test]
1526    async fn command_rewind_handles_start_and_end_in_any_order() -> anyhow::Result<()> {
1527        let anvil = Anvil::new().try_spawn()?;
1528
1529        let provider = ProviderBuilder::new().connect(anvil.endpoint().as_str()).await?;
1530        // Ensure blocks at 3 and 15 exist
1531        provider.anvil_mine(Some(16), None).await?;
1532
1533        let client = BlockRangeScanner::new()
1534            .max_block_range(5)
1535            .connect_ws::<Ethereum>(anvil.ws_endpoint_url())
1536            .await?
1537            .run()?;
1538
1539        let mut stream = client.rewind(15, 3).await?;
1540
1541        assert_next!(stream, 11..=15);
1542        assert_next!(stream, 6..=10);
1543        assert_next!(stream, 3..=5);
1544        assert_closed!(stream);
1545
1546        let mut stream = client.rewind(3, 15).await?;
1547
1548        assert_next!(stream, 11..=15);
1549        assert_next!(stream, 6..=10);
1550        assert_next!(stream, 3..=5);
1551        assert_closed!(stream);
1552
1553        Ok(())
1554    }
1555
1556    #[tokio::test]
1557    async fn command_rewind_propagates_block_not_found_error() -> anyhow::Result<()> {
1558        let anvil = Anvil::new().try_spawn()?;
1559
1560        // Do not mine up to 999 so start won't exist
1561        let client = BlockRangeScanner::new()
1562            .max_block_range(5)
1563            .connect_ws::<Ethereum>(anvil.ws_endpoint_url())
1564            .await?
1565            .run()?;
1566
1567        let stream = client.rewind(0, 999).await;
1568
1569        assert!(matches!(stream, Err(ScannerError::BlockNotFound(BlockNumberOrTag::Number(999)))));
1570
1571        Ok(())
1572    }
1573}