use alloy::network::{Ethereum, Network};
use tokio::sync::mpsc;
use tokio_stream::wrappers::ReceiverStream;
use crate::{
block_range_scanner::BlockRangeScanner,
event_scanner::{
EventScannerResult, Unspecified, filter::EventFilter, listener::EventListener,
},
};
#[derive(Debug)]
pub struct EventScanner<Mode = Unspecified, N: Network = Ethereum> {
pub(crate) config: Mode,
pub(crate) block_range_scanner: BlockRangeScanner<N>,
pub(crate) listeners: Vec<EventListener>,
}
impl<Mode, N: Network> EventScanner<Mode, N> {
pub fn new(config: Mode, block_range_scanner: BlockRangeScanner<N>) -> Self {
Self { config, block_range_scanner, listeners: Vec::new() }
}
}
pub struct EventSubscription {
inner: ReceiverStream<EventScannerResult>,
}
impl EventSubscription {
pub(crate) fn new(inner: ReceiverStream<EventScannerResult>) -> Self {
Self { inner }
}
#[must_use]
pub fn stream(self, _proof: &StartProof) -> ReceiverStream<EventScannerResult> {
self.inner
}
}
impl<Mode, N: Network> EventScanner<Mode, N> {
#[must_use]
pub fn buffer_capacity(&self) -> usize {
self.block_range_scanner.buffer_capacity()
}
#[must_use]
pub fn subscribe(&mut self, filter: EventFilter) -> EventSubscription {
let (sender, receiver) =
mpsc::channel::<EventScannerResult>(self.block_range_scanner.buffer_capacity());
self.listeners.push(EventListener { filter, sender });
EventSubscription::new(ReceiverStream::new(receiver))
}
}
#[derive(Debug, Clone)]
pub struct StartProof {
_private: (),
}
impl StartProof {
#[must_use]
pub(crate) fn new() -> Self {
Self { _private: () }
}
}
#[cfg(test)]
mod tests {
use alloy::{
providers::{RootProvider, mock::Asserter},
rpc::client::RpcClient,
};
use crate::{BlockRangeScannerBuilder, Historic};
use super::*;
#[tokio::test]
async fn test_historic_event_stream_listeners_vector_updates() -> anyhow::Result<()> {
let provider = RootProvider::<Ethereum>::new(RpcClient::mocked(Asserter::new()));
let brs = BlockRangeScannerBuilder::new().connect(provider).await?;
let mut scanner = EventScanner::new(Historic::default(), brs);
assert!(scanner.listeners.is_empty());
let _stream1 = scanner.subscribe(EventFilter::new());
assert_eq!(scanner.listeners.len(), 1);
let _stream2 = scanner.subscribe(EventFilter::new());
let _stream3 = scanner.subscribe(EventFilter::new());
assert_eq!(scanner.listeners.len(), 3);
Ok(())
}
#[tokio::test]
async fn test_historic_event_stream_channel_capacity() -> anyhow::Result<()> {
let provider = RootProvider::<Ethereum>::new(RpcClient::mocked(Asserter::new()));
let brs = BlockRangeScannerBuilder::new().connect(provider.clone()).await?;
let mut scanner = EventScanner::new(Historic::default(), brs);
let _ = scanner.subscribe(EventFilter::new());
let sender = &scanner.listeners[0].sender;
assert_eq!(sender.capacity(), scanner.block_range_scanner.buffer_capacity());
let custom_capacity = 1000;
let brs = BlockRangeScannerBuilder::new()
.buffer_capacity(custom_capacity)
.connect(provider)
.await?;
let mut scanner = EventScanner::new(Historic::default(), brs);
let _ = scanner.subscribe(EventFilter::new());
let sender = &scanner.listeners[0].sender;
assert_eq!(sender.capacity(), custom_capacity);
Ok(())
}
}