event_scanner/
event_scanner.rs

1use std::{ops::RangeInclusive, sync::Arc};
2
3use crate::{
4    block_range_scanner::{
5        BlockRangeMessage, BlockRangeScanner, BlockRangeScannerError, ConnectedBlockRangeScanner,
6        MAX_BUFFERED_MESSAGES,
7    },
8    event_filter::EventFilter,
9    event_listener::EventListener,
10    types::ScannerMessage,
11};
12use alloy::{
13    eips::BlockNumberOrTag,
14    network::Network,
15    providers::{Provider, RootProvider},
16    rpc::types::{Filter, Log},
17    transports::{RpcError, TransportErrorKind, http::reqwest::Url},
18};
19use thiserror::Error;
20use tokio::sync::{
21    broadcast::{self, Sender, error::RecvError},
22    mpsc,
23};
24use tokio_stream::{StreamExt, wrappers::ReceiverStream};
25use tracing::{error, info};
26
27pub struct EventScanner {
28    block_range_scanner: BlockRangeScanner,
29}
30
31pub type EventScannerMessage = ScannerMessage<Vec<Log>, EventScannerError>;
32
33#[derive(Error, Debug, Clone)]
34pub enum EventScannerError {
35    #[error("Block range scanner error: {0}")]
36    BlockRangeScanner(#[from] BlockRangeScannerError),
37    #[error("Provider error: {0}")]
38    Provider(Arc<RpcError<TransportErrorKind>>),
39}
40
41impl From<RpcError<TransportErrorKind>> for EventScannerError {
42    fn from(e: RpcError<TransportErrorKind>) -> Self {
43        EventScannerError::Provider(Arc::new(e))
44    }
45}
46
47impl Default for EventScanner {
48    fn default() -> Self {
49        Self::new()
50    }
51}
52
53impl EventScanner {
54    #[must_use]
55    /// Creates a new builder with default block scanner and callback strategy.
56    pub fn new() -> Self {
57        Self { block_range_scanner: BlockRangeScanner::new() }
58    }
59
60    /// Configures how many blocks are read per epoch during a historical sync.
61    #[must_use]
62    pub fn with_blocks_read_per_epoch(mut self, blocks_read_per_epoch: usize) -> Self {
63        self.block_range_scanner =
64            self.block_range_scanner.with_blocks_read_per_epoch(blocks_read_per_epoch);
65        self
66    }
67
68    /// Sets the depth to rewind when a reorg is detected.
69    #[must_use]
70    pub fn with_reorg_rewind_depth(mut self, reorg_rewind_depth: u64) -> Self {
71        self.block_range_scanner =
72            self.block_range_scanner.with_reorg_rewind_depth(reorg_rewind_depth);
73        self
74    }
75
76    /// Configures how many confirmations are required before processing a block (used for reorgs).
77    #[must_use]
78    pub fn with_block_confirmations(mut self, block_confirmations: u64) -> Self {
79        self.block_range_scanner =
80            self.block_range_scanner.with_block_confirmations(block_confirmations);
81        self
82    }
83
84    /// Connects to the provider via WebSocket
85    ///
86    /// # Errors
87    ///
88    /// Returns an error if the connection fails
89    pub async fn connect_ws<N: Network>(self, ws_url: Url) -> Result<Client<N>, EventScannerError> {
90        let block_range_scanner = self.block_range_scanner.connect_ws(ws_url).await?;
91        let event_scanner =
92            ConnectedEventScanner { block_range_scanner, event_listeners: Vec::default() };
93        Ok(Client { event_scanner })
94    }
95
96    /// Connects to the provider via IPC
97    ///
98    /// # Errors
99    ///
100    /// Returns an error if the connection fails
101    pub async fn connect_ipc<N: Network>(
102        self,
103        ipc_path: impl Into<String>,
104    ) -> Result<Client<N>, EventScannerError> {
105        let block_range_scanner = self.block_range_scanner.connect_ipc(ipc_path.into()).await?;
106        let event_scanner =
107            ConnectedEventScanner { block_range_scanner, event_listeners: Vec::default() };
108        Ok(Client { event_scanner })
109    }
110
111    /// Connects to an existing provider
112    ///
113    /// # Errors
114    ///
115    /// Returns an error if the connection fails
116    pub fn connect_provider<N: Network>(
117        self,
118        provider: RootProvider<N>,
119    ) -> Result<Client<N>, EventScannerError> {
120        let block_range_scanner = self.block_range_scanner.connect_provider(provider)?;
121        let event_scanner =
122            ConnectedEventScanner { block_range_scanner, event_listeners: Vec::default() };
123        Ok(Client { event_scanner })
124    }
125}
126
127pub struct ConnectedEventScanner<N: Network> {
128    block_range_scanner: ConnectedBlockRangeScanner<N>,
129    event_listeners: Vec<EventListener>,
130}
131
132impl<N: Network> ConnectedEventScanner<N> {
133    /// Starts the scanner
134    ///
135    /// # Errors
136    ///
137    /// Returns an error if the scanner fails to start
138    pub async fn start(
139        &self,
140        start_height: BlockNumberOrTag,
141        end_height: Option<BlockNumberOrTag>,
142    ) -> Result<(), EventScannerError> {
143        let client = self.block_range_scanner.run()?;
144        let mut stream = if let Some(end_height) = end_height {
145            client.stream_historical(start_height, end_height).await?
146        } else if matches!(start_height, BlockNumberOrTag::Latest) {
147            client.stream_live().await?
148        } else {
149            client.stream_from(start_height).await?
150        };
151
152        let (range_tx, _) = broadcast::channel::<BlockRangeMessage>(1024);
153
154        self.spawn_log_consumers(&range_tx);
155
156        while let Some(message) = stream.next().await {
157            if let Err(err) = range_tx.send(message) {
158                error!(error = %err, "failed sending message to broadcast channel");
159            }
160        }
161
162        Ok(())
163    }
164
165    fn spawn_log_consumers(&self, range_tx: &Sender<BlockRangeMessage>) {
166        for listener in &self.event_listeners {
167            let provider = self.block_range_scanner.provider().clone();
168            let filter = listener.filter.clone();
169            let log_filter = Filter::from(&filter);
170            let sender = listener.sender.clone();
171            let mut sub = range_tx.subscribe();
172
173            tokio::spawn(async move {
174                loop {
175                    match sub.recv().await {
176                        Ok(BlockRangeMessage::Data(range)) => {
177                            Self::process_range(range, &filter, &log_filter, &provider, &sender)
178                                .await;
179                        }
180                        Ok(BlockRangeMessage::Error(e)) => {
181                            if let Err(err) = sender.send(ScannerMessage::Error(e.into())).await {
182                                error!(error = %err, "Downstream channel closed, skipping error propagation and stopping streaming.");
183                                break;
184                            }
185                        }
186                        Ok(BlockRangeMessage::Status(status)) => {
187                            if let Err(err) = sender.send(ScannerMessage::Status(status)).await {
188                                error!(error = %err, "Downstream channel closed, skipping sending info to receiver stream and stopping streaming.");
189                                break;
190                            }
191                        }
192                        Err(RecvError::Closed) => {
193                            error!("No block ranges to receive, stopping streaming.");
194                            break;
195                        }
196                        Err(RecvError::Lagged(_)) => {}
197                    }
198                }
199            });
200        }
201    }
202
203    async fn process_range(
204        range: RangeInclusive<u64>,
205        event_filter: &EventFilter,
206        log_filter: &Filter,
207        provider: &RootProvider<N>,
208        sender: &mpsc::Sender<EventScannerMessage>,
209    ) {
210        let (from_block, to_block) = (*range.start(), *range.end());
211
212        let log_filter = log_filter.clone().from_block(from_block).to_block(to_block);
213
214        match provider.get_logs(&log_filter).await {
215            Ok(logs) => {
216                if logs.is_empty() {
217                    return;
218                }
219
220                info!(
221                    filter = %event_filter,
222                    log_count = logs.len(),
223                    from_block,
224                    to_block,
225                    "found logs for event in block range"
226                );
227
228                if let Err(e) = sender.send(EventScannerMessage::Data(logs)).await {
229                    error!(filter = %event_filter, error = %e, "failed to enqueue log for processing");
230                }
231            }
232            Err(e) => {
233                error!(
234                    filter = %event_filter,
235                    error = %e,
236                    from_block,
237                    to_block,
238                    "failed to get logs for block range"
239                );
240
241                if let Err(send_err) = sender.send(EventScannerMessage::Error(e.into())).await {
242                    error!(filter = %event_filter, error = %send_err, "failed to enqueue error for processing");
243                }
244            }
245        }
246    }
247
248    fn add_event_listener(&mut self, event_listener: EventListener) {
249        self.event_listeners.push(event_listener);
250    }
251}
252
253pub struct Client<N: Network> {
254    event_scanner: ConnectedEventScanner<N>,
255}
256
257impl<N: Network> Client<N> {
258    pub fn create_event_stream(
259        &mut self,
260        event_filter: EventFilter,
261    ) -> ReceiverStream<EventScannerMessage> {
262        let (sender, receiver) = mpsc::channel::<EventScannerMessage>(MAX_BUFFERED_MESSAGES);
263
264        self.event_scanner.add_event_listener(EventListener { filter: event_filter, sender });
265
266        ReceiverStream::new(receiver)
267    }
268
269    /// Starts the scanner
270    ///
271    /// # Errors
272    ///
273    /// Returns an error if the scanner fails to start
274    pub async fn start_scanner(
275        self,
276        start_height: BlockNumberOrTag,
277        end_height: Option<BlockNumberOrTag>,
278    ) -> Result<(), EventScannerError> {
279        self.event_scanner.start(start_height, end_height).await
280    }
281}