use alloy_primitives::BlockNumber;
use alloy_provider::Provider;
use alloy_rpc_types::{BlockNumberOrTag, Filter, Header, Log};
use futures::stream::{Stream, StreamExt};
use std::pin::Pin;
use tracing::{debug, info};
use crate::errors::{EventProcessingError, RpcError};
#[allow(dead_code)]
pub struct RealtimeEventScanner<P> {
provider: P,
}
#[allow(dead_code)]
impl<P> RealtimeEventScanner<P>
where
P: Provider,
{
pub fn new(provider: P) -> Self {
Self { provider }
}
pub async fn subscribe_blocks(
&self,
) -> Result<Pin<Box<dyn Stream<Item = Header> + Send + '_>>, EventProcessingError> {
info!("Subscribing to new blocks");
let subscription =
self.provider.subscribe_blocks().await.map_err(|e| {
EventProcessingError::Rpc(RpcError::subscription_failed("blocks", e))
})?;
let stream = subscription.into_stream();
debug!("Block subscription established");
Ok(Box::pin(stream))
}
pub async fn subscribe_logs(
&self,
filter: Filter,
) -> Result<Pin<Box<dyn Stream<Item = Log> + Send + '_>>, EventProcessingError> {
info!(
address = ?filter.address,
topics = ?filter.topics,
"Subscribing to logs"
);
let subscription = self
.provider
.subscribe_logs(&filter)
.await
.map_err(|e| EventProcessingError::Rpc(RpcError::subscription_failed("logs", e)))?;
let stream = subscription.into_stream();
debug!("Log subscription established");
Ok(Box::pin(stream))
}
pub async fn subscribe_logs_with_catchup(
&self,
filter_template: Filter,
from_block: BlockNumber,
) -> Result<Pin<Box<dyn Stream<Item = Log> + Send + '_>>, EventProcessingError> {
info!(
from_block = from_block,
address = ?filter_template.address,
"Subscribing to logs with historical catchup"
);
let historical_filter = filter_template
.clone()
.from_block(from_block)
.to_block(BlockNumberOrTag::Latest);
let historical_logs = self
.provider
.get_logs(&historical_filter)
.await
.map_err(|e| {
EventProcessingError::Rpc(RpcError::get_logs_failed("historical catchup", e))
})?;
info!(
historical_count = historical_logs.len(),
"Fetched historical logs, switching to live subscription"
);
let live_filter = filter_template.from_block(BlockNumberOrTag::Latest);
let subscription = self
.provider
.subscribe_logs(&live_filter)
.await
.map_err(|e| EventProcessingError::Rpc(RpcError::subscription_failed("logs", e)))?;
let historical_stream = futures::stream::iter(historical_logs);
let live_stream = subscription.into_stream();
let combined = historical_stream.chain(live_stream);
debug!("Log subscription with catchup established");
Ok(Box::pin(combined))
}
pub fn provider(&self) -> &P {
&self.provider
}
pub fn into_provider(self) -> P {
self.provider
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_scanner_construction_is_generic() {
fn _accepts_any_provider<P: Provider>(_scanner: RealtimeEventScanner<P>) {}
}
}