use alloy::{eips::BlockNumberOrTag, network::Network};
use crate::{
ScannerError,
event_scanner::{
EventScanner, StartProof,
block_range_handler::{BlockRangeHandler, LatestEventsHandler, StreamHandler},
builder::{EventScannerBuilder, SyncFromLatestEvents},
},
types::TryStream,
};
use robust_provider::IntoRobustProvider;
impl EventScannerBuilder<SyncFromLatestEvents> {
#[must_use]
pub fn block_confirmations(mut self, confirmations: u64) -> Self {
self.config.block_confirmations = confirmations;
self
}
#[must_use]
pub fn max_concurrent_fetches(mut self, max_concurrent_fetches: usize) -> Self {
self.config.max_concurrent_fetches = max_concurrent_fetches;
self
}
pub async fn connect<N: Network>(
self,
provider: impl IntoRobustProvider<N>,
) -> Result<EventScanner<SyncFromLatestEvents, N>, ScannerError> {
if self.config.count == 0 {
return Err(ScannerError::InvalidEventCount);
}
if self.config.max_concurrent_fetches == 0 {
return Err(ScannerError::InvalidMaxConcurrentFetches);
}
self.build(provider).await
}
}
impl<N: Network> EventScanner<SyncFromLatestEvents, N> {
#[allow(clippy::missing_panics_doc)]
pub async fn start(self) -> Result<StartProof, ScannerError> {
info!(
event_count = self.config.count,
block_confirmations = self.config.block_confirmations,
listener_count = self.listeners.len(),
"Starting EventScanner in SyncFromLatestEvents mode"
);
let count = self.config.count;
let provider = self.block_range_scanner.provider().clone();
let listeners = self.listeners.clone();
let broadcast_channel_capacity = self.buffer_capacity();
let latest_block = provider.get_block_number().await?;
let rewind_stream = self
.block_range_scanner
.stream_rewind(latest_block, BlockNumberOrTag::Earliest)
.await?;
let collection_handler = LatestEventsHandler::new(
self.block_range_scanner.provider().clone(),
listeners.clone(),
self.config.max_concurrent_fetches,
count,
broadcast_channel_capacity,
);
let stream_handler = StreamHandler::new(
self.block_range_scanner.provider().clone(),
listeners.clone(),
self.config.max_concurrent_fetches,
broadcast_channel_capacity,
);
tokio::spawn(async move {
debug!(
latest_block = latest_block,
count = count,
"Phase 1: Collecting latest events via rewind"
);
collection_handler.handle(rewind_stream).await;
debug!(
start_block = latest_block + 1,
"Phase 2: Catching up and transitioning to live mode"
);
let sync_stream = match self
.block_range_scanner
.stream_from(latest_block + 1, self.config.block_confirmations)
.await
{
Ok(stream) => stream,
Err(e) => {
error!("Failed to setup sync stream after collecting latest events");
for listener in listeners {
_ = listener.sender.try_stream(e.clone()).await;
}
return;
}
};
stream_handler.handle(sync_stream).await;
debug!("SyncFromLatestEvents stream ended");
});
Ok(StartProof::new())
}
}
#[cfg(test)]
mod tests {
use alloy::{
network::Ethereum,
node_bindings::Anvil,
providers::{ProviderBuilder, RootProvider, mock::Asserter},
rpc::client::RpcClient,
};
use crate::{
block_range_scanner::{
DEFAULT_BLOCK_CONFIRMATIONS, DEFAULT_MAX_BLOCK_RANGE, DEFAULT_STREAM_BUFFER_CAPACITY,
},
event_scanner::builder::DEFAULT_MAX_CONCURRENT_FETCHES,
};
use super::*;
#[test]
fn builder_pattern() {
let builder = EventScannerBuilder::sync()
.from_latest(1)
.block_confirmations(2)
.max_block_range(50)
.max_concurrent_fetches(10)
.buffer_capacity(33);
assert_eq!(builder.config.count, 1);
assert_eq!(builder.config.block_confirmations, 2);
assert_eq!(builder.block_range_scanner.max_block_range, 50);
assert_eq!(builder.config.max_concurrent_fetches, 10);
assert_eq!(builder.block_range_scanner.buffer_capacity, 33);
}
#[test]
fn builder_with_default_values() {
let builder = EventScannerBuilder::sync().from_latest(1);
assert_eq!(builder.config.count, 1);
assert_eq!(builder.config.block_confirmations, DEFAULT_BLOCK_CONFIRMATIONS);
assert_eq!(builder.block_range_scanner.max_block_range, DEFAULT_MAX_BLOCK_RANGE);
assert_eq!(builder.config.max_concurrent_fetches, DEFAULT_MAX_CONCURRENT_FETCHES);
assert_eq!(builder.block_range_scanner.buffer_capacity, DEFAULT_STREAM_BUFFER_CAPACITY);
}
#[test]
fn builder_last_call_wins() {
let builder = EventScannerBuilder::sync()
.from_latest(1)
.max_block_range(25)
.max_block_range(55)
.max_block_range(105)
.block_confirmations(2)
.block_confirmations(3)
.max_concurrent_fetches(10)
.max_concurrent_fetches(20)
.buffer_capacity(20)
.buffer_capacity(40);
assert_eq!(builder.config.count, 1);
assert_eq!(builder.block_range_scanner.max_block_range, 105);
assert_eq!(builder.config.block_confirmations, 3);
assert_eq!(builder.config.max_concurrent_fetches, 20);
assert_eq!(builder.block_range_scanner.buffer_capacity, 40);
}
#[tokio::test]
async fn accepts_zero_confirmations() -> anyhow::Result<()> {
let anvil = Anvil::new().try_spawn().unwrap();
let provider = ProviderBuilder::new().connect_http(anvil.endpoint_url());
let scanner = EventScannerBuilder::sync()
.from_latest(1)
.block_confirmations(0)
.connect(provider)
.await?;
assert_eq!(scanner.config.block_confirmations, 0);
Ok(())
}
#[tokio::test]
async fn returns_error_with_zero_max_concurrent_fetches() {
let provider = RootProvider::<Ethereum>::new(RpcClient::mocked(Asserter::new()));
let result = EventScannerBuilder::sync()
.from_latest(1)
.max_concurrent_fetches(0)
.connect(provider)
.await;
assert!(matches!(result, Err(ScannerError::InvalidMaxConcurrentFetches)));
}
#[tokio::test]
async fn test_sync_from_latest_returns_error_with_zero_count() {
let provider = RootProvider::<Ethereum>::new(RpcClient::mocked(Asserter::new()));
let result = EventScannerBuilder::sync().from_latest(0).connect(provider).await;
match result {
Err(ScannerError::InvalidEventCount) => {}
_ => panic!("Expected InvalidEventCount error"),
}
}
#[tokio::test]
async fn test_sync_from_latest_returns_error_with_zero_max_block_range() {
let provider = RootProvider::<Ethereum>::new(RpcClient::mocked(Asserter::new()));
let result =
EventScannerBuilder::sync().from_latest(10).max_block_range(0).connect(provider).await;
match result {
Err(ScannerError::InvalidMaxBlockRange) => {}
_ => panic!("Expected InvalidMaxBlockRange error"),
}
}
#[tokio::test]
async fn returns_error_with_zero_buffer_capacity() {
let provider = RootProvider::<Ethereum>::new(RpcClient::mocked(Asserter::new()));
let result =
EventScannerBuilder::sync().from_latest(10).buffer_capacity(0).connect(provider).await;
assert!(matches!(result, Err(ScannerError::InvalidBufferCapacity)));
}
}