event_scanner/block_range_scanner/
common.rs

1use std::ops::RangeInclusive;
2
3use robust_provider::{RobustProvider, RobustSubscription, subscription};
4use tokio::sync::mpsc;
5use tokio_stream::StreamExt;
6
7use crate::{
8    ScannerError, ScannerMessage,
9    block_range_scanner::{range_iterator::RangeIterator, reorg_handler::ReorgHandler},
10    types::{IntoScannerResult, Notification, ScannerResult, TryStream},
11};
12use alloy::{
13    consensus::BlockHeader,
14    eips::BlockNumberOrTag,
15    network::{BlockResponse, Network},
16    primitives::BlockNumber,
17};
18
19/// Default maximum number of blocks per streamed range.
20pub const DEFAULT_MAX_BLOCK_RANGE: u64 = 1000;
21
22/// Default confirmation depth used by scanners that accept a `block_confirmations` setting.
23pub const DEFAULT_BLOCK_CONFIRMATIONS: u64 = 0;
24
25/// Default per-stream buffer size used by scanners.
26pub const DEFAULT_STREAM_BUFFER_CAPACITY: usize = 50000;
27
28/// The result type yielded by block-range streams.
29pub type BlockScannerResult = ScannerResult<RangeInclusive<BlockNumber>>;
30
31/// Convenience alias for a streamed block-range message.
32pub type Message = ScannerMessage<RangeInclusive<BlockNumber>>;
33
34impl From<RangeInclusive<BlockNumber>> for Message {
35    fn from(range: RangeInclusive<BlockNumber>) -> Self {
36        Message::Data(range)
37    }
38}
39
40impl PartialEq<RangeInclusive<BlockNumber>> for Message {
41    fn eq(&self, other: &RangeInclusive<BlockNumber>) -> bool {
42        if let Message::Data(range) = self { range.eq(other) } else { false }
43    }
44}
45
46impl IntoScannerResult<RangeInclusive<BlockNumber>> for RangeInclusive<BlockNumber> {
47    fn into_scanner_message_result(self) -> BlockScannerResult {
48        Ok(Message::Data(self))
49    }
50}
51
52#[allow(clippy::too_many_arguments)]
53#[cfg_attr(
54    feature = "tracing",
55    tracing::instrument(level = "trace", skip(subscription, sender, provider, reorg_handler))
56)]
57pub(crate) async fn stream_live_blocks<N: Network>(
58    stream_start: BlockNumber,
59    subscription: RobustSubscription<N>,
60    sender: &mpsc::Sender<BlockScannerResult>,
61    provider: &RobustProvider<N>,
62    block_confirmations: u64,
63    max_block_range: u64,
64    reorg_handler: &mut ReorgHandler<N>,
65    notify_after_first_block: bool,
66) {
67    // Phase 1: Wait for first relevant block
68    let mut stream =
69        skip_to_first_relevant_block::<N>(subscription, stream_start, block_confirmations);
70
71    let Some(first_block) = get_first_block::<N, _>(&mut stream, sender).await else {
72        // error occurred and streamed
73        return;
74    };
75
76    debug!(
77        first_block = first_block.number(),
78        stream_start = stream_start,
79        "Received first relevant block, starting live streaming"
80    );
81
82    // This check is necessary when running `sync` modes. It makes sense to stream this notification
83    // only once the first relevant block is received from the subscription, and not before that;
84    // otherwise callers might perform certain operations expecting the relevant blocks to start
85    // coming, when in fact they are not.
86    if notify_after_first_block && !sender.try_stream(Notification::SwitchingToLive).await {
87        return;
88    }
89
90    // Phase 2: Initialize streaming state with first block
91    let Some(mut state) = initialize_live_streaming_state(
92        first_block,
93        stream_start,
94        block_confirmations,
95        max_block_range,
96        sender,
97        provider,
98        reorg_handler,
99    )
100    .await
101    else {
102        return;
103    };
104
105    // Phase 3: Continuously stream blocks with reorg handling
106    stream_blocks_continuously(
107        &mut stream,
108        &mut state,
109        stream_start,
110        block_confirmations,
111        max_block_range,
112        sender,
113        provider,
114        reorg_handler,
115    )
116    .await;
117}
118
119async fn get_first_block<
120    N: Network,
121    S: tokio_stream::Stream<Item = Result<N::HeaderResponse, subscription::Error>> + Unpin,
122>(
123    stream: &mut S,
124    sender: &mpsc::Sender<BlockScannerResult>,
125) -> Option<N::HeaderResponse> {
126    while let Some(first_block) = stream.next().await {
127        match first_block {
128            Ok(block) => return Some(block),
129            Err(e) => {
130                match e {
131                    subscription::Error::Lagged(_) => {
132                        // scanner already accounts for skipped block numbers
133                        // next block will be the actual incoming block
134                    }
135                    subscription::Error::Timeout => {
136                        _ = sender.try_stream(ScannerError::Timeout).await;
137                        break;
138                    }
139                    subscription::Error::RpcError(rpc_err) => {
140                        _ = sender.try_stream(ScannerError::RpcError(rpc_err)).await;
141                        break;
142                    }
143                    subscription::Error::Closed => {
144                        _ = sender.try_stream(ScannerError::SubscriptionClosed).await;
145                        break;
146                    }
147                }
148            }
149        }
150    }
151
152    None
153}
154
155/// Skips blocks until we reach the first block that's relevant for streaming
156fn skip_to_first_relevant_block<N: Network>(
157    subscription: RobustSubscription<N>,
158    stream_start: BlockNumber,
159    block_confirmations: u64,
160) -> impl tokio_stream::Stream<Item = Result<N::HeaderResponse, subscription::Error>> {
161    subscription.into_stream().skip_while(move |header| match header {
162        Ok(header) => header.number().saturating_sub(block_confirmations) < stream_start,
163        Err(subscription::Error::Lagged(_)) => true,
164        Err(_) => false,
165    })
166}
167
168/// Initializes the streaming state after receiving the first block.
169/// Returns None if the channel is closed.
170async fn initialize_live_streaming_state<N: Network>(
171    first_block: N::HeaderResponse,
172    stream_start: BlockNumber,
173    block_confirmations: u64,
174    max_block_range: u64,
175    sender: &mpsc::Sender<BlockScannerResult>,
176    provider: &RobustProvider<N>,
177    reorg_handler: &mut ReorgHandler<N>,
178) -> Option<LiveStreamingState<N>> {
179    let confirmed = first_block.number().saturating_sub(block_confirmations);
180
181    // The minimum common ancestor is the block before the stream start
182    let min_common_ancestor = stream_start.saturating_sub(1);
183
184    // Catch up on any confirmed blocks between stream_start and the confirmed tip
185    let previous_batch_end = stream_range_with_reorg_handling(
186        min_common_ancestor,
187        stream_start,
188        confirmed,
189        max_block_range,
190        sender,
191        provider,
192        reorg_handler,
193    )
194    .await?;
195
196    Some(LiveStreamingState {
197        batch_start: stream_start,
198        previous_batch_end: Some(previous_batch_end),
199    })
200}
201
202/// Continuously streams blocks, handling reorgs as they occur
203#[allow(clippy::too_many_arguments)]
204async fn stream_blocks_continuously<
205    N: Network,
206    S: tokio_stream::Stream<Item = Result<N::HeaderResponse, subscription::Error>> + Unpin,
207>(
208    stream: &mut S,
209    state: &mut LiveStreamingState<N>,
210    stream_start: BlockNumber,
211    block_confirmations: u64,
212    max_block_range: u64,
213    sender: &mpsc::Sender<BlockScannerResult>,
214    provider: &RobustProvider<N>,
215    reorg_handler: &mut ReorgHandler<N>,
216) {
217    while let Some(incoming_block) = stream.next().await {
218        let incoming_block = match incoming_block {
219            Ok(block) => block,
220            Err(e) => {
221                match e {
222                    subscription::Error::Lagged(_) => {
223                        // scanner already accounts for skipped block numbers,
224                        // next block will be the actual incoming block
225                        continue;
226                    }
227                    subscription::Error::Timeout => {
228                        _ = sender.try_stream(ScannerError::Timeout).await;
229                        return;
230                    }
231                    subscription::Error::RpcError(rpc_err) => {
232                        _ = sender.try_stream(ScannerError::RpcError(rpc_err)).await;
233                        return;
234                    }
235                    subscription::Error::Closed => {
236                        _ = sender.try_stream(ScannerError::SubscriptionClosed).await;
237                        return;
238                    }
239                }
240            }
241        };
242
243        let incoming_block = incoming_block.number();
244        trace!(received = incoming_block, "Received item from block subscription");
245
246        let Some(previous_batch_end) = state.previous_batch_end.as_ref() else {
247            // previously detected reorg wasn't fully handled
248            continue;
249        };
250
251        let common_ancestor = match reorg_handler.check(previous_batch_end).await {
252            Ok(reorg_opt) => reorg_opt,
253            Err(e) => {
254                error!("Failed to perform reorg check");
255                _ = sender.try_stream(e).await;
256                return;
257            }
258        };
259
260        if let Some(common_ancestor) = common_ancestor {
261            if !handle_reorg_detected(common_ancestor, stream_start, state, sender).await {
262                return; // Channel closed
263            }
264        } else {
265            // No reorg: advance batch_start to after the previous batch
266            state.batch_start = previous_batch_end.header().number() + 1;
267        }
268
269        // Stream the next batch of confirmed blocks
270        let batch_end_num = incoming_block.saturating_sub(block_confirmations);
271        if !stream_next_batch(
272            batch_end_num,
273            state,
274            stream_start,
275            max_block_range,
276            sender,
277            provider,
278            reorg_handler,
279        )
280        .await
281        {
282            return; // Channel closed
283        }
284    }
285}
286
287/// Handles a detected reorg by notifying and adjusting the streaming state
288/// Returns false if the channel is closed
289async fn handle_reorg_detected<N: Network>(
290    common_ancestor: N::BlockResponse,
291    stream_start: BlockNumber,
292    state: &mut LiveStreamingState<N>,
293    sender: &mpsc::Sender<BlockScannerResult>,
294) -> bool {
295    let ancestor_num = common_ancestor.header().number();
296
297    info!(
298        common_ancestor = ancestor_num,
299        stream_start = stream_start,
300        "Reorg detected during live streaming"
301    );
302
303    if !sender.try_stream(Notification::ReorgDetected { common_ancestor: ancestor_num }).await {
304        return false;
305    }
306
307    // Reset streaming position based on common ancestor
308    if ancestor_num < stream_start {
309        // Reorg went before our starting point - restart from stream_start
310        debug!(
311            common_ancestor = ancestor_num,
312            stream_start = stream_start,
313            "Reorg predates stream start, restarting from stream_start"
314        );
315        state.batch_start = stream_start;
316        state.previous_batch_end = None;
317    } else {
318        // Resume from after the common ancestor
319        debug!(
320            common_ancestor = ancestor_num,
321            resume_from = ancestor_num + 1,
322            "Resuming from after common ancestor"
323        );
324        state.batch_start = ancestor_num + 1;
325        state.previous_batch_end = Some(common_ancestor);
326    }
327
328    true
329}
330
331/// Streams the next batch of blocks up to `batch_end_num`.
332/// Returns false if the channel is closed
333async fn stream_next_batch<N: Network>(
334    batch_end_num: BlockNumber,
335    state: &mut LiveStreamingState<N>,
336    stream_start: BlockNumber,
337    max_block_range: u64,
338    sender: &mpsc::Sender<BlockScannerResult>,
339    provider: &RobustProvider<N>,
340    reorg_handler: &mut ReorgHandler<N>,
341) -> bool {
342    if batch_end_num < state.batch_start {
343        // No new confirmed blocks to stream yet
344        return true;
345    }
346
347    // The minimum common ancestor is the block before the stream start
348    let min_common_ancestor = stream_start.saturating_sub(1);
349
350    state.previous_batch_end = stream_range_with_reorg_handling(
351        min_common_ancestor,
352        state.batch_start,
353        batch_end_num,
354        max_block_range,
355        sender,
356        provider,
357        reorg_handler,
358    )
359    .await;
360
361    if state.previous_batch_end.is_none() {
362        // Channel closed
363        return false;
364    }
365
366    // SAFETY: Overflow cannot realistically happen
367    state.batch_start = batch_end_num + 1;
368
369    true
370}
371
372/// Tracks the current state of live streaming
373struct LiveStreamingState<N: Network> {
374    /// The starting block number for the next batch to stream
375    batch_start: BlockNumber,
376    /// The last block from the previous batch (used for reorg detection)
377    previous_batch_end: Option<N::BlockResponse>,
378}
379
380#[must_use]
381#[cfg_attr(
382    feature = "tracing",
383    tracing::instrument(level = "trace", skip(sender, provider, reorg_handler))
384)]
385pub(crate) async fn stream_historical_range<N: Network>(
386    start: BlockNumber,
387    end: BlockNumber,
388    max_block_range: u64,
389    sender: &mpsc::Sender<BlockScannerResult>,
390    provider: &RobustProvider<N>,
391    reorg_handler: &mut ReorgHandler<N>,
392) -> Option<()> {
393    // NOTE: Edge case - If the chain is too young to expose finalized blocks (height < finalized
394    // depth) just use zero.
395    // Since we use the finalized block number only to determine whether to run reorg checks
396    // or not, this is a "low-stakes" RPC call, for which, for simplicity, we can default to `0`
397    // even on errors. Here `0` is used because it effectively just enables reorg checks.
398    // If there was actually a provider problem, any subsequent provider call will catch and
399    // properly log it and return the error to the caller.
400    let finalized_block_num =
401        provider.get_block_number_by_id(BlockNumberOrTag::Finalized.into()).await.unwrap_or(0);
402
403    // no reorg check for finalized blocks
404    let finalized_batch_end = finalized_block_num.min(end);
405    let finalized_range_count =
406        RangeIterator::forward(start, finalized_batch_end, max_block_range).count();
407    trace!(
408        start = start,
409        finalized_batch_end = finalized_batch_end,
410        batch_count = finalized_range_count,
411        "Streaming finalized blocks (no reorg check)"
412    );
413
414    for range in RangeIterator::forward(start, finalized_batch_end, max_block_range) {
415        trace!(range_start = *range.start(), range_end = *range.end(), "Streaming finalized range");
416        if !sender.try_stream(range).await {
417            return None; // channel closed
418        }
419    }
420
421    // If start > finalized_batch_end, the loop above was empty and we should
422    // continue from start. Otherwise, continue from after finalized_batch_end.
423    let batch_start = start.max(finalized_batch_end + 1);
424
425    // covers case when `end <= finalized`
426    if batch_start > end {
427        return Some(()); // we're done
428    }
429
430    // we have non-finalized block numbers to stream, a reorg can occur
431
432    // Possible minimal common ancestors when a reorg occurs:
433    // * start > finalized -> the common ancestor we care about is the block before `start`, that's
434    //   where the stream should restart -> this is why we used `start - 1`
435    // * start == finalized -> `start` should never be re-streamed on reorgs; stream should restart
436    //   on `start + 1`
437    // * start < finalized -> if we got here, then `end > finalized`; on reorg, we should only
438    //   re-stream non-finalized blocks
439    let min_common_ancestor = (start.saturating_sub(1)).max(finalized_block_num);
440
441    stream_range_with_reorg_handling(
442        min_common_ancestor,
443        batch_start,
444        end,
445        max_block_range,
446        sender,
447        provider,
448        reorg_handler,
449    )
450    .await?;
451
452    Some(())
453}
454
455/// Assumes that `min_common_ancestor <= next_start_block <= end`, performs no internal checks.
456#[cfg_attr(
457    feature = "tracing",
458    tracing::instrument(level = "trace", skip(sender, provider, reorg_handler))
459)]
460pub(crate) async fn stream_range_with_reorg_handling<N: Network>(
461    min_common_ancestor: BlockNumber,
462    next_start_block: BlockNumber,
463    end: BlockNumber,
464    max_block_range: u64,
465    sender: &mpsc::Sender<BlockScannerResult>,
466    provider: &RobustProvider<N>,
467    reorg_handler: &mut ReorgHandler<N>,
468) -> Option<N::BlockResponse> {
469    let mut last_batch_end: Option<N::BlockResponse> = None;
470    let mut iter = RangeIterator::forward(next_start_block, end, max_block_range);
471
472    while let Some(batch) = iter.next() {
473        let batch_end_num = *batch.end();
474        let batch_end = match provider.get_block_by_number(batch_end_num.into()).await {
475            Ok(block) => block,
476            Err(e) => {
477                error!(
478                    batch_start = batch.start(),
479                    batch_end = batch_end_num,
480                    "Failed to get ending block of the current batch"
481                );
482                _ = sender.try_stream(e).await;
483                return None;
484            }
485        };
486
487        if !sender.try_stream(batch).await {
488            return None; // channel closed
489        }
490
491        let reorged_opt = match reorg_handler.check(&batch_end).await {
492            Ok(opt) => opt,
493            Err(e) => {
494                error!("Failed to perform reorg check");
495                _ = sender.try_stream(e).await;
496                return None;
497            }
498        };
499
500        if let Some(common_ancestor) = reorged_opt {
501            let common_ancestor = common_ancestor.header().number();
502            info!(
503                common_ancestor = common_ancestor,
504                "Reorg detected during historical streaming, resetting range iterator"
505            );
506            if !sender.try_stream(Notification::ReorgDetected { common_ancestor }).await {
507                return None;
508            }
509            let reset_to = (common_ancestor + 1).max(min_common_ancestor);
510            debug!(reset_to = reset_to, "Resetting range iterator after reorg");
511            iter.reset_to(reset_to);
512        }
513
514        last_batch_end = Some(batch_end);
515    }
516
517    last_batch_end
518}