event_scanner/block_range_scanner/
common.rs1use std::{ops::RangeInclusive, sync::Arc};
2
3use robust_provider::{Error as SubscriptionError, RobustProvider, RobustSubscription};
4use tokio::sync::mpsc;
5use tokio_stream::StreamExt;
6
7use crate::{
8 ScannerError, ScannerMessage,
9 block_range_scanner::{range_iterator::RangeIterator, reorg_handler::ReorgHandler},
10 types::{ChannelState, IntoScannerResult, Notification, ScannerResult, TryStream},
11};
12use alloy::{
13 consensus::BlockHeader,
14 network::{BlockResponse, Network},
15 primitives::BlockNumber,
16};
17
18pub const DEFAULT_MAX_BLOCK_RANGE: u64 = 1000;
20
21pub const DEFAULT_BLOCK_CONFIRMATIONS: u64 = 0;
23
24pub const DEFAULT_STREAM_BUFFER_CAPACITY: usize = 50000;
26
27pub type BlockScannerResult = ScannerResult<RangeInclusive<BlockNumber>>;
29
30pub type Message = ScannerMessage<RangeInclusive<BlockNumber>>;
32
33impl From<RangeInclusive<BlockNumber>> for Message {
34 fn from(range: RangeInclusive<BlockNumber>) -> Self {
35 Message::Data(range)
36 }
37}
38
39impl PartialEq<RangeInclusive<BlockNumber>> for Message {
40 fn eq(&self, other: &RangeInclusive<BlockNumber>) -> bool {
41 if let Message::Data(range) = self { range.eq(other) } else { false }
42 }
43}
44
45impl IntoScannerResult<RangeInclusive<BlockNumber>> for RangeInclusive<BlockNumber> {
46 fn into_scanner_message_result(self) -> BlockScannerResult {
47 Ok(Message::Data(self))
48 }
49}
50
51#[allow(clippy::too_many_arguments)]
52#[cfg_attr(
53 feature = "tracing",
54 tracing::instrument(level = "trace", skip(subscription, sender, provider, reorg_handler))
55)]
56pub(crate) async fn stream_live_blocks<N: Network, R: ReorgHandler<N>>(
57 stream_start: BlockNumber,
58 subscription: RobustSubscription<N>,
59 sender: &mpsc::Sender<BlockScannerResult>,
60 provider: &RobustProvider<N>,
61 block_confirmations: u64,
62 max_block_range: u64,
63 reorg_handler: &mut R,
64 notify_after_first_block: bool,
65) {
66 let mut stream =
68 skip_to_first_relevant_block::<N>(subscription, stream_start, block_confirmations);
69
70 let Some(first_block) = get_first_block::<N, _>(&mut stream, sender).await else {
71 return;
73 };
74
75 debug!(
76 first_block = first_block.number(),
77 stream_start = stream_start,
78 "Received first relevant block, starting live streaming"
79 );
80
81 if notify_after_first_block &&
86 sender.try_stream(Notification::SwitchingToLive).await.is_closed()
87 {
88 return;
89 }
90
91 let Some(mut state) = initialize_live_streaming_state(
93 first_block,
94 stream_start,
95 block_confirmations,
96 max_block_range,
97 sender,
98 provider,
99 reorg_handler,
100 )
101 .await
102 else {
103 return;
104 };
105
106 stream_blocks_continuously(
108 &mut stream,
109 &mut state,
110 stream_start,
111 block_confirmations,
112 max_block_range,
113 sender,
114 provider,
115 reorg_handler,
116 )
117 .await;
118}
119
120async fn get_first_block<
121 N: Network,
122 S: tokio_stream::Stream<Item = Result<N::HeaderResponse, SubscriptionError>> + Unpin,
123>(
124 stream: &mut S,
125 sender: &mpsc::Sender<BlockScannerResult>,
126) -> Option<N::HeaderResponse> {
127 while let Some(first_block) = stream.next().await {
128 match first_block {
129 Ok(block) => return Some(block),
130 Err(e) => {
131 match e {
132 SubscriptionError::Lagged(_) => {
133 }
136 SubscriptionError::Timeout => {
137 _ = sender.try_stream(ScannerError::Timeout).await;
138 break;
139 }
140 SubscriptionError::RpcError(rpc_err) => {
141 _ = sender.try_stream(ScannerError::RpcError(Arc::new(rpc_err))).await;
142 break;
143 }
144 SubscriptionError::Closed => {
145 _ = sender.try_stream(ScannerError::SubscriptionClosed).await;
146 break;
147 }
148 SubscriptionError::BlockNotFound => {
149 _ = sender.try_stream(ScannerError::BlockNotFound).await;
150 break;
151 }
152 }
153 }
154 }
155 }
156
157 None
158}
159
160fn skip_to_first_relevant_block<N: Network>(
162 subscription: RobustSubscription<N>,
163 stream_start: BlockNumber,
164 block_confirmations: u64,
165) -> impl tokio_stream::Stream<Item = Result<N::HeaderResponse, SubscriptionError>> {
166 subscription.into_stream().skip_while(move |header| match header {
167 Ok(header) => header.number().saturating_sub(block_confirmations) < stream_start,
168 Err(SubscriptionError::Lagged(_)) => true,
169 Err(_) => false,
170 })
171}
172
173async fn initialize_live_streaming_state<N: Network, R: ReorgHandler<N>>(
176 first_block: N::HeaderResponse,
177 stream_start: BlockNumber,
178 block_confirmations: u64,
179 max_block_range: u64,
180 sender: &mpsc::Sender<BlockScannerResult>,
181 provider: &RobustProvider<N>,
182 reorg_handler: &mut R,
183) -> Option<LiveStreamingState<N>> {
184 let confirmed = first_block.number().saturating_sub(block_confirmations);
185
186 let min_common_ancestor = stream_start.saturating_sub(1);
188
189 let previous_batch_end = stream_range_with_reorg_handling(
191 min_common_ancestor,
192 stream_start,
193 confirmed,
194 max_block_range,
195 sender,
196 provider,
197 reorg_handler,
198 )
199 .await?;
200
201 Some(LiveStreamingState {
202 batch_start: stream_start,
203 previous_batch_end: Some(previous_batch_end),
204 })
205}
206
207#[allow(clippy::too_many_arguments)]
209async fn stream_blocks_continuously<
210 N: Network,
211 S: tokio_stream::Stream<Item = Result<N::HeaderResponse, SubscriptionError>> + Unpin,
212 R: ReorgHandler<N>,
213>(
214 stream: &mut S,
215 state: &mut LiveStreamingState<N>,
216 stream_start: BlockNumber,
217 block_confirmations: u64,
218 max_block_range: u64,
219 sender: &mpsc::Sender<BlockScannerResult>,
220 provider: &RobustProvider<N>,
221 reorg_handler: &mut R,
222) {
223 while let Some(incoming_block) = stream.next().await {
224 let incoming_block = match incoming_block {
225 Ok(block) => block,
226 Err(e) => {
227 match e {
228 SubscriptionError::Lagged(_) => {
229 continue;
232 }
233 SubscriptionError::Timeout => {
234 _ = sender.try_stream(ScannerError::Timeout).await;
235 return;
236 }
237 SubscriptionError::RpcError(rpc_err) => {
238 _ = sender.try_stream(ScannerError::RpcError(Arc::new(rpc_err))).await;
239 return;
240 }
241 SubscriptionError::Closed => {
242 _ = sender.try_stream(ScannerError::SubscriptionClosed).await;
243 return;
244 }
245 SubscriptionError::BlockNotFound => {
246 _ = sender.try_stream(ScannerError::BlockNotFound).await;
247 break;
248 }
249 }
250 }
251 };
252
253 let incoming_block = incoming_block.number();
254 trace!(received = incoming_block, "Received item from block subscription");
255
256 let Some(previous_batch_end) = state.previous_batch_end.as_ref() else {
257 continue;
259 };
260
261 let common_ancestor = match reorg_handler.check(previous_batch_end).await {
262 Ok(reorg_opt) => reorg_opt,
263 Err(e) => {
264 error!("Failed to perform reorg check");
265 _ = sender.try_stream(e).await;
266 return;
267 }
268 };
269
270 if let Some(common_ancestor) = common_ancestor {
271 if handle_reorg_detected(common_ancestor, stream_start, state, sender).await.is_closed()
272 {
273 return; }
275 } else {
276 state.batch_start = previous_batch_end.header().number() + 1;
278 }
279
280 let batch_end_num = incoming_block.saturating_sub(block_confirmations);
282 if stream_next_batch(
283 batch_end_num,
284 state,
285 stream_start,
286 max_block_range,
287 sender,
288 provider,
289 reorg_handler,
290 )
291 .await
292 .is_closed()
293 {
294 return; }
296 }
297}
298
299async fn handle_reorg_detected<N: Network>(
302 common_ancestor: N::BlockResponse,
303 stream_start: BlockNumber,
304 state: &mut LiveStreamingState<N>,
305 sender: &mpsc::Sender<BlockScannerResult>,
306) -> ChannelState {
307 let ancestor_num = common_ancestor.header().number();
308
309 info!(
310 common_ancestor = ancestor_num,
311 stream_start = stream_start,
312 "Reorg detected during live streaming"
313 );
314
315 let channel_state =
316 sender.try_stream(Notification::ReorgDetected { common_ancestor: ancestor_num }).await;
317
318 if channel_state.is_closed() {
319 return ChannelState::Closed;
320 }
321
322 if ancestor_num < stream_start {
324 debug!(
326 common_ancestor = ancestor_num,
327 stream_start = stream_start,
328 "Reorg predates stream start, restarting from stream_start"
329 );
330 state.batch_start = stream_start;
331 state.previous_batch_end = None;
332 } else {
333 debug!(
335 common_ancestor = ancestor_num,
336 resume_from = ancestor_num + 1,
337 "Resuming from after common ancestor"
338 );
339 state.batch_start = ancestor_num + 1;
340 state.previous_batch_end = Some(common_ancestor);
341 }
342
343 ChannelState::Open
344}
345
346async fn stream_next_batch<N: Network, R: ReorgHandler<N>>(
349 batch_end_num: BlockNumber,
350 state: &mut LiveStreamingState<N>,
351 stream_start: BlockNumber,
352 max_block_range: u64,
353 sender: &mpsc::Sender<BlockScannerResult>,
354 provider: &RobustProvider<N>,
355 reorg_handler: &mut R,
356) -> ChannelState {
357 if batch_end_num < state.batch_start {
358 return ChannelState::Open;
360 }
361
362 let min_common_ancestor = stream_start.saturating_sub(1);
364
365 state.previous_batch_end = stream_range_with_reorg_handling(
366 min_common_ancestor,
367 state.batch_start,
368 batch_end_num,
369 max_block_range,
370 sender,
371 provider,
372 reorg_handler,
373 )
374 .await;
375
376 if state.previous_batch_end.is_none() {
377 return ChannelState::Closed;
379 }
380
381 state.batch_start = batch_end_num + 1;
383
384 ChannelState::Open
385}
386
387struct LiveStreamingState<N: Network> {
389 batch_start: BlockNumber,
391 previous_batch_end: Option<N::BlockResponse>,
393}
394
395#[cfg_attr(
397 feature = "tracing",
398 tracing::instrument(level = "trace", skip(sender, provider, reorg_handler))
399)]
400pub(crate) async fn stream_range_with_reorg_handling<N: Network, R: ReorgHandler<N>>(
401 min_common_ancestor: BlockNumber,
402 next_start_block: BlockNumber,
403 end: BlockNumber,
404 max_block_range: u64,
405 sender: &mpsc::Sender<BlockScannerResult>,
406 provider: &RobustProvider<N>,
407 reorg_handler: &mut R,
408) -> Option<N::BlockResponse> {
409 let mut last_batch_end: Option<N::BlockResponse> = None;
410 let mut iter = RangeIterator::forward(next_start_block, end, max_block_range);
411
412 while let Some(batch) = iter.next() {
413 let batch_end_num = *batch.end();
414 let batch_end = match provider.get_block_by_number(batch_end_num.into()).await {
415 Ok(block) => block,
416 Err(e) => {
417 error!(
418 batch_start = batch.start(),
419 batch_end = batch_end_num,
420 "Failed to get ending block of the current batch"
421 );
422 _ = sender.try_stream(e).await;
423 return None;
424 }
425 };
426
427 if sender.try_stream(batch).await.is_closed() {
428 return None; }
430
431 let reorged_opt = match reorg_handler.check(&batch_end).await {
432 Ok(opt) => opt,
433 Err(e) => {
434 error!("Failed to perform reorg check");
435 _ = sender.try_stream(e).await;
436 return None;
437 }
438 };
439
440 if let Some(common_ancestor) = reorged_opt {
441 let common_ancestor = common_ancestor.header().number();
442 info!(
443 common_ancestor = common_ancestor,
444 "Reorg detected during historical streaming, resetting range iterator"
445 );
446 if sender.try_stream(Notification::ReorgDetected { common_ancestor }).await.is_closed()
447 {
448 return None;
449 }
450 let reset_to = (common_ancestor + 1).max(min_common_ancestor);
451 debug!(reset_to = reset_to, "Resetting range iterator after reorg");
452 iter.reset_to(reset_to);
453 }
454
455 last_batch_end = Some(batch_end);
456 }
457
458 last_batch_end
459}