event_scanner/
block_range_scanner.rs

1//! Example usage:
2//!
3//! ```rust,no_run
4//! use alloy::{eips::BlockNumberOrTag, network::Ethereum, primitives::BlockNumber};
5//! use std::ops::RangeInclusive;
6//! use tokio_stream::{StreamExt, wrappers::ReceiverStream};
7//!
8//! use alloy::transports::http::reqwest::Url;
9//! use event_scanner::block_range_scanner::{
10//!     BlockRangeMessage, BlockRangeScanner, BlockRangeScannerClient, BlockRangeScannerError,
11//! };
12//! use tokio::time::Duration;
13//! use tracing::{error, info};
14//!
15//! #[tokio::main]
16//! async fn main() -> Result<(), Box<dyn std::error::Error>> {
17//!     // Initialize logging
18//!     tracing_subscriber::fmt::init();
19//!
20//!     // Configuration
21//!     let block_range_scanner = BlockRangeScanner::new()
22//!         .with_blocks_read_per_epoch(1000)
23//!         .with_reorg_rewind_depth(5)
24//!         .with_block_confirmations(5)
25//!         .connect_ws::<Ethereum>(Url::parse("ws://localhost:8546").unwrap())
26//!         .await?;
27//!
28//!     // Create client to send subscribe command to block scanner
29//!     let client: BlockRangeScannerClient = block_range_scanner.run()?;
30//!
31//!     let mut stream = client.stream_live().await?;
32//!
33//!     while let Some(message) = stream.next().await {
34//!         match message {
35//!             BlockRangeMessage::Data(range) => {
36//!                 // process range
37//!             }
38//!             BlockRangeMessage::Error(e) => {
39//!                 error!("Received error from subscription: {e}");
40//!
41//!                 // Decide whether to continue or break based on error type
42//!                 match e {
43//!                     BlockRangeScannerError::ServiceShutdown => break,
44//!                     BlockRangeScannerError::WebSocketConnectionFailed(_) => {
45//!                         // Maybe implement backoff and retry logic here
46//!                         error!(
47//!                             "WebSocket connection failed, continuing to listen for reconnection"
48//!                         );
49//!                     }
50//!                     _ => {
51//!                         // Continue processing for other errors
52//!                         error!("Non-fatal error, continuing: {e}");
53//!                     }
54//!                 }
55//!             }
56//!             BlockRangeMessage::Status(status) => {
57//!                 info!("Received status message: {:?}", status);
58//!             }
59//!         }
60//!     }
61//!
62//!     info!("Data processing stopped.");
63//!
64//!     Ok(())
65//! }
66//! ```
67
68use std::{ops::RangeInclusive, sync::Arc};
69
70use tokio::sync::{mpsc, oneshot};
71use tokio_stream::{StreamExt, wrappers::ReceiverStream};
72
73use crate::types::{ScannerMessage, ScannerStatus};
74use alloy::{
75    consensus::BlockHeader,
76    eips::BlockNumberOrTag,
77    network::{BlockResponse, Network, primitives::HeaderResponse},
78    primitives::{BlockHash, BlockNumber},
79    providers::{Provider, RootProvider},
80    pubsub::Subscription,
81    rpc::client::ClientBuilder,
82    transports::{
83        RpcError, TransportErrorKind, TransportResult,
84        http::reqwest::{self, Url},
85        ws::WsConnect,
86    },
87};
88use thiserror::Error;
89use tracing::{debug, error, info, warn};
90
91pub const DEFAULT_BLOCKS_READ_PER_EPOCH: usize = 1000;
92// copied form https://github.com/taikoxyz/taiko-mono/blob/f4b3a0e830e42e2fee54829326389709dd422098/packages/taiko-client/pkg/chain_iterator/block_batch_iterator.go#L19
93pub const DEFAULT_BLOCK_CONFIRMATIONS: u64 = 0;
94// const BACK_OFF_MAX_RETRIES: u64 = 5;
95
96pub const MAX_BUFFERED_MESSAGES: usize = 50000;
97
98pub const DEFAULT_REORG_REWIND_DEPTH: u64 = 0;
99
100// // State sync aware retry settings
101// const STATE_SYNC_RETRY_INTERVAL: Duration = Duration::from_secs(30);
102// const STATE_SYNC_MAX_RETRIES: u64 = 12;
103
104pub type BlockRangeMessage = ScannerMessage<RangeInclusive<BlockNumber>, BlockRangeScannerError>;
105
106#[derive(Error, Debug, Clone)]
107pub enum BlockRangeScannerError {
108    #[error("HTTP request failed: {0}")]
109    HttpError(Arc<reqwest::Error>),
110
111    // #[error("WebSocket error: {0}")]
112    // WebSocketError(#[from] tokio_tungstenite::tungstenite::Error),
113    #[error("Serialization error: {0}")]
114    SerializationError(Arc<serde_json::Error>),
115
116    #[error("RPC error: {0}")]
117    RpcError(Arc<RpcError<TransportErrorKind>>),
118
119    #[error("Channel send error")]
120    ChannelError,
121
122    #[error("Service is shutting down")]
123    ServiceShutdown,
124
125    #[error("Only one subscriber allowed at a time")]
126    MultipleSubscribers,
127
128    #[error("No subscriber set for streaming")]
129    NoSubscriber,
130
131    #[error("Historical sync failed: {0}")]
132    HistoricalSyncError(String),
133
134    #[error("WebSocket connection failed after {0} attempts")]
135    WebSocketConnectionFailed(usize),
136
137    #[error("Block not found, block number: {0}")]
138    BlockNotFound(BlockNumberOrTag),
139}
140
141impl From<reqwest::Error> for BlockRangeScannerError {
142    fn from(error: reqwest::Error) -> Self {
143        BlockRangeScannerError::HttpError(Arc::new(error))
144    }
145}
146
147impl From<serde_json::Error> for BlockRangeScannerError {
148    fn from(error: serde_json::Error) -> Self {
149        BlockRangeScannerError::SerializationError(Arc::new(error))
150    }
151}
152
153impl From<RpcError<TransportErrorKind>> for BlockRangeScannerError {
154    fn from(error: RpcError<TransportErrorKind>) -> Self {
155        BlockRangeScannerError::RpcError(Arc::new(error))
156    }
157}
158
159#[derive(Debug)]
160pub enum Command {
161    StreamLive {
162        sender: mpsc::Sender<BlockRangeMessage>,
163        response: oneshot::Sender<Result<(), BlockRangeScannerError>>,
164    },
165    StreamHistorical {
166        sender: mpsc::Sender<BlockRangeMessage>,
167        start_height: BlockNumberOrTag,
168        end_height: BlockNumberOrTag,
169        response: oneshot::Sender<Result<(), BlockRangeScannerError>>,
170    },
171    StreamFrom {
172        sender: mpsc::Sender<BlockRangeMessage>,
173        start_height: BlockNumberOrTag,
174        response: oneshot::Sender<Result<(), BlockRangeScannerError>>,
175    },
176    Unsubscribe {
177        response: oneshot::Sender<Result<(), BlockRangeScannerError>>,
178    },
179    Shutdown {
180        response: oneshot::Sender<Result<(), BlockRangeScannerError>>,
181    },
182}
183
184#[derive(Default, Debug, Clone)]
185pub struct BlockHashAndNumber {
186    pub hash: BlockHash,
187    pub number: BlockNumber,
188}
189
190impl BlockHashAndNumber {
191    fn from_header<N: Network>(header: &N::HeaderResponse) -> Self {
192        Self { hash: header.hash(), number: header.number() }
193    }
194}
195
196#[derive(Clone)]
197struct Config {
198    blocks_read_per_epoch: usize,
199    reorg_rewind_depth: u64,
200    #[allow(
201        dead_code,
202        reason = "Will be used in reorg mechanism: https://github.com/OpenZeppelin/Event-Scanner/issues/5"
203    )]
204    block_confirmations: u64,
205}
206
207pub struct BlockRangeScanner {
208    blocks_read_per_epoch: usize,
209    reorg_rewind_depth: u64,
210    block_confirmations: u64,
211}
212
213impl Default for BlockRangeScanner {
214    fn default() -> Self {
215        Self::new()
216    }
217}
218
219impl BlockRangeScanner {
220    #[must_use]
221    pub fn new() -> Self {
222        Self {
223            blocks_read_per_epoch: DEFAULT_BLOCKS_READ_PER_EPOCH,
224            reorg_rewind_depth: DEFAULT_REORG_REWIND_DEPTH,
225            block_confirmations: DEFAULT_BLOCK_CONFIRMATIONS,
226        }
227    }
228
229    #[must_use]
230    pub fn with_blocks_read_per_epoch(mut self, blocks_read_per_epoch: usize) -> Self {
231        self.blocks_read_per_epoch = blocks_read_per_epoch;
232        self
233    }
234
235    #[must_use]
236    pub fn with_reorg_rewind_depth(mut self, reorg_rewind_depth: u64) -> Self {
237        self.reorg_rewind_depth = reorg_rewind_depth;
238        self
239    }
240
241    #[must_use]
242    pub fn with_block_confirmations(mut self, block_confirmations: u64) -> Self {
243        self.block_confirmations = block_confirmations;
244        self
245    }
246
247    /// Connects to the provider via WebSocket
248    ///
249    /// # Errors
250    ///
251    /// Returns an error if the connection fails
252    pub async fn connect_ws<N: Network>(
253        self,
254        ws_url: Url,
255    ) -> TransportResult<ConnectedBlockRangeScanner<N>> {
256        let provider =
257            RootProvider::<N>::new(ClientBuilder::default().ws(WsConnect::new(ws_url)).await?);
258        self.connect_provider(provider)
259    }
260
261    /// Connects to the provider via IPC
262    ///
263    /// # Errors
264    ///
265    /// Returns an error if the connection fails
266    pub async fn connect_ipc<N: Network>(
267        self,
268        ipc_path: String,
269    ) -> TransportResult<ConnectedBlockRangeScanner<N>> {
270        let provider = RootProvider::<N>::new(ClientBuilder::default().ipc(ipc_path.into()).await?);
271        self.connect_provider(provider)
272    }
273
274    /// Connects to an existing provider
275    ///
276    /// # Errors
277    ///
278    /// Returns an error if the connection fails
279    pub fn connect_provider<N: Network>(
280        self,
281        provider: RootProvider<N>,
282    ) -> TransportResult<ConnectedBlockRangeScanner<N>> {
283        Ok(ConnectedBlockRangeScanner {
284            provider,
285            config: Config {
286                blocks_read_per_epoch: self.blocks_read_per_epoch,
287                reorg_rewind_depth: self.reorg_rewind_depth,
288                block_confirmations: self.block_confirmations,
289            },
290        })
291    }
292}
293
294pub struct ConnectedBlockRangeScanner<N: Network> {
295    provider: RootProvider<N>,
296    config: Config,
297}
298
299impl<N: Network> ConnectedBlockRangeScanner<N> {
300    /// Returns the underlying Provider.
301    #[must_use]
302    pub fn provider(&self) -> &RootProvider<N> {
303        &self.provider
304    }
305
306    /// Starts the subscription service and returns a client for sending commands.
307    ///
308    /// # Errors
309    ///
310    /// Returns an error if the subscription service fails to start.
311    pub fn run(&self) -> Result<BlockRangeScannerClient, BlockRangeScannerError> {
312        let (service, cmd_tx) = Service::new(self.config.clone(), self.provider.clone());
313        tokio::spawn(async move {
314            service.run().await;
315        });
316        Ok(BlockRangeScannerClient::new(cmd_tx))
317    }
318}
319
320struct Service<N: Network> {
321    config: Config,
322    provider: RootProvider<N>,
323    subscriber: Option<mpsc::Sender<BlockRangeMessage>>,
324    next_start_block: BlockHashAndNumber,
325    websocket_connected: bool,
326    processed_count: u64,
327    error_count: u64,
328    command_receiver: mpsc::Receiver<Command>,
329    shutdown: bool,
330}
331
332impl<N: Network> Service<N> {
333    pub fn new(config: Config, provider: RootProvider<N>) -> (Self, mpsc::Sender<Command>) {
334        let (cmd_tx, cmd_rx) = mpsc::channel(100);
335
336        let service = Self {
337            config,
338            provider,
339            subscriber: None,
340            next_start_block: BlockHashAndNumber::default(),
341            websocket_connected: false,
342            processed_count: 0,
343            error_count: 0,
344            command_receiver: cmd_rx,
345            shutdown: false,
346        };
347
348        (service, cmd_tx)
349    }
350
351    pub async fn run(mut self) {
352        info!("Starting subscription service");
353
354        while !self.shutdown {
355            tokio::select! {
356                cmd = self.command_receiver.recv() => {
357                    if let Some(command) = cmd {
358                        if let Err(e) = self.handle_command(command).await {
359                            error!("Command handling error: {}", e);
360                            self.error_count += 1;
361                        }
362                    } else {
363                        info!("Command channel closed, shutting down");
364                        break;
365                    }
366                }
367            }
368        }
369
370        info!("Subscription service stopped");
371    }
372
373    async fn handle_command(&mut self, command: Command) -> Result<(), BlockRangeScannerError> {
374        match command {
375            Command::StreamLive { sender, response } => {
376                self.ensure_no_subscriber()?;
377                info!("Starting live stream");
378                self.subscriber = Some(sender);
379                let result = self.handle_live().await;
380                let _ = response.send(result);
381            }
382            Command::StreamHistorical { sender, start_height, end_height, response } => {
383                self.ensure_no_subscriber()?;
384                info!(start_height = ?start_height, end_height = ?end_height, "Starting historical stream");
385                self.subscriber = Some(sender);
386                let result = self.handle_historical(start_height, end_height).await;
387                let _ = response.send(result);
388            }
389            Command::StreamFrom { sender, start_height, response } => {
390                self.ensure_no_subscriber()?;
391                self.subscriber = Some(sender);
392                info!(start_height = ?start_height, "Starting streaming from");
393                let result = self.handle_sync(start_height).await;
394                let _ = response.send(result);
395            }
396            Command::Unsubscribe { response } => {
397                self.handle_unsubscribe();
398                let _ = response.send(Ok(()));
399            }
400            Command::Shutdown { response } => {
401                self.shutdown = true;
402                self.handle_unsubscribe();
403                let _ = response.send(Ok(()));
404            }
405        }
406        Ok(())
407    }
408
409    async fn handle_live(&mut self) -> Result<(), BlockRangeScannerError> {
410        let Some(sender) = self.subscriber.clone() else {
411            return Err(BlockRangeScannerError::ServiceShutdown);
412        };
413
414        let block_confirmations = self.config.block_confirmations;
415        let provider = self.provider.clone();
416        let latest = self.provider.get_block_number().await?;
417
418        // the next block returned by the underlying subscription will always be `latest + 1`,
419        // because `latest` was already mined and subscription by definition only streams after new
420        // blocks have been mined
421        let range_start = (latest + 1).saturating_sub(block_confirmations);
422
423        tokio::spawn(async move {
424            Self::stream_live_blocks(range_start, provider, sender, block_confirmations).await;
425        });
426
427        Ok(())
428    }
429
430    async fn handle_historical(
431        &mut self,
432        start_height: BlockNumberOrTag,
433        end_height: BlockNumberOrTag,
434    ) -> Result<(), BlockRangeScannerError> {
435        let start_block = self.provider.get_block_by_number(start_height).await?.ok_or(
436            BlockRangeScannerError::HistoricalSyncError(format!(
437                "Start block {start_height:?} not found"
438            )),
439        )?;
440        let end_block = self.provider.get_block_by_number(end_height).await?.ok_or(
441            BlockRangeScannerError::HistoricalSyncError(format!(
442                "End block {end_height:?} not found"
443            )),
444        )?;
445
446        if end_block.header().number() < start_block.header().number() {
447            return Err(BlockRangeScannerError::HistoricalSyncError(format!(
448                "End block {end_height:?} is lower than start block {start_height:?}"
449            )));
450        }
451
452        info!(
453            start_block = start_block.header().number(),
454            end_block = end_block.header().number(),
455            "Syncing historical data"
456        );
457
458        self.sync_historical_data(start_block, end_block).await?;
459
460        _ = self.subscriber.take();
461
462        info!("Successfully synced historical data, closing the stream");
463
464        Ok(())
465    }
466
467    async fn handle_sync(
468        &mut self,
469        start_height: BlockNumberOrTag,
470    ) -> Result<(), BlockRangeScannerError> {
471        // Step 1:
472        // Fetches the starting block and end block for historical sync
473        let start_block = self.provider.get_block_by_number(start_height).await?.ok_or(
474            BlockRangeScannerError::HistoricalSyncError(format!(
475                "Start block {start_height:?} not found"
476            )),
477        )?;
478
479        let latest_block =
480            self.provider.get_block_by_number(BlockNumberOrTag::Latest).await?.ok_or(
481                BlockRangeScannerError::HistoricalSyncError("Latest block not found".to_string()),
482            )?;
483
484        let block_confirmations = self.config.block_confirmations;
485        let confirmed_tip_num = latest_block.header().number().saturating_sub(block_confirmations);
486
487        // If start is beyond confirmed tip, skip historical and go straight to live
488        if start_block.header().number() > confirmed_tip_num {
489            info!(
490                start_block = start_block.header().number(),
491                confirmed_tip = confirmed_tip_num,
492                "Start block is beyond confirmed tip, starting live stream"
493            );
494
495            let Some(sender) = self.subscriber.clone() else {
496                return Err(BlockRangeScannerError::ServiceShutdown);
497            };
498
499            let provider = self.provider.clone();
500            let expected_next = start_block.header().number();
501            tokio::spawn(async move {
502                Self::stream_live_blocks(expected_next, provider, sender, block_confirmations)
503                    .await;
504            });
505
506            return Ok(());
507        }
508
509        let end_block = self.provider.get_block_by_number(confirmed_tip_num.into()).await?.ok_or(
510            BlockRangeScannerError::HistoricalSyncError(format!(
511                "Confirmed tip block {confirmed_tip_num} not found"
512            )),
513        )?;
514
515        info!(
516            start_block = start_block.header().number(),
517            end_block = end_block.header().number(),
518            "Syncing historical data"
519        );
520
521        // Step 2: Setup the live streaming buffer
522        // This channel will accumulate while historical sync is running
523        let (live_block_buffer_sender, live_block_buffer_receiver) =
524            mpsc::channel::<BlockRangeMessage>(MAX_BUFFERED_MESSAGES);
525
526        let provider = self.provider.clone();
527
528        // The cutoff is the last block we have synced historically
529        // Any block > cutoff will come from the live stream
530        let cutoff = end_block.header().number();
531
532        // This task runs independently, accumulating new blocks while wehistorical data is syncing
533        let live_subscription_task = tokio::spawn(async move {
534            Self::stream_live_blocks(
535                cutoff + 1,
536                provider,
537                live_block_buffer_sender,
538                block_confirmations,
539            )
540            .await;
541        });
542
543        // Step 4: Perform historical synchronization
544        // This processes blocks from start_block to end_block (cutoff)
545        // If this fails, we need to abort the live streaming task
546        if let Err(e) = self.sync_historical_data(start_block, end_block).await {
547            warn!("aborting live_subscription_task");
548            live_subscription_task.abort();
549            return Err(BlockRangeScannerError::HistoricalSyncError(e.to_string()));
550        }
551
552        self.send_to_subscriber(ScannerMessage::Status(ScannerStatus::ChainTipReached)).await;
553
554        let Some(sender) = self.subscriber.clone() else {
555            return Err(BlockRangeScannerError::ServiceShutdown);
556        };
557        // Step 5:
558        // Spawn the buffer processor task
559        // This will:
560        // 1. Process all buffered blocks, filtering out any ≤ cutoff
561        // 2. Forward blocks > cutoff to the user
562        // 3. Continue forwarding until the buffer if exhausted (waits for new blocks from live
563        //    stream)
564        tokio::spawn(async move {
565            Self::process_live_block_buffer(live_block_buffer_receiver, sender, cutoff).await;
566        });
567
568        info!("Successfully transitioned from historical to live data");
569        Ok(())
570    }
571
572    async fn sync_historical_data(
573        &mut self,
574        start: N::BlockResponse,
575        end: N::BlockResponse,
576    ) -> Result<(), BlockRangeScannerError> {
577        let mut batch_count = 0;
578
579        self.next_start_block = BlockHashAndNumber::from_header::<N>(start.header());
580
581        // must be <= to include the edge case when start == end (i.e. return the single block
582        // range)
583        while self.next_start_block.number <= end.header().number() {
584            self.ensure_current_not_reorged().await?;
585
586            let batch_end_block_number = self
587                .next_start_block
588                .number
589                .saturating_add(self.config.blocks_read_per_epoch as u64 - 1)
590                .min(end.header().number());
591
592            self.send_to_subscriber(BlockRangeMessage::Data(
593                self.next_start_block.number..=batch_end_block_number,
594            ))
595            .await;
596
597            batch_count += 1;
598            if batch_count % 10 == 0 {
599                debug!(batch_count = batch_count, "Processed historical batches");
600            }
601
602            if batch_end_block_number == end.header().number() {
603                break;
604            }
605
606            let next_start_block_number = (batch_end_block_number + 1).into();
607            let next_start_block =
608                match self.provider.get_block_by_number(next_start_block_number).await {
609                    Ok(block) => {
610                        block.expect("block number is less than 'end', so it should exist")
611                    }
612                    Err(e) => {
613                        error!(error = %e, "Failed to get block by number");
614                        let e: BlockRangeScannerError = e.into();
615                        self.send_to_subscriber(BlockRangeMessage::Error(e.clone())).await;
616                        return Err(e);
617                    }
618                };
619            self.next_start_block = BlockHashAndNumber::from_header::<N>(next_start_block.header());
620        }
621
622        info!(batch_count = batch_count, "Historical sync completed");
623
624        Ok(())
625    }
626
627    async fn stream_live_blocks<P: Provider<N>>(
628        mut range_start: BlockNumber,
629        provider: P,
630        sender: mpsc::Sender<BlockRangeMessage>,
631        block_confirmations: u64,
632    ) {
633        match Self::get_block_subscription(&provider).await {
634            Ok(ws_stream) => {
635                info!("WebSocket connected for live blocks");
636
637                // ensure we start streaming only after the expected_next_block cutoff
638                let cutoff = range_start;
639                let mut stream =
640                    ws_stream.into_stream().skip_while(|header| header.number() < cutoff);
641
642                while let Some(incoming_block) = stream.next().await {
643                    let incoming_block_num = incoming_block.number();
644                    info!(block_number = incoming_block_num, "Received block header");
645
646                    if incoming_block_num < range_start {
647                        warn!("Reorg detected: sending forked range");
648                        if sender
649                            .send(BlockRangeMessage::Status(ScannerStatus::ReorgDetected))
650                            .await
651                            .is_err()
652                        {
653                            warn!("Downstream channel closed, stopping live blocks task");
654                            return;
655                        }
656
657                        // Calculate the confirmed block position for the incoming block
658                        let incoming_confirmed =
659                            incoming_block_num.saturating_sub(block_confirmations);
660
661                        // updated expected block to updated confirmed
662                        range_start = incoming_confirmed;
663                    }
664
665                    let confirmed = incoming_block_num.saturating_sub(block_confirmations);
666                    if confirmed >= range_start {
667                        if sender
668                            .send(BlockRangeMessage::Data(range_start..=confirmed))
669                            .await
670                            .is_err()
671                        {
672                            warn!("Downstream channel closed, stopping live blocks task");
673                            return;
674                        }
675
676                        // Overflow can not realistically happen
677                        range_start = confirmed + 1;
678                    }
679                }
680            }
681            Err(e) => {
682                if sender.send(BlockRangeMessage::Error(e)).await.is_err() {
683                    warn!("Downstream channel closed, stopping live blocks task");
684                }
685            }
686        }
687    }
688
689    async fn process_live_block_buffer(
690        mut buffer_rx: mpsc::Receiver<BlockRangeMessage>,
691        sender: mpsc::Sender<BlockRangeMessage>,
692        cutoff: BlockNumber,
693    ) {
694        let mut processed = 0;
695        let mut discarded = 0;
696
697        // Process all buffered messages
698        while let Some(data) = buffer_rx.recv().await {
699            match data {
700                BlockRangeMessage::Data(range) => {
701                    let (start, end) = (*range.start(), *range.end());
702                    if start >= cutoff {
703                        if sender.send(BlockRangeMessage::Data(range)).await.is_err() {
704                            warn!("Subscriber channel closed, cleaning up");
705                            return;
706                        }
707                        processed += end - start;
708                    } else if end > cutoff {
709                        discarded += cutoff - start;
710
711                        let start = cutoff;
712                        if sender.send(BlockRangeMessage::Data(start..=end)).await.is_err() {
713                            warn!("Subscriber channel closed, cleaning up");
714                            return;
715                        }
716                        processed += end - start;
717                    } else {
718                        discarded += end - start;
719                    }
720                }
721                _ => {
722                    // Could be error or status
723                    if sender.send(data).await.is_err() {
724                        warn!("Subscriber channel closed, cleaning up");
725                        return;
726                    }
727                }
728            }
729        }
730
731        info!(processed = processed, discarded = discarded, "Processed buffered messages");
732    }
733
734    async fn ensure_current_not_reorged(&mut self) -> Result<(), BlockRangeScannerError> {
735        let current_block = self.provider.get_block_by_hash(self.next_start_block.hash).await?;
736        if current_block.is_some() {
737            return Ok(());
738        }
739
740        self.rewind_on_reorg_detected().await
741    }
742
743    async fn rewind_on_reorg_detected(&mut self) -> Result<(), BlockRangeScannerError> {
744        let mut new_current_height =
745            self.next_start_block.number.saturating_sub(self.config.reorg_rewind_depth);
746
747        let head = self.provider.get_block_number().await?;
748        if head < new_current_height {
749            new_current_height = head;
750        }
751
752        let current = self
753            .provider
754            .get_block_by_number(new_current_height.into())
755            .await?
756            .map(|block| BlockHashAndNumber::from_header::<N>(block.header()))
757            .ok_or(BlockRangeScannerError::HistoricalSyncError(format!(
758                "Block {new_current_height} not found during rewind",
759            )))?;
760
761        info!(
762            old_current = self.next_start_block.number,
763            new_current = current.number,
764            "Rewind on reorg detected"
765        );
766
767        self.next_start_block = current;
768
769        Ok(())
770    }
771
772    async fn get_block_subscription(
773        provider: &impl Provider<N>,
774    ) -> Result<Subscription<N::HeaderResponse>, BlockRangeScannerError> {
775        let ws_stream = provider
776            .subscribe_blocks()
777            .await
778            .map_err(|_| BlockRangeScannerError::WebSocketConnectionFailed(1))?;
779
780        Ok(ws_stream)
781    }
782
783    async fn send_to_subscriber(&mut self, message: BlockRangeMessage) {
784        if let Some(ref sender) = self.subscriber {
785            if let Err(err) = sender.send(message).await {
786                warn!(error = %err, "Downstream channel closed, failed sending the message to subscriber");
787                self.subscriber = None;
788                self.websocket_connected = false;
789            } else {
790                self.processed_count += 1;
791            }
792        }
793    }
794
795    fn handle_unsubscribe(&mut self) {
796        if self.subscriber.take().is_some() {
797            info!("Unsubscribing current subscriber");
798            self.websocket_connected = false;
799        }
800    }
801
802    fn ensure_no_subscriber(&self) -> Result<(), BlockRangeScannerError> {
803        if self.subscriber.is_some() {
804            return Err(BlockRangeScannerError::MultipleSubscribers);
805        }
806        Ok(())
807    }
808}
809
810pub struct BlockRangeScannerClient {
811    command_sender: mpsc::Sender<Command>,
812}
813
814impl BlockRangeScannerClient {
815    /// Creates a new subscription client.
816    ///
817    /// # Arguments
818    ///
819    /// * `command_sender` - The sender for sending commands to the subscription service.
820    #[must_use]
821    pub fn new(command_sender: mpsc::Sender<Command>) -> Self {
822        Self { command_sender }
823    }
824
825    /// Streams live blocks starting from the latest block.
826    ///
827    /// # Errors
828    ///
829    /// * `BlockRangeScannerError::ServiceShutdown` - if the service is already shutting down.
830    pub async fn stream_live(
831        &self,
832    ) -> Result<ReceiverStream<BlockRangeMessage>, BlockRangeScannerError> {
833        let (blocks_sender, blocks_receiver) = mpsc::channel(MAX_BUFFERED_MESSAGES);
834        let (response_tx, response_rx) = oneshot::channel();
835
836        let command = Command::StreamLive { sender: blocks_sender, response: response_tx };
837
838        self.command_sender
839            .send(command)
840            .await
841            .map_err(|_| BlockRangeScannerError::ServiceShutdown)?;
842
843        response_rx.await.map_err(|_| BlockRangeScannerError::ServiceShutdown)??;
844
845        Ok(ReceiverStream::new(blocks_receiver))
846    }
847
848    /// Streams a batch of historical blocks from `start_height` to `end_height`.
849    ///
850    /// # Arguments
851    ///
852    /// * `start_height` - The starting block number or tag.
853    /// * `end_height` - The ending block number or tag.
854    ///
855    /// # Errors
856    ///
857    /// * `BlockRangeScannerError::ServiceShutdown` - if the service is already shutting down.
858    pub async fn stream_historical<N: Into<BlockNumberOrTag>>(
859        &self,
860        start_height: N,
861        end_height: N,
862    ) -> Result<ReceiverStream<BlockRangeMessage>, BlockRangeScannerError> {
863        let (blocks_sender, blocks_receiver) = mpsc::channel(MAX_BUFFERED_MESSAGES);
864        let (response_tx, response_rx) = oneshot::channel();
865
866        let command = Command::StreamHistorical {
867            sender: blocks_sender,
868            start_height: start_height.into(),
869            end_height: end_height.into(),
870            response: response_tx,
871        };
872
873        self.command_sender
874            .send(command)
875            .await
876            .map_err(|_| BlockRangeScannerError::ServiceShutdown)?;
877
878        response_rx.await.map_err(|_| BlockRangeScannerError::ServiceShutdown)??;
879
880        Ok(ReceiverStream::new(blocks_receiver))
881    }
882
883    /// Streams blocks starting from `start_height` and transitions to live mode.
884    ///
885    /// # Arguments
886    ///
887    /// * `start_height` - The starting block number or tag.
888    ///
889    /// # Errors
890    ///
891    /// * `BlockRangeScannerError::ServiceShutdown` - if the service is already shutting down.
892    pub async fn stream_from(
893        &self,
894        start_height: impl Into<BlockNumberOrTag>,
895    ) -> Result<ReceiverStream<BlockRangeMessage>, BlockRangeScannerError> {
896        let (blocks_sender, blocks_receiver) = mpsc::channel(MAX_BUFFERED_MESSAGES);
897        let (response_tx, response_rx) = oneshot::channel();
898
899        let command = Command::StreamFrom {
900            sender: blocks_sender,
901            start_height: start_height.into(),
902            response: response_tx,
903        };
904
905        self.command_sender
906            .send(command)
907            .await
908            .map_err(|_| BlockRangeScannerError::ServiceShutdown)?;
909
910        response_rx.await.map_err(|_| BlockRangeScannerError::ServiceShutdown)??;
911
912        Ok(ReceiverStream::new(blocks_receiver))
913    }
914
915    /// Unsubscribes the current subscriber.
916    ///
917    /// # Errors
918    ///
919    /// * `BlockRangeScannerError::ServiceShutdown` - if the service is already shutting down.
920    pub async fn unsubscribe(&self) -> Result<(), BlockRangeScannerError> {
921        let (response_tx, response_rx) = oneshot::channel();
922
923        let command = Command::Unsubscribe { response: response_tx };
924
925        self.command_sender
926            .send(command)
927            .await
928            .map_err(|_| BlockRangeScannerError::ServiceShutdown)?;
929
930        response_rx.await.map_err(|_| BlockRangeScannerError::ServiceShutdown)?
931    }
932
933    /// Shuts down the subscription service and unsubscribes the current subscriber.
934    ///
935    /// # Errors
936    ///
937    /// * `BlockRangeScannerError::ServiceShutdown` - if the service is already shutting down.
938    pub async fn shutdown(&self) -> Result<(), BlockRangeScannerError> {
939        let (response_tx, response_rx) = oneshot::channel();
940
941        let command = Command::Shutdown { response: response_tx };
942
943        self.command_sender
944            .send(command)
945            .await
946            .map_err(|_| BlockRangeScannerError::ServiceShutdown)?;
947
948        response_rx.await.map_err(|_| BlockRangeScannerError::ServiceShutdown)?
949    }
950}
951
952#[cfg(test)]
953mod tests {
954    use std::time::Duration;
955
956    use alloy::{
957        network::Ethereum,
958        primitives::{B256, keccak256},
959        providers::{ProviderBuilder, ext::AnvilApi},
960        rpc::{
961            client::RpcClient,
962            types::{Block as RpcBlock, Header, Transaction, anvil::ReorgOptions},
963        },
964        transports::mock::Asserter,
965    };
966    use alloy_node_bindings::Anvil;
967    use serde_json::{Value, json};
968    use tokio::{
969        sync::mpsc::{self, Receiver},
970        time::timeout,
971    };
972    use tokio_stream::StreamExt;
973
974    use super::*;
975
976    // Trait to enable receiver-type-agnostic range receival
977    trait RangeReceiver {
978        async fn next_range(&mut self) -> Option<BlockRangeMessage>;
979    }
980
981    impl RangeReceiver for ReceiverStream<BlockRangeMessage> {
982        async fn next_range(&mut self) -> Option<BlockRangeMessage> {
983            self.next().await
984        }
985    }
986
987    impl RangeReceiver for Receiver<BlockRangeMessage> {
988        async fn next_range(&mut self) -> Option<BlockRangeMessage> {
989            self.recv().await
990        }
991    }
992
993    macro_rules! assert_next_range {
994        ($recv:expr, None) => {
995            let next = $recv.next_range().await;
996            assert!(next.is_none());
997        };
998        ($recv:expr, $range:expr) => {
999            let next = $recv.next_range().await;
1000            if let Some(BlockRangeMessage::Data(range)) = next {
1001                assert_eq!($range, range);
1002            } else {
1003                panic!("expected block range, got: {next:?}");
1004            }
1005        };
1006    }
1007
1008    fn test_config() -> Config {
1009        Config { blocks_read_per_epoch: 5, reorg_rewind_depth: 5, block_confirmations: 0 }
1010    }
1011
1012    fn mocked_provider(asserter: Asserter) -> RootProvider<Ethereum> {
1013        RootProvider::new(RpcClient::mocked(asserter))
1014    }
1015
1016    fn mock_block(number: u64, hash: B256) -> RpcBlock<Transaction, Header> {
1017        let mut block: RpcBlock<Transaction, Header> = RpcBlock::default();
1018        block.header.hash = hash;
1019        block.header.number = number;
1020        block
1021    }
1022
1023    #[test]
1024    fn block_range_scanner_defaults_match_constants() {
1025        let scanner = BlockRangeScanner::new();
1026
1027        assert_eq!(scanner.blocks_read_per_epoch, DEFAULT_BLOCKS_READ_PER_EPOCH);
1028        assert_eq!(scanner.reorg_rewind_depth, DEFAULT_REORG_REWIND_DEPTH);
1029        assert_eq!(scanner.block_confirmations, DEFAULT_BLOCK_CONFIRMATIONS);
1030    }
1031
1032    #[test]
1033    fn builder_methods_update_configuration() {
1034        let blocks_read_per_epoch = 42;
1035        let reorg_rewind_depth = 12;
1036        let block_confirmations = 7;
1037
1038        let scanner = BlockRangeScanner::new()
1039            .with_blocks_read_per_epoch(blocks_read_per_epoch)
1040            .with_reorg_rewind_depth(reorg_rewind_depth)
1041            .with_block_confirmations(block_confirmations);
1042
1043        assert_eq!(scanner.blocks_read_per_epoch, blocks_read_per_epoch);
1044        assert_eq!(scanner.block_confirmations, block_confirmations);
1045    }
1046
1047    #[tokio::test]
1048    async fn send_to_subscriber_increments_processed_count() -> anyhow::Result<()> {
1049        let asserter = Asserter::new();
1050        let provider = mocked_provider(asserter);
1051        let (mut service, _cmd) = Service::new(test_config(), provider);
1052
1053        let (tx, mut rx) = mpsc::channel(1);
1054        service.subscriber = Some(tx);
1055
1056        let expected_range = 10..=11;
1057        service.send_to_subscriber(BlockRangeMessage::Data(expected_range.clone())).await;
1058
1059        assert_eq!(service.processed_count, 1);
1060        assert!(service.subscriber.is_some());
1061
1062        let BlockRangeMessage::Data(received) = rx.recv().await.expect("range received") else {
1063            panic!("expected BlockRange message")
1064        };
1065        assert_eq!(received, expected_range);
1066
1067        Ok(())
1068    }
1069
1070    #[tokio::test]
1071    async fn send_to_subscriber_removes_closed_channel() -> anyhow::Result<()> {
1072        let asserter = Asserter::new();
1073        let provider = mocked_provider(asserter);
1074        let (mut service, _cmd) = Service::new(test_config(), provider);
1075
1076        let (tx, rx) = mpsc::channel(1);
1077        service.websocket_connected = true;
1078        service.subscriber = Some(tx);
1079        // channel is closed
1080        drop(rx);
1081
1082        service.send_to_subscriber(BlockRangeMessage::Data(15..=15)).await;
1083
1084        assert!(service.subscriber.is_none());
1085        assert!(!service.websocket_connected);
1086        assert_eq!(service.processed_count, 0);
1087
1088        Ok(())
1089    }
1090
1091    #[test]
1092    fn handle_unsubscribe_clears_subscriber() {
1093        let asserter = Asserter::new();
1094        let provider = mocked_provider(asserter);
1095        let (mut service, _cmd) = Service::new(test_config(), provider);
1096
1097        let (tx, _rx) = mpsc::channel(1);
1098        service.websocket_connected = true;
1099        service.subscriber = Some(tx);
1100
1101        service.handle_unsubscribe();
1102
1103        assert!(service.subscriber.is_none());
1104        assert!(!service.websocket_connected);
1105    }
1106
1107    #[tokio::test]
1108    async fn live_mode_processes_all_blocks() -> anyhow::Result<()> {
1109        let anvil = Anvil::new().block_time_f64(0.01).try_spawn()?;
1110
1111        let client = BlockRangeScanner::new()
1112            .with_block_confirmations(1)
1113            .connect_ws::<Ethereum>(anvil.ws_endpoint_url())
1114            .await?
1115            .run()?;
1116
1117        let expected_blocks = 10;
1118
1119        let mut receiver = client.stream_live().await?.take(expected_blocks);
1120
1121        let mut block_range_start = 0;
1122
1123        while let Some(BlockRangeMessage::Data(range)) = receiver.next().await {
1124            info!("Received block range: [{range:?}]");
1125            if block_range_start == 0 {
1126                block_range_start = *range.start();
1127            }
1128
1129            assert_eq!(block_range_start, *range.start());
1130            assert!(range.end() >= range.start());
1131            block_range_start = *range.end() + 1;
1132        }
1133
1134        Ok(())
1135    }
1136
1137    #[tokio::test]
1138    async fn stream_from_latest_starts_at_tip_not_confirmed() -> anyhow::Result<()> {
1139        let anvil = Anvil::new().try_spawn()?;
1140
1141        let provider = ProviderBuilder::new().connect(anvil.endpoint().as_str()).await?;
1142        provider.anvil_mine(Option::Some(20), Option::None).await?;
1143
1144        let block_confirmations = 5;
1145
1146        let client = BlockRangeScanner::new()
1147            .with_block_confirmations(block_confirmations)
1148            .connect_ws::<Ethereum>(anvil.ws_endpoint_url())
1149            .await?
1150            .run()?;
1151
1152        let expected_blocks = 10;
1153        let mut receiver =
1154            client.stream_from(BlockNumberOrTag::Latest).await?.take(expected_blocks);
1155
1156        let latest_head = provider.get_block_number().await?;
1157        provider.anvil_mine(Option::Some(20), Option::None).await?;
1158
1159        let mut expected_range_start = latest_head;
1160
1161        while let Some(BlockRangeMessage::Data(range)) = receiver.next().await {
1162            assert_eq!(expected_range_start, *range.start());
1163            assert_eq!(range.end(), range.start());
1164            expected_range_start += 1;
1165        }
1166
1167        // verify that the final block number (range.end) was of the latest block with the expected
1168        // block confirmations
1169        assert_eq!(expected_range_start, latest_head + expected_blocks as u64);
1170
1171        Ok(())
1172    }
1173
1174    #[tokio::test]
1175    async fn live_mode_respects_block_confirmations() -> anyhow::Result<()> {
1176        let anvil = Anvil::new().try_spawn()?;
1177
1178        let provider = ProviderBuilder::new().connect(anvil.endpoint().as_str()).await?;
1179        provider.anvil_mine(Option::Some(20), Option::None).await?;
1180
1181        let block_confirmations = 5;
1182
1183        let client = BlockRangeScanner::new()
1184            .with_block_confirmations(block_confirmations)
1185            .connect_ws::<Ethereum>(anvil.ws_endpoint_url())
1186            .await?
1187            .run()?;
1188
1189        let expected_blocks = 10;
1190
1191        let mut receiver = client.stream_live().await?.take(expected_blocks);
1192        let provider = ProviderBuilder::new().connect(anvil.endpoint().as_str()).await?;
1193        let latest_head = provider.get_block_number().await?;
1194        provider.anvil_mine(Option::Some(expected_blocks as u64), Option::None).await?;
1195
1196        let mut expected_range_start = latest_head.saturating_sub(block_confirmations) + 1;
1197
1198        while let Some(BlockRangeMessage::Data(range)) = receiver.next().await {
1199            assert_eq!(expected_range_start, *range.start());
1200            assert_eq!(range.end(), range.start());
1201            expected_range_start += 1;
1202        }
1203
1204        // we add 1 to the right side, because we're expecting the number of the _next_ block to be
1205        // mined
1206        assert_eq!(
1207            expected_range_start,
1208            latest_head + expected_blocks as u64 + 1 - block_confirmations
1209        );
1210
1211        Ok(())
1212    }
1213
1214    #[tokio::test]
1215    async fn live_mode_respects_block_confirmations_on_new_chain() -> anyhow::Result<()> {
1216        let anvil = Anvil::new().try_spawn()?;
1217
1218        let provider = ProviderBuilder::new().connect(anvil.endpoint().as_str()).await?;
1219
1220        let block_confirmations = 5;
1221
1222        let client = BlockRangeScanner::new()
1223            .with_block_confirmations(block_confirmations)
1224            .connect_ws::<Ethereum>(anvil.ws_endpoint_url())
1225            .await?
1226            .run()?;
1227
1228        let mut receiver = client.stream_live().await?;
1229
1230        provider.anvil_mine(Option::Some(6), Option::None).await?;
1231
1232        let next = receiver.next().await;
1233        if let Some(BlockRangeMessage::Data(range)) = next {
1234            assert_eq!(0, *range.start());
1235            assert_eq!(0, *range.end());
1236        } else {
1237            panic!("expected range, got: {next:?}");
1238        }
1239
1240        let next = receiver.next().await;
1241        if let Some(BlockRangeMessage::Data(range)) = next {
1242            assert_eq!(1, *range.start());
1243            assert_eq!(1, *range.end());
1244        } else {
1245            panic!("expected range, got: {next:?}");
1246        }
1247
1248        // assert no new pending confirmed block ranges
1249        assert!(
1250            timeout(Duration::from_secs(1), async move { receiver.next().await }).await.is_err()
1251        );
1252
1253        Ok(())
1254    }
1255
1256    #[tokio::test]
1257    #[ignore = "Flaky test, see: https://github.com/OpenZeppelin/Event-Scanner/issues/109"]
1258    async fn continuous_blocks_if_reorg_less_than_block_confirmation() -> anyhow::Result<()> {
1259        let anvil = Anvil::new().try_spawn()?;
1260
1261        let provider = ProviderBuilder::new().connect(anvil.endpoint().as_str()).await?;
1262
1263        let block_confirmations = 5;
1264
1265        let client = BlockRangeScanner::new()
1266            .with_block_confirmations(block_confirmations)
1267            .connect_ws::<Ethereum>(anvil.ws_endpoint_url())
1268            .await?
1269            .run()?;
1270
1271        let mut receiver = client.stream_live().await?;
1272
1273        provider.anvil_mine(Option::Some(10), Option::None).await?;
1274
1275        provider
1276            .anvil_reorg(ReorgOptions { depth: block_confirmations - 1, tx_block_pairs: vec![] })
1277            .await?;
1278
1279        provider.anvil_mine(Option::Some(20), Option::None).await?;
1280
1281        let mut block_range_start = 0;
1282
1283        let end_loop = 20;
1284        let mut i = 0;
1285        while let Some(BlockRangeMessage::Data(range)) = receiver.next().await {
1286            if block_range_start == 0 {
1287                block_range_start = *range.start();
1288            }
1289
1290            assert_eq!(block_range_start, *range.start());
1291            assert!(range.end() >= range.start());
1292            block_range_start = *range.end() + 1;
1293            i += 1;
1294            if i == end_loop {
1295                break;
1296            }
1297        }
1298        Ok(())
1299    }
1300
1301    #[tokio::test]
1302    #[ignore = "Flaky test, see: https://github.com/OpenZeppelin/Event-Scanner/issues/109"]
1303    async fn shallow_block_confirmation_does_not_mitigate_reorg() -> anyhow::Result<()> {
1304        let anvil = Anvil::new().block_time(1).try_spawn()?;
1305
1306        let provider = ProviderBuilder::new().connect(anvil.endpoint().as_str()).await?;
1307
1308        let block_confirmations = 3;
1309
1310        let client = BlockRangeScanner::new()
1311            .with_block_confirmations(block_confirmations)
1312            .connect_ws::<Ethereum>(anvil.ws_endpoint_url())
1313            .await?
1314            .run()?;
1315
1316        let mut receiver = client.stream_live().await?;
1317
1318        provider.anvil_mine(Option::Some(10), Option::None).await?;
1319
1320        provider
1321            .anvil_reorg(ReorgOptions { depth: block_confirmations + 5, tx_block_pairs: vec![] })
1322            .await?;
1323
1324        provider.anvil_mine(Option::Some(30), Option::None).await?;
1325        receiver.close();
1326
1327        let mut block_range_start = 0;
1328
1329        let mut block_num = vec![];
1330        let mut reorg_detected = false;
1331        while let Some(msg) = receiver.next().await {
1332            match msg {
1333                BlockRangeMessage::Data(range) => {
1334                    if block_range_start == 0 {
1335                        block_range_start = *range.start();
1336                    }
1337                    block_num.push(range);
1338                    if block_num.len() == 15 {
1339                        break;
1340                    }
1341                }
1342                BlockRangeMessage::Status(ScannerStatus::ReorgDetected) => {
1343                    reorg_detected = true;
1344                }
1345                _ => {
1346                    break;
1347                }
1348            }
1349        }
1350        assert!(reorg_detected, "Reorg should have been detected");
1351
1352        // Generally check that there is a reorg in the range i.e.
1353        //                                                        REORG
1354        // [0..=0, 1..=1, 2..=2, 3..=3, 4..=4, 5..=5, 6..=6, 7..=7, 3..=3, 4..=4, 5..=5, 6..=6,
1355        // 7..=7, 8..=8, 9..=9] (Less flaky to assert this way)
1356        let mut found_reorg_pattern = false;
1357        for window in block_num.windows(2) {
1358            if window[1].start() < window[0].end() {
1359                found_reorg_pattern = true;
1360                break;
1361            }
1362        }
1363        assert!(found_reorg_pattern,);
1364
1365        Ok(())
1366    }
1367
1368    #[tokio::test]
1369    async fn rewinds_on_detected_reorg() -> anyhow::Result<()> {
1370        let asserter = Asserter::new();
1371        let provider = mocked_provider(asserter.clone());
1372
1373        let mut config = test_config();
1374        config.reorg_rewind_depth = 6;
1375        let (mut service, _cmd) = Service::new(config.clone(), provider);
1376
1377        let original_height = 10;
1378        let original_hash = keccak256(b"original block");
1379        let original_block = mock_block(original_height, original_hash);
1380        service.next_start_block =
1381            BlockHashAndNumber::from_header::<Ethereum>(original_block.header());
1382
1383        let expected_rewind_height = original_height - config.reorg_rewind_depth;
1384        let expected_rewind_hash = keccak256(b"rewound block");
1385        let rewound_block = mock_block(expected_rewind_height, expected_rewind_hash);
1386
1387        // Mock provider responses for reorg detection and rewind:
1388        // 1. get_block_by_hash(original_hash) -> None (block not found = reorg detected)
1389        asserter.push_success(&Value::Null);
1390        // 2. get_block_number() -> 12 (current chain head is at 12)
1391        asserter.push_success(&json!(format!("0x{:x}", original_height + 2)));
1392        // 3. get_block_by_number(expected_rewind_height) -> rewound_block
1393        asserter.push_success(&rewound_block);
1394
1395        service.ensure_current_not_reorged().await?;
1396
1397        let current = service.next_start_block;
1398        assert_eq!(current.number, expected_rewind_height, "should rewind by reorg_rewind_depth");
1399        assert_eq!(current.hash, expected_rewind_hash, "should use hash of block at rewind height");
1400
1401        Ok(())
1402    }
1403
1404    #[tokio::test]
1405    async fn historic_mode_respects_blocks_read_per_epoch() -> anyhow::Result<()> {
1406        let anvil = Anvil::new().try_spawn()?;
1407
1408        let provider = ProviderBuilder::new().connect(anvil.endpoint().as_str()).await?;
1409
1410        provider.anvil_mine(Option::Some(100), Option::None).await?;
1411
1412        let client = BlockRangeScanner::new()
1413            .with_blocks_read_per_epoch(5)
1414            .connect_ws::<Ethereum>(anvil.ws_endpoint_url())
1415            .await?
1416            .run()?;
1417
1418        // ranges where each batch is of max blocks per epoch size
1419        let mut stream = client.stream_historical(0, 19).await?;
1420        assert_next_range!(stream, 0..=4);
1421        assert_next_range!(stream, 5..=9);
1422        assert_next_range!(stream, 10..=14);
1423        assert_next_range!(stream, 15..=19);
1424        assert_next_range!(stream, None);
1425
1426        // ranges where last batch is smaller than blocks per epoch
1427        let mut stream = client.stream_historical(93, 99).await?;
1428        assert_next_range!(stream, 93..=97);
1429        assert_next_range!(stream, 98..=99);
1430        assert_next_range!(stream, None);
1431
1432        // range where blocks per epoch is larger than the number of blocks in the range
1433        let mut stream = client.stream_historical(3, 5).await?;
1434        assert_next_range!(stream, 3..=5);
1435        assert_next_range!(stream, None);
1436
1437        // single item range
1438        let mut stream = client.stream_historical(3, 3).await?;
1439        assert_next_range!(stream, 3..=3);
1440        assert_next_range!(stream, None);
1441
1442        // range where blocks per epoch is larger than the number of blocks on chain
1443        let client = BlockRangeScanner::new()
1444            .with_blocks_read_per_epoch(200)
1445            .connect_ws::<Ethereum>(anvil.ws_endpoint_url())
1446            .await?
1447            .run()?;
1448
1449        let mut stream = client.stream_historical(0, 20).await?;
1450        assert_next_range!(stream, 0..=20);
1451        assert_next_range!(stream, None);
1452
1453        let mut stream = client.stream_historical(0, 99).await?;
1454        assert_next_range!(stream, 0..=99);
1455        assert_next_range!(stream, None);
1456
1457        Ok(())
1458    }
1459
1460    #[tokio::test]
1461    async fn buffered_messages_trim_ranges_prior_to_cutoff() -> anyhow::Result<()> {
1462        let cutoff = 50;
1463        let (buffer_tx, buffer_rx) = mpsc::channel(8);
1464        buffer_tx.send(BlockRangeMessage::Data(51..=55)).await.unwrap();
1465        buffer_tx.send(BlockRangeMessage::Data(56..=60)).await.unwrap();
1466        buffer_tx.send(BlockRangeMessage::Data(61..=70)).await.unwrap();
1467        drop(buffer_tx);
1468
1469        let (out_tx, mut out_rx) = mpsc::channel(8);
1470        Service::<Ethereum>::process_live_block_buffer(buffer_rx, out_tx, cutoff).await;
1471
1472        let mut forwarded = Vec::new();
1473        while let Some(BlockRangeMessage::Data(range)) = out_rx.recv().await {
1474            forwarded.push(range);
1475        }
1476
1477        // All ranges should be forwarded as-is since they're after cutoff
1478        assert_eq!(forwarded, vec![51..=55, 56..=60, 61..=70]);
1479        Ok(())
1480    }
1481
1482    #[tokio::test]
1483    async fn ranges_entirely_before_cutoff_are_discarded() -> anyhow::Result<()> {
1484        let cutoff = 100;
1485
1486        let (buffer_tx, buffer_rx) = mpsc::channel(8);
1487        buffer_tx.send(BlockRangeMessage::Data(40..=50)).await.unwrap();
1488        buffer_tx.send(BlockRangeMessage::Data(51..=60)).await.unwrap();
1489        buffer_tx.send(BlockRangeMessage::Data(61..=70)).await.unwrap();
1490        drop(buffer_tx);
1491
1492        let (out_tx, mut out_rx) = mpsc::channel(8);
1493        Service::<Ethereum>::process_live_block_buffer(buffer_rx, out_tx, cutoff).await;
1494
1495        let mut forwarded = Vec::new();
1496        while let Some(BlockRangeMessage::Data(range)) = out_rx.recv().await {
1497            forwarded.push(range);
1498        }
1499
1500        // All ranges should be discarded since they're before cutoff
1501        assert_eq!(forwarded, vec![]);
1502        Ok(())
1503    }
1504
1505    #[tokio::test]
1506    async fn ranges_overlapping_cutoff_are_trimmed() -> anyhow::Result<()> {
1507        let cutoff = 75;
1508
1509        let (buffer_tx, buffer_rx) = mpsc::channel(8);
1510        buffer_tx.send(BlockRangeMessage::Data(70..=80)).await.unwrap();
1511        buffer_tx.send(BlockRangeMessage::Data(60..=80)).await.unwrap();
1512        buffer_tx.send(BlockRangeMessage::Data(74..=76)).await.unwrap();
1513        drop(buffer_tx);
1514
1515        let (out_tx, mut out_rx) = mpsc::channel(8);
1516        Service::<Ethereum>::process_live_block_buffer(buffer_rx, out_tx, cutoff).await;
1517
1518        let mut forwarded = Vec::new();
1519        while let Some(BlockRangeMessage::Data(range)) = out_rx.recv().await {
1520            forwarded.push(range);
1521        }
1522
1523        // All ranges should be trimmed to start at cutoff (75)
1524        assert_eq!(forwarded, vec![75..=80, 75..=80, 75..=76]);
1525        Ok(())
1526    }
1527
1528    #[tokio::test]
1529    async fn mixed_ranges_are_handled_correctly() -> anyhow::Result<()> {
1530        let cutoff = 50;
1531
1532        let (buffer_tx, buffer_rx) = mpsc::channel(8);
1533        buffer_tx.send(BlockRangeMessage::Data(30..=45)).await.unwrap(); // Before cutoff: discard
1534        buffer_tx.send(BlockRangeMessage::Data(46..=55)).await.unwrap(); // Overlaps: trim to 50..=55
1535        buffer_tx.send(BlockRangeMessage::Data(56..=65)).await.unwrap(); // After cutoff: forward as-is
1536        buffer_tx.send(BlockRangeMessage::Data(40..=49)).await.unwrap(); // Before cutoff: discard
1537        buffer_tx.send(BlockRangeMessage::Data(49..=51)).await.unwrap(); // Overlaps: trim to 50..=51
1538        buffer_tx.send(BlockRangeMessage::Data(51..=100)).await.unwrap(); // After cutoff: forward as-is
1539        drop(buffer_tx);
1540
1541        let (out_tx, mut out_rx) = mpsc::channel(8);
1542        Service::<Ethereum>::process_live_block_buffer(buffer_rx, out_tx, cutoff).await;
1543
1544        let mut forwarded = Vec::new();
1545        while let Some(BlockRangeMessage::Data(range)) = out_rx.recv().await {
1546            forwarded.push(range);
1547        }
1548
1549        assert_eq!(forwarded, vec![50..=55, 56..=65, 50..=51, 51..=100]);
1550        Ok(())
1551    }
1552
1553    #[tokio::test]
1554    async fn edge_case_range_exactly_at_cutoff() -> anyhow::Result<()> {
1555        let cutoff = 100;
1556
1557        let (buffer_tx, buffer_rx) = mpsc::channel(8);
1558        buffer_tx.send(BlockRangeMessage::Data(99..=99)).await.unwrap(); // Just before: discard
1559        buffer_tx.send(BlockRangeMessage::Data(100..=100)).await.unwrap(); // Exactly at: forward
1560        buffer_tx.send(BlockRangeMessage::Data(99..=100)).await.unwrap(); // Includes cutoff: trim to 100..=100
1561        buffer_tx.send(BlockRangeMessage::Data(100..=101)).await.unwrap(); // Starts at cutoff: forward
1562        drop(buffer_tx);
1563
1564        let (out_tx, mut out_rx) = mpsc::channel(8);
1565        Service::<Ethereum>::process_live_block_buffer(buffer_rx, out_tx, cutoff).await;
1566
1567        let mut forwarded = Vec::new();
1568        while let Some(BlockRangeMessage::Data(range)) = out_rx.recv().await {
1569            forwarded.push(range);
1570        }
1571
1572        // ensure no duplicates
1573        assert_eq!(forwarded, vec![100..=100, 100..=101]);
1574        Ok(())
1575    }
1576
1577    #[tokio::test]
1578    async fn cutoff_at_zero_handles_all_ranges() -> anyhow::Result<()> {
1579        let cutoff = 0;
1580
1581        let (buffer_tx, buffer_rx) = mpsc::channel(8);
1582        buffer_tx.send(BlockRangeMessage::Data(0..=5)).await.unwrap();
1583        buffer_tx.send(BlockRangeMessage::Data(6..=10)).await.unwrap();
1584        buffer_tx.send(BlockRangeMessage::Data(11..=25)).await.unwrap();
1585        drop(buffer_tx);
1586
1587        let (out_tx, mut out_rx) = mpsc::channel(8);
1588        Service::<Ethereum>::process_live_block_buffer(buffer_rx, out_tx, cutoff).await;
1589
1590        let mut forwarded = Vec::new();
1591        while let Some(BlockRangeMessage::Data(range)) = out_rx.recv().await {
1592            forwarded.push(range);
1593        }
1594
1595        // All ranges should be forwarded since they're all >= 0
1596        assert_eq!(forwarded, vec![0..=5, 6..=10, 11..=25]);
1597        Ok(())
1598    }
1599
1600    #[tokio::test]
1601    async fn forwards_errors_to_subscribers() -> anyhow::Result<()> {
1602        let asserter = Asserter::new();
1603        let provider = mocked_provider(asserter);
1604        let (mut service, _cmd) = Service::new(test_config(), provider);
1605
1606        let (tx, mut rx) = mpsc::channel(1);
1607        service.subscriber = Some(tx);
1608
1609        service
1610            .send_to_subscriber(BlockRangeMessage::Error(
1611                BlockRangeScannerError::WebSocketConnectionFailed(4),
1612            ))
1613            .await;
1614
1615        match rx.recv().await.expect("subscriber should stay open") {
1616            BlockRangeMessage::Error(BlockRangeScannerError::WebSocketConnectionFailed(
1617                attempts,
1618            )) => {
1619                assert_eq!(attempts, 4);
1620            }
1621            other => panic!("unexpected message: {other:?}"),
1622        }
1623
1624        Ok(())
1625    }
1626}