event_scanner/event_scanner/modes/sync/
from_block.rs1use alloy::{eips::BlockId, network::Network};
10
11use crate::{
12 ScannerError,
13 event_scanner::{
14 EventScanner, StartProof,
15 block_range_handler::{BlockRangeHandler, StreamHandler},
16 builder::{EventScannerBuilder, SyncFromBlock},
17 },
18};
19use robust_provider::IntoRobustProvider;
20
21impl EventScannerBuilder<SyncFromBlock> {
22 #[must_use]
28 pub fn block_confirmations(mut self, confirmations: u64) -> Self {
29 self.config.block_confirmations = confirmations;
30 self
31 }
32
33 #[must_use]
48 pub fn max_concurrent_fetches(mut self, max_concurrent_fetches: usize) -> Self {
49 self.config.max_concurrent_fetches = max_concurrent_fetches;
50 self
51 }
52
53 pub async fn connect<N: Network>(
61 self,
62 provider: impl IntoRobustProvider<N>,
63 ) -> Result<EventScanner<SyncFromBlock, N>, ScannerError> {
64 if self.config.max_concurrent_fetches == 0 {
65 return Err(ScannerError::InvalidMaxConcurrentFetches);
66 }
67
68 let scanner = self.build(provider).await?;
69
70 let provider = scanner.block_range_scanner.provider();
71
72 if let BlockId::Hash(from_hash) = scanner.config.from_block {
73 provider.get_block_by_hash(from_hash.into()).await?;
74 }
75
76 Ok(scanner)
77 }
78}
79
80impl<N: Network> EventScanner<SyncFromBlock, N> {
81 pub async fn start(self) -> Result<StartProof, ScannerError> {
91 info!(
92 from_block = ?self.config.from_block,
93 block_confirmations = self.config.block_confirmations,
94 listener_count = self.listeners.len(),
95 "Starting EventScanner in SyncFromBlock mode"
96 );
97
98 let stream = self
99 .block_range_scanner
100 .stream_from(self.config.from_block, self.config.block_confirmations)
101 .await?;
102
103 let buffer_capacity = self.buffer_capacity();
104
105 let handler = StreamHandler::new(
106 self.block_range_scanner.provider().clone(),
107 self.listeners,
108 self.config.max_concurrent_fetches,
109 buffer_capacity,
110 );
111
112 tokio::spawn(async move {
113 handler.handle(stream).await;
114 });
115
116 Ok(StartProof::new())
117 }
118}
119
120#[cfg(test)]
121mod tests {
122 use alloy::{
123 eips::BlockNumberOrTag,
124 network::Ethereum,
125 node_bindings::Anvil,
126 primitives::keccak256,
127 providers::{Provider, ProviderBuilder, RootProvider, ext::AnvilApi, mock::Asserter},
128 rpc::client::RpcClient,
129 };
130
131 use crate::{
132 block_range_scanner::{
133 DEFAULT_BLOCK_CONFIRMATIONS, DEFAULT_MAX_BLOCK_RANGE, DEFAULT_STREAM_BUFFER_CAPACITY,
134 },
135 event_scanner::builder::DEFAULT_MAX_CONCURRENT_FETCHES,
136 };
137
138 use super::*;
139
140 #[test]
141 fn sync_scanner_builder_pattern() {
142 let builder = EventScannerBuilder::sync()
143 .from_block(50)
144 .max_block_range(25)
145 .block_confirmations(5)
146 .max_concurrent_fetches(10)
147 .buffer_capacity(33);
148
149 assert_eq!(builder.block_range_scanner.max_block_range, 25);
150 assert_eq!(builder.config.block_confirmations, 5);
151 assert_eq!(builder.config.max_concurrent_fetches, 10);
152 assert_eq!(builder.config.from_block, BlockNumberOrTag::Number(50).into());
153 assert_eq!(builder.block_range_scanner.buffer_capacity, 33);
154 }
155
156 #[test]
157 fn sync_scanner_builder_default_values() {
158 let builder = EventScannerBuilder::sync().from_block(BlockNumberOrTag::Earliest);
159
160 assert_eq!(builder.config.from_block, BlockNumberOrTag::Earliest.into());
161 assert_eq!(builder.config.block_confirmations, DEFAULT_BLOCK_CONFIRMATIONS);
162 assert_eq!(builder.config.max_concurrent_fetches, DEFAULT_MAX_CONCURRENT_FETCHES);
163 assert_eq!(builder.block_range_scanner.max_block_range, DEFAULT_MAX_BLOCK_RANGE);
164 assert_eq!(builder.block_range_scanner.buffer_capacity, DEFAULT_STREAM_BUFFER_CAPACITY);
165 }
166
167 #[tokio::test]
168 async fn accepts_zero_confirmations() -> anyhow::Result<()> {
169 let anvil = Anvil::new().try_spawn().unwrap();
170 let provider = ProviderBuilder::new().connect_http(anvil.endpoint_url());
171
172 let scanner = EventScannerBuilder::sync()
173 .from_block(BlockNumberOrTag::Earliest)
174 .block_confirmations(0)
175 .connect(provider)
176 .await?;
177
178 assert_eq!(scanner.config.block_confirmations, 0);
179
180 Ok(())
181 }
182
183 #[test]
184 fn sync_scanner_builder_last_call_wins() {
185 let builder = EventScannerBuilder::sync()
186 .from_block(2)
187 .max_block_range(25)
188 .max_block_range(55)
189 .max_block_range(105)
190 .block_confirmations(5)
191 .block_confirmations(7)
192 .max_concurrent_fetches(10)
193 .max_concurrent_fetches(20)
194 .buffer_capacity(20)
195 .buffer_capacity(40);
196
197 assert_eq!(builder.block_range_scanner.max_block_range, 105);
198 assert_eq!(builder.config.from_block, BlockNumberOrTag::Number(2).into());
199 assert_eq!(builder.config.block_confirmations, 7);
200 assert_eq!(builder.config.max_concurrent_fetches, 20);
201 assert_eq!(builder.block_range_scanner.buffer_capacity, 40);
202 }
203
204 #[tokio::test]
205 async fn returns_error_with_zero_max_concurrent_fetches() {
206 let provider = RootProvider::<Ethereum>::new(RpcClient::mocked(Asserter::new()));
207 let result = EventScannerBuilder::sync()
208 .from_block(0)
209 .max_concurrent_fetches(0)
210 .connect(provider)
211 .await;
212
213 assert!(matches!(result, Err(ScannerError::InvalidMaxConcurrentFetches)));
214 }
215
216 #[tokio::test]
217 async fn test_sync_from_block_returns_error_with_zero_max_block_range() {
218 let provider = RootProvider::<Ethereum>::new(RpcClient::mocked(Asserter::new()));
219 let result =
220 EventScannerBuilder::sync().from_block(100).max_block_range(0).connect(provider).await;
221
222 match result {
223 Err(ScannerError::InvalidMaxBlockRange) => {}
224 _ => panic!("Expected InvalidMaxBlockRange error"),
225 }
226 }
227
228 #[tokio::test]
229 async fn returns_error_with_zero_buffer_capacity() {
230 let provider = RootProvider::<Ethereum>::new(RpcClient::mocked(Asserter::new()));
231 let result =
232 EventScannerBuilder::sync().from_block(100).buffer_capacity(0).connect(provider).await;
233
234 assert!(matches!(result, Err(ScannerError::InvalidBufferCapacity)));
235 }
236
237 #[tokio::test]
238 async fn test_sync_from_block_scanner_with_valid_from_hash() {
239 let anvil = Anvil::new().try_spawn().unwrap();
240 let provider = ProviderBuilder::new().connect_http(anvil.endpoint_url());
241
242 provider.anvil_mine(Some(5), None).await.unwrap();
243
244 let block_5_hash =
245 provider.get_block_by_number(5.into()).await.unwrap().unwrap().header.hash;
246
247 let result =
248 EventScannerBuilder::sync().from_block(block_5_hash).connect(provider.clone()).await;
249
250 assert!(result.is_ok());
251 }
252
253 #[tokio::test]
254 async fn test_sync_from_block_scanner_with_invalid_from_hash() {
255 let anvil = Anvil::new().try_spawn().unwrap();
256 let provider = ProviderBuilder::new().connect_http(anvil.endpoint_url());
257
258 let random_hash = keccak256("Invalid Hash");
259 let result = EventScannerBuilder::sync().from_block(random_hash).connect(provider).await;
260
261 assert!(matches!(result, Err(ScannerError::BlockNotFound)));
262 }
263}