1use std::ops::RangeInclusive;
2
3use robust_provider::{RobustProvider, RobustSubscription, subscription};
4use tokio::sync::mpsc;
5use tokio_stream::StreamExt;
6
7use crate::{
8 ScannerError, ScannerMessage,
9 block_range_scanner::{range_iterator::RangeIterator, reorg_handler::ReorgHandler},
10 types::{IntoScannerResult, Notification, ScannerResult, TryStream},
11};
12use alloy::{
13 consensus::BlockHeader,
14 eips::BlockNumberOrTag,
15 network::{BlockResponse, Network},
16 primitives::BlockNumber,
17};
18
19pub const DEFAULT_MAX_BLOCK_RANGE: u64 = 1000;
21
22pub const DEFAULT_BLOCK_CONFIRMATIONS: u64 = 0;
24
25pub const DEFAULT_STREAM_BUFFER_CAPACITY: usize = 50000;
27
28pub type BlockScannerResult = ScannerResult<RangeInclusive<BlockNumber>>;
30
31pub type Message = ScannerMessage<RangeInclusive<BlockNumber>>;
33
34impl From<RangeInclusive<BlockNumber>> for Message {
35 fn from(range: RangeInclusive<BlockNumber>) -> Self {
36 Message::Data(range)
37 }
38}
39
40impl PartialEq<RangeInclusive<BlockNumber>> for Message {
41 fn eq(&self, other: &RangeInclusive<BlockNumber>) -> bool {
42 if let Message::Data(range) = self { range.eq(other) } else { false }
43 }
44}
45
46impl IntoScannerResult<RangeInclusive<BlockNumber>> for RangeInclusive<BlockNumber> {
47 fn into_scanner_message_result(self) -> BlockScannerResult {
48 Ok(Message::Data(self))
49 }
50}
51
52#[allow(clippy::too_many_arguments)]
53#[cfg_attr(
54 feature = "tracing",
55 tracing::instrument(level = "trace", skip(subscription, sender, provider, reorg_handler))
56)]
57pub(crate) async fn stream_live_blocks<N: Network>(
58 stream_start: BlockNumber,
59 subscription: RobustSubscription<N>,
60 sender: &mpsc::Sender<BlockScannerResult>,
61 provider: &RobustProvider<N>,
62 block_confirmations: u64,
63 max_block_range: u64,
64 reorg_handler: &mut ReorgHandler<N>,
65 notify_after_first_block: bool,
66) {
67 let mut stream =
69 skip_to_first_relevant_block::<N>(subscription, stream_start, block_confirmations);
70
71 let Some(first_block) = get_first_block::<N, _>(&mut stream, sender).await else {
72 return;
74 };
75
76 debug!(
77 first_block = first_block.number(),
78 stream_start = stream_start,
79 "Received first relevant block, starting live streaming"
80 );
81
82 if notify_after_first_block && !sender.try_stream(Notification::SwitchingToLive).await {
87 return;
88 }
89
90 let Some(mut state) = initialize_live_streaming_state(
92 first_block,
93 stream_start,
94 block_confirmations,
95 max_block_range,
96 sender,
97 provider,
98 reorg_handler,
99 )
100 .await
101 else {
102 return;
103 };
104
105 stream_blocks_continuously(
107 &mut stream,
108 &mut state,
109 stream_start,
110 block_confirmations,
111 max_block_range,
112 sender,
113 provider,
114 reorg_handler,
115 )
116 .await;
117}
118
119async fn get_first_block<
120 N: Network,
121 S: tokio_stream::Stream<Item = Result<N::HeaderResponse, subscription::Error>> + Unpin,
122>(
123 stream: &mut S,
124 sender: &mpsc::Sender<BlockScannerResult>,
125) -> Option<N::HeaderResponse> {
126 while let Some(first_block) = stream.next().await {
127 match first_block {
128 Ok(block) => return Some(block),
129 Err(e) => {
130 match e {
131 subscription::Error::Lagged(_) => {
132 }
135 subscription::Error::Timeout => {
136 _ = sender.try_stream(ScannerError::Timeout).await;
137 break;
138 }
139 subscription::Error::RpcError(rpc_err) => {
140 _ = sender.try_stream(ScannerError::RpcError(rpc_err)).await;
141 break;
142 }
143 subscription::Error::Closed => {
144 _ = sender.try_stream(ScannerError::SubscriptionClosed).await;
145 break;
146 }
147 }
148 }
149 }
150 }
151
152 None
153}
154
155fn skip_to_first_relevant_block<N: Network>(
157 subscription: RobustSubscription<N>,
158 stream_start: BlockNumber,
159 block_confirmations: u64,
160) -> impl tokio_stream::Stream<Item = Result<N::HeaderResponse, subscription::Error>> {
161 subscription.into_stream().skip_while(move |header| match header {
162 Ok(header) => header.number().saturating_sub(block_confirmations) < stream_start,
163 Err(subscription::Error::Lagged(_)) => true,
164 Err(_) => false,
165 })
166}
167
168async fn initialize_live_streaming_state<N: Network>(
171 first_block: N::HeaderResponse,
172 stream_start: BlockNumber,
173 block_confirmations: u64,
174 max_block_range: u64,
175 sender: &mpsc::Sender<BlockScannerResult>,
176 provider: &RobustProvider<N>,
177 reorg_handler: &mut ReorgHandler<N>,
178) -> Option<LiveStreamingState<N>> {
179 let confirmed = first_block.number().saturating_sub(block_confirmations);
180
181 let min_common_ancestor = stream_start.saturating_sub(1);
183
184 let previous_batch_end = stream_range_with_reorg_handling(
186 min_common_ancestor,
187 stream_start,
188 confirmed,
189 max_block_range,
190 sender,
191 provider,
192 reorg_handler,
193 )
194 .await?;
195
196 Some(LiveStreamingState {
197 batch_start: stream_start,
198 previous_batch_end: Some(previous_batch_end),
199 })
200}
201
202#[allow(clippy::too_many_arguments)]
204async fn stream_blocks_continuously<
205 N: Network,
206 S: tokio_stream::Stream<Item = Result<N::HeaderResponse, subscription::Error>> + Unpin,
207>(
208 stream: &mut S,
209 state: &mut LiveStreamingState<N>,
210 stream_start: BlockNumber,
211 block_confirmations: u64,
212 max_block_range: u64,
213 sender: &mpsc::Sender<BlockScannerResult>,
214 provider: &RobustProvider<N>,
215 reorg_handler: &mut ReorgHandler<N>,
216) {
217 while let Some(incoming_block) = stream.next().await {
218 let incoming_block = match incoming_block {
219 Ok(block) => block,
220 Err(e) => {
221 match e {
222 subscription::Error::Lagged(_) => {
223 continue;
226 }
227 subscription::Error::Timeout => {
228 _ = sender.try_stream(ScannerError::Timeout).await;
229 return;
230 }
231 subscription::Error::RpcError(rpc_err) => {
232 _ = sender.try_stream(ScannerError::RpcError(rpc_err)).await;
233 return;
234 }
235 subscription::Error::Closed => {
236 _ = sender.try_stream(ScannerError::SubscriptionClosed).await;
237 return;
238 }
239 }
240 }
241 };
242
243 let incoming_block = incoming_block.number();
244 trace!(received = incoming_block, "Received item from block subscription");
245
246 let Some(previous_batch_end) = state.previous_batch_end.as_ref() else {
247 continue;
249 };
250
251 let common_ancestor = match reorg_handler.check(previous_batch_end).await {
252 Ok(reorg_opt) => reorg_opt,
253 Err(e) => {
254 error!("Failed to perform reorg check");
255 _ = sender.try_stream(e).await;
256 return;
257 }
258 };
259
260 if let Some(common_ancestor) = common_ancestor {
261 if !handle_reorg_detected(common_ancestor, stream_start, state, sender).await {
262 return; }
264 } else {
265 state.batch_start = previous_batch_end.header().number() + 1;
267 }
268
269 let batch_end_num = incoming_block.saturating_sub(block_confirmations);
271 if !stream_next_batch(
272 batch_end_num,
273 state,
274 stream_start,
275 max_block_range,
276 sender,
277 provider,
278 reorg_handler,
279 )
280 .await
281 {
282 return; }
284 }
285}
286
287async fn handle_reorg_detected<N: Network>(
290 common_ancestor: N::BlockResponse,
291 stream_start: BlockNumber,
292 state: &mut LiveStreamingState<N>,
293 sender: &mpsc::Sender<BlockScannerResult>,
294) -> bool {
295 let ancestor_num = common_ancestor.header().number();
296
297 info!(
298 common_ancestor = ancestor_num,
299 stream_start = stream_start,
300 "Reorg detected during live streaming"
301 );
302
303 if !sender.try_stream(Notification::ReorgDetected { common_ancestor: ancestor_num }).await {
304 return false;
305 }
306
307 if ancestor_num < stream_start {
309 debug!(
311 common_ancestor = ancestor_num,
312 stream_start = stream_start,
313 "Reorg predates stream start, restarting from stream_start"
314 );
315 state.batch_start = stream_start;
316 state.previous_batch_end = None;
317 } else {
318 debug!(
320 common_ancestor = ancestor_num,
321 resume_from = ancestor_num + 1,
322 "Resuming from after common ancestor"
323 );
324 state.batch_start = ancestor_num + 1;
325 state.previous_batch_end = Some(common_ancestor);
326 }
327
328 true
329}
330
331async fn stream_next_batch<N: Network>(
334 batch_end_num: BlockNumber,
335 state: &mut LiveStreamingState<N>,
336 stream_start: BlockNumber,
337 max_block_range: u64,
338 sender: &mpsc::Sender<BlockScannerResult>,
339 provider: &RobustProvider<N>,
340 reorg_handler: &mut ReorgHandler<N>,
341) -> bool {
342 if batch_end_num < state.batch_start {
343 return true;
345 }
346
347 let min_common_ancestor = stream_start.saturating_sub(1);
349
350 state.previous_batch_end = stream_range_with_reorg_handling(
351 min_common_ancestor,
352 state.batch_start,
353 batch_end_num,
354 max_block_range,
355 sender,
356 provider,
357 reorg_handler,
358 )
359 .await;
360
361 if state.previous_batch_end.is_none() {
362 return false;
364 }
365
366 state.batch_start = batch_end_num + 1;
368
369 true
370}
371
372struct LiveStreamingState<N: Network> {
374 batch_start: BlockNumber,
376 previous_batch_end: Option<N::BlockResponse>,
378}
379
380#[must_use]
381#[cfg_attr(
382 feature = "tracing",
383 tracing::instrument(level = "trace", skip(sender, provider, reorg_handler))
384)]
385pub(crate) async fn stream_historical_range<N: Network>(
386 start: BlockNumber,
387 end: BlockNumber,
388 max_block_range: u64,
389 sender: &mpsc::Sender<BlockScannerResult>,
390 provider: &RobustProvider<N>,
391 reorg_handler: &mut ReorgHandler<N>,
392) -> Option<()> {
393 let finalized_block_num =
401 provider.get_block_number_by_id(BlockNumberOrTag::Finalized.into()).await.unwrap_or(0);
402
403 let finalized_batch_end = finalized_block_num.min(end);
405 let finalized_range_count =
406 RangeIterator::forward(start, finalized_batch_end, max_block_range).count();
407 trace!(
408 start = start,
409 finalized_batch_end = finalized_batch_end,
410 batch_count = finalized_range_count,
411 "Streaming finalized blocks (no reorg check)"
412 );
413
414 for range in RangeIterator::forward(start, finalized_batch_end, max_block_range) {
415 trace!(range_start = *range.start(), range_end = *range.end(), "Streaming finalized range");
416 if !sender.try_stream(range).await {
417 return None; }
419 }
420
421 let batch_start = start.max(finalized_batch_end + 1);
424
425 if batch_start > end {
427 return Some(()); }
429
430 let min_common_ancestor = (start.saturating_sub(1)).max(finalized_block_num);
440
441 stream_range_with_reorg_handling(
442 min_common_ancestor,
443 batch_start,
444 end,
445 max_block_range,
446 sender,
447 provider,
448 reorg_handler,
449 )
450 .await?;
451
452 Some(())
453}
454
455#[cfg_attr(
457 feature = "tracing",
458 tracing::instrument(level = "trace", skip(sender, provider, reorg_handler))
459)]
460pub(crate) async fn stream_range_with_reorg_handling<N: Network>(
461 min_common_ancestor: BlockNumber,
462 next_start_block: BlockNumber,
463 end: BlockNumber,
464 max_block_range: u64,
465 sender: &mpsc::Sender<BlockScannerResult>,
466 provider: &RobustProvider<N>,
467 reorg_handler: &mut ReorgHandler<N>,
468) -> Option<N::BlockResponse> {
469 let mut last_batch_end: Option<N::BlockResponse> = None;
470 let mut iter = RangeIterator::forward(next_start_block, end, max_block_range);
471
472 while let Some(batch) = iter.next() {
473 let batch_end_num = *batch.end();
474 let batch_end = match provider.get_block_by_number(batch_end_num.into()).await {
475 Ok(block) => block,
476 Err(e) => {
477 error!(
478 batch_start = batch.start(),
479 batch_end = batch_end_num,
480 "Failed to get ending block of the current batch"
481 );
482 _ = sender.try_stream(e).await;
483 return None;
484 }
485 };
486
487 if !sender.try_stream(batch).await {
488 return None; }
490
491 let reorged_opt = match reorg_handler.check(&batch_end).await {
492 Ok(opt) => opt,
493 Err(e) => {
494 error!("Failed to perform reorg check");
495 _ = sender.try_stream(e).await;
496 return None;
497 }
498 };
499
500 if let Some(common_ancestor) = reorged_opt {
501 let common_ancestor = common_ancestor.header().number();
502 info!(
503 common_ancestor = common_ancestor,
504 "Reorg detected during historical streaming, resetting range iterator"
505 );
506 if !sender.try_stream(Notification::ReorgDetected { common_ancestor }).await {
507 return None;
508 }
509 let reset_to = (common_ancestor + 1).max(min_common_ancestor);
510 debug!(reset_to = reset_to, "Resetting range iterator after reorg");
511 iter.reset_to(reset_to);
512 }
513
514 last_batch_end = Some(batch_end);
515 }
516
517 last_batch_end
518}