1use std::{cmp::Ordering, ops::RangeInclusive};
67use tokio::{
68 join,
69 sync::{mpsc, oneshot},
70};
71use tokio_stream::{StreamExt, wrappers::ReceiverStream};
72
73use crate::{
74 ScannerMessage,
75 error::ScannerError,
76 types::{ScannerStatus, TryStream},
77};
78use alloy::{
79 consensus::BlockHeader,
80 eips::BlockNumberOrTag,
81 network::{BlockResponse, Network, primitives::HeaderResponse},
82 primitives::{B256, BlockNumber},
83 providers::{Provider, RootProvider},
84 pubsub::Subscription,
85 rpc::client::ClientBuilder,
86 transports::{
87 RpcError, TransportErrorKind, TransportResult, http::reqwest::Url, ws::WsConnect,
88 },
89};
90use tracing::{debug, error, info, warn};
91
92pub const DEFAULT_MAX_BLOCK_RANGE: u64 = 1000;
93pub const DEFAULT_BLOCK_CONFIRMATIONS: u64 = 0;
95
96pub const MAX_BUFFERED_MESSAGES: usize = 50000;
97
98pub const DEFAULT_REORG_REWIND_DEPTH: u64 = 64;
101
102pub type Message = ScannerMessage<RangeInclusive<BlockNumber>, ScannerError>;
103
104impl From<RangeInclusive<BlockNumber>> for Message {
105 fn from(logs: RangeInclusive<BlockNumber>) -> Self {
106 Message::Data(logs)
107 }
108}
109
110impl PartialEq<RangeInclusive<BlockNumber>> for Message {
111 fn eq(&self, other: &RangeInclusive<BlockNumber>) -> bool {
112 if let Message::Data(range) = self { range.eq(other) } else { false }
113 }
114}
115
116impl From<RpcError<TransportErrorKind>> for Message {
117 fn from(error: RpcError<TransportErrorKind>) -> Self {
118 Message::Error(error.into())
119 }
120}
121
122impl From<ScannerError> for Message {
123 fn from(error: ScannerError) -> Self {
124 Message::Error(error)
125 }
126}
127
128#[derive(Clone, Copy)]
129pub struct BlockRangeScanner {
130 pub max_block_range: u64,
131}
132
133impl Default for BlockRangeScanner {
134 fn default() -> Self {
135 Self::new()
136 }
137}
138
139impl BlockRangeScanner {
140 #[must_use]
141 pub fn new() -> Self {
142 Self { max_block_range: DEFAULT_MAX_BLOCK_RANGE }
143 }
144
145 #[must_use]
146 pub fn max_block_range(mut self, max_block_range: u64) -> Self {
147 self.max_block_range = max_block_range;
148 self
149 }
150
151 pub async fn connect_ws<N: Network>(
157 self,
158 ws_url: Url,
159 ) -> TransportResult<ConnectedBlockRangeScanner<N>> {
160 let provider =
161 RootProvider::<N>::new(ClientBuilder::default().ws(WsConnect::new(ws_url)).await?);
162 Ok(self.connect(provider))
163 }
164
165 pub async fn connect_ipc<N: Network>(
171 self,
172 ipc_path: String,
173 ) -> Result<ConnectedBlockRangeScanner<N>, RpcError<TransportErrorKind>> {
174 let provider = RootProvider::<N>::new(ClientBuilder::default().ipc(ipc_path.into()).await?);
175 Ok(self.connect(provider))
176 }
177
178 #[must_use]
184 pub fn connect<N: Network>(self, provider: RootProvider<N>) -> ConnectedBlockRangeScanner<N> {
185 ConnectedBlockRangeScanner { provider, max_block_range: self.max_block_range }
186 }
187}
188
189pub struct ConnectedBlockRangeScanner<N: Network> {
190 provider: RootProvider<N>,
191 max_block_range: u64,
192}
193
194impl<N: Network> ConnectedBlockRangeScanner<N> {
195 #[must_use]
197 pub fn provider(&self) -> &RootProvider<N> {
198 &self.provider
199 }
200
201 pub fn run(&self) -> Result<BlockRangeScannerClient, ScannerError> {
207 let (service, cmd_tx) = Service::new(self.provider.clone(), self.max_block_range);
208 tokio::spawn(async move {
209 service.run().await;
210 });
211 Ok(BlockRangeScannerClient::new(cmd_tx))
212 }
213}
214
215#[derive(Debug)]
216pub enum Command {
217 StreamLive {
218 sender: mpsc::Sender<Message>,
219 block_confirmations: u64,
220 response: oneshot::Sender<Result<(), ScannerError>>,
221 },
222 StreamHistorical {
223 sender: mpsc::Sender<Message>,
224 start_height: BlockNumberOrTag,
225 end_height: BlockNumberOrTag,
226 response: oneshot::Sender<Result<(), ScannerError>>,
227 },
228 StreamFrom {
229 sender: mpsc::Sender<Message>,
230 start_height: BlockNumberOrTag,
231 block_confirmations: u64,
232 response: oneshot::Sender<Result<(), ScannerError>>,
233 },
234 Rewind {
235 sender: mpsc::Sender<Message>,
236 start_height: BlockNumberOrTag,
237 end_height: BlockNumberOrTag,
238 response: oneshot::Sender<Result<(), ScannerError>>,
239 },
240}
241
242struct Service<N: Network> {
243 provider: RootProvider<N>,
244 max_block_range: u64,
245 error_count: u64,
246 command_receiver: mpsc::Receiver<Command>,
247 shutdown: bool,
248}
249
250impl<N: Network> Service<N> {
251 pub fn new(provider: RootProvider<N>, max_block_range: u64) -> (Self, mpsc::Sender<Command>) {
252 let (cmd_tx, cmd_rx) = mpsc::channel(100);
253
254 let service = Self {
255 provider,
256 max_block_range,
257 error_count: 0,
258 command_receiver: cmd_rx,
259 shutdown: false,
260 };
261
262 (service, cmd_tx)
263 }
264
265 pub async fn run(mut self) {
266 info!("Starting subscription service");
267
268 while !self.shutdown {
269 tokio::select! {
270 cmd = self.command_receiver.recv() => {
271 if let Some(command) = cmd {
272 if let Err(e) = self.handle_command(command).await {
273 error!("Command handling error: {}", e);
274 self.error_count += 1;
275 }
276 } else {
277 warn!("Command channel closed, shutting down");
278 break;
279 }
280 }
281 }
282 }
283
284 info!("Subscription service stopped");
285 }
286
287 async fn handle_command(&mut self, command: Command) -> Result<(), ScannerError> {
288 match command {
289 Command::StreamLive { sender, block_confirmations, response } => {
290 info!("Starting live stream");
291 let result = self.handle_live(block_confirmations, sender).await;
292 let _ = response.send(result);
293 }
294 Command::StreamHistorical { sender, start_height, end_height, response } => {
295 info!(start_height = ?start_height, end_height = ?end_height, "Starting historical stream");
296 let result = self.handle_historical(start_height, end_height, sender).await;
297 let _ = response.send(result);
298 }
299 Command::StreamFrom { sender, start_height, block_confirmations, response } => {
300 info!(start_height = ?start_height, "Starting streaming from");
301 let result = self.handle_sync(start_height, block_confirmations, sender).await;
302 let _ = response.send(result);
303 }
304 Command::Rewind { sender, start_height, end_height, response } => {
305 info!(start_height = ?start_height, end_height = ?end_height, "Starting rewind");
306 let result = self.handle_rewind(start_height, end_height, sender).await;
307 let _ = response.send(result);
308 }
309 }
310 Ok(())
311 }
312
313 async fn handle_live(
314 &mut self,
315 block_confirmations: u64,
316 sender: mpsc::Sender<Message>,
317 ) -> Result<(), ScannerError> {
318 let max_block_range = self.max_block_range;
319 let provider = self.provider.clone();
320 let latest = self.provider.get_block_number().await?;
321
322 let range_start = (latest + 1).saturating_sub(block_confirmations);
326
327 tokio::spawn(async move {
328 Self::stream_live_blocks(
329 range_start,
330 provider,
331 sender,
332 block_confirmations,
333 max_block_range,
334 )
335 .await;
336 });
337
338 Ok(())
339 }
340
341 async fn handle_historical(
342 &mut self,
343 start_height: BlockNumberOrTag,
344 end_height: BlockNumberOrTag,
345 sender: mpsc::Sender<Message>,
346 ) -> Result<(), ScannerError> {
347 let max_block_range = self.max_block_range;
348
349 let (start_block, end_block) = tokio::try_join!(
350 self.provider.get_block_by_number(start_height),
351 self.provider.get_block_by_number(end_height)
352 )?;
353
354 let start_block_num =
355 start_block.ok_or_else(|| ScannerError::BlockNotFound(start_height))?.header().number();
356 let end_block_num =
357 end_block.ok_or_else(|| ScannerError::BlockNotFound(end_height))?.header().number();
358
359 let (start_block_num, end_block_num) = match start_block_num.cmp(&end_block_num) {
360 Ordering::Greater => (end_block_num, start_block_num),
361 _ => (start_block_num, end_block_num),
362 };
363
364 info!(start_block = start_block_num, end_block = end_block_num, "Syncing historical data");
365
366 tokio::spawn(async move {
367 Self::stream_historical_blocks(
368 start_block_num,
369 end_block_num,
370 max_block_range,
371 &sender,
372 )
373 .await;
374 });
375
376 Ok(())
377 }
378
379 async fn handle_sync(
380 &mut self,
381 start_height: BlockNumberOrTag,
382 block_confirmations: u64,
383 sender: mpsc::Sender<Message>,
384 ) -> Result<(), ScannerError> {
385 let provider = self.provider.clone();
386 let max_block_range = self.max_block_range;
387
388 let get_start_block = async || -> Result<BlockNumber, ScannerError> {
389 let block = match start_height {
390 BlockNumberOrTag::Number(num) => num,
391 block_tag => provider
392 .get_block_by_number(block_tag)
393 .await?
394 .ok_or_else(|| ScannerError::BlockNotFound(block_tag))?
395 .header()
396 .number(),
397 };
398 Ok(block)
399 };
400
401 let get_latest_block = async || -> Result<BlockNumber, ScannerError> {
402 let block = provider
403 .get_block_by_number(BlockNumberOrTag::Latest)
404 .await?
405 .ok_or_else(|| ScannerError::BlockNotFound(BlockNumberOrTag::Latest))?
406 .header()
407 .number();
408 Ok(block)
409 };
410
411 let (start_block, latest_block) = tokio::try_join!(get_start_block(), get_latest_block())?;
414
415 let confirmed_tip = latest_block.saturating_sub(block_confirmations);
416
417 if start_block > confirmed_tip {
419 info!(
420 start_block = start_block,
421 confirmed_tip = confirmed_tip,
422 "Start block is beyond confirmed tip, starting live stream"
423 );
424
425 tokio::spawn(async move {
426 Self::stream_live_blocks(
427 start_block,
428 provider,
429 sender,
430 block_confirmations,
431 max_block_range,
432 )
433 .await;
434 });
435
436 return Ok(());
437 }
438
439 info!(start_block = start_block, end_block = confirmed_tip, "Syncing historical data");
440
441 let (live_block_buffer_sender, live_block_buffer_receiver) =
444 mpsc::channel::<Message>(MAX_BUFFERED_MESSAGES);
445
446 let cutoff = confirmed_tip;
449
450 tokio::spawn(async move {
452 Self::stream_live_blocks(
453 cutoff + 1,
454 provider,
455 live_block_buffer_sender,
456 block_confirmations,
457 max_block_range,
458 )
459 .await;
460 });
461
462 tokio::spawn(async move {
463 Self::stream_historical_blocks(start_block, confirmed_tip, max_block_range, &sender)
467 .await;
468
469 info!("Chain tip reached, switching to live");
470
471 if !sender.try_stream(ScannerStatus::SwitchingToLive).await {
472 return;
473 }
474
475 info!("Successfully transitioned from historical to live data");
476
477 Self::process_live_block_buffer(live_block_buffer_receiver, sender, cutoff).await;
485 });
486
487 Ok(())
488 }
489
490 async fn handle_rewind(
491 &mut self,
492 start_height: BlockNumberOrTag,
493 end_height: BlockNumberOrTag,
494 sender: mpsc::Sender<Message>,
495 ) -> Result<(), ScannerError> {
496 let max_block_range = self.max_block_range;
497 let provider = self.provider.clone();
498
499 let (start_block, end_block) = join!(
500 self.provider.get_block_by_number(start_height),
501 self.provider.get_block_by_number(end_height),
502 );
503
504 let start_block = start_block?.ok_or(ScannerError::BlockNotFound(start_height))?;
505 let end_block = end_block?.ok_or(ScannerError::BlockNotFound(end_height))?;
506
507 let (from, to) = match start_block.header().number().cmp(&end_block.header().number()) {
509 Ordering::Greater => (start_block, end_block),
510 _ => (end_block, start_block),
511 };
512
513 tokio::spawn(async move {
514 Self::stream_rewind(from, to, max_block_range, &sender, &provider).await;
515 });
516
517 Ok(())
518 }
519
520 async fn stream_rewind(
528 from: N::BlockResponse,
529 to: N::BlockResponse,
530 max_block_range: u64,
531 sender: &mpsc::Sender<Message>,
532 provider: &RootProvider<N>,
533 ) {
534 let mut batch_count = 0;
535
536 let mut tip_hash = from.header().hash();
538
539 let from = from.header().number();
540 let to = to.header().number();
541
542 let mut batch_from = from;
544
545 while batch_from >= to {
546 let batch_to = batch_from.saturating_sub(max_block_range - 1).max(to);
547
548 if !sender.try_stream(batch_to..=batch_from).await {
550 break;
551 }
552
553 batch_count += 1;
554 if batch_count % 10 == 0 {
555 debug!(batch_count = batch_count, "Processed rewind batches");
556 }
557
558 if batch_to == to {
561 break;
562 }
563
564 let reorged = match reorg_detected(provider, tip_hash).await {
565 Ok(detected) => {
566 info!(block_number = %from, hash = %tip_hash, "Reorg detected");
567 detected
568 }
569 Err(e) => {
570 error!(error = %e, "Terminal RPC call error, shutting down");
571 _ = sender.try_stream(e);
572 return;
573 }
574 };
575
576 if reorged {
577 info!(block_number = %from, hash = %tip_hash, "Reorg detected");
578
579 if !sender.try_stream(ScannerStatus::ReorgDetected).await {
580 break;
581 }
582
583 batch_from = from;
585 tip_hash = match provider.get_block_by_number(from.into()).await {
587 Ok(block) => block
588 .unwrap_or_else(|| {
589 panic!("Block with number '{from}' should exist post-reorg")
590 })
591 .header()
592 .hash(),
593 Err(e) => {
594 error!(error = %e, "Terminal RPC call error, shutting down");
595 _ = sender.try_stream(e);
596 return;
597 }
598 };
599 } else {
600 batch_from = batch_to - 1;
603 }
604 }
605
606 info!(batch_count = batch_count, "Rewind completed");
607 }
608
609 async fn stream_historical_blocks(
610 start: BlockNumber,
611 end: BlockNumber,
612 max_block_range: u64,
613 sender: &mpsc::Sender<Message>,
614 ) {
615 let mut batch_count = 0;
616
617 let mut next_start_block = start;
618
619 while next_start_block <= end {
622 let batch_end_block_number =
623 next_start_block.saturating_add(max_block_range - 1).min(end);
624
625 if !sender.try_stream(next_start_block..=batch_end_block_number).await {
626 break;
627 }
628
629 batch_count += 1;
630 if batch_count % 10 == 0 {
631 debug!(batch_count = batch_count, "Processed historical batches");
632 }
633
634 if batch_end_block_number == end {
635 break;
636 }
637
638 let next_start_block_number = batch_end_block_number.saturating_add(1);
640
641 next_start_block = next_start_block_number;
642 }
643
644 info!(batch_count = batch_count, "Historical sync completed");
645 }
646
647 async fn stream_live_blocks<P: Provider<N>>(
648 mut range_start: BlockNumber,
649 provider: P,
650 sender: mpsc::Sender<Message>,
651 block_confirmations: u64,
652 max_block_range: u64,
653 ) {
654 match Self::get_block_subscription(&provider).await {
655 Ok(ws_stream) => {
656 info!("WebSocket connected for live blocks");
657
658 let cutoff = range_start;
660 let mut stream =
661 ws_stream.into_stream().skip_while(|header| header.number() < cutoff);
662
663 while let Some(incoming_block) = stream.next().await {
664 let incoming_block_num = incoming_block.number();
665 info!(block_number = incoming_block_num, "Received block header");
666
667 if incoming_block_num < range_start {
668 warn!("Reorg detected: sending forked range");
669 if !sender.try_stream(ScannerStatus::ReorgDetected).await {
670 return;
671 }
672
673 let incoming_confirmed =
675 incoming_block_num.saturating_sub(block_confirmations);
676
677 range_start = incoming_confirmed;
679 }
680
681 let confirmed = incoming_block_num.saturating_sub(block_confirmations);
682 if confirmed >= range_start {
683 let range_end =
686 confirmed.min(range_start.saturating_add(max_block_range - 1));
687
688 info!(
689 range_start = range_start,
690 range_end = range_end,
691 "Sending live block range"
692 );
693
694 if !sender.try_stream(range_start..=range_end).await {
695 return;
696 }
697
698 range_start = range_end + 1;
700 }
701 }
702 }
703 Err(e) => {
704 _ = sender.try_stream(e).await;
705 }
706 }
707 }
708
709 async fn process_live_block_buffer(
710 mut buffer_rx: mpsc::Receiver<Message>,
711 sender: mpsc::Sender<Message>,
712 cutoff: BlockNumber,
713 ) {
714 let mut processed = 0;
715 let mut discarded = 0;
716
717 while let Some(data) = buffer_rx.recv().await {
719 match data {
720 Message::Data(range) => {
721 let (start, end) = (*range.start(), *range.end());
722 if start >= cutoff {
723 if !sender.try_stream(range).await {
724 break;
725 }
726 processed += end - start;
727 } else if end >= cutoff {
728 discarded += cutoff - start;
729
730 let start = cutoff;
731 if !sender.try_stream(start..=end).await {
732 break;
733 }
734 processed += end - start;
735 } else {
736 discarded += end - start;
737 }
738 }
739 other => {
740 if !sender.try_stream(other).await {
742 break;
743 }
744 }
745 }
746 }
747
748 info!(processed = processed, discarded = discarded, "Processed buffered messages");
749 }
750
751 async fn get_block_subscription(
752 provider: &impl Provider<N>,
753 ) -> Result<Subscription<N::HeaderResponse>, ScannerError> {
754 let ws_stream = provider
755 .subscribe_blocks()
756 .await
757 .map_err(|_| ScannerError::WebSocketConnectionFailed(1))?;
758
759 Ok(ws_stream)
760 }
761}
762
763async fn reorg_detected<N: Network>(
764 provider: &RootProvider<N>,
765 hash_to_check: B256,
766) -> Result<bool, RpcError<TransportErrorKind>> {
767 Ok(provider.get_block_by_hash(hash_to_check).await?.is_none())
768}
769
770pub struct BlockRangeScannerClient {
771 command_sender: mpsc::Sender<Command>,
772}
773
774impl BlockRangeScannerClient {
775 #[must_use]
781 pub fn new(command_sender: mpsc::Sender<Command>) -> Self {
782 Self { command_sender }
783 }
784
785 pub async fn stream_live(
795 &self,
796 block_confirmations: u64,
797 ) -> Result<ReceiverStream<Message>, ScannerError> {
798 let (blocks_sender, blocks_receiver) = mpsc::channel(MAX_BUFFERED_MESSAGES);
799 let (response_tx, response_rx) = oneshot::channel();
800
801 let command = Command::StreamLive {
802 sender: blocks_sender,
803 block_confirmations,
804 response: response_tx,
805 };
806
807 self.command_sender.send(command).await.map_err(|_| ScannerError::ServiceShutdown)?;
808
809 response_rx.await.map_err(|_| ScannerError::ServiceShutdown)??;
810
811 Ok(ReceiverStream::new(blocks_receiver))
812 }
813
814 pub async fn stream_historical(
825 &self,
826 start_height: impl Into<BlockNumberOrTag>,
827 end_height: impl Into<BlockNumberOrTag>,
828 ) -> Result<ReceiverStream<Message>, ScannerError> {
829 let (blocks_sender, blocks_receiver) = mpsc::channel(MAX_BUFFERED_MESSAGES);
830 let (response_tx, response_rx) = oneshot::channel();
831
832 let command = Command::StreamHistorical {
833 sender: blocks_sender,
834 start_height: start_height.into(),
835 end_height: end_height.into(),
836 response: response_tx,
837 };
838
839 self.command_sender.send(command).await.map_err(|_| ScannerError::ServiceShutdown)?;
840
841 response_rx.await.map_err(|_| ScannerError::ServiceShutdown)??;
842
843 Ok(ReceiverStream::new(blocks_receiver))
844 }
845
846 pub async fn stream_from(
857 &self,
858 start_height: impl Into<BlockNumberOrTag>,
859 block_confirmations: u64,
860 ) -> Result<ReceiverStream<Message>, ScannerError> {
861 let (blocks_sender, blocks_receiver) = mpsc::channel(MAX_BUFFERED_MESSAGES);
862 let (response_tx, response_rx) = oneshot::channel();
863
864 let command = Command::StreamFrom {
865 sender: blocks_sender,
866 start_height: start_height.into(),
867 block_confirmations,
868 response: response_tx,
869 };
870
871 self.command_sender.send(command).await.map_err(|_| ScannerError::ServiceShutdown)?;
872
873 response_rx.await.map_err(|_| ScannerError::ServiceShutdown)??;
874
875 Ok(ReceiverStream::new(blocks_receiver))
876 }
877
878 pub async fn rewind(
889 &self,
890 start_height: impl Into<BlockNumberOrTag>,
891 end_height: impl Into<BlockNumberOrTag>,
892 ) -> Result<ReceiverStream<Message>, ScannerError> {
893 let (blocks_sender, blocks_receiver) = mpsc::channel(MAX_BUFFERED_MESSAGES);
894 let (response_tx, response_rx) = oneshot::channel();
895
896 let command = Command::Rewind {
897 sender: blocks_sender,
898 start_height: start_height.into(),
899 end_height: end_height.into(),
900 response: response_tx,
901 };
902
903 self.command_sender.send(command).await.map_err(|_| ScannerError::ServiceShutdown)?;
904
905 response_rx.await.map_err(|_| ScannerError::ServiceShutdown)??;
906
907 Ok(ReceiverStream::new(blocks_receiver))
908 }
909}
910
911#[cfg(test)]
912mod tests {
913 use super::*;
914 use crate::{assert_closed, assert_empty, assert_next};
915 use alloy::{
916 network::Ethereum,
917 providers::{ProviderBuilder, ext::AnvilApi},
918 rpc::types::anvil::ReorgOptions,
919 };
920 use alloy_node_bindings::Anvil;
921 use tokio::sync::mpsc;
922 use tokio_stream::StreamExt;
923
924 #[test]
925 fn block_range_scanner_defaults_match_constants() {
926 let scanner = BlockRangeScanner::new();
927
928 assert_eq!(scanner.max_block_range, DEFAULT_MAX_BLOCK_RANGE);
929 }
930
931 #[test]
932 fn builder_methods_update_configuration() {
933 let max_block_range = 42;
934
935 let scanner = BlockRangeScanner::new().max_block_range(max_block_range);
936
937 assert_eq!(scanner.max_block_range, max_block_range);
938 }
939
940 #[tokio::test]
941 async fn live_mode_processes_all_blocks_respecting_block_confirmations() -> anyhow::Result<()> {
942 let anvil = Anvil::new().try_spawn()?;
943 let provider = ProviderBuilder::new().connect(anvil.endpoint().as_str()).await?;
944
945 let client = BlockRangeScanner::new()
948 .connect_ws::<Ethereum>(anvil.ws_endpoint_url())
949 .await?
950 .run()?;
951
952 let mut stream = client.stream_live(0).await?;
953
954 provider.anvil_mine(Some(5), None).await?;
955
956 assert_next!(stream, 1..=1);
957 assert_next!(stream, 2..=2);
958 assert_next!(stream, 3..=3);
959 assert_next!(stream, 4..=4);
960 assert_next!(stream, 5..=5);
961 let mut stream = assert_empty!(stream);
962
963 provider.anvil_mine(Some(1), None).await?;
964
965 assert_next!(stream, 6..=6);
966 assert_empty!(stream);
967
968 let mut stream = client.stream_live(1).await?;
971
972 provider.anvil_mine(Some(5), None).await?;
973
974 assert_next!(stream, 6..=6);
975 assert_next!(stream, 7..=7);
976 assert_next!(stream, 8..=8);
977 assert_next!(stream, 9..=9);
978 assert_next!(stream, 10..=10);
979 let mut stream = assert_empty!(stream);
980
981 provider.anvil_mine(Some(1), None).await?;
982
983 assert_next!(stream, 11..=11);
984 assert_empty!(stream);
985
986 Ok(())
987 }
988
989 #[tokio::test]
990 async fn stream_from_latest_starts_at_tip_not_confirmed() -> anyhow::Result<()> {
991 let anvil = Anvil::new().try_spawn()?;
992
993 let provider = ProviderBuilder::new().connect(anvil.endpoint().as_str()).await?;
994 provider.anvil_mine(Some(20), None).await?;
995
996 let block_confirmations = 5;
997
998 let client = BlockRangeScanner::new()
999 .connect_ws::<Ethereum>(anvil.ws_endpoint_url())
1000 .await?
1001 .run()?;
1002
1003 let stream = client.stream_from(BlockNumberOrTag::Latest, block_confirmations).await?;
1004
1005 let stream = assert_empty!(stream);
1006
1007 provider.anvil_mine(Some(4), None).await?;
1008
1009 let mut stream = assert_empty!(stream);
1010
1011 provider.anvil_mine(Some(1), None).await?;
1012
1013 assert_next!(stream, 20..=20);
1014 let mut stream = assert_empty!(stream);
1015
1016 provider.anvil_mine(Some(1), None).await?;
1017
1018 assert_next!(stream, 21..=21);
1019 assert_empty!(stream);
1020
1021 Ok(())
1022 }
1023
1024 #[tokio::test]
1025 #[ignore = "Flaky test, see: https://github.com/OpenZeppelin/Event-Scanner/issues/109"]
1026 async fn continuous_blocks_if_reorg_less_than_block_confirmation() -> anyhow::Result<()> {
1027 let anvil = Anvil::new().try_spawn()?;
1028
1029 let provider = ProviderBuilder::new().connect(anvil.endpoint().as_str()).await?;
1030
1031 let block_confirmations = 5;
1032
1033 let client = BlockRangeScanner::new()
1034 .connect_ws::<Ethereum>(anvil.ws_endpoint_url())
1035 .await?
1036 .run()?;
1037
1038 let mut receiver = client.stream_live(block_confirmations).await?;
1039
1040 provider.anvil_mine(Some(10), None).await?;
1041
1042 provider
1043 .anvil_reorg(ReorgOptions { depth: block_confirmations - 1, tx_block_pairs: vec![] })
1044 .await?;
1045
1046 provider.anvil_mine(Some(20), None).await?;
1047
1048 let mut block_range_start = 0;
1049
1050 let end_loop = 20;
1051 let mut i = 0;
1052 while let Some(Message::Data(range)) = receiver.next().await {
1053 if block_range_start == 0 {
1054 block_range_start = *range.start();
1055 }
1056
1057 assert_eq!(block_range_start, *range.start());
1058 assert!(range.end() >= range.start());
1059 block_range_start = *range.end() + 1;
1060 i += 1;
1061 if i == end_loop {
1062 break;
1063 }
1064 }
1065 Ok(())
1066 }
1067
1068 #[tokio::test]
1069 #[ignore = "Flaky test, see: https://github.com/OpenZeppelin/Event-Scanner/issues/109"]
1070 async fn shallow_block_confirmation_does_not_mitigate_reorg() -> anyhow::Result<()> {
1071 let anvil = Anvil::new().block_time(1).try_spawn()?;
1072
1073 let provider = ProviderBuilder::new().connect(anvil.endpoint().as_str()).await?;
1074
1075 let block_confirmations = 3;
1076
1077 let client = BlockRangeScanner::new()
1078 .connect_ws::<Ethereum>(anvil.ws_endpoint_url())
1079 .await?
1080 .run()?;
1081
1082 let mut receiver = client.stream_live(block_confirmations).await?;
1083
1084 provider.anvil_mine(Some(10), None).await?;
1085
1086 provider
1087 .anvil_reorg(ReorgOptions { depth: block_confirmations + 5, tx_block_pairs: vec![] })
1088 .await?;
1089
1090 provider.anvil_mine(Some(30), None).await?;
1091 receiver.close();
1092
1093 let mut block_range_start = 0;
1094
1095 let mut block_num = vec![];
1096 let mut reorg_detected = false;
1097 while let Some(msg) = receiver.next().await {
1098 match msg {
1099 Message::Data(range) => {
1100 if block_range_start == 0 {
1101 block_range_start = *range.start();
1102 }
1103 block_num.push(range);
1104 if block_num.len() == 15 {
1105 break;
1106 }
1107 }
1108 Message::Status(ScannerStatus::ReorgDetected) => {
1109 reorg_detected = true;
1110 }
1111 _ => {
1112 break;
1113 }
1114 }
1115 }
1116 assert!(reorg_detected, "Reorg should have been detected");
1117
1118 let mut found_reorg_pattern = false;
1123 for window in block_num.windows(2) {
1124 if window[1].start() < window[0].end() {
1125 found_reorg_pattern = true;
1126 break;
1127 }
1128 }
1129 assert!(found_reorg_pattern,);
1130
1131 Ok(())
1132 }
1133
1134 #[tokio::test]
1135 #[ignore = "too flaky, un-ignore once a full local node is used: https://github.com/OpenZeppelin/Event-Scanner/issues/109"]
1136 async fn historical_emits_correction_range_when_reorg_below_end() -> anyhow::Result<()> {
1137 let anvil = Anvil::new().try_spawn()?;
1138 let provider = ProviderBuilder::new().connect(anvil.ws_endpoint_url().as_str()).await?;
1139
1140 provider.anvil_mine(Some(120), None).await?;
1141
1142 let end_num = 110;
1143
1144 let client = BlockRangeScanner::new()
1145 .max_block_range(30)
1146 .connect_ws::<Ethereum>(anvil.ws_endpoint_url())
1147 .await?
1148 .run()?;
1149
1150 let mut stream = client
1151 .stream_historical(BlockNumberOrTag::Number(0), BlockNumberOrTag::Number(end_num))
1152 .await?;
1153
1154 let depth = 15;
1155 _ = provider.anvil_reorg(ReorgOptions { depth, tx_block_pairs: vec![] }).await;
1156 _ = provider.anvil_mine(Some(20), None).await;
1157
1158 assert_next!(stream, 0..=29);
1159 assert_next!(stream, 30..=59);
1160 assert_next!(stream, 60..=89);
1161 assert_next!(stream, 90..=110);
1162 assert_next!(stream, ScannerStatus::ReorgDetected);
1163 assert_next!(stream, 105..=110);
1164 assert_closed!(stream);
1165
1166 Ok(())
1167 }
1168
1169 #[tokio::test]
1170 #[ignore = "too flaky, un-ignore once a full local node is used: https://github.com/OpenZeppelin/Event-Scanner/issues/109"]
1171 async fn historical_emits_correction_range_when_end_num_reorgs() -> anyhow::Result<()> {
1172 let anvil = Anvil::new().try_spawn()?;
1173 let provider = ProviderBuilder::new().connect(anvil.ws_endpoint_url().as_str()).await?;
1174
1175 provider.anvil_mine(Some(120), None).await?;
1176
1177 let end_num = 120;
1178
1179 let client = BlockRangeScanner::new()
1180 .max_block_range(30)
1181 .connect_ws::<Ethereum>(anvil.ws_endpoint_url())
1182 .await?
1183 .run()?;
1184
1185 let mut stream = client
1186 .stream_historical(BlockNumberOrTag::Number(0), BlockNumberOrTag::Number(end_num))
1187 .await?;
1188
1189 let pre_reorg_mine = 20;
1190 _ = provider.anvil_mine(Some(pre_reorg_mine), None).await;
1191 let depth = pre_reorg_mine + 1;
1192 _ = provider.anvil_reorg(ReorgOptions { depth, tx_block_pairs: vec![] }).await;
1193 _ = provider.anvil_mine(Some(20), None).await;
1194
1195 assert_next!(stream, 0..=29);
1196 assert_next!(stream, 30..=59);
1197 assert_next!(stream, 60..=89);
1198 assert_next!(stream, 90..=120);
1199 assert_next!(stream, ScannerStatus::ReorgDetected);
1200 assert_next!(stream, 120..=120);
1201 assert_closed!(stream);
1202
1203 Ok(())
1204 }
1205
1206 #[tokio::test]
1207 async fn historic_mode_respects_blocks_read_per_epoch() -> anyhow::Result<()> {
1208 let anvil = Anvil::new().try_spawn()?;
1209
1210 let provider = ProviderBuilder::new().connect(anvil.endpoint().as_str()).await?;
1211
1212 provider.anvil_mine(Some(100), None).await?;
1213
1214 let client = BlockRangeScanner::new()
1215 .max_block_range(5)
1216 .connect_ws::<Ethereum>(anvil.ws_endpoint_url())
1217 .await?
1218 .run()?;
1219
1220 let mut stream = client.stream_historical(0, 19).await?;
1222 assert_next!(stream, 0..=4);
1223 assert_next!(stream, 5..=9);
1224 assert_next!(stream, 10..=14);
1225 assert_next!(stream, 15..=19);
1226 assert_closed!(stream);
1227
1228 let mut stream = client.stream_historical(93, 99).await?;
1230 assert_next!(stream, 93..=97);
1231 assert_next!(stream, 98..=99);
1232 assert_closed!(stream);
1233
1234 let mut stream = client.stream_historical(3, 5).await?;
1236 assert_next!(stream, 3..=5);
1237 assert_closed!(stream);
1238
1239 let mut stream = client.stream_historical(3, 3).await?;
1241 assert_next!(stream, 3..=3);
1242 assert_closed!(stream);
1243
1244 let client = BlockRangeScanner::new()
1246 .max_block_range(200)
1247 .connect_ws::<Ethereum>(anvil.ws_endpoint_url())
1248 .await?
1249 .run()?;
1250
1251 let mut stream = client.stream_historical(0, 20).await?;
1252 assert_next!(stream, 0..=20);
1253 assert_closed!(stream);
1254
1255 let mut stream = client.stream_historical(0, 99).await?;
1256 assert_next!(stream, 0..=99);
1257 assert_closed!(stream);
1258
1259 Ok(())
1260 }
1261
1262 #[tokio::test]
1263 async fn historic_mode_normalises_start_and_end_block() -> anyhow::Result<()> {
1264 let anvil = Anvil::new().try_spawn()?;
1265
1266 let provider = ProviderBuilder::new().connect(anvil.endpoint().as_str()).await?;
1267 provider.anvil_mine(Some(11), None).await?;
1268
1269 let client = BlockRangeScanner::new()
1270 .max_block_range(5)
1271 .connect_ws::<Ethereum>(anvil.ws_endpoint_url())
1272 .await?
1273 .run()?;
1274
1275 let mut stream = client.stream_historical(10, 0).await?;
1276 assert_next!(stream, 0..=4);
1277 assert_next!(stream, 5..=9);
1278 assert_next!(stream, 10..=10);
1279 assert_closed!(stream);
1280
1281 Ok(())
1282 }
1283
1284 #[tokio::test]
1285 async fn buffered_messages_after_cutoff_are_all_passed() {
1286 let cutoff = 50;
1287 let (buffer_tx, buffer_rx) = mpsc::channel(8);
1288 buffer_tx.send(Message::Data(51..=55)).await.unwrap();
1289 buffer_tx.send(Message::Data(56..=60)).await.unwrap();
1290 buffer_tx.send(Message::Data(61..=70)).await.unwrap();
1291 drop(buffer_tx);
1292
1293 let (out_tx, out_rx) = mpsc::channel(8);
1294 Service::<Ethereum>::process_live_block_buffer(buffer_rx, out_tx, cutoff).await;
1295
1296 let mut stream = ReceiverStream::new(out_rx);
1297
1298 assert_next!(stream, 51..=55);
1299 assert_next!(stream, 56..=60);
1300 assert_next!(stream, 61..=70);
1301 assert_closed!(stream);
1302 }
1303
1304 #[tokio::test]
1305 async fn ranges_entirely_before_cutoff_are_discarded() {
1306 let cutoff = 100;
1307
1308 let (buffer_tx, buffer_rx) = mpsc::channel(8);
1309 buffer_tx.send(Message::Data(40..=50)).await.unwrap();
1310 buffer_tx.send(Message::Data(51..=60)).await.unwrap();
1311 buffer_tx.send(Message::Data(61..=70)).await.unwrap();
1312 drop(buffer_tx);
1313
1314 let (out_tx, out_rx) = mpsc::channel(8);
1315 Service::<Ethereum>::process_live_block_buffer(buffer_rx, out_tx, cutoff).await;
1316
1317 let mut stream = ReceiverStream::new(out_rx);
1318
1319 assert_closed!(stream);
1320 }
1321
1322 #[tokio::test]
1323 async fn ranges_overlapping_cutoff_are_trimmed() {
1324 let cutoff = 75;
1325
1326 let (buffer_tx, buffer_rx) = mpsc::channel(8);
1327 buffer_tx.send(Message::Data(60..=70)).await.unwrap();
1328 buffer_tx.send(Message::Data(71..=80)).await.unwrap();
1329 buffer_tx.send(Message::Data(81..=86)).await.unwrap();
1330 drop(buffer_tx);
1331
1332 let (out_tx, out_rx) = mpsc::channel(8);
1333 Service::<Ethereum>::process_live_block_buffer(buffer_rx, out_tx, cutoff).await;
1334
1335 let mut stream = ReceiverStream::new(out_rx);
1336
1337 assert_next!(stream, 75..=80);
1338 assert_next!(stream, 81..=86);
1339 assert_closed!(stream);
1340 }
1341
1342 #[tokio::test]
1343 async fn edge_case_range_exactly_at_cutoff() {
1344 let cutoff = 100;
1345
1346 let (buffer_tx, buffer_rx) = mpsc::channel(8);
1347 buffer_tx.send(Message::Data(98..=98)).await.unwrap(); buffer_tx.send(Message::Data(99..=100)).await.unwrap(); buffer_tx.send(Message::Data(100..=100)).await.unwrap(); buffer_tx.send(Message::Data(100..=101)).await.unwrap(); buffer_tx.send(Message::Data(102..=102)).await.unwrap(); drop(buffer_tx);
1353
1354 let (out_tx, out_rx) = mpsc::channel(8);
1355 Service::<Ethereum>::process_live_block_buffer(buffer_rx, out_tx, cutoff).await;
1356
1357 let mut stream = ReceiverStream::new(out_rx);
1358
1359 assert_next!(stream, 100..=100);
1360 assert_next!(stream, 100..=100);
1361 assert_next!(stream, 100..=101);
1362 assert_next!(stream, 102..=102);
1363 assert_closed!(stream);
1364 }
1365
1366 #[tokio::test]
1367 async fn try_send_forwards_errors_to_subscribers() {
1368 let (tx, mut rx) = mpsc::channel(1);
1369
1370 _ = tx.try_stream(ScannerError::WebSocketConnectionFailed(4)).await;
1371
1372 assert!(matches!(
1373 rx.recv().await,
1374 Some(Message::Error(ScannerError::WebSocketConnectionFailed(4)))
1375 ));
1376 }
1377
1378 #[tokio::test]
1379 async fn rewind_single_batch_when_epoch_larger_than_range() -> anyhow::Result<()> {
1380 let anvil = Anvil::new().try_spawn()?;
1381
1382 let provider = ProviderBuilder::new().connect(anvil.endpoint().as_str()).await?;
1383
1384 provider.anvil_mine(Some(150), None).await?;
1385
1386 let client = BlockRangeScanner::new()
1387 .max_block_range(100)
1388 .connect_ws::<Ethereum>(anvil.ws_endpoint_url())
1389 .await?
1390 .run()?;
1391
1392 let mut stream = client.rewind(100, 150).await?;
1393
1394 assert_next!(stream, 100..=150);
1396 assert_closed!(stream);
1397
1398 Ok(())
1399 }
1400
1401 #[tokio::test]
1402 async fn rewind_exact_multiple_of_epoch_creates_full_batches_in_reverse() -> anyhow::Result<()>
1403 {
1404 let anvil = Anvil::new().try_spawn()?;
1405
1406 let provider = ProviderBuilder::new().connect(anvil.endpoint().as_str()).await?;
1407
1408 provider.anvil_mine(Some(15), None).await?;
1409
1410 let client = BlockRangeScanner::new()
1411 .max_block_range(5)
1412 .connect_ws::<Ethereum>(anvil.ws_endpoint_url())
1413 .await?
1414 .run()?;
1415
1416 let mut stream = client.rewind(0, 14).await?;
1417
1418 assert_next!(stream, 10..=14);
1420 assert_next!(stream, 5..=9);
1421 assert_next!(stream, 0..=4);
1422 assert_closed!(stream);
1423
1424 Ok(())
1425 }
1426
1427 #[tokio::test]
1428 async fn rewind_with_remainder_trims_first_batch_to_stream_start() -> anyhow::Result<()> {
1429 let anvil = Anvil::new().try_spawn()?;
1430
1431 let provider = ProviderBuilder::new().connect(anvil.endpoint().as_str()).await?;
1432
1433 provider.anvil_mine(Some(15), None).await?;
1434
1435 let client = BlockRangeScanner::new()
1436 .max_block_range(4)
1437 .connect_ws::<Ethereum>(anvil.ws_endpoint_url())
1438 .await?
1439 .run()?;
1440
1441 let mut stream = client.rewind(3, 12).await?;
1442
1443 assert_next!(stream, 9..=12);
1445 assert_next!(stream, 5..=8);
1446 assert_next!(stream, 3..=4);
1447 assert_closed!(stream);
1448
1449 Ok(())
1450 }
1451
1452 #[tokio::test]
1453 async fn rewind_single_block_range() -> anyhow::Result<()> {
1454 let anvil = Anvil::new().try_spawn()?;
1455
1456 let provider = ProviderBuilder::new().connect(anvil.endpoint().as_str()).await?;
1457
1458 provider.anvil_mine(Some(15), None).await?;
1459
1460 let client = BlockRangeScanner::new()
1461 .max_block_range(5)
1462 .connect_ws::<Ethereum>(anvil.ws_endpoint_url())
1463 .await?
1464 .run()?;
1465
1466 let mut stream = client.rewind(7, 7).await?;
1467
1468 assert_next!(stream, 7..=7);
1469 assert_closed!(stream);
1470
1471 Ok(())
1472 }
1473
1474 #[tokio::test]
1475 async fn rewind_epoch_of_one_sends_each_block_in_reverse_order() -> anyhow::Result<()> {
1476 let anvil = Anvil::new().try_spawn()?;
1477
1478 let provider = ProviderBuilder::new().connect(anvil.endpoint().as_str()).await?;
1479
1480 provider.anvil_mine(Some(15), None).await?;
1481
1482 let client = BlockRangeScanner::new()
1483 .max_block_range(1)
1484 .connect_ws::<Ethereum>(anvil.ws_endpoint_url())
1485 .await?
1486 .run()?;
1487
1488 let mut stream = client.rewind(5, 8).await?;
1489
1490 assert_next!(stream, 8..=8);
1492 assert_next!(stream, 7..=7);
1493 assert_next!(stream, 6..=6);
1494 assert_next!(stream, 5..=5);
1495 assert_closed!(stream);
1496
1497 Ok(())
1498 }
1499
1500 #[tokio::test]
1501 async fn command_rewind_defaults_latest_to_earliest_batches_correctly() -> anyhow::Result<()> {
1502 let anvil = Anvil::new().try_spawn()?;
1503
1504 let provider = ProviderBuilder::new().connect(anvil.endpoint().as_str()).await?;
1505 provider.anvil_mine(Some(20), None).await?;
1507
1508 let client = BlockRangeScanner::new()
1509 .max_block_range(7)
1510 .connect_ws::<Ethereum>(anvil.ws_endpoint_url())
1511 .await?
1512 .run()?;
1513
1514 let mut stream =
1515 client.rewind(BlockNumberOrTag::Earliest, BlockNumberOrTag::Latest).await?;
1516
1517 assert_next!(stream, 14..=20);
1518 assert_next!(stream, 7..=13);
1519 assert_next!(stream, 0..=6);
1520 assert_closed!(stream);
1521
1522 Ok(())
1523 }
1524
1525 #[tokio::test]
1526 async fn command_rewind_handles_start_and_end_in_any_order() -> anyhow::Result<()> {
1527 let anvil = Anvil::new().try_spawn()?;
1528
1529 let provider = ProviderBuilder::new().connect(anvil.endpoint().as_str()).await?;
1530 provider.anvil_mine(Some(16), None).await?;
1532
1533 let client = BlockRangeScanner::new()
1534 .max_block_range(5)
1535 .connect_ws::<Ethereum>(anvil.ws_endpoint_url())
1536 .await?
1537 .run()?;
1538
1539 let mut stream = client.rewind(15, 3).await?;
1540
1541 assert_next!(stream, 11..=15);
1542 assert_next!(stream, 6..=10);
1543 assert_next!(stream, 3..=5);
1544 assert_closed!(stream);
1545
1546 let mut stream = client.rewind(3, 15).await?;
1547
1548 assert_next!(stream, 11..=15);
1549 assert_next!(stream, 6..=10);
1550 assert_next!(stream, 3..=5);
1551 assert_closed!(stream);
1552
1553 Ok(())
1554 }
1555
1556 #[tokio::test]
1557 async fn command_rewind_propagates_block_not_found_error() -> anyhow::Result<()> {
1558 let anvil = Anvil::new().try_spawn()?;
1559
1560 let client = BlockRangeScanner::new()
1562 .max_block_range(5)
1563 .connect_ws::<Ethereum>(anvil.ws_endpoint_url())
1564 .await?
1565 .run()?;
1566
1567 let stream = client.rewind(0, 999).await;
1568
1569 assert!(matches!(stream, Err(ScannerError::BlockNotFound(BlockNumberOrTag::Number(999)))));
1570
1571 Ok(())
1572 }
1573}