event_scanner/event_scanner/scanner/sync/
from_latest.rs1use alloy::{eips::BlockNumberOrTag, network::Network};
2
3use tracing::{error, info};
4
5use crate::{
6 EventScannerBuilder, ScannerError,
7 event_scanner::{
8 EventScanner,
9 scanner::{
10 SyncFromLatestEvents,
11 common::{ConsumerMode, handle_stream},
12 },
13 },
14 robust_provider::IntoRobustProvider,
15 types::TryStream,
16};
17
18impl EventScannerBuilder<SyncFromLatestEvents> {
19 #[must_use]
20 pub fn block_confirmations(mut self, confirmations: u64) -> Self {
21 self.config.block_confirmations = confirmations;
22 self
23 }
24
25 #[must_use]
37 pub fn max_concurrent_fetches(mut self, max_concurrent_fetches: usize) -> Self {
38 self.config.max_concurrent_fetches = max_concurrent_fetches;
39 self
40 }
41
42 pub async fn connect<N: Network>(
51 self,
52 provider: impl IntoRobustProvider<N>,
53 ) -> Result<EventScanner<SyncFromLatestEvents, N>, ScannerError> {
54 if self.config.count == 0 {
55 return Err(ScannerError::InvalidEventCount);
56 }
57 if self.config.max_concurrent_fetches == 0 {
58 return Err(ScannerError::InvalidMaxConcurrentFetches);
59 }
60 self.build(provider).await
61 }
62}
63
64impl<N: Network> EventScanner<SyncFromLatestEvents, N> {
65 #[allow(clippy::missing_panics_doc)]
79 pub async fn start(self) -> Result<(), ScannerError> {
80 let count = self.config.count;
81 let provider = self.block_range_scanner.provider().clone();
82 let listeners = self.listeners.clone();
83 let max_concurrent_fetches = self.config.max_concurrent_fetches;
84
85 info!(count = count, "Starting scanner, mode: fetch latest events and switch to live");
86
87 let client = self.block_range_scanner.run()?;
88
89 let latest_block = provider.get_block_number().await?;
94
95 let rewind_stream = client.rewind(latest_block, BlockNumberOrTag::Earliest).await?;
97
98 tokio::spawn(async move {
100 handle_stream(
105 rewind_stream,
106 &provider,
107 &listeners,
108 ConsumerMode::CollectLatest { count },
109 max_concurrent_fetches,
110 )
111 .await;
112
113 let sync_stream =
117 match client.stream_from(latest_block + 1, self.config.block_confirmations).await {
118 Ok(stream) => stream,
119 Err(e) => {
120 error!(error = %e, "Error during sync mode setup");
121 for listener in listeners {
122 _ = listener.sender.try_stream(e.clone()).await;
123 }
124 return;
125 }
126 };
127
128 handle_stream(
130 sync_stream,
131 &provider,
132 &listeners,
133 ConsumerMode::Stream,
134 max_concurrent_fetches,
135 )
136 .await;
137 });
138
139 Ok(())
140 }
141}
142
143#[cfg(test)]
144mod tests {
145 use alloy::{
146 network::Ethereum,
147 providers::{ProviderBuilder, RootProvider, mock::Asserter},
148 rpc::client::RpcClient,
149 };
150 use alloy_node_bindings::Anvil;
151
152 use crate::{
153 block_range_scanner::{DEFAULT_BLOCK_CONFIRMATIONS, DEFAULT_MAX_BLOCK_RANGE},
154 event_scanner::scanner::DEFAULT_MAX_CONCURRENT_FETCHES,
155 };
156
157 use super::*;
158
159 #[test]
160 fn builder_pattern() {
161 let builder = EventScannerBuilder::sync()
162 .from_latest(1)
163 .block_confirmations(2)
164 .max_block_range(50)
165 .max_concurrent_fetches(10);
166
167 assert_eq!(builder.config.count, 1);
168 assert_eq!(builder.config.block_confirmations, 2);
169 assert_eq!(builder.block_range_scanner.max_block_range, 50);
170 assert_eq!(builder.config.max_concurrent_fetches, 10);
171 }
172
173 #[test]
174 fn builder_with_default_values() {
175 let builder = EventScannerBuilder::sync().from_latest(1);
176
177 assert_eq!(builder.config.count, 1);
178 assert_eq!(builder.config.block_confirmations, DEFAULT_BLOCK_CONFIRMATIONS);
179 assert_eq!(builder.block_range_scanner.max_block_range, DEFAULT_MAX_BLOCK_RANGE);
180 assert_eq!(builder.config.max_concurrent_fetches, DEFAULT_MAX_CONCURRENT_FETCHES);
181 }
182
183 #[test]
184 fn builder_last_call_wins() {
185 let builder = EventScannerBuilder::sync()
186 .from_latest(1)
187 .max_block_range(25)
188 .max_block_range(55)
189 .max_block_range(105)
190 .block_confirmations(2)
191 .block_confirmations(3)
192 .max_concurrent_fetches(10)
193 .max_concurrent_fetches(20);
194
195 assert_eq!(builder.config.count, 1);
196 assert_eq!(builder.block_range_scanner.max_block_range, 105);
197 assert_eq!(builder.config.block_confirmations, 3);
198 assert_eq!(builder.config.max_concurrent_fetches, 20);
199 }
200
201 #[tokio::test]
202 async fn accepts_zero_confirmations() -> anyhow::Result<()> {
203 let anvil = Anvil::new().try_spawn().unwrap();
204 let provider = ProviderBuilder::new().connect_http(anvil.endpoint_url());
205
206 let scanner = EventScannerBuilder::sync()
207 .from_latest(1)
208 .block_confirmations(0)
209 .connect(provider)
210 .await?;
211
212 assert_eq!(scanner.config.block_confirmations, 0);
213
214 Ok(())
215 }
216
217 #[tokio::test]
218 async fn returns_error_with_zero_max_concurrent_fetches() {
219 let provider = RootProvider::<Ethereum>::new(RpcClient::mocked(Asserter::new()));
220 let result = EventScannerBuilder::sync()
221 .from_latest(1)
222 .max_concurrent_fetches(0)
223 .connect(provider)
224 .await;
225
226 assert!(matches!(result, Err(ScannerError::InvalidMaxConcurrentFetches)));
227 }
228
229 #[tokio::test]
230 async fn test_sync_from_latest_returns_error_with_zero_count() {
231 let provider = RootProvider::<Ethereum>::new(RpcClient::mocked(Asserter::new()));
232 let result = EventScannerBuilder::sync().from_latest(0).connect(provider).await;
233
234 match result {
235 Err(ScannerError::InvalidEventCount) => {}
236 _ => panic!("Expected InvalidEventCount error"),
237 }
238 }
239
240 #[tokio::test]
241 async fn test_sync_from_latest_returns_error_with_zero_max_block_range() {
242 let provider = RootProvider::<Ethereum>::new(RpcClient::mocked(Asserter::new()));
243 let result =
244 EventScannerBuilder::sync().from_latest(10).max_block_range(0).connect(provider).await;
245
246 match result {
247 Err(ScannerError::InvalidMaxBlockRange) => {}
248 _ => panic!("Expected InvalidMaxBlockRange error"),
249 }
250 }
251}