use robust_provider::RobustProvider;
use std::{cmp::Ordering, fmt::Debug};
use tokio::sync::mpsc;
use tokio_stream::wrappers::ReceiverStream;
use crate::{
ScannerError,
block_range_scanner::{
RingBufferCapacity,
common::{self, BlockScannerResult},
historical_range_handler::HistoricalRangeHandler,
reorg_handler::DefaultReorgHandler,
rewind_handler::RewindHandler,
sync_handler::SyncHandler,
},
};
use alloy::{
consensus::BlockHeader,
eips::BlockId,
network::{BlockResponse, Network},
};
#[derive(Debug)]
pub struct BlockRangeScanner<N: Network> {
provider: RobustProvider<N>,
max_block_range: u64,
past_blocks_storage_capacity: RingBufferCapacity,
buffer_capacity: usize,
}
impl<N: Network> BlockRangeScanner<N> {
#[must_use]
pub fn new(
provider: RobustProvider<N>,
max_block_range: u64,
past_blocks_storage_capacity: RingBufferCapacity,
buffer_capacity: usize,
) -> Self {
Self { provider, max_block_range, past_blocks_storage_capacity, buffer_capacity }
}
#[must_use]
pub fn provider(&self) -> &RobustProvider<N> {
&self.provider
}
#[must_use]
pub fn buffer_capacity(&self) -> usize {
self.buffer_capacity
}
#[cfg_attr(feature = "tracing", tracing::instrument(level = "trace", skip_all))]
pub async fn stream_live(
&self,
block_confirmations: u64,
) -> Result<ReceiverStream<BlockScannerResult>, ScannerError> {
let (blocks_sender, blocks_receiver) = mpsc::channel(self.buffer_capacity);
let max_block_range = self.max_block_range;
let past_blocks_storage_capacity = self.past_blocks_storage_capacity;
let latest = self.provider.get_block_number().await?;
let provider = self.provider.clone();
let start_block = (latest + 1).saturating_sub(block_confirmations);
debug!(
latest_block = latest,
start_block = start_block,
block_confirmations = block_confirmations,
max_block_range = max_block_range,
"Starting live block stream"
);
let subscription = self.provider.subscribe_blocks().await?;
tokio::spawn(async move {
let mut reorg_handler =
DefaultReorgHandler::new(provider.clone(), past_blocks_storage_capacity);
common::stream_live_blocks(
start_block,
subscription,
&blocks_sender,
&provider,
block_confirmations,
max_block_range,
&mut reorg_handler,
false, )
.await;
debug!("Live block stream ended");
});
Ok(ReceiverStream::new(blocks_receiver))
}
#[cfg_attr(feature = "tracing", tracing::instrument(level = "trace", skip_all))]
pub async fn stream_historical(
&self,
start_id: impl Into<BlockId>,
end_id: impl Into<BlockId>,
) -> Result<ReceiverStream<BlockScannerResult>, ScannerError> {
let (blocks_sender, blocks_receiver) = mpsc::channel(self.buffer_capacity);
let max_block_range = self.max_block_range;
let provider = self.provider.clone();
let (start_block, end_block) = tokio::try_join!(
self.provider.get_block(start_id.into()),
self.provider.get_block(end_id.into())
)?;
let start_block_num = start_block.header().number();
let end_block_num = end_block.header().number();
let (start_block_num, end_block_num) = match start_block_num.cmp(&end_block_num) {
Ordering::Greater => (end_block_num, start_block_num),
_ => (start_block_num, end_block_num),
};
debug!(
from_block = start_block_num,
to_block = end_block_num,
total_blocks = end_block_num.saturating_sub(start_block_num) + 1,
max_block_range = max_block_range,
"Starting historical block stream"
);
let handler = HistoricalRangeHandler::new(
provider,
max_block_range,
start_block_num,
end_block_num,
blocks_sender,
);
handler.run();
Ok(ReceiverStream::new(blocks_receiver))
}
#[cfg_attr(feature = "tracing", tracing::instrument(level = "trace", skip_all))]
pub async fn stream_from(
&self,
start_id: impl Into<BlockId>,
block_confirmations: u64,
) -> Result<ReceiverStream<BlockScannerResult>, ScannerError> {
let (blocks_sender, blocks_receiver) = mpsc::channel(self.buffer_capacity);
let start_id = start_id.into();
debug!(
start_block = ?start_id,
block_confirmations = block_confirmations,
max_block_range = self.max_block_range,
"Starting sync block stream"
);
let sync_handler = SyncHandler::new(
self.provider.clone(),
self.max_block_range,
start_id,
block_confirmations,
self.past_blocks_storage_capacity,
blocks_sender,
);
sync_handler.run().await?;
Ok(ReceiverStream::new(blocks_receiver))
}
#[cfg_attr(feature = "tracing", tracing::instrument(level = "trace", skip_all))]
pub async fn stream_rewind(
&self,
start_id: impl Into<BlockId>,
end_id: impl Into<BlockId>,
) -> Result<ReceiverStream<BlockScannerResult>, ScannerError> {
let (blocks_sender, blocks_receiver) = mpsc::channel(self.buffer_capacity);
let start_id = start_id.into();
let end_id = end_id.into();
debug!(
from_block = ?start_id,
to_block = ?end_id,
max_block_range = self.max_block_range,
"Starting rewind block stream"
);
let rewind_handler = RewindHandler::new(
self.provider.clone(),
self.max_block_range,
start_id,
end_id,
self.past_blocks_storage_capacity,
blocks_sender,
);
rewind_handler.run().await?;
Ok(ReceiverStream::new(blocks_receiver))
}
}
#[cfg(test)]
mod tests {
use crate::{
block_range_scanner::{
BlockRangeScannerBuilder, DEFAULT_MAX_BLOCK_RANGE, DEFAULT_STREAM_BUFFER_CAPACITY,
},
types::TryStream,
};
use super::*;
use alloy::{
network::Ethereum,
providers::{RootProvider, mock::Asserter},
rpc::client::RpcClient,
};
use tokio::sync::mpsc;
#[test]
fn block_range_scanner_defaults_match_constants() {
let scanner = BlockRangeScannerBuilder::new();
assert_eq!(scanner.max_block_range, DEFAULT_MAX_BLOCK_RANGE);
assert_eq!(scanner.buffer_capacity, DEFAULT_STREAM_BUFFER_CAPACITY);
}
#[test]
fn builder_methods_update_configuration() {
let scanner = BlockRangeScannerBuilder::new().max_block_range(42).buffer_capacity(33);
assert_eq!(scanner.max_block_range, 42);
assert_eq!(scanner.buffer_capacity, 33);
}
#[tokio::test]
async fn try_send_forwards_errors_to_subscribers() {
let (tx, mut rx) = mpsc::channel::<BlockScannerResult>(1);
_ = tx.try_stream(ScannerError::BlockNotFound).await;
assert!(matches!(rx.recv().await, Some(Err(ScannerError::BlockNotFound))));
}
#[tokio::test]
async fn returns_error_with_zero_buffer_capacity() {
let provider = RootProvider::<Ethereum>::new(RpcClient::mocked(Asserter::new()));
let result = BlockRangeScannerBuilder::new().buffer_capacity(0).connect(provider).await;
assert!(matches!(result, Err(ScannerError::InvalidBufferCapacity)));
}
#[tokio::test]
async fn returns_error_with_zero_max_block_range() {
let provider = RootProvider::<Ethereum>::new(RpcClient::mocked(Asserter::new()));
let result = BlockRangeScannerBuilder::new().max_block_range(0).connect(provider).await;
assert!(matches!(result, Err(ScannerError::InvalidMaxBlockRange)));
}
}