1use std::{ops::RangeInclusive, sync::Arc};
2
3use crate::{
4 block_range_scanner::{
5 BlockRangeMessage, BlockRangeScanner, BlockRangeScannerError, ConnectedBlockRangeScanner,
6 MAX_BUFFERED_MESSAGES,
7 },
8 event_filter::EventFilter,
9 event_listener::EventListener,
10 types::ScannerMessage,
11};
12use alloy::{
13 eips::BlockNumberOrTag,
14 network::Network,
15 providers::{Provider, RootProvider},
16 rpc::types::{Filter, Log},
17 transports::{RpcError, TransportErrorKind, http::reqwest::Url},
18};
19use thiserror::Error;
20use tokio::sync::{
21 broadcast::{self, Sender, error::RecvError},
22 mpsc,
23};
24use tokio_stream::{StreamExt, wrappers::ReceiverStream};
25use tracing::{error, info};
26
27pub struct EventScanner {
28 block_range_scanner: BlockRangeScanner,
29}
30
31pub type EventScannerMessage = ScannerMessage<Vec<Log>, EventScannerError>;
32
33#[derive(Error, Debug, Clone)]
34pub enum EventScannerError {
35 #[error("Block range scanner error: {0}")]
36 BlockRangeScanner(#[from] BlockRangeScannerError),
37 #[error("Provider error: {0}")]
38 Provider(Arc<RpcError<TransportErrorKind>>),
39}
40
41impl From<RpcError<TransportErrorKind>> for EventScannerError {
42 fn from(e: RpcError<TransportErrorKind>) -> Self {
43 EventScannerError::Provider(Arc::new(e))
44 }
45}
46
47impl Default for EventScanner {
48 fn default() -> Self {
49 Self::new()
50 }
51}
52
53impl EventScanner {
54 #[must_use]
55 pub fn new() -> Self {
57 Self { block_range_scanner: BlockRangeScanner::new() }
58 }
59
60 #[must_use]
62 pub fn with_blocks_read_per_epoch(mut self, blocks_read_per_epoch: usize) -> Self {
63 self.block_range_scanner =
64 self.block_range_scanner.with_blocks_read_per_epoch(blocks_read_per_epoch);
65 self
66 }
67
68 #[must_use]
70 pub fn with_reorg_rewind_depth(mut self, reorg_rewind_depth: u64) -> Self {
71 self.block_range_scanner =
72 self.block_range_scanner.with_reorg_rewind_depth(reorg_rewind_depth);
73 self
74 }
75
76 #[must_use]
78 pub fn with_block_confirmations(mut self, block_confirmations: u64) -> Self {
79 self.block_range_scanner =
80 self.block_range_scanner.with_block_confirmations(block_confirmations);
81 self
82 }
83
84 pub async fn connect_ws<N: Network>(self, ws_url: Url) -> Result<Client<N>, EventScannerError> {
90 let block_range_scanner = self.block_range_scanner.connect_ws(ws_url).await?;
91 let event_scanner =
92 ConnectedEventScanner { block_range_scanner, event_listeners: Vec::default() };
93 Ok(Client { event_scanner })
94 }
95
96 pub async fn connect_ipc<N: Network>(
102 self,
103 ipc_path: impl Into<String>,
104 ) -> Result<Client<N>, EventScannerError> {
105 let block_range_scanner = self.block_range_scanner.connect_ipc(ipc_path.into()).await?;
106 let event_scanner =
107 ConnectedEventScanner { block_range_scanner, event_listeners: Vec::default() };
108 Ok(Client { event_scanner })
109 }
110
111 pub fn connect_provider<N: Network>(
117 self,
118 provider: RootProvider<N>,
119 ) -> Result<Client<N>, EventScannerError> {
120 let block_range_scanner = self.block_range_scanner.connect_provider(provider)?;
121 let event_scanner =
122 ConnectedEventScanner { block_range_scanner, event_listeners: Vec::default() };
123 Ok(Client { event_scanner })
124 }
125}
126
127pub struct ConnectedEventScanner<N: Network> {
128 block_range_scanner: ConnectedBlockRangeScanner<N>,
129 event_listeners: Vec<EventListener>,
130}
131
132impl<N: Network> ConnectedEventScanner<N> {
133 pub async fn start(
139 &self,
140 start_height: BlockNumberOrTag,
141 end_height: Option<BlockNumberOrTag>,
142 ) -> Result<(), EventScannerError> {
143 let client = self.block_range_scanner.run()?;
144 let mut stream = if let Some(end_height) = end_height {
145 client.stream_historical(start_height, end_height).await?
146 } else if matches!(start_height, BlockNumberOrTag::Latest) {
147 client.stream_live().await?
148 } else {
149 client.stream_from(start_height).await?
150 };
151
152 let (range_tx, _) = broadcast::channel::<BlockRangeMessage>(1024);
153
154 self.spawn_log_consumers(&range_tx);
155
156 while let Some(message) = stream.next().await {
157 if let Err(err) = range_tx.send(message) {
158 error!(error = %err, "failed sending message to broadcast channel");
159 }
160 }
161
162 Ok(())
163 }
164
165 fn spawn_log_consumers(&self, range_tx: &Sender<BlockRangeMessage>) {
166 for listener in &self.event_listeners {
167 let provider = self.block_range_scanner.provider().clone();
168 let filter = listener.filter.clone();
169 let log_filter = Filter::from(&filter);
170 let sender = listener.sender.clone();
171 let mut sub = range_tx.subscribe();
172
173 tokio::spawn(async move {
174 loop {
175 match sub.recv().await {
176 Ok(BlockRangeMessage::Data(range)) => {
177 Self::process_range(range, &filter, &log_filter, &provider, &sender)
178 .await;
179 }
180 Ok(BlockRangeMessage::Error(e)) => {
181 if let Err(err) = sender.send(ScannerMessage::Error(e.into())).await {
182 error!(error = %err, "Downstream channel closed, skipping error propagation and stopping streaming.");
183 break;
184 }
185 }
186 Ok(BlockRangeMessage::Status(status)) => {
187 if let Err(err) = sender.send(ScannerMessage::Status(status)).await {
188 error!(error = %err, "Downstream channel closed, skipping sending info to receiver stream and stopping streaming.");
189 break;
190 }
191 }
192 Err(RecvError::Closed) => {
193 error!("No block ranges to receive, stopping streaming.");
194 break;
195 }
196 Err(RecvError::Lagged(_)) => {}
197 }
198 }
199 });
200 }
201 }
202
203 async fn process_range(
204 range: RangeInclusive<u64>,
205 event_filter: &EventFilter,
206 log_filter: &Filter,
207 provider: &RootProvider<N>,
208 sender: &mpsc::Sender<EventScannerMessage>,
209 ) {
210 let (from_block, to_block) = (*range.start(), *range.end());
211
212 let log_filter = log_filter.clone().from_block(from_block).to_block(to_block);
213
214 match provider.get_logs(&log_filter).await {
215 Ok(logs) => {
216 if logs.is_empty() {
217 return;
218 }
219
220 info!(
221 filter = %event_filter,
222 log_count = logs.len(),
223 from_block,
224 to_block,
225 "found logs for event in block range"
226 );
227
228 if let Err(e) = sender.send(EventScannerMessage::Data(logs)).await {
229 error!(filter = %event_filter, error = %e, "failed to enqueue log for processing");
230 }
231 }
232 Err(e) => {
233 error!(
234 filter = %event_filter,
235 error = %e,
236 from_block,
237 to_block,
238 "failed to get logs for block range"
239 );
240
241 if let Err(send_err) = sender.send(EventScannerMessage::Error(e.into())).await {
242 error!(filter = %event_filter, error = %send_err, "failed to enqueue error for processing");
243 }
244 }
245 }
246 }
247
248 fn add_event_listener(&mut self, event_listener: EventListener) {
249 self.event_listeners.push(event_listener);
250 }
251}
252
253pub struct Client<N: Network> {
254 event_scanner: ConnectedEventScanner<N>,
255}
256
257impl<N: Network> Client<N> {
258 pub fn create_event_stream(
259 &mut self,
260 event_filter: EventFilter,
261 ) -> ReceiverStream<EventScannerMessage> {
262 let (sender, receiver) = mpsc::channel::<EventScannerMessage>(MAX_BUFFERED_MESSAGES);
263
264 self.event_scanner.add_event_listener(EventListener { filter: event_filter, sender });
265
266 ReceiverStream::new(receiver)
267 }
268
269 pub async fn start_scanner(
275 self,
276 start_height: BlockNumberOrTag,
277 end_height: Option<BlockNumberOrTag>,
278 ) -> Result<(), EventScannerError> {
279 self.event_scanner.start(start_height, end_height).await
280 }
281}