1use alloy::network::Network;
2use alloy::primitives::Address;
3use alloy::providers::Provider;
4use alloy::rpc::types::{Filter, Log};
5use alloy::sol_types::SolEvent;
6use alloy::transports::Transport;
7use anyhow::anyhow;
8use anyhow::Result;
9use futures::StreamExt;
10use indicatif::ProgressBar;
11use log::info;
12use rand::Rng;
13use std::collections::{BTreeMap, HashMap};
14use std::sync::Arc;
15use tokio::sync::{Mutex, Semaphore};
16use tokio::time::{interval, Duration};
17
18use crate::events::*;
19use crate::pools::pool_builder;
20use crate::pools::pool_structures::balancer_v2_structure::process_balance_data;
21use crate::pools::pool_structures::v2_structure::process_sync_data;
22use crate::pools::pool_structures::v3_structure::process_tick_data;
23use crate::pools::PoolFetcher;
24use crate::util::create_progress_bar;
25use crate::{Chain, Pool, PoolInfo, PoolType};
26
27const MAX_RETRIES: u32 = 5;
29const INITIAL_BACKOFF: u64 = 1000; #[derive(Debug)]
33struct EventConfig {
34 events: &'static [&'static str],
35 step_size: u64,
36 description: &'static str,
37 requires_initial_sync: bool,
38}
39
40pub struct Rpc;
41impl Rpc {
42 pub async fn fetch_pool_addrs<P, T, N>(
44 start_block: u64,
45 end_block: u64,
46 provider: Arc<P>,
47 fetcher: Arc<dyn PoolFetcher>,
48 chain: Chain,
49 rate_limit: u64,
50 ) -> Result<Vec<Address>>
51 where
52 P: Provider<T, N> + 'static,
53 T: Transport + Clone + 'static,
54 N: Network,
55 {
56 let filter = Filter::new()
58 .address(fetcher.factory_address(chain))
59 .event(fetcher.pair_created_signature());
60
61 let step_size: u64 = 10000;
62 let num_tasks = end_block / step_size;
63 let pb_info = format!(
64 "{} Address Sync. Block range {}-{}",
65 fetcher.pool_type(),
66 start_block,
67 end_block
68 );
69 let progress_bar = Arc::new(create_progress_bar(num_tasks, pb_info));
70
71 let logs = Rpc::fetch_event_logs(
73 start_block,
74 end_block,
75 10000,
76 provider,
77 rate_limit,
78 progress_bar,
79 filter,
80 )
81 .await?;
82
83 let addresses: Vec<Address> = logs
85 .iter()
86 .map(|log| fetcher.log_to_address(&log.inner))
87 .collect();
88 anyhow::Ok(addresses)
89 }
90
91 pub async fn populate_pools<P, T, N>(
92 pool_addrs: Vec<Address>,
93 provider: Arc<P>,
94 pool: PoolType,
95 fetcher: Arc<dyn PoolFetcher>,
96 rate_limit: u64,
97 chain: Chain
98 ) -> Result<Vec<Pool>>
99 where
100 P: Provider<T, N> + 'static,
101 T: Transport + Clone + 'static,
102 N: Network,
103 {
104 let batch_size = if pool.is_balancer() { 10 } else { 50 };
106
107 let total_tasks = (pool_addrs.len() + batch_size - 1) / batch_size; let progress_bar = create_progress_bar(total_tasks as u64, format!("{} data sync", pool));
110 let semaphore = Arc::new(Semaphore::new(rate_limit as usize));
111 let interval = Arc::new(tokio::sync::Mutex::new(interval(Duration::from_secs_f64(
112 1.0 / rate_limit as f64,
113 ))));
114
115 let addr_chunks: Vec<Vec<Address>> = pool_addrs
117 .chunks(batch_size)
118 .map(|chunk| chunk.to_vec())
119 .collect();
120
121 let mut stream = futures::stream::iter(addr_chunks.into_iter().map(|chunk| {
122 let provider = provider.clone();
123 let sem = semaphore.clone();
124 let pb = progress_bar.clone();
125 let fetcher = fetcher.clone();
126 let interval = interval.clone();
127 let data = fetcher.get_pool_repr();
128
129 async move {
130 let _permit = sem.acquire().await.unwrap();
131 interval.lock().await.tick().await;
132 let mut retry_count = 0;
133 let mut backoff = 1000; loop {
135 match pool_builder::build_pools(
137 &provider,
138 chunk.clone(),
139 pool,
140 data.clone(),
141 chain
142 )
143 .await
144 {
145 Ok(populated_pools) if !populated_pools.is_empty() => {
146 pb.inc(1);
147 drop(provider);
148 return anyhow::Ok::<Vec<Pool>>(populated_pools);
149 }
150 Err(e) => {
151 if retry_count >= MAX_RETRIES {
152 info!("Failed to populate pools data: {}", e);
153 drop(provider);
154 return Ok(Vec::new());
155 }
156 let jitter = rand::thread_rng().gen_range(0..=100);
157 let sleep_duration = Duration::from_millis(backoff + jitter);
158 tokio::time::sleep(sleep_duration).await;
159 retry_count += 1;
160 backoff *= 2; }
162 _ => continue,
163 }
164 }
165 }
166 }))
167 .buffer_unordered(rate_limit as usize);
168
169 let mut all_pools = Vec::new();
170
171 while let Some(pool_res) = stream.next().await {
172 match pool_res {
173 Ok(pool) => all_pools.extend(pool),
174 Err(e) => return Err(e),
175 }
176 }
177
178 Ok(all_pools)
179 }
180
181 pub async fn populate_liquidity<P, T, N>(
182 start_block: u64,
183 end_block: u64,
184 pools: &mut [Pool],
185 provider: Arc<P>,
186 pool_type: PoolType,
187 rate_limit: u64,
188 is_initial_sync: bool,
189 ) -> anyhow::Result<()>
190 where
191 P: Provider<T, N> + Sync + 'static,
192 T: Transport + Sync + Clone,
193 N: Network,
194 {
195 if pools.is_empty() {
196 return anyhow::Ok(());
197 }
198
199 let address_to_index: HashMap<Address, usize> = pools
200 .iter()
201 .enumerate()
202 .map(|(i, pool)| (pool.address(), i))
203 .collect();
204
205 let batch_size = 1_000_000;
206 let mut current_block = start_block;
207
208 let config = Rpc::get_event_config(pool_type, is_initial_sync);
210 if is_initial_sync && config.requires_initial_sync {
211 return anyhow::Ok(());
212 }
213
214 let num_tasks = (end_block - start_block) / config.step_size;
216 let pb_info = format!(
217 "{} {}. Block range {}-{}",
218 pool_type, config.description, current_block, end_block
219 );
220 let progress_bar = Arc::new(create_progress_bar(num_tasks, pb_info));
221
222 while current_block <= end_block {
224 let batch_end = (current_block + batch_size).min(end_block);
225
226 let logs = Rpc::fetch_logs_for_config(
227 &config,
228 current_block,
229 batch_end,
230 provider.clone(),
231 progress_bar.clone(),
232 rate_limit,
233 )
234 .await?;
235
236 let processing_pb_info = format!(
238 "Processing logs batch for blocks {}-{}",
239 start_block, end_block
240 );
241 let processing_progress_bar =
242 create_progress_bar(logs.len().try_into().unwrap(), processing_pb_info);
243
244 let mut ordered_logs: BTreeMap<u64, Vec<Log>> = BTreeMap::new();
246 for log in logs {
247 if let Some(block_number) = log.block_number {
248 ordered_logs.entry(block_number).or_default().push(log);
249 }
250 }
251
252 for (_, log_group) in ordered_logs {
254 for log in log_group {
255 let address = log.address();
256 if let Some(&index) = address_to_index.get(&address) {
257 if let Some(pool) = pools.get_mut(index) {
258 if pool_type.is_v3() {
259 process_tick_data(
260 pool.get_v3_mut().unwrap(),
261 log,
262 pool_type,
263 is_initial_sync,
264 );
265 } else if pool_type.is_balancer() {
266 process_balance_data(pool.get_balancer_mut().unwrap(), log);
267 } else {
268 process_sync_data(pool.get_v2_mut().unwrap(), log, pool_type);
269 }
270 }
271 }
272 processing_progress_bar.inc(1);
273 }
274 }
275
276 processing_progress_bar.finish_and_clear();
277 current_block = batch_end + 1;
278 }
279 anyhow::Ok(())
280 }
281
282 pub async fn fetch_event_logs<T, N, P>(
283 start_block: u64,
284 end_block: u64,
285 step_size: u64,
286 provider: Arc<P>,
287 rate_limit: u64,
288 progress_bar: Arc<ProgressBar>,
289 filter: Filter,
290 ) -> anyhow::Result<Vec<Log>>
291 where
292 T: Transport + Clone,
293 N: Network,
294 P: Provider<T, N> + 'static,
295 {
296 let block_range = Rpc::get_block_range(step_size, start_block, end_block);
298
299 let semaphore = Arc::new(Semaphore::new(rate_limit as usize));
301 let interval = Arc::new(Mutex::new(interval(Duration::from_secs_f64(
302 1.0 / rate_limit as f64,
303 ))));
304
305 let mut stream =
307 futures::stream::iter(block_range.into_iter().map(|(from_block, to_block)| {
308 let provider = provider.clone();
309 let sem = semaphore.clone();
310 let pb = progress_bar.clone();
311 let interval = interval.clone();
312 let filter = filter.clone();
313
314 async move {
315 let _permit = sem.acquire().await.unwrap();
316 interval.lock().await.tick().await;
317
318 let filter = filter.from_block(from_block).to_block(to_block);
319 let logs = Rpc::get_logs_with_retry(provider, &filter).await;
320 if logs.is_ok() {
321 pb.inc(1);
322 }
323 logs
324 }
325 }))
326 .buffer_unordered(rate_limit as usize); let mut all_logs = Vec::new();
329
330 while let Some(result) = stream.next().await {
332 match result {
333 Ok(logs) => all_logs.extend(logs),
334 Err(e) => return Err(e),
335 }
336 }
337
338 Ok(all_logs)
339 }
340
341 async fn fetch_logs_for_config<P, T, N>(
345 config: &EventConfig,
346 start_block: u64,
347 end_block: u64,
348 provider: Arc<P>,
349 progress_bar: Arc<ProgressBar>,
350 rate_limit: u64,
351 ) -> Result<Vec<Log>>
352 where
353 P: Provider<T, N> + 'static,
354 T: Transport + Clone + 'static,
355 N: Network,
356 {
357 let filter = Filter::new().events(config.events.iter().copied());
358 Rpc::fetch_event_logs(
359 start_block,
360 end_block,
361 config.step_size,
362 provider,
363 rate_limit,
364 progress_bar,
365 filter,
366 )
367 .await
368 }
369
370 async fn get_logs_with_retry<P, T, N>(
372 provider: Arc<P>,
373 filter: &Filter,
374 ) -> anyhow::Result<Vec<Log>>
375 where
376 P: Provider<T, N> + 'static,
377 T: Transport + Clone + 'static,
378 N: Network,
379 {
380 let mut retry_count = 0;
381 let mut backoff = INITIAL_BACKOFF;
382
383 loop {
384 match provider.get_logs(filter).await {
385 Ok(logs) => {
386 return anyhow::Ok(logs);
387 }
388 Err(e) => {
389 if retry_count >= MAX_RETRIES {
390 return Err(anyhow!(e));
391 }
392 let jitter = rand::thread_rng().gen_range(0..=100);
393 let sleep_duration = Duration::from_millis(backoff + jitter);
394 tokio::time::sleep(sleep_duration).await;
395 retry_count += 1;
396 backoff *= 2;
397 }
398 }
399 }
400 }
401
402 fn get_event_config(pool_type: PoolType, is_initial_sync: bool) -> EventConfig {
403 match pool_type {
404 pt if pt.is_v3() => {
405 if is_initial_sync {
406 EventConfig {
407 events: &[DataEvents::Mint::SIGNATURE, DataEvents::Burn::SIGNATURE],
408 step_size: 1500,
409 description: "Tick sync",
410 requires_initial_sync: false, }
412 } else {
413 EventConfig {
414 events: &[
415 DataEvents::Mint::SIGNATURE,
416 DataEvents::Burn::SIGNATURE,
417 DataEvents::Swap::SIGNATURE,
418 ],
419 step_size: 50,
420 description: "Full sync",
421 requires_initial_sync: true, }
423 }
424 }
425 pt if pt.is_balancer() => EventConfig {
426 events: &[BalancerV2Event::Swap::SIGNATURE],
427 step_size: 5000,
428 description: "Swap Sync",
429 requires_initial_sync: true,
430 },
431 _ => EventConfig {
432 events: &[AerodromeSync::Sync::SIGNATURE, DataEvents::Sync::SIGNATURE],
433 step_size: 250,
434 description: "Reserve Sync",
435 requires_initial_sync: true,
436 },
437 }
438 }
439
440 pub fn get_block_range(step_size: u64, start_block: u64, end_block: u64) -> Vec<(u64, u64)> {
442 if start_block == end_block {
443 return vec![(start_block, end_block)];
444 }
445
446 let block_difference = end_block.saturating_sub(start_block);
447 let (_, step_size) = if block_difference < step_size {
448 (1, block_difference)
449 } else {
450 (
451 ((block_difference as f64) / (step_size as f64)).ceil() as u64,
452 step_size,
453 )
454 };
455 let block_ranges: Vec<(u64, u64)> = (start_block..=end_block)
456 .step_by(step_size as usize)
457 .map(|from_block| {
458 let to_block = (from_block + step_size - 1).min(end_block);
459 (from_block, to_block)
460 })
461 .collect();
462 block_ranges
463 }
464}