Skip to main content

event_scanner/block_range_scanner/
common.rs

1use std::{ops::RangeInclusive, sync::Arc};
2
3use robust_provider::{Error as SubscriptionError, RobustProvider, RobustSubscription};
4use tokio::sync::mpsc;
5use tokio_stream::StreamExt;
6
7use crate::{
8    ScannerError, ScannerMessage,
9    block_range_scanner::{range_iterator::RangeIterator, reorg_handler::ReorgHandler},
10    types::{ChannelState, IntoScannerResult, Notification, ScannerResult, TryStream},
11};
12use alloy::{
13    consensus::BlockHeader,
14    network::{BlockResponse, Network},
15    primitives::BlockNumber,
16};
17
18/// Default maximum number of blocks per streamed range.
19pub const DEFAULT_MAX_BLOCK_RANGE: u64 = 1000;
20
21/// Default confirmation depth used by scanners that accept a `block_confirmations` setting.
22pub const DEFAULT_BLOCK_CONFIRMATIONS: u64 = 0;
23
24/// Default per-stream buffer size used by scanners.
25pub const DEFAULT_STREAM_BUFFER_CAPACITY: usize = 50000;
26
27/// The result type yielded by block-range streams.
28pub type BlockScannerResult = ScannerResult<RangeInclusive<BlockNumber>>;
29
30/// Convenience alias for a streamed block-range message.
31pub type Message = ScannerMessage<RangeInclusive<BlockNumber>>;
32
33impl From<RangeInclusive<BlockNumber>> for Message {
34    fn from(range: RangeInclusive<BlockNumber>) -> Self {
35        Message::Data(range)
36    }
37}
38
39impl PartialEq<RangeInclusive<BlockNumber>> for Message {
40    fn eq(&self, other: &RangeInclusive<BlockNumber>) -> bool {
41        if let Message::Data(range) = self { range.eq(other) } else { false }
42    }
43}
44
45impl IntoScannerResult<RangeInclusive<BlockNumber>> for RangeInclusive<BlockNumber> {
46    fn into_scanner_message_result(self) -> BlockScannerResult {
47        Ok(Message::Data(self))
48    }
49}
50
51#[allow(clippy::too_many_arguments)]
52#[cfg_attr(
53    feature = "tracing",
54    tracing::instrument(level = "trace", skip(subscription, sender, provider, reorg_handler))
55)]
56pub(crate) async fn stream_live_blocks<N: Network, R: ReorgHandler<N>>(
57    stream_start: BlockNumber,
58    subscription: RobustSubscription<N>,
59    sender: &mpsc::Sender<BlockScannerResult>,
60    provider: &RobustProvider<N>,
61    block_confirmations: u64,
62    max_block_range: u64,
63    reorg_handler: &mut R,
64    notify_after_first_block: bool,
65) {
66    // Phase 1: Wait for first relevant block
67    let mut stream =
68        skip_to_first_relevant_block::<N>(subscription, stream_start, block_confirmations);
69
70    let Some(first_block) = get_first_block::<N, _>(&mut stream, sender).await else {
71        // error occurred and streamed
72        return;
73    };
74
75    debug!(
76        first_block = first_block.number(),
77        stream_start = stream_start,
78        "Received first relevant block, starting live streaming"
79    );
80
81    // This check is necessary when running `sync` modes. It makes sense to stream this notification
82    // only once the first relevant block is received from the subscription, and not before that;
83    // otherwise callers might perform certain operations expecting the relevant blocks to start
84    // coming, when in fact they are not.
85    if notify_after_first_block &&
86        sender.try_stream(Notification::SwitchingToLive).await.is_closed()
87    {
88        return;
89    }
90
91    // Phase 2: Initialize streaming state with first block
92    let Some(mut state) = initialize_live_streaming_state(
93        first_block,
94        stream_start,
95        block_confirmations,
96        max_block_range,
97        sender,
98        provider,
99        reorg_handler,
100    )
101    .await
102    else {
103        return;
104    };
105
106    // Phase 3: Continuously stream blocks with reorg handling
107    stream_blocks_continuously(
108        &mut stream,
109        &mut state,
110        stream_start,
111        block_confirmations,
112        max_block_range,
113        sender,
114        provider,
115        reorg_handler,
116    )
117    .await;
118}
119
120async fn get_first_block<
121    N: Network,
122    S: tokio_stream::Stream<Item = Result<N::HeaderResponse, SubscriptionError>> + Unpin,
123>(
124    stream: &mut S,
125    sender: &mpsc::Sender<BlockScannerResult>,
126) -> Option<N::HeaderResponse> {
127    while let Some(first_block) = stream.next().await {
128        match first_block {
129            Ok(block) => return Some(block),
130            Err(e) => {
131                match e {
132                    SubscriptionError::Lagged(_) => {
133                        // scanner already accounts for skipped block numbers
134                        // next block will be the actual incoming block
135                    }
136                    SubscriptionError::Timeout => {
137                        _ = sender.try_stream(ScannerError::Timeout).await;
138                        break;
139                    }
140                    SubscriptionError::RpcError(rpc_err) => {
141                        _ = sender.try_stream(ScannerError::RpcError(Arc::new(rpc_err))).await;
142                        break;
143                    }
144                    SubscriptionError::Closed => {
145                        _ = sender.try_stream(ScannerError::SubscriptionClosed).await;
146                        break;
147                    }
148                    SubscriptionError::BlockNotFound => {
149                        _ = sender.try_stream(ScannerError::BlockNotFound).await;
150                        break;
151                    }
152                }
153            }
154        }
155    }
156
157    None
158}
159
160/// Skips blocks until we reach the first block that's relevant for streaming
161fn skip_to_first_relevant_block<N: Network>(
162    subscription: RobustSubscription<N>,
163    stream_start: BlockNumber,
164    block_confirmations: u64,
165) -> impl tokio_stream::Stream<Item = Result<N::HeaderResponse, SubscriptionError>> {
166    subscription.into_stream().skip_while(move |header| match header {
167        Ok(header) => header.number().saturating_sub(block_confirmations) < stream_start,
168        Err(SubscriptionError::Lagged(_)) => true,
169        Err(_) => false,
170    })
171}
172
173/// Initializes the streaming state after receiving the first block.
174/// Returns None if the channel is closed.
175async fn initialize_live_streaming_state<N: Network, R: ReorgHandler<N>>(
176    first_block: N::HeaderResponse,
177    stream_start: BlockNumber,
178    block_confirmations: u64,
179    max_block_range: u64,
180    sender: &mpsc::Sender<BlockScannerResult>,
181    provider: &RobustProvider<N>,
182    reorg_handler: &mut R,
183) -> Option<LiveStreamingState<N>> {
184    let confirmed = first_block.number().saturating_sub(block_confirmations);
185
186    // The minimum common ancestor is the block before the stream start
187    let min_common_ancestor = stream_start.saturating_sub(1);
188
189    // Catch up on any confirmed blocks between stream_start and the confirmed tip
190    let previous_batch_end = stream_range_with_reorg_handling(
191        min_common_ancestor,
192        stream_start,
193        confirmed,
194        max_block_range,
195        sender,
196        provider,
197        reorg_handler,
198    )
199    .await?;
200
201    Some(LiveStreamingState {
202        batch_start: stream_start,
203        previous_batch_end: Some(previous_batch_end),
204    })
205}
206
207/// Continuously streams blocks, handling reorgs as they occur
208#[allow(clippy::too_many_arguments)]
209async fn stream_blocks_continuously<
210    N: Network,
211    S: tokio_stream::Stream<Item = Result<N::HeaderResponse, SubscriptionError>> + Unpin,
212    R: ReorgHandler<N>,
213>(
214    stream: &mut S,
215    state: &mut LiveStreamingState<N>,
216    stream_start: BlockNumber,
217    block_confirmations: u64,
218    max_block_range: u64,
219    sender: &mpsc::Sender<BlockScannerResult>,
220    provider: &RobustProvider<N>,
221    reorg_handler: &mut R,
222) {
223    while let Some(incoming_block) = stream.next().await {
224        let incoming_block = match incoming_block {
225            Ok(block) => block,
226            Err(e) => {
227                match e {
228                    SubscriptionError::Lagged(_) => {
229                        // scanner already accounts for skipped block numbers,
230                        // next block will be the actual incoming block
231                        continue;
232                    }
233                    SubscriptionError::Timeout => {
234                        _ = sender.try_stream(ScannerError::Timeout).await;
235                        return;
236                    }
237                    SubscriptionError::RpcError(rpc_err) => {
238                        _ = sender.try_stream(ScannerError::RpcError(Arc::new(rpc_err))).await;
239                        return;
240                    }
241                    SubscriptionError::Closed => {
242                        _ = sender.try_stream(ScannerError::SubscriptionClosed).await;
243                        return;
244                    }
245                    SubscriptionError::BlockNotFound => {
246                        _ = sender.try_stream(ScannerError::BlockNotFound).await;
247                        break;
248                    }
249                }
250            }
251        };
252
253        let incoming_block = incoming_block.number();
254        trace!(received = incoming_block, "Received item from block subscription");
255
256        let Some(previous_batch_end) = state.previous_batch_end.as_ref() else {
257            // previously detected reorg wasn't fully handled
258            continue;
259        };
260
261        let common_ancestor = match reorg_handler.check(previous_batch_end).await {
262            Ok(reorg_opt) => reorg_opt,
263            Err(e) => {
264                error!("Failed to perform reorg check");
265                _ = sender.try_stream(e).await;
266                return;
267            }
268        };
269
270        if let Some(common_ancestor) = common_ancestor {
271            if handle_reorg_detected(common_ancestor, stream_start, state, sender).await.is_closed()
272            {
273                return; // Channel closed
274            }
275        } else {
276            // No reorg: advance batch_start to after the previous batch
277            state.batch_start = previous_batch_end.header().number() + 1;
278        }
279
280        // Stream the next batch of confirmed blocks
281        let batch_end_num = incoming_block.saturating_sub(block_confirmations);
282        if stream_next_batch(
283            batch_end_num,
284            state,
285            stream_start,
286            max_block_range,
287            sender,
288            provider,
289            reorg_handler,
290        )
291        .await
292        .is_closed()
293        {
294            return; // Channel closed
295        }
296    }
297}
298
299/// Handles a detected reorg by notifying and adjusting the streaming state.
300/// Returns `ChannelState::Closed` if the channel is closed, `ChannelState::Open` otherwise.
301async fn handle_reorg_detected<N: Network>(
302    common_ancestor: N::BlockResponse,
303    stream_start: BlockNumber,
304    state: &mut LiveStreamingState<N>,
305    sender: &mpsc::Sender<BlockScannerResult>,
306) -> ChannelState {
307    let ancestor_num = common_ancestor.header().number();
308
309    info!(
310        common_ancestor = ancestor_num,
311        stream_start = stream_start,
312        "Reorg detected during live streaming"
313    );
314
315    let channel_state =
316        sender.try_stream(Notification::ReorgDetected { common_ancestor: ancestor_num }).await;
317
318    if channel_state.is_closed() {
319        return ChannelState::Closed;
320    }
321
322    // Reset streaming position based on common ancestor
323    if ancestor_num < stream_start {
324        // Reorg went before our starting point - restart from stream_start
325        debug!(
326            common_ancestor = ancestor_num,
327            stream_start = stream_start,
328            "Reorg predates stream start, restarting from stream_start"
329        );
330        state.batch_start = stream_start;
331        state.previous_batch_end = None;
332    } else {
333        // Resume from after the common ancestor
334        debug!(
335            common_ancestor = ancestor_num,
336            resume_from = ancestor_num + 1,
337            "Resuming from after common ancestor"
338        );
339        state.batch_start = ancestor_num + 1;
340        state.previous_batch_end = Some(common_ancestor);
341    }
342
343    ChannelState::Open
344}
345
346/// Streams the next batch of blocks up to `batch_end_num`.
347/// Returns `ChannelState::Closed` if the channel is closed, `ChannelState::Open` otherwise.
348async fn stream_next_batch<N: Network, R: ReorgHandler<N>>(
349    batch_end_num: BlockNumber,
350    state: &mut LiveStreamingState<N>,
351    stream_start: BlockNumber,
352    max_block_range: u64,
353    sender: &mpsc::Sender<BlockScannerResult>,
354    provider: &RobustProvider<N>,
355    reorg_handler: &mut R,
356) -> ChannelState {
357    if batch_end_num < state.batch_start {
358        // No new confirmed blocks to stream yet
359        return ChannelState::Open;
360    }
361
362    // The minimum common ancestor is the block before the stream start
363    let min_common_ancestor = stream_start.saturating_sub(1);
364
365    state.previous_batch_end = stream_range_with_reorg_handling(
366        min_common_ancestor,
367        state.batch_start,
368        batch_end_num,
369        max_block_range,
370        sender,
371        provider,
372        reorg_handler,
373    )
374    .await;
375
376    if state.previous_batch_end.is_none() {
377        // Channel closed
378        return ChannelState::Closed;
379    }
380
381    // SAFETY: Overflow cannot realistically happen
382    state.batch_start = batch_end_num + 1;
383
384    ChannelState::Open
385}
386
387/// Tracks the current state of live streaming
388struct LiveStreamingState<N: Network> {
389    /// The starting block number for the next batch to stream
390    batch_start: BlockNumber,
391    /// The last block from the previous batch (used for reorg detection)
392    previous_batch_end: Option<N::BlockResponse>,
393}
394
395/// Assumes that `min_common_ancestor <= next_start_block <= end`, performs no internal checks.
396#[cfg_attr(
397    feature = "tracing",
398    tracing::instrument(level = "trace", skip(sender, provider, reorg_handler))
399)]
400pub(crate) async fn stream_range_with_reorg_handling<N: Network, R: ReorgHandler<N>>(
401    min_common_ancestor: BlockNumber,
402    next_start_block: BlockNumber,
403    end: BlockNumber,
404    max_block_range: u64,
405    sender: &mpsc::Sender<BlockScannerResult>,
406    provider: &RobustProvider<N>,
407    reorg_handler: &mut R,
408) -> Option<N::BlockResponse> {
409    let mut last_batch_end: Option<N::BlockResponse> = None;
410    let mut iter = RangeIterator::forward(next_start_block, end, max_block_range);
411
412    while let Some(batch) = iter.next() {
413        let batch_end_num = *batch.end();
414        let batch_end = match provider.get_block_by_number(batch_end_num.into()).await {
415            Ok(block) => block,
416            Err(e) => {
417                error!(
418                    batch_start = batch.start(),
419                    batch_end = batch_end_num,
420                    "Failed to get ending block of the current batch"
421                );
422                _ = sender.try_stream(e).await;
423                return None;
424            }
425        };
426
427        if sender.try_stream(batch).await.is_closed() {
428            return None; // channel closed
429        }
430
431        let reorged_opt = match reorg_handler.check(&batch_end).await {
432            Ok(opt) => opt,
433            Err(e) => {
434                error!("Failed to perform reorg check");
435                _ = sender.try_stream(e).await;
436                return None;
437            }
438        };
439
440        if let Some(common_ancestor) = reorged_opt {
441            let common_ancestor = common_ancestor.header().number();
442            info!(
443                common_ancestor = common_ancestor,
444                "Reorg detected during historical streaming, resetting range iterator"
445            );
446            if sender.try_stream(Notification::ReorgDetected { common_ancestor }).await.is_closed()
447            {
448                return None;
449            }
450            let reset_to = (common_ancestor + 1).max(min_common_ancestor);
451            debug!(reset_to = reset_to, "Resetting range iterator after reorg");
452            iter.reset_to(reset_to);
453        }
454
455        last_batch_end = Some(batch_end);
456    }
457
458    last_batch_end
459}