event_scanner/event_scanner/scanner/sync/
from_latest.rs

1use alloy::{eips::BlockNumberOrTag, network::Network};
2
3use tracing::{error, info};
4
5use crate::{
6    EventScannerBuilder, ScannerError,
7    event_scanner::{
8        EventScanner,
9        scanner::{
10            SyncFromLatestEvents,
11            common::{ConsumerMode, handle_stream},
12        },
13    },
14    robust_provider::IntoRobustProvider,
15    types::TryStream,
16};
17
18impl EventScannerBuilder<SyncFromLatestEvents> {
19    #[must_use]
20    pub fn block_confirmations(mut self, confirmations: u64) -> Self {
21        self.config.block_confirmations = confirmations;
22        self
23    }
24
25    /// Sets the maximum number of block-range fetches to process concurrently when
26    /// fetching the latest events before switching to live streaming.
27    ///
28    /// Increasing this value can improve catch-up throughput by issuing multiple
29    /// RPC requests concurrently, at the cost of additional load on the provider.
30    ///
31    /// Must be greater than 0.
32    ///
33    /// Defaults to [`DEFAULT_MAX_CONCURRENT_FETCHES`][default].
34    ///
35    /// [default]: crate::event_scanner::scanner::DEFAULT_MAX_CONCURRENT_FETCHES
36    #[must_use]
37    pub fn max_concurrent_fetches(mut self, max_concurrent_fetches: usize) -> Self {
38        self.config.max_concurrent_fetches = max_concurrent_fetches;
39        self
40    }
41
42    /// Connects to an existing provider.
43    ///
44    /// # Errors
45    ///
46    /// Returns an error if:
47    /// * The provider connection fails
48    /// * The event count is zero
49    /// * The max block range is zero
50    pub async fn connect<N: Network>(
51        self,
52        provider: impl IntoRobustProvider<N>,
53    ) -> Result<EventScanner<SyncFromLatestEvents, N>, ScannerError> {
54        if self.config.count == 0 {
55            return Err(ScannerError::InvalidEventCount);
56        }
57        if self.config.max_concurrent_fetches == 0 {
58            return Err(ScannerError::InvalidMaxConcurrentFetches);
59        }
60        self.build(provider).await
61    }
62}
63
64impl<N: Network> EventScanner<SyncFromLatestEvents, N> {
65    /// Starts the scanner.
66    ///
67    /// # Important notes
68    ///
69    /// * Register event streams via [`scanner.subscribe(filter)`][subscribe] **before** calling
70    ///   this function.
71    /// * The method returns immediately; events are delivered asynchronously.
72    ///
73    /// # Errors
74    ///
75    /// Can error out if the service fails to start.
76    ///
77    /// [subscribe]: EventScanner::subscribe
78    #[allow(clippy::missing_panics_doc)]
79    pub async fn start(self) -> Result<(), ScannerError> {
80        let count = self.config.count;
81        let provider = self.block_range_scanner.provider().clone();
82        let listeners = self.listeners.clone();
83        let max_concurrent_fetches = self.config.max_concurrent_fetches;
84
85        info!(count = count, "Starting scanner, mode: fetch latest events and switch to live");
86
87        let client = self.block_range_scanner.run()?;
88
89        // Fetch the latest block number.
90        // This is used to determine the starting point for the rewind stream and the live
91        // stream. We do this before starting the streams to avoid a race condition
92        // where the latest block changes while we're setting up the streams.
93        let latest_block = provider.get_block_number().await?;
94
95        // Setup rewind and live streams to run in parallel.
96        let rewind_stream = client.rewind(latest_block, BlockNumberOrTag::Earliest).await?;
97
98        // Start streaming...
99        tokio::spawn(async move {
100            // Since both rewind and live log consumers are ultimately streaming to the same
101            // channel, we must ensure that all latest events are streamed before
102            // consuming the live stream, otherwise the log consumers may send events out
103            // of order.
104            handle_stream(
105                rewind_stream,
106                &provider,
107                &listeners,
108                ConsumerMode::CollectLatest { count },
109                max_concurrent_fetches,
110            )
111            .await;
112
113            // We actually rely on the sync mode for the live stream, as more blocks could have been
114            // minted while the scanner was collecting the latest `count` events.
115            // Note: Sync mode will notify the client when it switches to live streaming.
116            let sync_stream =
117                match client.stream_from(latest_block + 1, self.config.block_confirmations).await {
118                    Ok(stream) => stream,
119                    Err(e) => {
120                        error!(error = %e, "Error during sync mode setup");
121                        for listener in listeners {
122                            _ = listener.sender.try_stream(e.clone()).await;
123                        }
124                        return;
125                    }
126                };
127
128            // Start the live (sync) stream.
129            handle_stream(
130                sync_stream,
131                &provider,
132                &listeners,
133                ConsumerMode::Stream,
134                max_concurrent_fetches,
135            )
136            .await;
137        });
138
139        Ok(())
140    }
141}
142
143#[cfg(test)]
144mod tests {
145    use alloy::{
146        network::Ethereum,
147        providers::{ProviderBuilder, RootProvider, mock::Asserter},
148        rpc::client::RpcClient,
149    };
150    use alloy_node_bindings::Anvil;
151
152    use crate::{
153        block_range_scanner::{DEFAULT_BLOCK_CONFIRMATIONS, DEFAULT_MAX_BLOCK_RANGE},
154        event_scanner::scanner::DEFAULT_MAX_CONCURRENT_FETCHES,
155    };
156
157    use super::*;
158
159    #[test]
160    fn builder_pattern() {
161        let builder = EventScannerBuilder::sync()
162            .from_latest(1)
163            .block_confirmations(2)
164            .max_block_range(50)
165            .max_concurrent_fetches(10);
166
167        assert_eq!(builder.config.count, 1);
168        assert_eq!(builder.config.block_confirmations, 2);
169        assert_eq!(builder.block_range_scanner.max_block_range, 50);
170        assert_eq!(builder.config.max_concurrent_fetches, 10);
171    }
172
173    #[test]
174    fn builder_with_default_values() {
175        let builder = EventScannerBuilder::sync().from_latest(1);
176
177        assert_eq!(builder.config.count, 1);
178        assert_eq!(builder.config.block_confirmations, DEFAULT_BLOCK_CONFIRMATIONS);
179        assert_eq!(builder.block_range_scanner.max_block_range, DEFAULT_MAX_BLOCK_RANGE);
180        assert_eq!(builder.config.max_concurrent_fetches, DEFAULT_MAX_CONCURRENT_FETCHES);
181    }
182
183    #[test]
184    fn builder_last_call_wins() {
185        let builder = EventScannerBuilder::sync()
186            .from_latest(1)
187            .max_block_range(25)
188            .max_block_range(55)
189            .max_block_range(105)
190            .block_confirmations(2)
191            .block_confirmations(3)
192            .max_concurrent_fetches(10)
193            .max_concurrent_fetches(20);
194
195        assert_eq!(builder.config.count, 1);
196        assert_eq!(builder.block_range_scanner.max_block_range, 105);
197        assert_eq!(builder.config.block_confirmations, 3);
198        assert_eq!(builder.config.max_concurrent_fetches, 20);
199    }
200
201    #[tokio::test]
202    async fn accepts_zero_confirmations() -> anyhow::Result<()> {
203        let anvil = Anvil::new().try_spawn().unwrap();
204        let provider = ProviderBuilder::new().connect_http(anvil.endpoint_url());
205
206        let scanner = EventScannerBuilder::sync()
207            .from_latest(1)
208            .block_confirmations(0)
209            .connect(provider)
210            .await?;
211
212        assert_eq!(scanner.config.block_confirmations, 0);
213
214        Ok(())
215    }
216
217    #[tokio::test]
218    async fn returns_error_with_zero_max_concurrent_fetches() {
219        let provider = RootProvider::<Ethereum>::new(RpcClient::mocked(Asserter::new()));
220        let result = EventScannerBuilder::sync()
221            .from_latest(1)
222            .max_concurrent_fetches(0)
223            .connect(provider)
224            .await;
225
226        assert!(matches!(result, Err(ScannerError::InvalidMaxConcurrentFetches)));
227    }
228
229    #[tokio::test]
230    async fn test_sync_from_latest_returns_error_with_zero_count() {
231        let provider = RootProvider::<Ethereum>::new(RpcClient::mocked(Asserter::new()));
232        let result = EventScannerBuilder::sync().from_latest(0).connect(provider).await;
233
234        match result {
235            Err(ScannerError::InvalidEventCount) => {}
236            _ => panic!("Expected InvalidEventCount error"),
237        }
238    }
239
240    #[tokio::test]
241    async fn test_sync_from_latest_returns_error_with_zero_max_block_range() {
242        let provider = RootProvider::<Ethereum>::new(RpcClient::mocked(Asserter::new()));
243        let result =
244            EventScannerBuilder::sync().from_latest(10).max_block_range(0).connect(provider).await;
245
246        match result {
247            Err(ScannerError::InvalidMaxBlockRange) => {}
248            _ => panic!("Expected InvalidMaxBlockRange error"),
249        }
250    }
251}