use std::collections::HashMap;
use alloy::{dyn_abi::EventExt, json_abi::Event, rpc::types::Filter};
use alloy_primitives::B256;
use async_stream::stream;
use futures_util::{Stream, StreamExt, pin_mut};
use crate::{
rpc::{
DecodedEventRange, RpcError, config::ProviderSettings, stream::backfill_then_watch_logs,
},
storage::codec::decoder::DecodedEventWithHeader,
};
pub struct EventStreamer {
providers: ProviderSettings,
events: HashMap<B256, Event>,
signatures: Vec<String>,
}
impl EventStreamer {
pub fn new(providers: ProviderSettings, events: &[Event]) -> Self {
let signatures = events.iter().map(|e| e.signature()).collect::<Vec<_>>();
let events_map = events
.iter()
.map(|e| (e.selector(), e.clone()))
.collect::<HashMap<_, _>>();
Self {
providers,
events: events_map,
signatures,
}
}
pub async fn stream(
&self,
filter: Option<&Filter>,
) -> impl Stream<Item = Result<DecodedEventRange, RpcError>> {
stream! {
let filter = filter.cloned().unwrap_or(Filter::default()).events(&self.signatures);
let log_stream = backfill_then_watch_logs(&self.providers, Some(&filter)).await;
pin_mut!(log_stream);
while let Some(result) = log_stream.next().await {
match result {
Ok((from_block, to_block, logs)) => {
let decoded_events = logs.iter().filter_map(|log| {
let event = self.events.get(log.topic0()?)?.decode_log(log.data()).ok()?;
let log_block = log.block_number?;
let log_index = log.log_index? as u32;
let log_address = log.address();
Some(
DecodedEventWithHeader { log_block, log_index, log_address, event }
)
}).collect::<Vec<_>>();
yield Ok((from_block, to_block, decoded_events));
}
Err(e) => {
yield Err(e);
return;
}
}
}
}
}
}