1use std::{ops::RangeInclusive, sync::Arc};
69
70use tokio::sync::{mpsc, oneshot};
71use tokio_stream::{StreamExt, wrappers::ReceiverStream};
72
73use crate::types::{ScannerMessage, ScannerStatus};
74use alloy::{
75 consensus::BlockHeader,
76 eips::BlockNumberOrTag,
77 network::{BlockResponse, Network, primitives::HeaderResponse},
78 primitives::{BlockHash, BlockNumber},
79 providers::{Provider, RootProvider},
80 pubsub::Subscription,
81 rpc::client::ClientBuilder,
82 transports::{
83 RpcError, TransportErrorKind, TransportResult,
84 http::reqwest::{self, Url},
85 ws::WsConnect,
86 },
87};
88use thiserror::Error;
89use tracing::{debug, error, info, warn};
90
91pub const DEFAULT_BLOCKS_READ_PER_EPOCH: usize = 1000;
92pub const DEFAULT_BLOCK_CONFIRMATIONS: u64 = 0;
94pub const MAX_BUFFERED_MESSAGES: usize = 50000;
97
98pub const DEFAULT_REORG_REWIND_DEPTH: u64 = 0;
99
100pub type BlockRangeMessage = ScannerMessage<RangeInclusive<BlockNumber>, BlockRangeScannerError>;
105
106#[derive(Error, Debug, Clone)]
107pub enum BlockRangeScannerError {
108 #[error("HTTP request failed: {0}")]
109 HttpError(Arc<reqwest::Error>),
110
111 #[error("Serialization error: {0}")]
114 SerializationError(Arc<serde_json::Error>),
115
116 #[error("RPC error: {0}")]
117 RpcError(Arc<RpcError<TransportErrorKind>>),
118
119 #[error("Channel send error")]
120 ChannelError,
121
122 #[error("Service is shutting down")]
123 ServiceShutdown,
124
125 #[error("Only one subscriber allowed at a time")]
126 MultipleSubscribers,
127
128 #[error("No subscriber set for streaming")]
129 NoSubscriber,
130
131 #[error("Historical sync failed: {0}")]
132 HistoricalSyncError(String),
133
134 #[error("WebSocket connection failed after {0} attempts")]
135 WebSocketConnectionFailed(usize),
136
137 #[error("Block not found, block number: {0}")]
138 BlockNotFound(BlockNumberOrTag),
139}
140
141impl From<reqwest::Error> for BlockRangeScannerError {
142 fn from(error: reqwest::Error) -> Self {
143 BlockRangeScannerError::HttpError(Arc::new(error))
144 }
145}
146
147impl From<serde_json::Error> for BlockRangeScannerError {
148 fn from(error: serde_json::Error) -> Self {
149 BlockRangeScannerError::SerializationError(Arc::new(error))
150 }
151}
152
153impl From<RpcError<TransportErrorKind>> for BlockRangeScannerError {
154 fn from(error: RpcError<TransportErrorKind>) -> Self {
155 BlockRangeScannerError::RpcError(Arc::new(error))
156 }
157}
158
159#[derive(Debug)]
160pub enum Command {
161 StreamLive {
162 sender: mpsc::Sender<BlockRangeMessage>,
163 response: oneshot::Sender<Result<(), BlockRangeScannerError>>,
164 },
165 StreamHistorical {
166 sender: mpsc::Sender<BlockRangeMessage>,
167 start_height: BlockNumberOrTag,
168 end_height: BlockNumberOrTag,
169 response: oneshot::Sender<Result<(), BlockRangeScannerError>>,
170 },
171 StreamFrom {
172 sender: mpsc::Sender<BlockRangeMessage>,
173 start_height: BlockNumberOrTag,
174 response: oneshot::Sender<Result<(), BlockRangeScannerError>>,
175 },
176 Unsubscribe {
177 response: oneshot::Sender<Result<(), BlockRangeScannerError>>,
178 },
179 Shutdown {
180 response: oneshot::Sender<Result<(), BlockRangeScannerError>>,
181 },
182}
183
184#[derive(Default, Debug, Clone)]
185pub struct BlockHashAndNumber {
186 pub hash: BlockHash,
187 pub number: BlockNumber,
188}
189
190impl BlockHashAndNumber {
191 fn from_header<N: Network>(header: &N::HeaderResponse) -> Self {
192 Self { hash: header.hash(), number: header.number() }
193 }
194}
195
196#[derive(Clone)]
197struct Config {
198 blocks_read_per_epoch: usize,
199 reorg_rewind_depth: u64,
200 #[allow(
201 dead_code,
202 reason = "Will be used in reorg mechanism: https://github.com/OpenZeppelin/Event-Scanner/issues/5"
203 )]
204 block_confirmations: u64,
205}
206
207pub struct BlockRangeScanner {
208 blocks_read_per_epoch: usize,
209 reorg_rewind_depth: u64,
210 block_confirmations: u64,
211}
212
213impl Default for BlockRangeScanner {
214 fn default() -> Self {
215 Self::new()
216 }
217}
218
219impl BlockRangeScanner {
220 #[must_use]
221 pub fn new() -> Self {
222 Self {
223 blocks_read_per_epoch: DEFAULT_BLOCKS_READ_PER_EPOCH,
224 reorg_rewind_depth: DEFAULT_REORG_REWIND_DEPTH,
225 block_confirmations: DEFAULT_BLOCK_CONFIRMATIONS,
226 }
227 }
228
229 #[must_use]
230 pub fn with_blocks_read_per_epoch(mut self, blocks_read_per_epoch: usize) -> Self {
231 self.blocks_read_per_epoch = blocks_read_per_epoch;
232 self
233 }
234
235 #[must_use]
236 pub fn with_reorg_rewind_depth(mut self, reorg_rewind_depth: u64) -> Self {
237 self.reorg_rewind_depth = reorg_rewind_depth;
238 self
239 }
240
241 #[must_use]
242 pub fn with_block_confirmations(mut self, block_confirmations: u64) -> Self {
243 self.block_confirmations = block_confirmations;
244 self
245 }
246
247 pub async fn connect_ws<N: Network>(
253 self,
254 ws_url: Url,
255 ) -> TransportResult<ConnectedBlockRangeScanner<N>> {
256 let provider =
257 RootProvider::<N>::new(ClientBuilder::default().ws(WsConnect::new(ws_url)).await?);
258 self.connect_provider(provider)
259 }
260
261 pub async fn connect_ipc<N: Network>(
267 self,
268 ipc_path: String,
269 ) -> TransportResult<ConnectedBlockRangeScanner<N>> {
270 let provider = RootProvider::<N>::new(ClientBuilder::default().ipc(ipc_path.into()).await?);
271 self.connect_provider(provider)
272 }
273
274 pub fn connect_provider<N: Network>(
280 self,
281 provider: RootProvider<N>,
282 ) -> TransportResult<ConnectedBlockRangeScanner<N>> {
283 Ok(ConnectedBlockRangeScanner {
284 provider,
285 config: Config {
286 blocks_read_per_epoch: self.blocks_read_per_epoch,
287 reorg_rewind_depth: self.reorg_rewind_depth,
288 block_confirmations: self.block_confirmations,
289 },
290 })
291 }
292}
293
294pub struct ConnectedBlockRangeScanner<N: Network> {
295 provider: RootProvider<N>,
296 config: Config,
297}
298
299impl<N: Network> ConnectedBlockRangeScanner<N> {
300 #[must_use]
302 pub fn provider(&self) -> &RootProvider<N> {
303 &self.provider
304 }
305
306 pub fn run(&self) -> Result<BlockRangeScannerClient, BlockRangeScannerError> {
312 let (service, cmd_tx) = Service::new(self.config.clone(), self.provider.clone());
313 tokio::spawn(async move {
314 service.run().await;
315 });
316 Ok(BlockRangeScannerClient::new(cmd_tx))
317 }
318}
319
320struct Service<N: Network> {
321 config: Config,
322 provider: RootProvider<N>,
323 subscriber: Option<mpsc::Sender<BlockRangeMessage>>,
324 next_start_block: BlockHashAndNumber,
325 websocket_connected: bool,
326 processed_count: u64,
327 error_count: u64,
328 command_receiver: mpsc::Receiver<Command>,
329 shutdown: bool,
330}
331
332impl<N: Network> Service<N> {
333 pub fn new(config: Config, provider: RootProvider<N>) -> (Self, mpsc::Sender<Command>) {
334 let (cmd_tx, cmd_rx) = mpsc::channel(100);
335
336 let service = Self {
337 config,
338 provider,
339 subscriber: None,
340 next_start_block: BlockHashAndNumber::default(),
341 websocket_connected: false,
342 processed_count: 0,
343 error_count: 0,
344 command_receiver: cmd_rx,
345 shutdown: false,
346 };
347
348 (service, cmd_tx)
349 }
350
351 pub async fn run(mut self) {
352 info!("Starting subscription service");
353
354 while !self.shutdown {
355 tokio::select! {
356 cmd = self.command_receiver.recv() => {
357 if let Some(command) = cmd {
358 if let Err(e) = self.handle_command(command).await {
359 error!("Command handling error: {}", e);
360 self.error_count += 1;
361 }
362 } else {
363 info!("Command channel closed, shutting down");
364 break;
365 }
366 }
367 }
368 }
369
370 info!("Subscription service stopped");
371 }
372
373 async fn handle_command(&mut self, command: Command) -> Result<(), BlockRangeScannerError> {
374 match command {
375 Command::StreamLive { sender, response } => {
376 self.ensure_no_subscriber()?;
377 info!("Starting live stream");
378 self.subscriber = Some(sender);
379 let result = self.handle_live().await;
380 let _ = response.send(result);
381 }
382 Command::StreamHistorical { sender, start_height, end_height, response } => {
383 self.ensure_no_subscriber()?;
384 info!(start_height = ?start_height, end_height = ?end_height, "Starting historical stream");
385 self.subscriber = Some(sender);
386 let result = self.handle_historical(start_height, end_height).await;
387 let _ = response.send(result);
388 }
389 Command::StreamFrom { sender, start_height, response } => {
390 self.ensure_no_subscriber()?;
391 self.subscriber = Some(sender);
392 info!(start_height = ?start_height, "Starting streaming from");
393 let result = self.handle_sync(start_height).await;
394 let _ = response.send(result);
395 }
396 Command::Unsubscribe { response } => {
397 self.handle_unsubscribe();
398 let _ = response.send(Ok(()));
399 }
400 Command::Shutdown { response } => {
401 self.shutdown = true;
402 self.handle_unsubscribe();
403 let _ = response.send(Ok(()));
404 }
405 }
406 Ok(())
407 }
408
409 async fn handle_live(&mut self) -> Result<(), BlockRangeScannerError> {
410 let Some(sender) = self.subscriber.clone() else {
411 return Err(BlockRangeScannerError::ServiceShutdown);
412 };
413
414 let block_confirmations = self.config.block_confirmations;
415 let provider = self.provider.clone();
416 let latest = self.provider.get_block_number().await?;
417
418 let range_start = (latest + 1).saturating_sub(block_confirmations);
422
423 tokio::spawn(async move {
424 Self::stream_live_blocks(range_start, provider, sender, block_confirmations).await;
425 });
426
427 Ok(())
428 }
429
430 async fn handle_historical(
431 &mut self,
432 start_height: BlockNumberOrTag,
433 end_height: BlockNumberOrTag,
434 ) -> Result<(), BlockRangeScannerError> {
435 let start_block = self.provider.get_block_by_number(start_height).await?.ok_or(
436 BlockRangeScannerError::HistoricalSyncError(format!(
437 "Start block {start_height:?} not found"
438 )),
439 )?;
440 let end_block = self.provider.get_block_by_number(end_height).await?.ok_or(
441 BlockRangeScannerError::HistoricalSyncError(format!(
442 "End block {end_height:?} not found"
443 )),
444 )?;
445
446 if end_block.header().number() < start_block.header().number() {
447 return Err(BlockRangeScannerError::HistoricalSyncError(format!(
448 "End block {end_height:?} is lower than start block {start_height:?}"
449 )));
450 }
451
452 info!(
453 start_block = start_block.header().number(),
454 end_block = end_block.header().number(),
455 "Syncing historical data"
456 );
457
458 self.sync_historical_data(start_block, end_block).await?;
459
460 _ = self.subscriber.take();
461
462 info!("Successfully synced historical data, closing the stream");
463
464 Ok(())
465 }
466
467 async fn handle_sync(
468 &mut self,
469 start_height: BlockNumberOrTag,
470 ) -> Result<(), BlockRangeScannerError> {
471 let start_block = self.provider.get_block_by_number(start_height).await?.ok_or(
474 BlockRangeScannerError::HistoricalSyncError(format!(
475 "Start block {start_height:?} not found"
476 )),
477 )?;
478
479 let latest_block =
480 self.provider.get_block_by_number(BlockNumberOrTag::Latest).await?.ok_or(
481 BlockRangeScannerError::HistoricalSyncError("Latest block not found".to_string()),
482 )?;
483
484 let block_confirmations = self.config.block_confirmations;
485 let confirmed_tip_num = latest_block.header().number().saturating_sub(block_confirmations);
486
487 if start_block.header().number() > confirmed_tip_num {
489 info!(
490 start_block = start_block.header().number(),
491 confirmed_tip = confirmed_tip_num,
492 "Start block is beyond confirmed tip, starting live stream"
493 );
494
495 let Some(sender) = self.subscriber.clone() else {
496 return Err(BlockRangeScannerError::ServiceShutdown);
497 };
498
499 let provider = self.provider.clone();
500 let expected_next = start_block.header().number();
501 tokio::spawn(async move {
502 Self::stream_live_blocks(expected_next, provider, sender, block_confirmations)
503 .await;
504 });
505
506 return Ok(());
507 }
508
509 let end_block = self.provider.get_block_by_number(confirmed_tip_num.into()).await?.ok_or(
510 BlockRangeScannerError::HistoricalSyncError(format!(
511 "Confirmed tip block {confirmed_tip_num} not found"
512 )),
513 )?;
514
515 info!(
516 start_block = start_block.header().number(),
517 end_block = end_block.header().number(),
518 "Syncing historical data"
519 );
520
521 let (live_block_buffer_sender, live_block_buffer_receiver) =
524 mpsc::channel::<BlockRangeMessage>(MAX_BUFFERED_MESSAGES);
525
526 let provider = self.provider.clone();
527
528 let cutoff = end_block.header().number();
531
532 let live_subscription_task = tokio::spawn(async move {
534 Self::stream_live_blocks(
535 cutoff + 1,
536 provider,
537 live_block_buffer_sender,
538 block_confirmations,
539 )
540 .await;
541 });
542
543 if let Err(e) = self.sync_historical_data(start_block, end_block).await {
547 warn!("aborting live_subscription_task");
548 live_subscription_task.abort();
549 return Err(BlockRangeScannerError::HistoricalSyncError(e.to_string()));
550 }
551
552 self.send_to_subscriber(ScannerMessage::Status(ScannerStatus::ChainTipReached)).await;
553
554 let Some(sender) = self.subscriber.clone() else {
555 return Err(BlockRangeScannerError::ServiceShutdown);
556 };
557 tokio::spawn(async move {
565 Self::process_live_block_buffer(live_block_buffer_receiver, sender, cutoff).await;
566 });
567
568 info!("Successfully transitioned from historical to live data");
569 Ok(())
570 }
571
572 async fn sync_historical_data(
573 &mut self,
574 start: N::BlockResponse,
575 end: N::BlockResponse,
576 ) -> Result<(), BlockRangeScannerError> {
577 let mut batch_count = 0;
578
579 self.next_start_block = BlockHashAndNumber::from_header::<N>(start.header());
580
581 while self.next_start_block.number <= end.header().number() {
584 self.ensure_current_not_reorged().await?;
585
586 let batch_end_block_number = self
587 .next_start_block
588 .number
589 .saturating_add(self.config.blocks_read_per_epoch as u64 - 1)
590 .min(end.header().number());
591
592 self.send_to_subscriber(BlockRangeMessage::Data(
593 self.next_start_block.number..=batch_end_block_number,
594 ))
595 .await;
596
597 batch_count += 1;
598 if batch_count % 10 == 0 {
599 debug!(batch_count = batch_count, "Processed historical batches");
600 }
601
602 if batch_end_block_number == end.header().number() {
603 break;
604 }
605
606 let next_start_block_number = (batch_end_block_number + 1).into();
607 let next_start_block =
608 match self.provider.get_block_by_number(next_start_block_number).await {
609 Ok(block) => {
610 block.expect("block number is less than 'end', so it should exist")
611 }
612 Err(e) => {
613 error!(error = %e, "Failed to get block by number");
614 let e: BlockRangeScannerError = e.into();
615 self.send_to_subscriber(BlockRangeMessage::Error(e.clone())).await;
616 return Err(e);
617 }
618 };
619 self.next_start_block = BlockHashAndNumber::from_header::<N>(next_start_block.header());
620 }
621
622 info!(batch_count = batch_count, "Historical sync completed");
623
624 Ok(())
625 }
626
627 async fn stream_live_blocks<P: Provider<N>>(
628 mut range_start: BlockNumber,
629 provider: P,
630 sender: mpsc::Sender<BlockRangeMessage>,
631 block_confirmations: u64,
632 ) {
633 match Self::get_block_subscription(&provider).await {
634 Ok(ws_stream) => {
635 info!("WebSocket connected for live blocks");
636
637 let cutoff = range_start;
639 let mut stream =
640 ws_stream.into_stream().skip_while(|header| header.number() < cutoff);
641
642 while let Some(incoming_block) = stream.next().await {
643 let incoming_block_num = incoming_block.number();
644 info!(block_number = incoming_block_num, "Received block header");
645
646 if incoming_block_num < range_start {
647 warn!("Reorg detected: sending forked range");
648 if sender
649 .send(BlockRangeMessage::Status(ScannerStatus::ReorgDetected))
650 .await
651 .is_err()
652 {
653 warn!("Downstream channel closed, stopping live blocks task");
654 return;
655 }
656
657 let incoming_confirmed =
659 incoming_block_num.saturating_sub(block_confirmations);
660
661 range_start = incoming_confirmed;
663 }
664
665 let confirmed = incoming_block_num.saturating_sub(block_confirmations);
666 if confirmed >= range_start {
667 if sender
668 .send(BlockRangeMessage::Data(range_start..=confirmed))
669 .await
670 .is_err()
671 {
672 warn!("Downstream channel closed, stopping live blocks task");
673 return;
674 }
675
676 range_start = confirmed + 1;
678 }
679 }
680 }
681 Err(e) => {
682 if sender.send(BlockRangeMessage::Error(e)).await.is_err() {
683 warn!("Downstream channel closed, stopping live blocks task");
684 }
685 }
686 }
687 }
688
689 async fn process_live_block_buffer(
690 mut buffer_rx: mpsc::Receiver<BlockRangeMessage>,
691 sender: mpsc::Sender<BlockRangeMessage>,
692 cutoff: BlockNumber,
693 ) {
694 let mut processed = 0;
695 let mut discarded = 0;
696
697 while let Some(data) = buffer_rx.recv().await {
699 match data {
700 BlockRangeMessage::Data(range) => {
701 let (start, end) = (*range.start(), *range.end());
702 if start >= cutoff {
703 if sender.send(BlockRangeMessage::Data(range)).await.is_err() {
704 warn!("Subscriber channel closed, cleaning up");
705 return;
706 }
707 processed += end - start;
708 } else if end > cutoff {
709 discarded += cutoff - start;
710
711 let start = cutoff;
712 if sender.send(BlockRangeMessage::Data(start..=end)).await.is_err() {
713 warn!("Subscriber channel closed, cleaning up");
714 return;
715 }
716 processed += end - start;
717 } else {
718 discarded += end - start;
719 }
720 }
721 _ => {
722 if sender.send(data).await.is_err() {
724 warn!("Subscriber channel closed, cleaning up");
725 return;
726 }
727 }
728 }
729 }
730
731 info!(processed = processed, discarded = discarded, "Processed buffered messages");
732 }
733
734 async fn ensure_current_not_reorged(&mut self) -> Result<(), BlockRangeScannerError> {
735 let current_block = self.provider.get_block_by_hash(self.next_start_block.hash).await?;
736 if current_block.is_some() {
737 return Ok(());
738 }
739
740 self.rewind_on_reorg_detected().await
741 }
742
743 async fn rewind_on_reorg_detected(&mut self) -> Result<(), BlockRangeScannerError> {
744 let mut new_current_height =
745 self.next_start_block.number.saturating_sub(self.config.reorg_rewind_depth);
746
747 let head = self.provider.get_block_number().await?;
748 if head < new_current_height {
749 new_current_height = head;
750 }
751
752 let current = self
753 .provider
754 .get_block_by_number(new_current_height.into())
755 .await?
756 .map(|block| BlockHashAndNumber::from_header::<N>(block.header()))
757 .ok_or(BlockRangeScannerError::HistoricalSyncError(format!(
758 "Block {new_current_height} not found during rewind",
759 )))?;
760
761 info!(
762 old_current = self.next_start_block.number,
763 new_current = current.number,
764 "Rewind on reorg detected"
765 );
766
767 self.next_start_block = current;
768
769 Ok(())
770 }
771
772 async fn get_block_subscription(
773 provider: &impl Provider<N>,
774 ) -> Result<Subscription<N::HeaderResponse>, BlockRangeScannerError> {
775 let ws_stream = provider
776 .subscribe_blocks()
777 .await
778 .map_err(|_| BlockRangeScannerError::WebSocketConnectionFailed(1))?;
779
780 Ok(ws_stream)
781 }
782
783 async fn send_to_subscriber(&mut self, message: BlockRangeMessage) {
784 if let Some(ref sender) = self.subscriber {
785 if let Err(err) = sender.send(message).await {
786 warn!(error = %err, "Downstream channel closed, failed sending the message to subscriber");
787 self.subscriber = None;
788 self.websocket_connected = false;
789 } else {
790 self.processed_count += 1;
791 }
792 }
793 }
794
795 fn handle_unsubscribe(&mut self) {
796 if self.subscriber.take().is_some() {
797 info!("Unsubscribing current subscriber");
798 self.websocket_connected = false;
799 }
800 }
801
802 fn ensure_no_subscriber(&self) -> Result<(), BlockRangeScannerError> {
803 if self.subscriber.is_some() {
804 return Err(BlockRangeScannerError::MultipleSubscribers);
805 }
806 Ok(())
807 }
808}
809
810pub struct BlockRangeScannerClient {
811 command_sender: mpsc::Sender<Command>,
812}
813
814impl BlockRangeScannerClient {
815 #[must_use]
821 pub fn new(command_sender: mpsc::Sender<Command>) -> Self {
822 Self { command_sender }
823 }
824
825 pub async fn stream_live(
831 &self,
832 ) -> Result<ReceiverStream<BlockRangeMessage>, BlockRangeScannerError> {
833 let (blocks_sender, blocks_receiver) = mpsc::channel(MAX_BUFFERED_MESSAGES);
834 let (response_tx, response_rx) = oneshot::channel();
835
836 let command = Command::StreamLive { sender: blocks_sender, response: response_tx };
837
838 self.command_sender
839 .send(command)
840 .await
841 .map_err(|_| BlockRangeScannerError::ServiceShutdown)?;
842
843 response_rx.await.map_err(|_| BlockRangeScannerError::ServiceShutdown)??;
844
845 Ok(ReceiverStream::new(blocks_receiver))
846 }
847
848 pub async fn stream_historical<N: Into<BlockNumberOrTag>>(
859 &self,
860 start_height: N,
861 end_height: N,
862 ) -> Result<ReceiverStream<BlockRangeMessage>, BlockRangeScannerError> {
863 let (blocks_sender, blocks_receiver) = mpsc::channel(MAX_BUFFERED_MESSAGES);
864 let (response_tx, response_rx) = oneshot::channel();
865
866 let command = Command::StreamHistorical {
867 sender: blocks_sender,
868 start_height: start_height.into(),
869 end_height: end_height.into(),
870 response: response_tx,
871 };
872
873 self.command_sender
874 .send(command)
875 .await
876 .map_err(|_| BlockRangeScannerError::ServiceShutdown)?;
877
878 response_rx.await.map_err(|_| BlockRangeScannerError::ServiceShutdown)??;
879
880 Ok(ReceiverStream::new(blocks_receiver))
881 }
882
883 pub async fn stream_from(
893 &self,
894 start_height: impl Into<BlockNumberOrTag>,
895 ) -> Result<ReceiverStream<BlockRangeMessage>, BlockRangeScannerError> {
896 let (blocks_sender, blocks_receiver) = mpsc::channel(MAX_BUFFERED_MESSAGES);
897 let (response_tx, response_rx) = oneshot::channel();
898
899 let command = Command::StreamFrom {
900 sender: blocks_sender,
901 start_height: start_height.into(),
902 response: response_tx,
903 };
904
905 self.command_sender
906 .send(command)
907 .await
908 .map_err(|_| BlockRangeScannerError::ServiceShutdown)?;
909
910 response_rx.await.map_err(|_| BlockRangeScannerError::ServiceShutdown)??;
911
912 Ok(ReceiverStream::new(blocks_receiver))
913 }
914
915 pub async fn unsubscribe(&self) -> Result<(), BlockRangeScannerError> {
921 let (response_tx, response_rx) = oneshot::channel();
922
923 let command = Command::Unsubscribe { response: response_tx };
924
925 self.command_sender
926 .send(command)
927 .await
928 .map_err(|_| BlockRangeScannerError::ServiceShutdown)?;
929
930 response_rx.await.map_err(|_| BlockRangeScannerError::ServiceShutdown)?
931 }
932
933 pub async fn shutdown(&self) -> Result<(), BlockRangeScannerError> {
939 let (response_tx, response_rx) = oneshot::channel();
940
941 let command = Command::Shutdown { response: response_tx };
942
943 self.command_sender
944 .send(command)
945 .await
946 .map_err(|_| BlockRangeScannerError::ServiceShutdown)?;
947
948 response_rx.await.map_err(|_| BlockRangeScannerError::ServiceShutdown)?
949 }
950}
951
952#[cfg(test)]
953mod tests {
954 use std::time::Duration;
955
956 use alloy::{
957 network::Ethereum,
958 primitives::{B256, keccak256},
959 providers::{ProviderBuilder, ext::AnvilApi},
960 rpc::{
961 client::RpcClient,
962 types::{Block as RpcBlock, Header, Transaction, anvil::ReorgOptions},
963 },
964 transports::mock::Asserter,
965 };
966 use alloy_node_bindings::Anvil;
967 use serde_json::{Value, json};
968 use tokio::{
969 sync::mpsc::{self, Receiver},
970 time::timeout,
971 };
972 use tokio_stream::StreamExt;
973
974 use super::*;
975
976 trait RangeReceiver {
978 async fn next_range(&mut self) -> Option<BlockRangeMessage>;
979 }
980
981 impl RangeReceiver for ReceiverStream<BlockRangeMessage> {
982 async fn next_range(&mut self) -> Option<BlockRangeMessage> {
983 self.next().await
984 }
985 }
986
987 impl RangeReceiver for Receiver<BlockRangeMessage> {
988 async fn next_range(&mut self) -> Option<BlockRangeMessage> {
989 self.recv().await
990 }
991 }
992
993 macro_rules! assert_next_range {
994 ($recv:expr, None) => {
995 let next = $recv.next_range().await;
996 assert!(next.is_none());
997 };
998 ($recv:expr, $range:expr) => {
999 let next = $recv.next_range().await;
1000 if let Some(BlockRangeMessage::Data(range)) = next {
1001 assert_eq!($range, range);
1002 } else {
1003 panic!("expected block range, got: {next:?}");
1004 }
1005 };
1006 }
1007
1008 fn test_config() -> Config {
1009 Config { blocks_read_per_epoch: 5, reorg_rewind_depth: 5, block_confirmations: 0 }
1010 }
1011
1012 fn mocked_provider(asserter: Asserter) -> RootProvider<Ethereum> {
1013 RootProvider::new(RpcClient::mocked(asserter))
1014 }
1015
1016 fn mock_block(number: u64, hash: B256) -> RpcBlock<Transaction, Header> {
1017 let mut block: RpcBlock<Transaction, Header> = RpcBlock::default();
1018 block.header.hash = hash;
1019 block.header.number = number;
1020 block
1021 }
1022
1023 #[test]
1024 fn block_range_scanner_defaults_match_constants() {
1025 let scanner = BlockRangeScanner::new();
1026
1027 assert_eq!(scanner.blocks_read_per_epoch, DEFAULT_BLOCKS_READ_PER_EPOCH);
1028 assert_eq!(scanner.reorg_rewind_depth, DEFAULT_REORG_REWIND_DEPTH);
1029 assert_eq!(scanner.block_confirmations, DEFAULT_BLOCK_CONFIRMATIONS);
1030 }
1031
1032 #[test]
1033 fn builder_methods_update_configuration() {
1034 let blocks_read_per_epoch = 42;
1035 let reorg_rewind_depth = 12;
1036 let block_confirmations = 7;
1037
1038 let scanner = BlockRangeScanner::new()
1039 .with_blocks_read_per_epoch(blocks_read_per_epoch)
1040 .with_reorg_rewind_depth(reorg_rewind_depth)
1041 .with_block_confirmations(block_confirmations);
1042
1043 assert_eq!(scanner.blocks_read_per_epoch, blocks_read_per_epoch);
1044 assert_eq!(scanner.block_confirmations, block_confirmations);
1045 }
1046
1047 #[tokio::test]
1048 async fn send_to_subscriber_increments_processed_count() -> anyhow::Result<()> {
1049 let asserter = Asserter::new();
1050 let provider = mocked_provider(asserter);
1051 let (mut service, _cmd) = Service::new(test_config(), provider);
1052
1053 let (tx, mut rx) = mpsc::channel(1);
1054 service.subscriber = Some(tx);
1055
1056 let expected_range = 10..=11;
1057 service.send_to_subscriber(BlockRangeMessage::Data(expected_range.clone())).await;
1058
1059 assert_eq!(service.processed_count, 1);
1060 assert!(service.subscriber.is_some());
1061
1062 let BlockRangeMessage::Data(received) = rx.recv().await.expect("range received") else {
1063 panic!("expected BlockRange message")
1064 };
1065 assert_eq!(received, expected_range);
1066
1067 Ok(())
1068 }
1069
1070 #[tokio::test]
1071 async fn send_to_subscriber_removes_closed_channel() -> anyhow::Result<()> {
1072 let asserter = Asserter::new();
1073 let provider = mocked_provider(asserter);
1074 let (mut service, _cmd) = Service::new(test_config(), provider);
1075
1076 let (tx, rx) = mpsc::channel(1);
1077 service.websocket_connected = true;
1078 service.subscriber = Some(tx);
1079 drop(rx);
1081
1082 service.send_to_subscriber(BlockRangeMessage::Data(15..=15)).await;
1083
1084 assert!(service.subscriber.is_none());
1085 assert!(!service.websocket_connected);
1086 assert_eq!(service.processed_count, 0);
1087
1088 Ok(())
1089 }
1090
1091 #[test]
1092 fn handle_unsubscribe_clears_subscriber() {
1093 let asserter = Asserter::new();
1094 let provider = mocked_provider(asserter);
1095 let (mut service, _cmd) = Service::new(test_config(), provider);
1096
1097 let (tx, _rx) = mpsc::channel(1);
1098 service.websocket_connected = true;
1099 service.subscriber = Some(tx);
1100
1101 service.handle_unsubscribe();
1102
1103 assert!(service.subscriber.is_none());
1104 assert!(!service.websocket_connected);
1105 }
1106
1107 #[tokio::test]
1108 async fn live_mode_processes_all_blocks() -> anyhow::Result<()> {
1109 let anvil = Anvil::new().block_time_f64(0.01).try_spawn()?;
1110
1111 let client = BlockRangeScanner::new()
1112 .with_block_confirmations(1)
1113 .connect_ws::<Ethereum>(anvil.ws_endpoint_url())
1114 .await?
1115 .run()?;
1116
1117 let expected_blocks = 10;
1118
1119 let mut receiver = client.stream_live().await?.take(expected_blocks);
1120
1121 let mut block_range_start = 0;
1122
1123 while let Some(BlockRangeMessage::Data(range)) = receiver.next().await {
1124 info!("Received block range: [{range:?}]");
1125 if block_range_start == 0 {
1126 block_range_start = *range.start();
1127 }
1128
1129 assert_eq!(block_range_start, *range.start());
1130 assert!(range.end() >= range.start());
1131 block_range_start = *range.end() + 1;
1132 }
1133
1134 Ok(())
1135 }
1136
1137 #[tokio::test]
1138 async fn stream_from_latest_starts_at_tip_not_confirmed() -> anyhow::Result<()> {
1139 let anvil = Anvil::new().try_spawn()?;
1140
1141 let provider = ProviderBuilder::new().connect(anvil.endpoint().as_str()).await?;
1142 provider.anvil_mine(Option::Some(20), Option::None).await?;
1143
1144 let block_confirmations = 5;
1145
1146 let client = BlockRangeScanner::new()
1147 .with_block_confirmations(block_confirmations)
1148 .connect_ws::<Ethereum>(anvil.ws_endpoint_url())
1149 .await?
1150 .run()?;
1151
1152 let expected_blocks = 10;
1153 let mut receiver =
1154 client.stream_from(BlockNumberOrTag::Latest).await?.take(expected_blocks);
1155
1156 let latest_head = provider.get_block_number().await?;
1157 provider.anvil_mine(Option::Some(20), Option::None).await?;
1158
1159 let mut expected_range_start = latest_head;
1160
1161 while let Some(BlockRangeMessage::Data(range)) = receiver.next().await {
1162 assert_eq!(expected_range_start, *range.start());
1163 assert_eq!(range.end(), range.start());
1164 expected_range_start += 1;
1165 }
1166
1167 assert_eq!(expected_range_start, latest_head + expected_blocks as u64);
1170
1171 Ok(())
1172 }
1173
1174 #[tokio::test]
1175 async fn live_mode_respects_block_confirmations() -> anyhow::Result<()> {
1176 let anvil = Anvil::new().try_spawn()?;
1177
1178 let provider = ProviderBuilder::new().connect(anvil.endpoint().as_str()).await?;
1179 provider.anvil_mine(Option::Some(20), Option::None).await?;
1180
1181 let block_confirmations = 5;
1182
1183 let client = BlockRangeScanner::new()
1184 .with_block_confirmations(block_confirmations)
1185 .connect_ws::<Ethereum>(anvil.ws_endpoint_url())
1186 .await?
1187 .run()?;
1188
1189 let expected_blocks = 10;
1190
1191 let mut receiver = client.stream_live().await?.take(expected_blocks);
1192 let provider = ProviderBuilder::new().connect(anvil.endpoint().as_str()).await?;
1193 let latest_head = provider.get_block_number().await?;
1194 provider.anvil_mine(Option::Some(expected_blocks as u64), Option::None).await?;
1195
1196 let mut expected_range_start = latest_head.saturating_sub(block_confirmations) + 1;
1197
1198 while let Some(BlockRangeMessage::Data(range)) = receiver.next().await {
1199 assert_eq!(expected_range_start, *range.start());
1200 assert_eq!(range.end(), range.start());
1201 expected_range_start += 1;
1202 }
1203
1204 assert_eq!(
1207 expected_range_start,
1208 latest_head + expected_blocks as u64 + 1 - block_confirmations
1209 );
1210
1211 Ok(())
1212 }
1213
1214 #[tokio::test]
1215 async fn live_mode_respects_block_confirmations_on_new_chain() -> anyhow::Result<()> {
1216 let anvil = Anvil::new().try_spawn()?;
1217
1218 let provider = ProviderBuilder::new().connect(anvil.endpoint().as_str()).await?;
1219
1220 let block_confirmations = 5;
1221
1222 let client = BlockRangeScanner::new()
1223 .with_block_confirmations(block_confirmations)
1224 .connect_ws::<Ethereum>(anvil.ws_endpoint_url())
1225 .await?
1226 .run()?;
1227
1228 let mut receiver = client.stream_live().await?;
1229
1230 provider.anvil_mine(Option::Some(6), Option::None).await?;
1231
1232 let next = receiver.next().await;
1233 if let Some(BlockRangeMessage::Data(range)) = next {
1234 assert_eq!(0, *range.start());
1235 assert_eq!(0, *range.end());
1236 } else {
1237 panic!("expected range, got: {next:?}");
1238 }
1239
1240 let next = receiver.next().await;
1241 if let Some(BlockRangeMessage::Data(range)) = next {
1242 assert_eq!(1, *range.start());
1243 assert_eq!(1, *range.end());
1244 } else {
1245 panic!("expected range, got: {next:?}");
1246 }
1247
1248 assert!(
1250 timeout(Duration::from_secs(1), async move { receiver.next().await }).await.is_err()
1251 );
1252
1253 Ok(())
1254 }
1255
1256 #[tokio::test]
1257 #[ignore = "Flaky test, see: https://github.com/OpenZeppelin/Event-Scanner/issues/109"]
1258 async fn continuous_blocks_if_reorg_less_than_block_confirmation() -> anyhow::Result<()> {
1259 let anvil = Anvil::new().try_spawn()?;
1260
1261 let provider = ProviderBuilder::new().connect(anvil.endpoint().as_str()).await?;
1262
1263 let block_confirmations = 5;
1264
1265 let client = BlockRangeScanner::new()
1266 .with_block_confirmations(block_confirmations)
1267 .connect_ws::<Ethereum>(anvil.ws_endpoint_url())
1268 .await?
1269 .run()?;
1270
1271 let mut receiver = client.stream_live().await?;
1272
1273 provider.anvil_mine(Option::Some(10), Option::None).await?;
1274
1275 provider
1276 .anvil_reorg(ReorgOptions { depth: block_confirmations - 1, tx_block_pairs: vec![] })
1277 .await?;
1278
1279 provider.anvil_mine(Option::Some(20), Option::None).await?;
1280
1281 let mut block_range_start = 0;
1282
1283 let end_loop = 20;
1284 let mut i = 0;
1285 while let Some(BlockRangeMessage::Data(range)) = receiver.next().await {
1286 if block_range_start == 0 {
1287 block_range_start = *range.start();
1288 }
1289
1290 assert_eq!(block_range_start, *range.start());
1291 assert!(range.end() >= range.start());
1292 block_range_start = *range.end() + 1;
1293 i += 1;
1294 if i == end_loop {
1295 break;
1296 }
1297 }
1298 Ok(())
1299 }
1300
1301 #[tokio::test]
1302 #[ignore = "Flaky test, see: https://github.com/OpenZeppelin/Event-Scanner/issues/109"]
1303 async fn shallow_block_confirmation_does_not_mitigate_reorg() -> anyhow::Result<()> {
1304 let anvil = Anvil::new().block_time(1).try_spawn()?;
1305
1306 let provider = ProviderBuilder::new().connect(anvil.endpoint().as_str()).await?;
1307
1308 let block_confirmations = 3;
1309
1310 let client = BlockRangeScanner::new()
1311 .with_block_confirmations(block_confirmations)
1312 .connect_ws::<Ethereum>(anvil.ws_endpoint_url())
1313 .await?
1314 .run()?;
1315
1316 let mut receiver = client.stream_live().await?;
1317
1318 provider.anvil_mine(Option::Some(10), Option::None).await?;
1319
1320 provider
1321 .anvil_reorg(ReorgOptions { depth: block_confirmations + 5, tx_block_pairs: vec![] })
1322 .await?;
1323
1324 provider.anvil_mine(Option::Some(30), Option::None).await?;
1325 receiver.close();
1326
1327 let mut block_range_start = 0;
1328
1329 let mut block_num = vec![];
1330 let mut reorg_detected = false;
1331 while let Some(msg) = receiver.next().await {
1332 match msg {
1333 BlockRangeMessage::Data(range) => {
1334 if block_range_start == 0 {
1335 block_range_start = *range.start();
1336 }
1337 block_num.push(range);
1338 if block_num.len() == 15 {
1339 break;
1340 }
1341 }
1342 BlockRangeMessage::Status(ScannerStatus::ReorgDetected) => {
1343 reorg_detected = true;
1344 }
1345 _ => {
1346 break;
1347 }
1348 }
1349 }
1350 assert!(reorg_detected, "Reorg should have been detected");
1351
1352 let mut found_reorg_pattern = false;
1357 for window in block_num.windows(2) {
1358 if window[1].start() < window[0].end() {
1359 found_reorg_pattern = true;
1360 break;
1361 }
1362 }
1363 assert!(found_reorg_pattern,);
1364
1365 Ok(())
1366 }
1367
1368 #[tokio::test]
1369 async fn rewinds_on_detected_reorg() -> anyhow::Result<()> {
1370 let asserter = Asserter::new();
1371 let provider = mocked_provider(asserter.clone());
1372
1373 let mut config = test_config();
1374 config.reorg_rewind_depth = 6;
1375 let (mut service, _cmd) = Service::new(config.clone(), provider);
1376
1377 let original_height = 10;
1378 let original_hash = keccak256(b"original block");
1379 let original_block = mock_block(original_height, original_hash);
1380 service.next_start_block =
1381 BlockHashAndNumber::from_header::<Ethereum>(original_block.header());
1382
1383 let expected_rewind_height = original_height - config.reorg_rewind_depth;
1384 let expected_rewind_hash = keccak256(b"rewound block");
1385 let rewound_block = mock_block(expected_rewind_height, expected_rewind_hash);
1386
1387 asserter.push_success(&Value::Null);
1390 asserter.push_success(&json!(format!("0x{:x}", original_height + 2)));
1392 asserter.push_success(&rewound_block);
1394
1395 service.ensure_current_not_reorged().await?;
1396
1397 let current = service.next_start_block;
1398 assert_eq!(current.number, expected_rewind_height, "should rewind by reorg_rewind_depth");
1399 assert_eq!(current.hash, expected_rewind_hash, "should use hash of block at rewind height");
1400
1401 Ok(())
1402 }
1403
1404 #[tokio::test]
1405 async fn historic_mode_respects_blocks_read_per_epoch() -> anyhow::Result<()> {
1406 let anvil = Anvil::new().try_spawn()?;
1407
1408 let provider = ProviderBuilder::new().connect(anvil.endpoint().as_str()).await?;
1409
1410 provider.anvil_mine(Option::Some(100), Option::None).await?;
1411
1412 let client = BlockRangeScanner::new()
1413 .with_blocks_read_per_epoch(5)
1414 .connect_ws::<Ethereum>(anvil.ws_endpoint_url())
1415 .await?
1416 .run()?;
1417
1418 let mut stream = client.stream_historical(0, 19).await?;
1420 assert_next_range!(stream, 0..=4);
1421 assert_next_range!(stream, 5..=9);
1422 assert_next_range!(stream, 10..=14);
1423 assert_next_range!(stream, 15..=19);
1424 assert_next_range!(stream, None);
1425
1426 let mut stream = client.stream_historical(93, 99).await?;
1428 assert_next_range!(stream, 93..=97);
1429 assert_next_range!(stream, 98..=99);
1430 assert_next_range!(stream, None);
1431
1432 let mut stream = client.stream_historical(3, 5).await?;
1434 assert_next_range!(stream, 3..=5);
1435 assert_next_range!(stream, None);
1436
1437 let mut stream = client.stream_historical(3, 3).await?;
1439 assert_next_range!(stream, 3..=3);
1440 assert_next_range!(stream, None);
1441
1442 let client = BlockRangeScanner::new()
1444 .with_blocks_read_per_epoch(200)
1445 .connect_ws::<Ethereum>(anvil.ws_endpoint_url())
1446 .await?
1447 .run()?;
1448
1449 let mut stream = client.stream_historical(0, 20).await?;
1450 assert_next_range!(stream, 0..=20);
1451 assert_next_range!(stream, None);
1452
1453 let mut stream = client.stream_historical(0, 99).await?;
1454 assert_next_range!(stream, 0..=99);
1455 assert_next_range!(stream, None);
1456
1457 Ok(())
1458 }
1459
1460 #[tokio::test]
1461 async fn buffered_messages_trim_ranges_prior_to_cutoff() -> anyhow::Result<()> {
1462 let cutoff = 50;
1463 let (buffer_tx, buffer_rx) = mpsc::channel(8);
1464 buffer_tx.send(BlockRangeMessage::Data(51..=55)).await.unwrap();
1465 buffer_tx.send(BlockRangeMessage::Data(56..=60)).await.unwrap();
1466 buffer_tx.send(BlockRangeMessage::Data(61..=70)).await.unwrap();
1467 drop(buffer_tx);
1468
1469 let (out_tx, mut out_rx) = mpsc::channel(8);
1470 Service::<Ethereum>::process_live_block_buffer(buffer_rx, out_tx, cutoff).await;
1471
1472 let mut forwarded = Vec::new();
1473 while let Some(BlockRangeMessage::Data(range)) = out_rx.recv().await {
1474 forwarded.push(range);
1475 }
1476
1477 assert_eq!(forwarded, vec![51..=55, 56..=60, 61..=70]);
1479 Ok(())
1480 }
1481
1482 #[tokio::test]
1483 async fn ranges_entirely_before_cutoff_are_discarded() -> anyhow::Result<()> {
1484 let cutoff = 100;
1485
1486 let (buffer_tx, buffer_rx) = mpsc::channel(8);
1487 buffer_tx.send(BlockRangeMessage::Data(40..=50)).await.unwrap();
1488 buffer_tx.send(BlockRangeMessage::Data(51..=60)).await.unwrap();
1489 buffer_tx.send(BlockRangeMessage::Data(61..=70)).await.unwrap();
1490 drop(buffer_tx);
1491
1492 let (out_tx, mut out_rx) = mpsc::channel(8);
1493 Service::<Ethereum>::process_live_block_buffer(buffer_rx, out_tx, cutoff).await;
1494
1495 let mut forwarded = Vec::new();
1496 while let Some(BlockRangeMessage::Data(range)) = out_rx.recv().await {
1497 forwarded.push(range);
1498 }
1499
1500 assert_eq!(forwarded, vec![]);
1502 Ok(())
1503 }
1504
1505 #[tokio::test]
1506 async fn ranges_overlapping_cutoff_are_trimmed() -> anyhow::Result<()> {
1507 let cutoff = 75;
1508
1509 let (buffer_tx, buffer_rx) = mpsc::channel(8);
1510 buffer_tx.send(BlockRangeMessage::Data(70..=80)).await.unwrap();
1511 buffer_tx.send(BlockRangeMessage::Data(60..=80)).await.unwrap();
1512 buffer_tx.send(BlockRangeMessage::Data(74..=76)).await.unwrap();
1513 drop(buffer_tx);
1514
1515 let (out_tx, mut out_rx) = mpsc::channel(8);
1516 Service::<Ethereum>::process_live_block_buffer(buffer_rx, out_tx, cutoff).await;
1517
1518 let mut forwarded = Vec::new();
1519 while let Some(BlockRangeMessage::Data(range)) = out_rx.recv().await {
1520 forwarded.push(range);
1521 }
1522
1523 assert_eq!(forwarded, vec![75..=80, 75..=80, 75..=76]);
1525 Ok(())
1526 }
1527
1528 #[tokio::test]
1529 async fn mixed_ranges_are_handled_correctly() -> anyhow::Result<()> {
1530 let cutoff = 50;
1531
1532 let (buffer_tx, buffer_rx) = mpsc::channel(8);
1533 buffer_tx.send(BlockRangeMessage::Data(30..=45)).await.unwrap(); buffer_tx.send(BlockRangeMessage::Data(46..=55)).await.unwrap(); buffer_tx.send(BlockRangeMessage::Data(56..=65)).await.unwrap(); buffer_tx.send(BlockRangeMessage::Data(40..=49)).await.unwrap(); buffer_tx.send(BlockRangeMessage::Data(49..=51)).await.unwrap(); buffer_tx.send(BlockRangeMessage::Data(51..=100)).await.unwrap(); drop(buffer_tx);
1540
1541 let (out_tx, mut out_rx) = mpsc::channel(8);
1542 Service::<Ethereum>::process_live_block_buffer(buffer_rx, out_tx, cutoff).await;
1543
1544 let mut forwarded = Vec::new();
1545 while let Some(BlockRangeMessage::Data(range)) = out_rx.recv().await {
1546 forwarded.push(range);
1547 }
1548
1549 assert_eq!(forwarded, vec![50..=55, 56..=65, 50..=51, 51..=100]);
1550 Ok(())
1551 }
1552
1553 #[tokio::test]
1554 async fn edge_case_range_exactly_at_cutoff() -> anyhow::Result<()> {
1555 let cutoff = 100;
1556
1557 let (buffer_tx, buffer_rx) = mpsc::channel(8);
1558 buffer_tx.send(BlockRangeMessage::Data(99..=99)).await.unwrap(); buffer_tx.send(BlockRangeMessage::Data(100..=100)).await.unwrap(); buffer_tx.send(BlockRangeMessage::Data(99..=100)).await.unwrap(); buffer_tx.send(BlockRangeMessage::Data(100..=101)).await.unwrap(); drop(buffer_tx);
1563
1564 let (out_tx, mut out_rx) = mpsc::channel(8);
1565 Service::<Ethereum>::process_live_block_buffer(buffer_rx, out_tx, cutoff).await;
1566
1567 let mut forwarded = Vec::new();
1568 while let Some(BlockRangeMessage::Data(range)) = out_rx.recv().await {
1569 forwarded.push(range);
1570 }
1571
1572 assert_eq!(forwarded, vec![100..=100, 100..=101]);
1574 Ok(())
1575 }
1576
1577 #[tokio::test]
1578 async fn cutoff_at_zero_handles_all_ranges() -> anyhow::Result<()> {
1579 let cutoff = 0;
1580
1581 let (buffer_tx, buffer_rx) = mpsc::channel(8);
1582 buffer_tx.send(BlockRangeMessage::Data(0..=5)).await.unwrap();
1583 buffer_tx.send(BlockRangeMessage::Data(6..=10)).await.unwrap();
1584 buffer_tx.send(BlockRangeMessage::Data(11..=25)).await.unwrap();
1585 drop(buffer_tx);
1586
1587 let (out_tx, mut out_rx) = mpsc::channel(8);
1588 Service::<Ethereum>::process_live_block_buffer(buffer_rx, out_tx, cutoff).await;
1589
1590 let mut forwarded = Vec::new();
1591 while let Some(BlockRangeMessage::Data(range)) = out_rx.recv().await {
1592 forwarded.push(range);
1593 }
1594
1595 assert_eq!(forwarded, vec![0..=5, 6..=10, 11..=25]);
1597 Ok(())
1598 }
1599
1600 #[tokio::test]
1601 async fn forwards_errors_to_subscribers() -> anyhow::Result<()> {
1602 let asserter = Asserter::new();
1603 let provider = mocked_provider(asserter);
1604 let (mut service, _cmd) = Service::new(test_config(), provider);
1605
1606 let (tx, mut rx) = mpsc::channel(1);
1607 service.subscriber = Some(tx);
1608
1609 service
1610 .send_to_subscriber(BlockRangeMessage::Error(
1611 BlockRangeScannerError::WebSocketConnectionFailed(4),
1612 ))
1613 .await;
1614
1615 match rx.recv().await.expect("subscriber should stay open") {
1616 BlockRangeMessage::Error(BlockRangeScannerError::WebSocketConnectionFailed(
1617 attempts,
1618 )) => {
1619 assert_eq!(attempts, 4);
1620 }
1621 other => panic!("unexpected message: {other:?}"),
1622 }
1623
1624 Ok(())
1625 }
1626}