event_scanner/
block_range_scanner.rs

1//! Example usage:
2//!
3//! ```rust,no_run
4//! use alloy::{eips::BlockNumberOrTag, network::Ethereum, primitives::BlockNumber};
5//! use std::ops::RangeInclusive;
6//! use tokio_stream::{StreamExt, wrappers::ReceiverStream};
7//!
8//! use alloy::providers::{Provider, ProviderBuilder};
9//! use event_scanner::{
10//!     ScannerError,
11//!     block_range_scanner::{
12//!         BlockRangeScanner, BlockRangeScannerClient, DEFAULT_BLOCK_CONFIRMATIONS,
13//!         DEFAULT_MAX_BLOCK_RANGE, Message,
14//!     },
15//!     robust_provider::RobustProviderBuilder,
16//! };
17//! use tokio::time::Duration;
18//! use tracing::{error, info};
19//!
20//! #[tokio::main]
21//! async fn main() -> Result<(), Box<dyn std::error::Error>> {
22//!     // Initialize logging
23//!     tracing_subscriber::fmt::init();
24//!
25//!     // Configuration
26//!     let provider = ProviderBuilder::new().connect("ws://localhost:8546").await?;
27//!     let robust_provider = RobustProviderBuilder::new(provider).build().await?;
28//!     let block_range_scanner = BlockRangeScanner::new().connect(robust_provider).await?;
29//!
30//!     // Create client to send subscribe command to block scanner
31//!     let client: BlockRangeScannerClient = block_range_scanner.run()?;
32//!
33//!     let mut stream =
34//!         client.stream_from(BlockNumberOrTag::Number(5), DEFAULT_BLOCK_CONFIRMATIONS).await?;
35//!
36//!     while let Some(message) = stream.next().await {
37//!         match message {
38//!             Message::Data(range) => {
39//!                 // process range
40//!             }
41//!             Message::Error(e) => {
42//!                 error!("Received error from subscription: {e}");
43//!                 match e {
44//!                     ScannerError::ServiceShutdown => break,
45//!                     _ => {
46//!                         error!("Non-fatal error, continuing: {e}");
47//!                     }
48//!                 }
49//!             }
50//!             Message::Status(status) => {
51//!                 info!("Received status message: {:?}", status);
52//!             }
53//!         }
54//!     }
55//!
56//!     info!("Data processing stopped.");
57//!
58//!     Ok(())
59//! }
60//! ```
61
62use std::{cmp::Ordering, ops::RangeInclusive};
63use tokio::{
64    sync::{mpsc, oneshot},
65    try_join,
66};
67use tokio_stream::{StreamExt, wrappers::ReceiverStream};
68
69use crate::{
70    ScannerMessage,
71    error::ScannerError,
72    robust_provider::{Error as RobustProviderError, IntoRobustProvider, RobustProvider},
73    types::{ScannerStatus, TryStream},
74};
75use alloy::{
76    consensus::BlockHeader,
77    eips::BlockNumberOrTag,
78    network::{BlockResponse, Network, primitives::HeaderResponse},
79    primitives::{B256, BlockNumber},
80    pubsub::Subscription,
81    transports::{RpcError, TransportErrorKind},
82};
83use tracing::{debug, error, info, warn};
84
85pub const DEFAULT_MAX_BLOCK_RANGE: u64 = 1000;
86// copied form https://github.com/taikoxyz/taiko-mono/blob/f4b3a0e830e42e2fee54829326389709dd422098/packages/taiko-client/pkg/chain_iterator/block_batch_iterator.go#L19
87pub const DEFAULT_BLOCK_CONFIRMATIONS: u64 = 0;
88
89pub const MAX_BUFFERED_MESSAGES: usize = 50000;
90
91// Maximum amount of reorged blocks on Ethereum (after this amount of block confirmations, a block
92// is considered final)
93pub const DEFAULT_REORG_REWIND_DEPTH: u64 = 64;
94
95pub type Message = ScannerMessage<RangeInclusive<BlockNumber>, ScannerError>;
96
97impl From<RangeInclusive<BlockNumber>> for Message {
98    fn from(logs: RangeInclusive<BlockNumber>) -> Self {
99        Message::Data(logs)
100    }
101}
102
103impl PartialEq<RangeInclusive<BlockNumber>> for Message {
104    fn eq(&self, other: &RangeInclusive<BlockNumber>) -> bool {
105        if let Message::Data(range) = self { range.eq(other) } else { false }
106    }
107}
108
109impl From<RobustProviderError> for Message {
110    fn from(error: RobustProviderError) -> Self {
111        Message::Error(error.into())
112    }
113}
114
115impl From<RpcError<TransportErrorKind>> for Message {
116    fn from(error: RpcError<TransportErrorKind>) -> Self {
117        Message::Error(error.into())
118    }
119}
120
121impl From<ScannerError> for Message {
122    fn from(error: ScannerError) -> Self {
123        Message::Error(error)
124    }
125}
126
127#[derive(Clone)]
128pub struct BlockRangeScanner {
129    pub max_block_range: u64,
130}
131
132impl Default for BlockRangeScanner {
133    fn default() -> Self {
134        Self::new()
135    }
136}
137
138impl BlockRangeScanner {
139    #[must_use]
140    pub fn new() -> Self {
141        Self { max_block_range: DEFAULT_MAX_BLOCK_RANGE }
142    }
143
144    #[must_use]
145    pub fn max_block_range(mut self, max_block_range: u64) -> Self {
146        self.max_block_range = max_block_range;
147        self
148    }
149
150    /// Connects to an existing provider
151    ///
152    /// # Errors
153    ///
154    /// Returns an error if the provider connection fails.
155    pub async fn connect<N: Network>(
156        self,
157        provider: impl IntoRobustProvider<N>,
158    ) -> Result<ConnectedBlockRangeScanner<N>, ScannerError> {
159        let provider = provider.into_robust_provider().await?;
160        Ok(ConnectedBlockRangeScanner { provider, max_block_range: self.max_block_range })
161    }
162}
163
164pub struct ConnectedBlockRangeScanner<N: Network> {
165    provider: RobustProvider<N>,
166    max_block_range: u64,
167}
168
169impl<N: Network> ConnectedBlockRangeScanner<N> {
170    /// Returns the `RobustProvider`
171    #[must_use]
172    pub fn provider(&self) -> &RobustProvider<N> {
173        &self.provider
174    }
175
176    /// Starts the subscription service and returns a client for sending commands.
177    ///
178    /// # Errors
179    ///
180    /// Returns an error if the subscription service fails to start.
181    pub fn run(&self) -> Result<BlockRangeScannerClient, ScannerError> {
182        let (service, cmd_tx) = Service::new(self.provider.clone(), self.max_block_range);
183        tokio::spawn(async move {
184            service.run().await;
185        });
186        Ok(BlockRangeScannerClient::new(cmd_tx))
187    }
188}
189
190#[derive(Debug)]
191pub enum Command {
192    StreamLive {
193        sender: mpsc::Sender<Message>,
194        block_confirmations: u64,
195        response: oneshot::Sender<Result<(), ScannerError>>,
196    },
197    StreamHistorical {
198        sender: mpsc::Sender<Message>,
199        start_height: BlockNumberOrTag,
200        end_height: BlockNumberOrTag,
201        response: oneshot::Sender<Result<(), ScannerError>>,
202    },
203    StreamFrom {
204        sender: mpsc::Sender<Message>,
205        start_height: BlockNumberOrTag,
206        block_confirmations: u64,
207        response: oneshot::Sender<Result<(), ScannerError>>,
208    },
209    Rewind {
210        sender: mpsc::Sender<Message>,
211        start_height: BlockNumberOrTag,
212        end_height: BlockNumberOrTag,
213        response: oneshot::Sender<Result<(), ScannerError>>,
214    },
215}
216
217struct Service<N: Network> {
218    provider: RobustProvider<N>,
219    max_block_range: u64,
220    error_count: u64,
221    command_receiver: mpsc::Receiver<Command>,
222    shutdown: bool,
223}
224
225impl<N: Network> Service<N> {
226    pub fn new(provider: RobustProvider<N>, max_block_range: u64) -> (Self, mpsc::Sender<Command>) {
227        let (cmd_tx, cmd_rx) = mpsc::channel(100);
228
229        let service = Self {
230            provider,
231            max_block_range,
232            error_count: 0,
233            command_receiver: cmd_rx,
234            shutdown: false,
235        };
236
237        (service, cmd_tx)
238    }
239
240    pub async fn run(mut self) {
241        info!("Starting subscription service");
242
243        while !self.shutdown {
244            tokio::select! {
245                cmd = self.command_receiver.recv() => {
246                    if let Some(command) = cmd {
247                        if let Err(e) = self.handle_command(command).await {
248                            error!("Command handling error: {}", e);
249                            self.error_count += 1;
250                        }
251                    } else {
252                        warn!("Command channel closed, shutting down");
253                        break;
254                    }
255                }
256            }
257        }
258
259        info!("Subscription service stopped");
260    }
261
262    async fn handle_command(&mut self, command: Command) -> Result<(), ScannerError> {
263        match command {
264            Command::StreamLive { sender, block_confirmations, response } => {
265                info!("Starting live stream");
266                let result = self.handle_live(block_confirmations, sender).await;
267                let _ = response.send(result);
268            }
269            Command::StreamHistorical { sender, start_height, end_height, response } => {
270                info!(start_height = ?start_height, end_height = ?end_height, "Starting historical stream");
271                let result = self.handle_historical(start_height, end_height, sender).await;
272                let _ = response.send(result);
273            }
274            Command::StreamFrom { sender, start_height, block_confirmations, response } => {
275                info!(start_height = ?start_height, "Starting streaming from");
276                let result = self.handle_sync(start_height, block_confirmations, sender).await;
277                let _ = response.send(result);
278            }
279            Command::Rewind { sender, start_height, end_height, response } => {
280                info!(start_height = ?start_height, end_height = ?end_height, "Starting rewind");
281                let result = self.handle_rewind(start_height, end_height, sender).await;
282                let _ = response.send(result);
283            }
284        }
285        Ok(())
286    }
287
288    async fn handle_live(
289        &mut self,
290        block_confirmations: u64,
291        sender: mpsc::Sender<Message>,
292    ) -> Result<(), ScannerError> {
293        let max_block_range = self.max_block_range;
294        let latest = self.provider.get_block_number().await?;
295
296        // the next block returned by the underlying subscription will always be `latest + 1`,
297        // because `latest` was already mined and subscription by definition only streams after new
298        // blocks have been mined
299        let range_start = (latest + 1).saturating_sub(block_confirmations);
300
301        let subscription = self.provider.subscribe_blocks().await?;
302
303        info!("WebSocket connected for live blocks");
304
305        tokio::spawn(async move {
306            Self::stream_live_blocks(
307                range_start,
308                subscription,
309                sender,
310                block_confirmations,
311                max_block_range,
312            )
313            .await;
314        });
315
316        Ok(())
317    }
318
319    async fn handle_historical(
320        &mut self,
321        start_height: BlockNumberOrTag,
322        end_height: BlockNumberOrTag,
323        sender: mpsc::Sender<Message>,
324    ) -> Result<(), ScannerError> {
325        let max_block_range = self.max_block_range;
326
327        let (start_block, end_block) = tokio::try_join!(
328            self.provider.get_block_by_number(start_height),
329            self.provider.get_block_by_number(end_height)
330        )?;
331
332        let start_block_num = start_block.header().number();
333        let end_block_num = end_block.header().number();
334
335        let (start_block_num, end_block_num) = match start_block_num.cmp(&end_block_num) {
336            Ordering::Greater => (end_block_num, start_block_num),
337            _ => (start_block_num, end_block_num),
338        };
339
340        info!(start_block = start_block_num, end_block = end_block_num, "Syncing historical data");
341
342        tokio::spawn(async move {
343            Self::stream_historical_blocks(
344                start_block_num,
345                end_block_num,
346                max_block_range,
347                &sender,
348            )
349            .await;
350        });
351
352        Ok(())
353    }
354
355    async fn handle_sync(
356        &mut self,
357        start_height: BlockNumberOrTag,
358        block_confirmations: u64,
359        sender: mpsc::Sender<Message>,
360    ) -> Result<(), ScannerError> {
361        let provider = self.provider.clone();
362        let max_block_range = self.max_block_range;
363
364        let get_start_block = async || -> Result<BlockNumber, ScannerError> {
365            let block = match start_height {
366                BlockNumberOrTag::Number(num) => num,
367                block_tag => provider.get_block_by_number(block_tag).await?.header().number(),
368            };
369            Ok(block)
370        };
371
372        let get_latest_block = async || -> Result<BlockNumber, ScannerError> {
373            let block =
374                provider.get_block_by_number(BlockNumberOrTag::Latest).await?.header().number();
375            Ok(block)
376        };
377
378        // Step 1:
379        // Fetches the starting block and end block for historical sync in parallel
380        let (start_block, latest_block) = tokio::try_join!(get_start_block(), get_latest_block())?;
381
382        let confirmed_tip = latest_block.saturating_sub(block_confirmations);
383
384        let subscription = self.provider.subscribe_blocks().await?;
385        info!("Buffering live blocks");
386
387        // If start is beyond confirmed tip, skip historical and go straight to live
388        if start_block > confirmed_tip {
389            info!(
390                start_block = start_block,
391                confirmed_tip = confirmed_tip,
392                "Start block is beyond confirmed tip, starting live stream"
393            );
394
395            tokio::spawn(async move {
396                Self::stream_live_blocks(
397                    start_block,
398                    subscription,
399                    sender,
400                    block_confirmations,
401                    max_block_range,
402                )
403                .await;
404            });
405
406            return Ok(());
407        }
408
409        info!(start_block = start_block, end_block = confirmed_tip, "Syncing historical data");
410
411        // Step 2: Setup the live streaming buffer
412        // This channel will accumulate while historical sync is running
413        let (live_block_buffer_sender, live_block_buffer_receiver) =
414            mpsc::channel::<Message>(MAX_BUFFERED_MESSAGES);
415
416        // The cutoff is the last block we have synced historically
417        // Any block > cutoff will come from the live stream
418        let cutoff = confirmed_tip;
419
420        // This task runs independently, accumulating new blocks while wehistorical data is syncing
421        tokio::spawn(async move {
422            Self::stream_live_blocks(
423                cutoff + 1,
424                subscription,
425                live_block_buffer_sender,
426                block_confirmations,
427                max_block_range,
428            )
429            .await;
430        });
431
432        tokio::spawn(async move {
433            // Step 4: Perform historical synchronization
434            // This processes blocks from start_block to end_block (cutoff)
435            // If this fails, we need to abort the live streaming task
436            Self::stream_historical_blocks(start_block, confirmed_tip, max_block_range, &sender)
437                .await;
438
439            info!("Chain tip reached, switching to live");
440
441            if !sender.try_stream(ScannerStatus::SwitchingToLive).await {
442                return;
443            }
444
445            info!("Successfully transitioned from historical to live data");
446
447            // Step 5:
448            // Spawn the buffer processor task
449            // This will:
450            // 1. Process all buffered blocks, filtering out any ≤ cutoff
451            // 2. Forward blocks > cutoff to the user
452            // 3. Continue forwarding until the buffer if exhausted (waits for new blocks from live
453            //    stream)
454            Self::process_live_block_buffer(live_block_buffer_receiver, sender, cutoff).await;
455        });
456
457        Ok(())
458    }
459
460    async fn handle_rewind(
461        &mut self,
462        start_height: BlockNumberOrTag,
463        end_height: BlockNumberOrTag,
464        sender: mpsc::Sender<Message>,
465    ) -> Result<(), ScannerError> {
466        let max_block_range = self.max_block_range;
467        let provider = self.provider.clone();
468
469        let (start_block, end_block) = try_join!(
470            self.provider.get_block_by_number(start_height),
471            self.provider.get_block_by_number(end_height),
472        )?;
473
474        // normalize block range
475        let (from, to) = match start_block.header().number().cmp(&end_block.header().number()) {
476            Ordering::Greater => (start_block, end_block),
477            _ => (end_block, start_block),
478        };
479
480        tokio::spawn(async move {
481            Self::stream_rewind(from, to, max_block_range, &sender, &provider).await;
482        });
483
484        Ok(())
485    }
486
487    /// Streams blocks in reverse order from `from` to `to`.
488    ///
489    /// The `from` block is assumed to be greater than or equal to the `to` block.
490    ///
491    /// # Errors
492    ///
493    /// Returns an error if the stream fails
494    async fn stream_rewind(
495        from: N::BlockResponse,
496        to: N::BlockResponse,
497        max_block_range: u64,
498        sender: &mpsc::Sender<Message>,
499        provider: &RobustProvider<N>,
500    ) {
501        let mut batch_count = 0;
502
503        // for checking whether reorg occurred
504        let mut tip_hash = from.header().hash();
505
506        let from = from.header().number();
507        let to = to.header().number();
508
509        // we're iterating in reverse
510        let mut batch_from = from;
511
512        while batch_from >= to {
513            let batch_to = batch_from.saturating_sub(max_block_range - 1).max(to);
514
515            // stream the range regularly, i.e. from smaller block number to greater
516            if !sender.try_stream(batch_to..=batch_from).await {
517                break;
518            }
519
520            batch_count += 1;
521            if batch_count % 10 == 0 {
522                debug!(batch_count = batch_count, "Processed rewind batches");
523            }
524
525            // check early if end of stream achieved to avoid subtraction overflow when `to
526            // == 0`
527            if batch_to == to {
528                break;
529            }
530
531            let reorged = match reorg_detected(provider, tip_hash).await {
532                Ok(detected) => {
533                    info!(block_number = %from, hash = %tip_hash, "Reorg detected");
534                    detected
535                }
536                Err(e) => {
537                    error!(error = %e, "Terminal RPC call error, shutting down");
538                    _ = sender.try_stream(e);
539                    return;
540                }
541            };
542
543            if reorged {
544                info!(block_number = %from, hash = %tip_hash, "Reorg detected");
545
546                if !sender.try_stream(ScannerStatus::ReorgDetected).await {
547                    break;
548                }
549
550                // restart rewind
551                batch_from = from;
552                // store the updated end block hash
553                tip_hash = match provider.get_block_by_number(from.into()).await {
554                    Ok(block) => block.header().hash(),
555                    Err(RobustProviderError::BlockNotFound(_)) => {
556                        panic!("Block with number '{from}' should exist post-reorg");
557                    }
558                    Err(e) => {
559                        error!(error = %e, "Terminal RPC call error, shutting down");
560                        _ = sender.try_stream(e);
561                        return;
562                    }
563                };
564            } else {
565                // `batch_to` is always greater than `to`, so `batch_to - 1` is always a valid
566                // unsigned integer
567                batch_from = batch_to - 1;
568            }
569        }
570
571        info!(batch_count = batch_count, "Rewind completed");
572    }
573
574    async fn stream_historical_blocks(
575        start: BlockNumber,
576        end: BlockNumber,
577        max_block_range: u64,
578        sender: &mpsc::Sender<Message>,
579    ) {
580        let mut batch_count = 0;
581
582        let mut next_start_block = start;
583
584        // must be <= to include the edge case when start == end (i.e. return the single block
585        // range)
586        while next_start_block <= end {
587            let batch_end_block_number =
588                next_start_block.saturating_add(max_block_range - 1).min(end);
589
590            if !sender.try_stream(next_start_block..=batch_end_block_number).await {
591                break;
592            }
593
594            batch_count += 1;
595            if batch_count % 10 == 0 {
596                debug!(batch_count = batch_count, "Processed historical batches");
597            }
598
599            if batch_end_block_number == end {
600                break;
601            }
602
603            // Next block number always exists as we checked end block previously
604            let next_start_block_number = batch_end_block_number.saturating_add(1);
605
606            next_start_block = next_start_block_number;
607        }
608
609        info!(batch_count = batch_count, "Historical sync completed");
610    }
611
612    async fn stream_live_blocks(
613        mut range_start: BlockNumber,
614        subscription: Subscription<N::HeaderResponse>,
615        sender: mpsc::Sender<Message>,
616        block_confirmations: u64,
617        max_block_range: u64,
618    ) {
619        // ensure we start streaming only after the expected_next_block cutoff
620        let cutoff = range_start;
621        let mut stream = subscription.into_stream().skip_while(|header| header.number() < cutoff);
622
623        while let Some(incoming_block) = stream.next().await {
624            let incoming_block_num = incoming_block.number();
625            info!(block_number = incoming_block_num, "Received block header");
626
627            if incoming_block_num < range_start {
628                warn!("Reorg detected: sending forked range");
629                if !sender.try_stream(ScannerStatus::ReorgDetected).await {
630                    return;
631                }
632
633                // Calculate the confirmed block position for the incoming block
634                let incoming_confirmed = incoming_block_num.saturating_sub(block_confirmations);
635
636                // updated expected block to updated confirmed
637                range_start = incoming_confirmed;
638            }
639
640            let confirmed = incoming_block_num.saturating_sub(block_confirmations);
641            if confirmed >= range_start {
642                // NOTE: Edge case when difference between range end and range start >= max
643                // reads
644                let range_end = confirmed.min(range_start.saturating_add(max_block_range - 1));
645
646                info!(range_start = range_start, range_end = range_end, "Sending live block range");
647
648                if !sender.try_stream(range_start..=range_end).await {
649                    return;
650                }
651
652                // Overflow can not realistically happen
653                range_start = range_end + 1;
654            }
655        }
656    }
657
658    async fn process_live_block_buffer(
659        mut buffer_rx: mpsc::Receiver<Message>,
660        sender: mpsc::Sender<Message>,
661        cutoff: BlockNumber,
662    ) {
663        let mut processed = 0;
664        let mut discarded = 0;
665
666        // Process all buffered messages
667        while let Some(data) = buffer_rx.recv().await {
668            match data {
669                Message::Data(range) => {
670                    let (start, end) = (*range.start(), *range.end());
671                    if start >= cutoff {
672                        if !sender.try_stream(range).await {
673                            break;
674                        }
675                        processed += end - start;
676                    } else if end >= cutoff {
677                        discarded += cutoff - start;
678
679                        let start = cutoff;
680                        if !sender.try_stream(start..=end).await {
681                            break;
682                        }
683                        processed += end - start;
684                    } else {
685                        discarded += end - start;
686                    }
687                }
688                other => {
689                    // Could be error or status
690                    if !sender.try_stream(other).await {
691                        break;
692                    }
693                }
694            }
695        }
696
697        info!(processed = processed, discarded = discarded, "Processed buffered messages");
698    }
699}
700
701async fn reorg_detected<N: Network>(
702    provider: &RobustProvider<N>,
703    hash_to_check: B256,
704) -> Result<bool, ScannerError> {
705    match provider.get_block_by_hash(hash_to_check).await {
706        Ok(_) => Ok(false),
707        Err(RobustProviderError::BlockNotFound(_)) => Ok(true),
708        Err(e) => Err(e.into()),
709    }
710}
711
712pub struct BlockRangeScannerClient {
713    command_sender: mpsc::Sender<Command>,
714}
715
716impl BlockRangeScannerClient {
717    /// Creates a new subscription client.
718    ///
719    /// # Arguments
720    ///
721    /// * `command_sender` - The sender for sending commands to the subscription service.
722    #[must_use]
723    pub fn new(command_sender: mpsc::Sender<Command>) -> Self {
724        Self { command_sender }
725    }
726
727    /// Streams live blocks starting from the latest block.
728    ///
729    /// # Arguments
730    ///
731    /// * `block_confirmations` - Number of confirmations to apply once in live mode.
732    ///
733    /// # Errors
734    ///
735    /// * `ScannerError::ServiceShutdown` - if the service is already shutting down.
736    pub async fn stream_live(
737        &self,
738        block_confirmations: u64,
739    ) -> Result<ReceiverStream<Message>, ScannerError> {
740        let (blocks_sender, blocks_receiver) = mpsc::channel(MAX_BUFFERED_MESSAGES);
741        let (response_tx, response_rx) = oneshot::channel();
742
743        let command = Command::StreamLive {
744            sender: blocks_sender,
745            block_confirmations,
746            response: response_tx,
747        };
748
749        self.command_sender.send(command).await.map_err(|_| ScannerError::ServiceShutdown)?;
750
751        response_rx.await.map_err(|_| ScannerError::ServiceShutdown)??;
752
753        Ok(ReceiverStream::new(blocks_receiver))
754    }
755
756    /// Streams a batch of historical blocks from `start_height` to `end_height`.
757    ///
758    /// # Arguments
759    ///
760    /// * `start_height` - The starting block number or tag.
761    /// * `end_height` - The ending block number or tag.
762    ///
763    /// # Errors
764    ///
765    /// * `ScannerError::ServiceShutdown` - if the service is already shutting down.
766    pub async fn stream_historical(
767        &self,
768        start_height: impl Into<BlockNumberOrTag>,
769        end_height: impl Into<BlockNumberOrTag>,
770    ) -> Result<ReceiverStream<Message>, ScannerError> {
771        let (blocks_sender, blocks_receiver) = mpsc::channel(MAX_BUFFERED_MESSAGES);
772        let (response_tx, response_rx) = oneshot::channel();
773
774        let command = Command::StreamHistorical {
775            sender: blocks_sender,
776            start_height: start_height.into(),
777            end_height: end_height.into(),
778            response: response_tx,
779        };
780
781        self.command_sender.send(command).await.map_err(|_| ScannerError::ServiceShutdown)?;
782
783        response_rx.await.map_err(|_| ScannerError::ServiceShutdown)??;
784
785        Ok(ReceiverStream::new(blocks_receiver))
786    }
787
788    /// Streams blocks starting from `start_height` and transitions to live mode.
789    ///
790    /// # Arguments
791    ///
792    /// * `start_height` - The starting block number or tag.
793    /// * `block_confirmations` - Number of confirmations to apply once in live mode.
794    ///
795    /// # Errors
796    ///
797    /// * `ScannerError::ServiceShutdown` - if the service is already shutting down.
798    pub async fn stream_from(
799        &self,
800        start_height: impl Into<BlockNumberOrTag>,
801        block_confirmations: u64,
802    ) -> Result<ReceiverStream<Message>, ScannerError> {
803        let (blocks_sender, blocks_receiver) = mpsc::channel(MAX_BUFFERED_MESSAGES);
804        let (response_tx, response_rx) = oneshot::channel();
805
806        let command = Command::StreamFrom {
807            sender: blocks_sender,
808            start_height: start_height.into(),
809            block_confirmations,
810            response: response_tx,
811        };
812
813        self.command_sender.send(command).await.map_err(|_| ScannerError::ServiceShutdown)?;
814
815        response_rx.await.map_err(|_| ScannerError::ServiceShutdown)??;
816
817        Ok(ReceiverStream::new(blocks_receiver))
818    }
819
820    /// Streams blocks in reverse order from `start_height` to `end_height`.
821    ///
822    /// # Arguments
823    ///
824    /// * `start_height` - The starting block number or tag (defaults to Latest if None).
825    /// * `end_height` - The ending block number or tag (defaults to Earliest if None).
826    ///
827    /// # Errors
828    ///
829    /// * `ScannerError::ServiceShutdown` - if the service is already shutting down.
830    pub async fn rewind(
831        &self,
832        start_height: impl Into<BlockNumberOrTag>,
833        end_height: impl Into<BlockNumberOrTag>,
834    ) -> Result<ReceiverStream<Message>, ScannerError> {
835        let (blocks_sender, blocks_receiver) = mpsc::channel(MAX_BUFFERED_MESSAGES);
836        let (response_tx, response_rx) = oneshot::channel();
837
838        let command = Command::Rewind {
839            sender: blocks_sender,
840            start_height: start_height.into(),
841            end_height: end_height.into(),
842            response: response_tx,
843        };
844
845        self.command_sender.send(command).await.map_err(|_| ScannerError::ServiceShutdown)?;
846
847        response_rx.await.map_err(|_| ScannerError::ServiceShutdown)??;
848
849        Ok(ReceiverStream::new(blocks_receiver))
850    }
851}
852
853#[cfg(test)]
854mod tests {
855    use super::*;
856    use crate::{assert_closed, assert_next};
857    use alloy::{eips::BlockId, network::Ethereum};
858    use tokio::sync::mpsc;
859
860    #[test]
861    fn block_range_scanner_defaults_match_constants() {
862        let scanner = BlockRangeScanner::new();
863
864        assert_eq!(scanner.max_block_range, DEFAULT_MAX_BLOCK_RANGE);
865    }
866
867    #[test]
868    fn builder_methods_update_configuration() {
869        let max_block_range = 42;
870
871        let scanner = BlockRangeScanner::new().max_block_range(max_block_range);
872
873        assert_eq!(scanner.max_block_range, max_block_range);
874    }
875
876    #[tokio::test]
877    async fn buffered_messages_after_cutoff_are_all_passed() {
878        let cutoff = 50;
879        let (buffer_tx, buffer_rx) = mpsc::channel(8);
880        buffer_tx.send(Message::Data(51..=55)).await.unwrap();
881        buffer_tx.send(Message::Data(56..=60)).await.unwrap();
882        buffer_tx.send(Message::Data(61..=70)).await.unwrap();
883        drop(buffer_tx);
884
885        let (out_tx, out_rx) = mpsc::channel(8);
886        Service::<Ethereum>::process_live_block_buffer(buffer_rx, out_tx, cutoff).await;
887
888        let mut stream = ReceiverStream::new(out_rx);
889
890        assert_next!(stream, 51..=55);
891        assert_next!(stream, 56..=60);
892        assert_next!(stream, 61..=70);
893        assert_closed!(stream);
894    }
895
896    #[tokio::test]
897    async fn ranges_entirely_before_cutoff_are_discarded() {
898        let cutoff = 100;
899
900        let (buffer_tx, buffer_rx) = mpsc::channel(8);
901        buffer_tx.send(Message::Data(40..=50)).await.unwrap();
902        buffer_tx.send(Message::Data(51..=60)).await.unwrap();
903        buffer_tx.send(Message::Data(61..=70)).await.unwrap();
904        drop(buffer_tx);
905
906        let (out_tx, out_rx) = mpsc::channel(8);
907        Service::<Ethereum>::process_live_block_buffer(buffer_rx, out_tx, cutoff).await;
908
909        let mut stream = ReceiverStream::new(out_rx);
910
911        assert_closed!(stream);
912    }
913
914    #[tokio::test]
915    async fn ranges_overlapping_cutoff_are_trimmed() {
916        let cutoff = 75;
917
918        let (buffer_tx, buffer_rx) = mpsc::channel(8);
919        buffer_tx.send(Message::Data(60..=70)).await.unwrap();
920        buffer_tx.send(Message::Data(71..=80)).await.unwrap();
921        buffer_tx.send(Message::Data(81..=86)).await.unwrap();
922        drop(buffer_tx);
923
924        let (out_tx, out_rx) = mpsc::channel(8);
925        Service::<Ethereum>::process_live_block_buffer(buffer_rx, out_tx, cutoff).await;
926
927        let mut stream = ReceiverStream::new(out_rx);
928
929        assert_next!(stream, 75..=80);
930        assert_next!(stream, 81..=86);
931        assert_closed!(stream);
932    }
933
934    #[tokio::test]
935    async fn edge_case_range_exactly_at_cutoff() {
936        let cutoff = 100;
937
938        let (buffer_tx, buffer_rx) = mpsc::channel(8);
939        buffer_tx.send(Message::Data(98..=98)).await.unwrap(); // Just before: discard
940        buffer_tx.send(Message::Data(99..=100)).await.unwrap(); // Includes cutoff: trim to 100..=100
941        buffer_tx.send(Message::Data(100..=100)).await.unwrap(); // Exactly at: forward
942        buffer_tx.send(Message::Data(100..=101)).await.unwrap(); // Starts at cutoff: forward
943        buffer_tx.send(Message::Data(102..=102)).await.unwrap(); // After cutoff: forward
944        drop(buffer_tx);
945
946        let (out_tx, out_rx) = mpsc::channel(8);
947        Service::<Ethereum>::process_live_block_buffer(buffer_rx, out_tx, cutoff).await;
948
949        let mut stream = ReceiverStream::new(out_rx);
950
951        assert_next!(stream, 100..=100);
952        assert_next!(stream, 100..=100);
953        assert_next!(stream, 100..=101);
954        assert_next!(stream, 102..=102);
955        assert_closed!(stream);
956    }
957
958    #[tokio::test]
959    async fn try_send_forwards_errors_to_subscribers() {
960        let (tx, mut rx) = mpsc::channel::<Message>(1);
961
962        _ = tx.try_stream(ScannerError::BlockNotFound(4.into())).await;
963
964        assert!(matches!(
965            rx.recv().await,
966            Some(ScannerMessage::Error(ScannerError::BlockNotFound(BlockId::Number(
967                BlockNumberOrTag::Number(4)
968            ))))
969        ));
970    }
971}