Skip to main content

event_scanner/event_scanner/modes/sync/
from_latest.rs

1//! Collects recent events, then transitions to live streaming.
2//!
3//! Collects a specified number of the most recent events, then automatically continues
4//! with live streaming. See [`EventScannerBuilder::sync().from_latest()`][from_latest]
5//! for usage details.
6//!
7//! [from_latest]: crate::EventScannerBuilder::from_latest
8
9use alloy::{eips::BlockNumberOrTag, network::Network};
10
11use crate::{
12    ScannerError,
13    event_scanner::{
14        EventScanner, StartProof,
15        block_range_handler::{BlockRangeHandler, LatestEventsHandler, StreamHandler},
16        builder::{EventScannerBuilder, SyncFromLatestEvents},
17    },
18    types::TryStream,
19};
20
21use robust_provider::IntoRobustProvider;
22
23impl EventScannerBuilder<SyncFromLatestEvents> {
24    /// Sets the number of confirmations required before a block is considered stable enough to
25    /// scan in the live phase.
26    ///
27    /// This affects the post-sync live streaming phase; higher values reduce reorg risk at the
28    /// cost of increased event delivery latency.
29    #[must_use]
30    pub fn block_confirmations(mut self, confirmations: u64) -> Self {
31        self.config.block_confirmations = confirmations;
32        self
33    }
34
35    /// Sets the maximum number of block-range fetches to process concurrently when
36    /// fetching the latest events before switching to live streaming.
37    ///
38    /// Increasing this value can improve catch-up throughput by issuing multiple
39    /// RPC requests concurrently, at the cost of additional load on the provider.
40    ///
41    /// **Note**: This limit applies **per listener**. With N listeners and a limit of M,
42    /// up to N × M concurrent RPC requests may be in-flight simultaneously.
43    ///
44    /// Must be greater than 0.
45    ///
46    /// Defaults to [`DEFAULT_MAX_CONCURRENT_FETCHES`][default].
47    ///
48    /// [default]: crate::event_scanner::builder::DEFAULT_MAX_CONCURRENT_FETCHES
49    #[must_use]
50    pub fn max_concurrent_fetches(mut self, max_concurrent_fetches: usize) -> Self {
51        self.config.max_concurrent_fetches = max_concurrent_fetches;
52        self
53    }
54
55    /// Connects to an existing provider.
56    ///
57    /// # Errors
58    ///
59    /// Returns an error if:
60    /// * The provider connection fails
61    /// * The event count is zero
62    /// * The max block range is zero
63    pub async fn connect<N: Network>(
64        self,
65        provider: impl IntoRobustProvider<N>,
66    ) -> Result<EventScanner<SyncFromLatestEvents, N>, ScannerError> {
67        if self.config.count == 0 {
68            return Err(ScannerError::InvalidEventCount);
69        }
70        if self.config.max_concurrent_fetches == 0 {
71            return Err(ScannerError::InvalidMaxConcurrentFetches);
72        }
73        self.build(provider).await
74    }
75}
76
77impl<N: Network> EventScanner<SyncFromLatestEvents, N> {
78    /// Starts the scanner in [`SyncFromLatestEvents`] mode.
79    ///
80    /// See [`EventScanner`] for general startup notes.
81    ///
82    /// # Errors
83    ///
84    /// * [`ScannerError::Timeout`] - if an RPC call required for startup times out.
85    /// * [`ScannerError::RpcError`] - if an RPC call required for startup fails.
86    #[allow(clippy::missing_panics_doc)]
87    pub async fn start(self) -> Result<StartProof, ScannerError> {
88        info!(
89            event_count = self.config.count,
90            block_confirmations = self.config.block_confirmations,
91            listener_count = self.listeners.len(),
92            "Starting EventScanner in SyncFromLatestEvents mode"
93        );
94
95        let count = self.config.count;
96        let provider = self.block_range_scanner.provider().clone();
97        let listeners = self.listeners.clone();
98        let broadcast_channel_capacity = self.buffer_capacity();
99
100        // Fetch the latest block number.
101        // This is used to determine the starting point for the rewind stream and the live
102        // stream. We do this before starting the streams to avoid a race condition
103        // where the latest block changes while we're setting up the streams.
104        let latest_block = provider.get_block_number().await?;
105
106        // Setup rewind and live streams to run in parallel.
107        let rewind_stream = self
108            .block_range_scanner
109            .stream_rewind(latest_block, BlockNumberOrTag::Earliest)
110            .await?;
111
112        let collection_handler = LatestEventsHandler::new(
113            self.block_range_scanner.provider().clone(),
114            listeners.clone(),
115            self.config.max_concurrent_fetches,
116            count,
117            broadcast_channel_capacity,
118        );
119        let stream_handler = StreamHandler::new(
120            self.block_range_scanner.provider().clone(),
121            listeners.clone(),
122            self.config.max_concurrent_fetches,
123            broadcast_channel_capacity,
124        );
125
126        // Start streaming...
127        tokio::spawn(async move {
128            debug!(
129                latest_block = latest_block,
130                count = count,
131                "Phase 1: Collecting latest events via rewind"
132            );
133
134            // Since both rewind and live log consumers are ultimately streaming to the same
135            // channel, we must ensure that all latest events are streamed before
136            // consuming the live stream, otherwise the log consumers may send events out
137            // of order.
138            collection_handler.handle(rewind_stream).await;
139
140            debug!(
141                start_block = latest_block + 1,
142                "Phase 2: Catching up and transitioning to live mode"
143            );
144
145            // We actually rely on the sync mode for the live stream, as more blocks could have been
146            // minted while the scanner was collecting the latest `count` events.
147            // Note: Sync mode will notify the client when it switches to live streaming.
148            let sync_stream = match self
149                .block_range_scanner
150                .stream_from(latest_block + 1, self.config.block_confirmations)
151                .await
152            {
153                Ok(stream) => stream,
154                Err(e) => {
155                    error!("Failed to setup sync stream after collecting latest events");
156                    // notify all active listeners about the error before dropping the stream
157                    for listener in listeners {
158                        _ = listener.sender.try_stream(e.clone()).await;
159                    }
160                    return;
161                }
162            };
163
164            // Start the live (sync) stream.
165            stream_handler.handle(sync_stream).await;
166
167            debug!("SyncFromLatestEvents stream ended");
168        });
169
170        Ok(StartProof::new())
171    }
172}
173
174#[cfg(test)]
175mod tests {
176    use alloy::{
177        network::Ethereum,
178        node_bindings::Anvil,
179        providers::{ProviderBuilder, RootProvider, mock::Asserter},
180        rpc::client::RpcClient,
181    };
182
183    use crate::{
184        block_range_scanner::{
185            DEFAULT_BLOCK_CONFIRMATIONS, DEFAULT_MAX_BLOCK_RANGE, DEFAULT_STREAM_BUFFER_CAPACITY,
186        },
187        event_scanner::builder::DEFAULT_MAX_CONCURRENT_FETCHES,
188    };
189
190    use super::*;
191
192    #[test]
193    fn builder_pattern() {
194        let builder = EventScannerBuilder::sync()
195            .from_latest(1)
196            .block_confirmations(2)
197            .max_block_range(50)
198            .max_concurrent_fetches(10)
199            .buffer_capacity(33);
200
201        assert_eq!(builder.config.count, 1);
202        assert_eq!(builder.config.block_confirmations, 2);
203        assert_eq!(builder.block_range_scanner.max_block_range, 50);
204        assert_eq!(builder.config.max_concurrent_fetches, 10);
205        assert_eq!(builder.block_range_scanner.buffer_capacity, 33);
206    }
207
208    #[test]
209    fn builder_with_default_values() {
210        let builder = EventScannerBuilder::sync().from_latest(1);
211
212        assert_eq!(builder.config.count, 1);
213        assert_eq!(builder.config.block_confirmations, DEFAULT_BLOCK_CONFIRMATIONS);
214        assert_eq!(builder.block_range_scanner.max_block_range, DEFAULT_MAX_BLOCK_RANGE);
215        assert_eq!(builder.config.max_concurrent_fetches, DEFAULT_MAX_CONCURRENT_FETCHES);
216        assert_eq!(builder.block_range_scanner.buffer_capacity, DEFAULT_STREAM_BUFFER_CAPACITY);
217    }
218
219    #[test]
220    fn builder_last_call_wins() {
221        let builder = EventScannerBuilder::sync()
222            .from_latest(1)
223            .max_block_range(25)
224            .max_block_range(55)
225            .max_block_range(105)
226            .block_confirmations(2)
227            .block_confirmations(3)
228            .max_concurrent_fetches(10)
229            .max_concurrent_fetches(20)
230            .buffer_capacity(20)
231            .buffer_capacity(40);
232
233        assert_eq!(builder.config.count, 1);
234        assert_eq!(builder.block_range_scanner.max_block_range, 105);
235        assert_eq!(builder.config.block_confirmations, 3);
236        assert_eq!(builder.config.max_concurrent_fetches, 20);
237        assert_eq!(builder.block_range_scanner.buffer_capacity, 40);
238    }
239
240    #[tokio::test]
241    async fn accepts_zero_confirmations() -> anyhow::Result<()> {
242        let anvil = Anvil::new().try_spawn().unwrap();
243        let provider = ProviderBuilder::new().connect_http(anvil.endpoint_url());
244
245        let scanner = EventScannerBuilder::sync()
246            .from_latest(1)
247            .block_confirmations(0)
248            .connect(provider)
249            .await?;
250
251        assert_eq!(scanner.config.block_confirmations, 0);
252
253        Ok(())
254    }
255
256    #[tokio::test]
257    async fn returns_error_with_zero_max_concurrent_fetches() {
258        let provider = RootProvider::<Ethereum>::new(RpcClient::mocked(Asserter::new()));
259        let result = EventScannerBuilder::sync()
260            .from_latest(1)
261            .max_concurrent_fetches(0)
262            .connect(provider)
263            .await;
264
265        assert!(matches!(result, Err(ScannerError::InvalidMaxConcurrentFetches)));
266    }
267
268    #[tokio::test]
269    async fn test_sync_from_latest_returns_error_with_zero_count() {
270        let provider = RootProvider::<Ethereum>::new(RpcClient::mocked(Asserter::new()));
271        let result = EventScannerBuilder::sync().from_latest(0).connect(provider).await;
272
273        match result {
274            Err(ScannerError::InvalidEventCount) => {}
275            _ => panic!("Expected InvalidEventCount error"),
276        }
277    }
278
279    #[tokio::test]
280    async fn test_sync_from_latest_returns_error_with_zero_max_block_range() {
281        let provider = RootProvider::<Ethereum>::new(RpcClient::mocked(Asserter::new()));
282        let result =
283            EventScannerBuilder::sync().from_latest(10).max_block_range(0).connect(provider).await;
284
285        match result {
286            Err(ScannerError::InvalidMaxBlockRange) => {}
287            _ => panic!("Expected InvalidMaxBlockRange error"),
288        }
289    }
290
291    #[tokio::test]
292    async fn returns_error_with_zero_buffer_capacity() {
293        let provider = RootProvider::<Ethereum>::new(RpcClient::mocked(Asserter::new()));
294        let result =
295            EventScannerBuilder::sync().from_latest(10).buffer_capacity(0).connect(provider).await;
296
297        assert!(matches!(result, Err(ScannerError::InvalidBufferCapacity)));
298    }
299}