pool_sync/
rpc.rs

1use alloy::network::Network;
2use alloy::primitives::Address;
3use alloy::providers::Provider;
4use alloy::rpc::types::{Filter, Log};
5use alloy::sol_types::SolEvent;
6use alloy::transports::Transport;
7use anyhow::anyhow;
8use anyhow::Result;
9use futures::StreamExt;
10use indicatif::ProgressBar;
11use log::info;
12use rand::Rng;
13use std::collections::{BTreeMap, HashMap};
14use std::sync::Arc;
15use tokio::sync::{Mutex, Semaphore};
16use tokio::time::{interval, Duration};
17
18use crate::events::*;
19use crate::pools::pool_builder;
20use crate::pools::pool_structures::balancer_v2_structure::process_balance_data;
21use crate::pools::pool_structures::v2_structure::process_sync_data;
22use crate::pools::pool_structures::v3_structure::process_tick_data;
23use crate::pools::PoolFetcher;
24use crate::util::create_progress_bar;
25use crate::{Chain, Pool, PoolInfo, PoolType};
26
27// Retry constants
28const MAX_RETRIES: u32 = 5;
29const INITIAL_BACKOFF: u64 = 1000; // 1 second
30
31// Define event configurations
32#[derive(Debug)]
33struct EventConfig {
34    events: &'static [&'static str],
35    step_size: u64,
36    description: &'static str,
37    requires_initial_sync: bool,
38}
39
40pub struct Rpc;
41impl Rpc {
42    // Fetch all pool addresses for the protocol
43    pub async fn fetch_pool_addrs<P, T, N>(
44        start_block: u64,
45        end_block: u64,
46        provider: Arc<P>,
47        fetcher: Arc<dyn PoolFetcher>,
48        chain: Chain,
49        rate_limit: u64,
50    ) -> Result<Vec<Address>>
51    where
52        P: Provider<T, N> + 'static,
53        T: Transport + Clone + 'static,
54        N: Network,
55    {
56        // fetch all of the logs
57        let filter = Filter::new()
58            .address(fetcher.factory_address(chain))
59            .event(fetcher.pair_created_signature());
60
61        let step_size: u64 = 10000;
62        let num_tasks = end_block / step_size;
63        let pb_info = format!(
64            "{} Address Sync. Block range {}-{}",
65            fetcher.pool_type(),
66            start_block,
67            end_block
68        );
69        let progress_bar = Arc::new(create_progress_bar(num_tasks, pb_info));
70
71        // fetch all of the logs
72        let logs = Rpc::fetch_event_logs(
73            start_block,
74            end_block,
75            10000,
76            provider,
77            rate_limit,
78            progress_bar,
79            filter,
80        )
81        .await?;
82
83        // extract the addresses from the logs
84        let addresses: Vec<Address> = logs
85            .iter()
86            .map(|log| fetcher.log_to_address(&log.inner))
87            .collect();
88        anyhow::Ok(addresses)
89    }
90
91    pub async fn populate_pools<P, T, N>(
92        pool_addrs: Vec<Address>,
93        provider: Arc<P>,
94        pool: PoolType,
95        fetcher: Arc<dyn PoolFetcher>,
96        rate_limit: u64,
97        chain: Chain
98    ) -> Result<Vec<Pool>>
99    where
100        P: Provider<T, N> + 'static,
101        T: Transport + Clone + 'static,
102        N: Network,
103    {
104        // data batch size for contract calls
105        let batch_size = if pool.is_balancer() { 10 } else { 50 };
106
107        // informational and rate limiting initialization
108        let total_tasks = (pool_addrs.len() + batch_size - 1) / batch_size; // Ceiling division
109        let progress_bar = create_progress_bar(total_tasks as u64, format!("{} data sync", pool));
110        let semaphore = Arc::new(Semaphore::new(rate_limit as usize));
111        let interval = Arc::new(tokio::sync::Mutex::new(interval(Duration::from_secs_f64(
112            1.0 / rate_limit as f64,
113        ))));
114
115        // break the addresses up into chunk
116        let addr_chunks: Vec<Vec<Address>> = pool_addrs
117            .chunks(batch_size)
118            .map(|chunk| chunk.to_vec())
119            .collect();
120
121        let mut stream = futures::stream::iter(addr_chunks.into_iter().map(|chunk| {
122            let provider = provider.clone();
123            let sem = semaphore.clone();
124            let pb = progress_bar.clone();
125            let fetcher = fetcher.clone();
126            let interval = interval.clone();
127            let data = fetcher.get_pool_repr();
128
129            async move {
130                let _permit = sem.acquire().await.unwrap();
131                interval.lock().await.tick().await;
132                let mut retry_count = 0;
133                let mut backoff = 1000; // Initial backoff of 1 second
134                loop {
135                    // try building pools from this set of addresses
136                    match pool_builder::build_pools(
137                        &provider,
138                        chunk.clone(),
139                        pool,
140                        data.clone(),
141                        chain
142                    )
143                    .await
144                    {
145                        Ok(populated_pools) if !populated_pools.is_empty() => {
146                            pb.inc(1);
147                            drop(provider);
148                            return anyhow::Ok::<Vec<Pool>>(populated_pools);
149                        }
150                        Err(e) => {
151                            if retry_count >= MAX_RETRIES {
152                                info!("Failed to populate pools data: {}", e);
153                                drop(provider);
154                                return Ok(Vec::new());
155                            }
156                            let jitter = rand::thread_rng().gen_range(0..=100);
157                            let sleep_duration = Duration::from_millis(backoff + jitter);
158                            tokio::time::sleep(sleep_duration).await;
159                            retry_count += 1;
160                            backoff *= 2; // Exponential backoff
161                        }
162                        _ => continue,
163                    }
164                }
165            }
166        }))
167        .buffer_unordered(rate_limit as usize);
168
169        let mut all_pools = Vec::new();
170
171        while let Some(pool_res) = stream.next().await {
172            match pool_res {
173                Ok(pool) => all_pools.extend(pool),
174                Err(e) => return Err(e),
175            }
176        }
177
178        Ok(all_pools)
179    }
180
181    pub async fn populate_liquidity<P, T, N>(
182        start_block: u64,
183        end_block: u64,
184        pools: &mut [Pool],
185        provider: Arc<P>,
186        pool_type: PoolType,
187        rate_limit: u64,
188        is_initial_sync: bool,
189    ) -> anyhow::Result<()>
190    where
191        P: Provider<T, N> + Sync + 'static,
192        T: Transport + Sync + Clone,
193        N: Network,
194    {
195        if pools.is_empty() {
196            return anyhow::Ok(());
197        }
198
199        let address_to_index: HashMap<Address, usize> = pools
200            .iter()
201            .enumerate()
202            .map(|(i, pool)| (pool.address(), i))
203            .collect();
204
205        let batch_size = 1_000_000;
206        let mut current_block = start_block;
207
208        // get the configuration for this sync and config we should sync
209        let config = Rpc::get_event_config(pool_type, is_initial_sync);
210        if is_initial_sync && config.requires_initial_sync {
211            return anyhow::Ok(());
212        }
213
214        // construct the progress bar
215        let num_tasks = (end_block - start_block) / config.step_size;
216        let pb_info = format!(
217            "{} {}. Block range {}-{}",
218            pool_type, config.description, current_block, end_block
219        );
220        let progress_bar = Arc::new(create_progress_bar(num_tasks, pb_info));
221
222        // sync in batches
223        while current_block <= end_block {
224            let batch_end = (current_block + batch_size).min(end_block);
225
226            let logs = Rpc::fetch_logs_for_config(
227                &config,
228                current_block,
229                batch_end,
230                provider.clone(),
231                progress_bar.clone(),
232                rate_limit,
233            )
234            .await?;
235
236            // create pb for block processing
237            let processing_pb_info = format!(
238                "Processing logs batch for blocks {}-{}",
239                start_block, end_block
240            );
241            let processing_progress_bar =
242                create_progress_bar(logs.len().try_into().unwrap(), processing_pb_info);
243
244            // Process logs immediately after fetching
245            let mut ordered_logs: BTreeMap<u64, Vec<Log>> = BTreeMap::new();
246            for log in logs {
247                if let Some(block_number) = log.block_number {
248                    ordered_logs.entry(block_number).or_default().push(log);
249                }
250            }
251
252            // Process logs in order
253            for (_, log_group) in ordered_logs {
254                for log in log_group {
255                    let address = log.address();
256                    if let Some(&index) = address_to_index.get(&address) {
257                        if let Some(pool) = pools.get_mut(index) {
258                            if pool_type.is_v3() {
259                                process_tick_data(
260                                    pool.get_v3_mut().unwrap(),
261                                    log,
262                                    pool_type,
263                                    is_initial_sync,
264                                );
265                            } else if pool_type.is_balancer() {
266                                process_balance_data(pool.get_balancer_mut().unwrap(), log);
267                            } else {
268                                process_sync_data(pool.get_v2_mut().unwrap(), log, pool_type);
269                            }
270                        }
271                    }
272                    processing_progress_bar.inc(1);
273                }
274            }
275
276            processing_progress_bar.finish_and_clear();
277            current_block = batch_end + 1;
278        }
279        anyhow::Ok(())
280    }
281
282    pub async fn fetch_event_logs<T, N, P>(
283        start_block: u64,
284        end_block: u64,
285        step_size: u64,
286        provider: Arc<P>,
287        rate_limit: u64,
288        progress_bar: Arc<ProgressBar>,
289        filter: Filter,
290    ) -> anyhow::Result<Vec<Log>>
291    where
292        T: Transport + Clone,
293        N: Network,
294        P: Provider<T, N> + 'static,
295    {
296        // generate the block range for the sync and setup progress bar
297        let block_range = Rpc::get_block_range(step_size, start_block, end_block);
298
299        // semaphore and interval for rate limiting
300        let semaphore = Arc::new(Semaphore::new(rate_limit as usize));
301        let interval = Arc::new(Mutex::new(interval(Duration::from_secs_f64(
302            1.0 / rate_limit as f64,
303        ))));
304
305        // Create a stream of futures
306        let mut stream =
307            futures::stream::iter(block_range.into_iter().map(|(from_block, to_block)| {
308                let provider = provider.clone();
309                let sem = semaphore.clone();
310                let pb = progress_bar.clone();
311                let interval = interval.clone();
312                let filter = filter.clone();
313
314                async move {
315                    let _permit = sem.acquire().await.unwrap();
316                    interval.lock().await.tick().await;
317
318                    let filter = filter.from_block(from_block).to_block(to_block);
319                    let logs = Rpc::get_logs_with_retry(provider, &filter).await;
320                    if logs.is_ok() {
321                        pb.inc(1);
322                    }
323                    logs
324                }
325            }))
326            .buffer_unordered(rate_limit as usize); // Process up to rate_limit tasks concurrently
327
328        let mut all_logs = Vec::new();
329
330        // Process results as they complete
331        while let Some(result) = stream.next().await {
332            match result {
333                Ok(logs) => all_logs.extend(logs),
334                Err(e) => return Err(e),
335            }
336        }
337
338        Ok(all_logs)
339    }
340
341    // Given a config and a range, fetch all the logs for it
342    // This is a top level call which will delegate to individual fetching
343    // functions to get the logs and to ensure retries on failure
344    async fn fetch_logs_for_config<P, T, N>(
345        config: &EventConfig,
346        start_block: u64,
347        end_block: u64,
348        provider: Arc<P>,
349        progress_bar: Arc<ProgressBar>,
350        rate_limit: u64,
351    ) -> Result<Vec<Log>>
352    where
353        P: Provider<T, N> + 'static,
354        T: Transport + Clone + 'static,
355        N: Network,
356    {
357        let filter = Filter::new().events(config.events.iter().copied());
358        Rpc::fetch_event_logs(
359            start_block,
360            end_block,
361            config.step_size,
362            provider,
363            rate_limit,
364            progress_bar,
365            filter,
366        )
367        .await
368    }
369
370    // Fetch logs with retry functionality
371    async fn get_logs_with_retry<P, T, N>(
372        provider: Arc<P>,
373        filter: &Filter,
374    ) -> anyhow::Result<Vec<Log>>
375    where
376        P: Provider<T, N> + 'static,
377        T: Transport + Clone + 'static,
378        N: Network,
379    {
380        let mut retry_count = 0;
381        let mut backoff = INITIAL_BACKOFF;
382
383        loop {
384            match provider.get_logs(filter).await {
385                Ok(logs) => {
386                    return anyhow::Ok(logs);
387                }
388                Err(e) => {
389                    if retry_count >= MAX_RETRIES {
390                        return Err(anyhow!(e));
391                    }
392                    let jitter = rand::thread_rng().gen_range(0..=100);
393                    let sleep_duration = Duration::from_millis(backoff + jitter);
394                    tokio::time::sleep(sleep_duration).await;
395                    retry_count += 1;
396                    backoff *= 2;
397                }
398            }
399        }
400    }
401
402    fn get_event_config(pool_type: PoolType, is_initial_sync: bool) -> EventConfig {
403        match pool_type {
404            pt if pt.is_v3() => {
405                if is_initial_sync {
406                    EventConfig {
407                        events: &[DataEvents::Mint::SIGNATURE, DataEvents::Burn::SIGNATURE],
408                        step_size: 1500,
409                        description: "Tick sync",
410                        requires_initial_sync: false, // Always fetch these
411                    }
412                } else {
413                    EventConfig {
414                        events: &[
415                            DataEvents::Mint::SIGNATURE,
416                            DataEvents::Burn::SIGNATURE,
417                            DataEvents::Swap::SIGNATURE,
418                        ],
419                        step_size: 50,
420                        description: "Full sync",
421                        requires_initial_sync: true, // Always fetch these
422                    }
423                }
424            }
425            pt if pt.is_balancer() => EventConfig {
426                events: &[BalancerV2Event::Swap::SIGNATURE],
427                step_size: 5000,
428                description: "Swap Sync",
429                requires_initial_sync: true,
430            },
431            _ => EventConfig {
432                events: &[AerodromeSync::Sync::SIGNATURE, DataEvents::Sync::SIGNATURE],
433                step_size: 250,
434                description: "Reserve Sync",
435                requires_initial_sync: true,
436            },
437        }
438    }
439
440    // Generate a range of blocks of step size distance
441    pub fn get_block_range(step_size: u64, start_block: u64, end_block: u64) -> Vec<(u64, u64)> {
442        if start_block == end_block {
443            return vec![(start_block, end_block)];
444        }
445
446        let block_difference = end_block.saturating_sub(start_block);
447        let (_, step_size) = if block_difference < step_size {
448            (1, block_difference)
449        } else {
450            (
451                ((block_difference as f64) / (step_size as f64)).ceil() as u64,
452                step_size,
453            )
454        };
455        let block_ranges: Vec<(u64, u64)> = (start_block..=end_block)
456            .step_by(step_size as usize)
457            .map(|from_block| {
458                let to_block = (from_block + step_size - 1).min(end_block);
459                (from_block, to_block)
460            })
461            .collect();
462        block_ranges
463    }
464}