1use std::{cmp::Ordering, ops::RangeInclusive};
63use tokio::{
64 sync::{mpsc, oneshot},
65 try_join,
66};
67use tokio_stream::{StreamExt, wrappers::ReceiverStream};
68
69use crate::{
70 ScannerMessage,
71 error::ScannerError,
72 robust_provider::{Error as RobustProviderError, IntoRobustProvider, RobustProvider},
73 types::{ScannerStatus, TryStream},
74};
75use alloy::{
76 consensus::BlockHeader,
77 eips::BlockNumberOrTag,
78 network::{BlockResponse, Network, primitives::HeaderResponse},
79 primitives::{B256, BlockNumber},
80 pubsub::Subscription,
81 transports::{RpcError, TransportErrorKind},
82};
83use tracing::{debug, error, info, warn};
84
85pub const DEFAULT_MAX_BLOCK_RANGE: u64 = 1000;
86pub const DEFAULT_BLOCK_CONFIRMATIONS: u64 = 0;
88
89pub const MAX_BUFFERED_MESSAGES: usize = 50000;
90
91pub const DEFAULT_REORG_REWIND_DEPTH: u64 = 64;
94
95pub type Message = ScannerMessage<RangeInclusive<BlockNumber>, ScannerError>;
96
97impl From<RangeInclusive<BlockNumber>> for Message {
98 fn from(logs: RangeInclusive<BlockNumber>) -> Self {
99 Message::Data(logs)
100 }
101}
102
103impl PartialEq<RangeInclusive<BlockNumber>> for Message {
104 fn eq(&self, other: &RangeInclusive<BlockNumber>) -> bool {
105 if let Message::Data(range) = self { range.eq(other) } else { false }
106 }
107}
108
109impl From<RobustProviderError> for Message {
110 fn from(error: RobustProviderError) -> Self {
111 Message::Error(error.into())
112 }
113}
114
115impl From<RpcError<TransportErrorKind>> for Message {
116 fn from(error: RpcError<TransportErrorKind>) -> Self {
117 Message::Error(error.into())
118 }
119}
120
121impl From<ScannerError> for Message {
122 fn from(error: ScannerError) -> Self {
123 Message::Error(error)
124 }
125}
126
127#[derive(Clone)]
128pub struct BlockRangeScanner {
129 pub max_block_range: u64,
130}
131
132impl Default for BlockRangeScanner {
133 fn default() -> Self {
134 Self::new()
135 }
136}
137
138impl BlockRangeScanner {
139 #[must_use]
140 pub fn new() -> Self {
141 Self { max_block_range: DEFAULT_MAX_BLOCK_RANGE }
142 }
143
144 #[must_use]
145 pub fn max_block_range(mut self, max_block_range: u64) -> Self {
146 self.max_block_range = max_block_range;
147 self
148 }
149
150 pub async fn connect<N: Network>(
156 self,
157 provider: impl IntoRobustProvider<N>,
158 ) -> Result<ConnectedBlockRangeScanner<N>, ScannerError> {
159 let provider = provider.into_robust_provider().await?;
160 Ok(ConnectedBlockRangeScanner { provider, max_block_range: self.max_block_range })
161 }
162}
163
164pub struct ConnectedBlockRangeScanner<N: Network> {
165 provider: RobustProvider<N>,
166 max_block_range: u64,
167}
168
169impl<N: Network> ConnectedBlockRangeScanner<N> {
170 #[must_use]
172 pub fn provider(&self) -> &RobustProvider<N> {
173 &self.provider
174 }
175
176 pub fn run(&self) -> Result<BlockRangeScannerClient, ScannerError> {
182 let (service, cmd_tx) = Service::new(self.provider.clone(), self.max_block_range);
183 tokio::spawn(async move {
184 service.run().await;
185 });
186 Ok(BlockRangeScannerClient::new(cmd_tx))
187 }
188}
189
190#[derive(Debug)]
191pub enum Command {
192 StreamLive {
193 sender: mpsc::Sender<Message>,
194 block_confirmations: u64,
195 response: oneshot::Sender<Result<(), ScannerError>>,
196 },
197 StreamHistorical {
198 sender: mpsc::Sender<Message>,
199 start_height: BlockNumberOrTag,
200 end_height: BlockNumberOrTag,
201 response: oneshot::Sender<Result<(), ScannerError>>,
202 },
203 StreamFrom {
204 sender: mpsc::Sender<Message>,
205 start_height: BlockNumberOrTag,
206 block_confirmations: u64,
207 response: oneshot::Sender<Result<(), ScannerError>>,
208 },
209 Rewind {
210 sender: mpsc::Sender<Message>,
211 start_height: BlockNumberOrTag,
212 end_height: BlockNumberOrTag,
213 response: oneshot::Sender<Result<(), ScannerError>>,
214 },
215}
216
217struct Service<N: Network> {
218 provider: RobustProvider<N>,
219 max_block_range: u64,
220 error_count: u64,
221 command_receiver: mpsc::Receiver<Command>,
222 shutdown: bool,
223}
224
225impl<N: Network> Service<N> {
226 pub fn new(provider: RobustProvider<N>, max_block_range: u64) -> (Self, mpsc::Sender<Command>) {
227 let (cmd_tx, cmd_rx) = mpsc::channel(100);
228
229 let service = Self {
230 provider,
231 max_block_range,
232 error_count: 0,
233 command_receiver: cmd_rx,
234 shutdown: false,
235 };
236
237 (service, cmd_tx)
238 }
239
240 pub async fn run(mut self) {
241 info!("Starting subscription service");
242
243 while !self.shutdown {
244 tokio::select! {
245 cmd = self.command_receiver.recv() => {
246 if let Some(command) = cmd {
247 if let Err(e) = self.handle_command(command).await {
248 error!("Command handling error: {}", e);
249 self.error_count += 1;
250 }
251 } else {
252 warn!("Command channel closed, shutting down");
253 break;
254 }
255 }
256 }
257 }
258
259 info!("Subscription service stopped");
260 }
261
262 async fn handle_command(&mut self, command: Command) -> Result<(), ScannerError> {
263 match command {
264 Command::StreamLive { sender, block_confirmations, response } => {
265 info!("Starting live stream");
266 let result = self.handle_live(block_confirmations, sender).await;
267 let _ = response.send(result);
268 }
269 Command::StreamHistorical { sender, start_height, end_height, response } => {
270 info!(start_height = ?start_height, end_height = ?end_height, "Starting historical stream");
271 let result = self.handle_historical(start_height, end_height, sender).await;
272 let _ = response.send(result);
273 }
274 Command::StreamFrom { sender, start_height, block_confirmations, response } => {
275 info!(start_height = ?start_height, "Starting streaming from");
276 let result = self.handle_sync(start_height, block_confirmations, sender).await;
277 let _ = response.send(result);
278 }
279 Command::Rewind { sender, start_height, end_height, response } => {
280 info!(start_height = ?start_height, end_height = ?end_height, "Starting rewind");
281 let result = self.handle_rewind(start_height, end_height, sender).await;
282 let _ = response.send(result);
283 }
284 }
285 Ok(())
286 }
287
288 async fn handle_live(
289 &mut self,
290 block_confirmations: u64,
291 sender: mpsc::Sender<Message>,
292 ) -> Result<(), ScannerError> {
293 let max_block_range = self.max_block_range;
294 let latest = self.provider.get_block_number().await?;
295
296 let range_start = (latest + 1).saturating_sub(block_confirmations);
300
301 let subscription = self.provider.subscribe_blocks().await?;
302
303 info!("WebSocket connected for live blocks");
304
305 tokio::spawn(async move {
306 Self::stream_live_blocks(
307 range_start,
308 subscription,
309 sender,
310 block_confirmations,
311 max_block_range,
312 )
313 .await;
314 });
315
316 Ok(())
317 }
318
319 async fn handle_historical(
320 &mut self,
321 start_height: BlockNumberOrTag,
322 end_height: BlockNumberOrTag,
323 sender: mpsc::Sender<Message>,
324 ) -> Result<(), ScannerError> {
325 let max_block_range = self.max_block_range;
326
327 let (start_block, end_block) = tokio::try_join!(
328 self.provider.get_block_by_number(start_height),
329 self.provider.get_block_by_number(end_height)
330 )?;
331
332 let start_block_num = start_block.header().number();
333 let end_block_num = end_block.header().number();
334
335 let (start_block_num, end_block_num) = match start_block_num.cmp(&end_block_num) {
336 Ordering::Greater => (end_block_num, start_block_num),
337 _ => (start_block_num, end_block_num),
338 };
339
340 info!(start_block = start_block_num, end_block = end_block_num, "Syncing historical data");
341
342 tokio::spawn(async move {
343 Self::stream_historical_blocks(
344 start_block_num,
345 end_block_num,
346 max_block_range,
347 &sender,
348 )
349 .await;
350 });
351
352 Ok(())
353 }
354
355 async fn handle_sync(
356 &mut self,
357 start_height: BlockNumberOrTag,
358 block_confirmations: u64,
359 sender: mpsc::Sender<Message>,
360 ) -> Result<(), ScannerError> {
361 let provider = self.provider.clone();
362 let max_block_range = self.max_block_range;
363
364 let get_start_block = async || -> Result<BlockNumber, ScannerError> {
365 let block = match start_height {
366 BlockNumberOrTag::Number(num) => num,
367 block_tag => provider.get_block_by_number(block_tag).await?.header().number(),
368 };
369 Ok(block)
370 };
371
372 let get_latest_block = async || -> Result<BlockNumber, ScannerError> {
373 let block =
374 provider.get_block_by_number(BlockNumberOrTag::Latest).await?.header().number();
375 Ok(block)
376 };
377
378 let (start_block, latest_block) = tokio::try_join!(get_start_block(), get_latest_block())?;
381
382 let confirmed_tip = latest_block.saturating_sub(block_confirmations);
383
384 let subscription = self.provider.subscribe_blocks().await?;
385 info!("Buffering live blocks");
386
387 if start_block > confirmed_tip {
389 info!(
390 start_block = start_block,
391 confirmed_tip = confirmed_tip,
392 "Start block is beyond confirmed tip, starting live stream"
393 );
394
395 tokio::spawn(async move {
396 Self::stream_live_blocks(
397 start_block,
398 subscription,
399 sender,
400 block_confirmations,
401 max_block_range,
402 )
403 .await;
404 });
405
406 return Ok(());
407 }
408
409 info!(start_block = start_block, end_block = confirmed_tip, "Syncing historical data");
410
411 let (live_block_buffer_sender, live_block_buffer_receiver) =
414 mpsc::channel::<Message>(MAX_BUFFERED_MESSAGES);
415
416 let cutoff = confirmed_tip;
419
420 tokio::spawn(async move {
422 Self::stream_live_blocks(
423 cutoff + 1,
424 subscription,
425 live_block_buffer_sender,
426 block_confirmations,
427 max_block_range,
428 )
429 .await;
430 });
431
432 tokio::spawn(async move {
433 Self::stream_historical_blocks(start_block, confirmed_tip, max_block_range, &sender)
437 .await;
438
439 info!("Chain tip reached, switching to live");
440
441 if !sender.try_stream(ScannerStatus::SwitchingToLive).await {
442 return;
443 }
444
445 info!("Successfully transitioned from historical to live data");
446
447 Self::process_live_block_buffer(live_block_buffer_receiver, sender, cutoff).await;
455 });
456
457 Ok(())
458 }
459
460 async fn handle_rewind(
461 &mut self,
462 start_height: BlockNumberOrTag,
463 end_height: BlockNumberOrTag,
464 sender: mpsc::Sender<Message>,
465 ) -> Result<(), ScannerError> {
466 let max_block_range = self.max_block_range;
467 let provider = self.provider.clone();
468
469 let (start_block, end_block) = try_join!(
470 self.provider.get_block_by_number(start_height),
471 self.provider.get_block_by_number(end_height),
472 )?;
473
474 let (from, to) = match start_block.header().number().cmp(&end_block.header().number()) {
476 Ordering::Greater => (start_block, end_block),
477 _ => (end_block, start_block),
478 };
479
480 tokio::spawn(async move {
481 Self::stream_rewind(from, to, max_block_range, &sender, &provider).await;
482 });
483
484 Ok(())
485 }
486
487 async fn stream_rewind(
495 from: N::BlockResponse,
496 to: N::BlockResponse,
497 max_block_range: u64,
498 sender: &mpsc::Sender<Message>,
499 provider: &RobustProvider<N>,
500 ) {
501 let mut batch_count = 0;
502
503 let mut tip_hash = from.header().hash();
505
506 let from = from.header().number();
507 let to = to.header().number();
508
509 let mut batch_from = from;
511
512 while batch_from >= to {
513 let batch_to = batch_from.saturating_sub(max_block_range - 1).max(to);
514
515 if !sender.try_stream(batch_to..=batch_from).await {
517 break;
518 }
519
520 batch_count += 1;
521 if batch_count % 10 == 0 {
522 debug!(batch_count = batch_count, "Processed rewind batches");
523 }
524
525 if batch_to == to {
528 break;
529 }
530
531 let reorged = match reorg_detected(provider, tip_hash).await {
532 Ok(detected) => {
533 info!(block_number = %from, hash = %tip_hash, "Reorg detected");
534 detected
535 }
536 Err(e) => {
537 error!(error = %e, "Terminal RPC call error, shutting down");
538 _ = sender.try_stream(e);
539 return;
540 }
541 };
542
543 if reorged {
544 info!(block_number = %from, hash = %tip_hash, "Reorg detected");
545
546 if !sender.try_stream(ScannerStatus::ReorgDetected).await {
547 break;
548 }
549
550 batch_from = from;
552 tip_hash = match provider.get_block_by_number(from.into()).await {
554 Ok(block) => block.header().hash(),
555 Err(RobustProviderError::BlockNotFound(_)) => {
556 panic!("Block with number '{from}' should exist post-reorg");
557 }
558 Err(e) => {
559 error!(error = %e, "Terminal RPC call error, shutting down");
560 _ = sender.try_stream(e);
561 return;
562 }
563 };
564 } else {
565 batch_from = batch_to - 1;
568 }
569 }
570
571 info!(batch_count = batch_count, "Rewind completed");
572 }
573
574 async fn stream_historical_blocks(
575 start: BlockNumber,
576 end: BlockNumber,
577 max_block_range: u64,
578 sender: &mpsc::Sender<Message>,
579 ) {
580 let mut batch_count = 0;
581
582 let mut next_start_block = start;
583
584 while next_start_block <= end {
587 let batch_end_block_number =
588 next_start_block.saturating_add(max_block_range - 1).min(end);
589
590 if !sender.try_stream(next_start_block..=batch_end_block_number).await {
591 break;
592 }
593
594 batch_count += 1;
595 if batch_count % 10 == 0 {
596 debug!(batch_count = batch_count, "Processed historical batches");
597 }
598
599 if batch_end_block_number == end {
600 break;
601 }
602
603 let next_start_block_number = batch_end_block_number.saturating_add(1);
605
606 next_start_block = next_start_block_number;
607 }
608
609 info!(batch_count = batch_count, "Historical sync completed");
610 }
611
612 async fn stream_live_blocks(
613 mut range_start: BlockNumber,
614 subscription: Subscription<N::HeaderResponse>,
615 sender: mpsc::Sender<Message>,
616 block_confirmations: u64,
617 max_block_range: u64,
618 ) {
619 let cutoff = range_start;
621 let mut stream = subscription.into_stream().skip_while(|header| header.number() < cutoff);
622
623 while let Some(incoming_block) = stream.next().await {
624 let incoming_block_num = incoming_block.number();
625 info!(block_number = incoming_block_num, "Received block header");
626
627 if incoming_block_num < range_start {
628 warn!("Reorg detected: sending forked range");
629 if !sender.try_stream(ScannerStatus::ReorgDetected).await {
630 return;
631 }
632
633 let incoming_confirmed = incoming_block_num.saturating_sub(block_confirmations);
635
636 range_start = incoming_confirmed;
638 }
639
640 let confirmed = incoming_block_num.saturating_sub(block_confirmations);
641 if confirmed >= range_start {
642 let range_end = confirmed.min(range_start.saturating_add(max_block_range - 1));
645
646 info!(range_start = range_start, range_end = range_end, "Sending live block range");
647
648 if !sender.try_stream(range_start..=range_end).await {
649 return;
650 }
651
652 range_start = range_end + 1;
654 }
655 }
656 }
657
658 async fn process_live_block_buffer(
659 mut buffer_rx: mpsc::Receiver<Message>,
660 sender: mpsc::Sender<Message>,
661 cutoff: BlockNumber,
662 ) {
663 let mut processed = 0;
664 let mut discarded = 0;
665
666 while let Some(data) = buffer_rx.recv().await {
668 match data {
669 Message::Data(range) => {
670 let (start, end) = (*range.start(), *range.end());
671 if start >= cutoff {
672 if !sender.try_stream(range).await {
673 break;
674 }
675 processed += end - start;
676 } else if end >= cutoff {
677 discarded += cutoff - start;
678
679 let start = cutoff;
680 if !sender.try_stream(start..=end).await {
681 break;
682 }
683 processed += end - start;
684 } else {
685 discarded += end - start;
686 }
687 }
688 other => {
689 if !sender.try_stream(other).await {
691 break;
692 }
693 }
694 }
695 }
696
697 info!(processed = processed, discarded = discarded, "Processed buffered messages");
698 }
699}
700
701async fn reorg_detected<N: Network>(
702 provider: &RobustProvider<N>,
703 hash_to_check: B256,
704) -> Result<bool, ScannerError> {
705 match provider.get_block_by_hash(hash_to_check).await {
706 Ok(_) => Ok(false),
707 Err(RobustProviderError::BlockNotFound(_)) => Ok(true),
708 Err(e) => Err(e.into()),
709 }
710}
711
712pub struct BlockRangeScannerClient {
713 command_sender: mpsc::Sender<Command>,
714}
715
716impl BlockRangeScannerClient {
717 #[must_use]
723 pub fn new(command_sender: mpsc::Sender<Command>) -> Self {
724 Self { command_sender }
725 }
726
727 pub async fn stream_live(
737 &self,
738 block_confirmations: u64,
739 ) -> Result<ReceiverStream<Message>, ScannerError> {
740 let (blocks_sender, blocks_receiver) = mpsc::channel(MAX_BUFFERED_MESSAGES);
741 let (response_tx, response_rx) = oneshot::channel();
742
743 let command = Command::StreamLive {
744 sender: blocks_sender,
745 block_confirmations,
746 response: response_tx,
747 };
748
749 self.command_sender.send(command).await.map_err(|_| ScannerError::ServiceShutdown)?;
750
751 response_rx.await.map_err(|_| ScannerError::ServiceShutdown)??;
752
753 Ok(ReceiverStream::new(blocks_receiver))
754 }
755
756 pub async fn stream_historical(
767 &self,
768 start_height: impl Into<BlockNumberOrTag>,
769 end_height: impl Into<BlockNumberOrTag>,
770 ) -> Result<ReceiverStream<Message>, ScannerError> {
771 let (blocks_sender, blocks_receiver) = mpsc::channel(MAX_BUFFERED_MESSAGES);
772 let (response_tx, response_rx) = oneshot::channel();
773
774 let command = Command::StreamHistorical {
775 sender: blocks_sender,
776 start_height: start_height.into(),
777 end_height: end_height.into(),
778 response: response_tx,
779 };
780
781 self.command_sender.send(command).await.map_err(|_| ScannerError::ServiceShutdown)?;
782
783 response_rx.await.map_err(|_| ScannerError::ServiceShutdown)??;
784
785 Ok(ReceiverStream::new(blocks_receiver))
786 }
787
788 pub async fn stream_from(
799 &self,
800 start_height: impl Into<BlockNumberOrTag>,
801 block_confirmations: u64,
802 ) -> Result<ReceiverStream<Message>, ScannerError> {
803 let (blocks_sender, blocks_receiver) = mpsc::channel(MAX_BUFFERED_MESSAGES);
804 let (response_tx, response_rx) = oneshot::channel();
805
806 let command = Command::StreamFrom {
807 sender: blocks_sender,
808 start_height: start_height.into(),
809 block_confirmations,
810 response: response_tx,
811 };
812
813 self.command_sender.send(command).await.map_err(|_| ScannerError::ServiceShutdown)?;
814
815 response_rx.await.map_err(|_| ScannerError::ServiceShutdown)??;
816
817 Ok(ReceiverStream::new(blocks_receiver))
818 }
819
820 pub async fn rewind(
831 &self,
832 start_height: impl Into<BlockNumberOrTag>,
833 end_height: impl Into<BlockNumberOrTag>,
834 ) -> Result<ReceiverStream<Message>, ScannerError> {
835 let (blocks_sender, blocks_receiver) = mpsc::channel(MAX_BUFFERED_MESSAGES);
836 let (response_tx, response_rx) = oneshot::channel();
837
838 let command = Command::Rewind {
839 sender: blocks_sender,
840 start_height: start_height.into(),
841 end_height: end_height.into(),
842 response: response_tx,
843 };
844
845 self.command_sender.send(command).await.map_err(|_| ScannerError::ServiceShutdown)?;
846
847 response_rx.await.map_err(|_| ScannerError::ServiceShutdown)??;
848
849 Ok(ReceiverStream::new(blocks_receiver))
850 }
851}
852
853#[cfg(test)]
854mod tests {
855 use super::*;
856 use crate::{assert_closed, assert_next};
857 use alloy::{eips::BlockId, network::Ethereum};
858 use tokio::sync::mpsc;
859
860 #[test]
861 fn block_range_scanner_defaults_match_constants() {
862 let scanner = BlockRangeScanner::new();
863
864 assert_eq!(scanner.max_block_range, DEFAULT_MAX_BLOCK_RANGE);
865 }
866
867 #[test]
868 fn builder_methods_update_configuration() {
869 let max_block_range = 42;
870
871 let scanner = BlockRangeScanner::new().max_block_range(max_block_range);
872
873 assert_eq!(scanner.max_block_range, max_block_range);
874 }
875
876 #[tokio::test]
877 async fn buffered_messages_after_cutoff_are_all_passed() {
878 let cutoff = 50;
879 let (buffer_tx, buffer_rx) = mpsc::channel(8);
880 buffer_tx.send(Message::Data(51..=55)).await.unwrap();
881 buffer_tx.send(Message::Data(56..=60)).await.unwrap();
882 buffer_tx.send(Message::Data(61..=70)).await.unwrap();
883 drop(buffer_tx);
884
885 let (out_tx, out_rx) = mpsc::channel(8);
886 Service::<Ethereum>::process_live_block_buffer(buffer_rx, out_tx, cutoff).await;
887
888 let mut stream = ReceiverStream::new(out_rx);
889
890 assert_next!(stream, 51..=55);
891 assert_next!(stream, 56..=60);
892 assert_next!(stream, 61..=70);
893 assert_closed!(stream);
894 }
895
896 #[tokio::test]
897 async fn ranges_entirely_before_cutoff_are_discarded() {
898 let cutoff = 100;
899
900 let (buffer_tx, buffer_rx) = mpsc::channel(8);
901 buffer_tx.send(Message::Data(40..=50)).await.unwrap();
902 buffer_tx.send(Message::Data(51..=60)).await.unwrap();
903 buffer_tx.send(Message::Data(61..=70)).await.unwrap();
904 drop(buffer_tx);
905
906 let (out_tx, out_rx) = mpsc::channel(8);
907 Service::<Ethereum>::process_live_block_buffer(buffer_rx, out_tx, cutoff).await;
908
909 let mut stream = ReceiverStream::new(out_rx);
910
911 assert_closed!(stream);
912 }
913
914 #[tokio::test]
915 async fn ranges_overlapping_cutoff_are_trimmed() {
916 let cutoff = 75;
917
918 let (buffer_tx, buffer_rx) = mpsc::channel(8);
919 buffer_tx.send(Message::Data(60..=70)).await.unwrap();
920 buffer_tx.send(Message::Data(71..=80)).await.unwrap();
921 buffer_tx.send(Message::Data(81..=86)).await.unwrap();
922 drop(buffer_tx);
923
924 let (out_tx, out_rx) = mpsc::channel(8);
925 Service::<Ethereum>::process_live_block_buffer(buffer_rx, out_tx, cutoff).await;
926
927 let mut stream = ReceiverStream::new(out_rx);
928
929 assert_next!(stream, 75..=80);
930 assert_next!(stream, 81..=86);
931 assert_closed!(stream);
932 }
933
934 #[tokio::test]
935 async fn edge_case_range_exactly_at_cutoff() {
936 let cutoff = 100;
937
938 let (buffer_tx, buffer_rx) = mpsc::channel(8);
939 buffer_tx.send(Message::Data(98..=98)).await.unwrap(); buffer_tx.send(Message::Data(99..=100)).await.unwrap(); buffer_tx.send(Message::Data(100..=100)).await.unwrap(); buffer_tx.send(Message::Data(100..=101)).await.unwrap(); buffer_tx.send(Message::Data(102..=102)).await.unwrap(); drop(buffer_tx);
945
946 let (out_tx, out_rx) = mpsc::channel(8);
947 Service::<Ethereum>::process_live_block_buffer(buffer_rx, out_tx, cutoff).await;
948
949 let mut stream = ReceiverStream::new(out_rx);
950
951 assert_next!(stream, 100..=100);
952 assert_next!(stream, 100..=100);
953 assert_next!(stream, 100..=101);
954 assert_next!(stream, 102..=102);
955 assert_closed!(stream);
956 }
957
958 #[tokio::test]
959 async fn try_send_forwards_errors_to_subscribers() {
960 let (tx, mut rx) = mpsc::channel::<Message>(1);
961
962 _ = tx.try_stream(ScannerError::BlockNotFound(4.into())).await;
963
964 assert!(matches!(
965 rx.recv().await,
966 Some(ScannerMessage::Error(ScannerError::BlockNotFound(BlockId::Number(
967 BlockNumberOrTag::Number(4)
968 ))))
969 ));
970 }
971}