event_scanner/event_scanner/modes/sync/
from_latest.rs1use alloy::{eips::BlockNumberOrTag, network::Network};
10
11use crate::{
12 ScannerError,
13 event_scanner::{
14 EventScanner, StartProof,
15 block_range_handler::{BlockRangeHandler, LatestEventsHandler, StreamHandler},
16 builder::{EventScannerBuilder, SyncFromLatestEvents},
17 },
18 types::TryStream,
19};
20
21use robust_provider::IntoRobustProvider;
22
23impl EventScannerBuilder<SyncFromLatestEvents> {
24 #[must_use]
30 pub fn block_confirmations(mut self, confirmations: u64) -> Self {
31 self.config.block_confirmations = confirmations;
32 self
33 }
34
35 #[must_use]
50 pub fn max_concurrent_fetches(mut self, max_concurrent_fetches: usize) -> Self {
51 self.config.max_concurrent_fetches = max_concurrent_fetches;
52 self
53 }
54
55 pub async fn connect<N: Network>(
64 self,
65 provider: impl IntoRobustProvider<N>,
66 ) -> Result<EventScanner<SyncFromLatestEvents, N>, ScannerError> {
67 if self.config.count == 0 {
68 return Err(ScannerError::InvalidEventCount);
69 }
70 if self.config.max_concurrent_fetches == 0 {
71 return Err(ScannerError::InvalidMaxConcurrentFetches);
72 }
73 self.build(provider).await
74 }
75}
76
77impl<N: Network> EventScanner<SyncFromLatestEvents, N> {
78 #[allow(clippy::missing_panics_doc)]
87 pub async fn start(self) -> Result<StartProof, ScannerError> {
88 info!(
89 event_count = self.config.count,
90 block_confirmations = self.config.block_confirmations,
91 listener_count = self.listeners.len(),
92 "Starting EventScanner in SyncFromLatestEvents mode"
93 );
94
95 let count = self.config.count;
96 let provider = self.block_range_scanner.provider().clone();
97 let listeners = self.listeners.clone();
98 let broadcast_channel_capacity = self.buffer_capacity();
99
100 let latest_block = provider.get_block_number().await?;
105
106 let rewind_stream = self
108 .block_range_scanner
109 .stream_rewind(latest_block, BlockNumberOrTag::Earliest)
110 .await?;
111
112 let collection_handler = LatestEventsHandler::new(
113 self.block_range_scanner.provider().clone(),
114 listeners.clone(),
115 self.config.max_concurrent_fetches,
116 count,
117 broadcast_channel_capacity,
118 );
119 let stream_handler = StreamHandler::new(
120 self.block_range_scanner.provider().clone(),
121 listeners.clone(),
122 self.config.max_concurrent_fetches,
123 broadcast_channel_capacity,
124 );
125
126 tokio::spawn(async move {
128 debug!(
129 latest_block = latest_block,
130 count = count,
131 "Phase 1: Collecting latest events via rewind"
132 );
133
134 collection_handler.handle(rewind_stream).await;
139
140 debug!(
141 start_block = latest_block + 1,
142 "Phase 2: Catching up and transitioning to live mode"
143 );
144
145 let sync_stream = match self
149 .block_range_scanner
150 .stream_from(latest_block + 1, self.config.block_confirmations)
151 .await
152 {
153 Ok(stream) => stream,
154 Err(e) => {
155 error!("Failed to setup sync stream after collecting latest events");
156 for listener in listeners {
158 _ = listener.sender.try_stream(e.clone()).await;
159 }
160 return;
161 }
162 };
163
164 stream_handler.handle(sync_stream).await;
166
167 debug!("SyncFromLatestEvents stream ended");
168 });
169
170 Ok(StartProof::new())
171 }
172}
173
174#[cfg(test)]
175mod tests {
176 use alloy::{
177 network::Ethereum,
178 node_bindings::Anvil,
179 providers::{ProviderBuilder, RootProvider, mock::Asserter},
180 rpc::client::RpcClient,
181 };
182
183 use crate::{
184 block_range_scanner::{
185 DEFAULT_BLOCK_CONFIRMATIONS, DEFAULT_MAX_BLOCK_RANGE, DEFAULT_STREAM_BUFFER_CAPACITY,
186 },
187 event_scanner::builder::DEFAULT_MAX_CONCURRENT_FETCHES,
188 };
189
190 use super::*;
191
192 #[test]
193 fn builder_pattern() {
194 let builder = EventScannerBuilder::sync()
195 .from_latest(1)
196 .block_confirmations(2)
197 .max_block_range(50)
198 .max_concurrent_fetches(10)
199 .buffer_capacity(33);
200
201 assert_eq!(builder.config.count, 1);
202 assert_eq!(builder.config.block_confirmations, 2);
203 assert_eq!(builder.block_range_scanner.max_block_range, 50);
204 assert_eq!(builder.config.max_concurrent_fetches, 10);
205 assert_eq!(builder.block_range_scanner.buffer_capacity, 33);
206 }
207
208 #[test]
209 fn builder_with_default_values() {
210 let builder = EventScannerBuilder::sync().from_latest(1);
211
212 assert_eq!(builder.config.count, 1);
213 assert_eq!(builder.config.block_confirmations, DEFAULT_BLOCK_CONFIRMATIONS);
214 assert_eq!(builder.block_range_scanner.max_block_range, DEFAULT_MAX_BLOCK_RANGE);
215 assert_eq!(builder.config.max_concurrent_fetches, DEFAULT_MAX_CONCURRENT_FETCHES);
216 assert_eq!(builder.block_range_scanner.buffer_capacity, DEFAULT_STREAM_BUFFER_CAPACITY);
217 }
218
219 #[test]
220 fn builder_last_call_wins() {
221 let builder = EventScannerBuilder::sync()
222 .from_latest(1)
223 .max_block_range(25)
224 .max_block_range(55)
225 .max_block_range(105)
226 .block_confirmations(2)
227 .block_confirmations(3)
228 .max_concurrent_fetches(10)
229 .max_concurrent_fetches(20)
230 .buffer_capacity(20)
231 .buffer_capacity(40);
232
233 assert_eq!(builder.config.count, 1);
234 assert_eq!(builder.block_range_scanner.max_block_range, 105);
235 assert_eq!(builder.config.block_confirmations, 3);
236 assert_eq!(builder.config.max_concurrent_fetches, 20);
237 assert_eq!(builder.block_range_scanner.buffer_capacity, 40);
238 }
239
240 #[tokio::test]
241 async fn accepts_zero_confirmations() -> anyhow::Result<()> {
242 let anvil = Anvil::new().try_spawn().unwrap();
243 let provider = ProviderBuilder::new().connect_http(anvil.endpoint_url());
244
245 let scanner = EventScannerBuilder::sync()
246 .from_latest(1)
247 .block_confirmations(0)
248 .connect(provider)
249 .await?;
250
251 assert_eq!(scanner.config.block_confirmations, 0);
252
253 Ok(())
254 }
255
256 #[tokio::test]
257 async fn returns_error_with_zero_max_concurrent_fetches() {
258 let provider = RootProvider::<Ethereum>::new(RpcClient::mocked(Asserter::new()));
259 let result = EventScannerBuilder::sync()
260 .from_latest(1)
261 .max_concurrent_fetches(0)
262 .connect(provider)
263 .await;
264
265 assert!(matches!(result, Err(ScannerError::InvalidMaxConcurrentFetches)));
266 }
267
268 #[tokio::test]
269 async fn test_sync_from_latest_returns_error_with_zero_count() {
270 let provider = RootProvider::<Ethereum>::new(RpcClient::mocked(Asserter::new()));
271 let result = EventScannerBuilder::sync().from_latest(0).connect(provider).await;
272
273 match result {
274 Err(ScannerError::InvalidEventCount) => {}
275 _ => panic!("Expected InvalidEventCount error"),
276 }
277 }
278
279 #[tokio::test]
280 async fn test_sync_from_latest_returns_error_with_zero_max_block_range() {
281 let provider = RootProvider::<Ethereum>::new(RpcClient::mocked(Asserter::new()));
282 let result =
283 EventScannerBuilder::sync().from_latest(10).max_block_range(0).connect(provider).await;
284
285 match result {
286 Err(ScannerError::InvalidMaxBlockRange) => {}
287 _ => panic!("Expected InvalidMaxBlockRange error"),
288 }
289 }
290
291 #[tokio::test]
292 async fn returns_error_with_zero_buffer_capacity() {
293 let provider = RootProvider::<Ethereum>::new(RpcClient::mocked(Asserter::new()));
294 let result =
295 EventScannerBuilder::sync().from_latest(10).buffer_capacity(0).connect(provider).await;
296
297 assert!(matches!(result, Err(ScannerError::InvalidBufferCapacity)));
298 }
299}