pool_sync/
pool_sync.rs

1//! PoolSync Core Implementation
2//!
3//! This module contains the core functionality for synchronizing pools across different
4//! blockchain networks and protocols. It includes the main `PoolSync` struct and its
5//! associated methods for configuring and executing the synchronization process.
6//!
7use alloy::providers::Provider;
8use alloy::providers::ProviderBuilder;
9use std::collections::HashMap;
10use std::sync::Arc;
11
12use crate::builder::PoolSyncBuilder;
13use crate::cache::{read_cache_file, write_cache_file, PoolCache};
14use crate::chain::Chain;
15use crate::errors::*;
16use crate::pools::*;
17use crate::rpc::Rpc;
18
19/// The main struct for pool synchronization
20pub struct PoolSync {
21    /// Map of pool types to their fetcher implementations
22    pub fetchers: HashMap<PoolType, Arc<dyn PoolFetcher>>,
23    /// The chain to sync on
24    pub chain: Chain,
25    /// The rate limit of the rpc
26    pub rate_limit: u64,
27}
28
29impl PoolSync {
30    /// Construct a new builder to configure sync parameters
31    pub fn builder() -> PoolSyncBuilder {
32        PoolSyncBuilder::default()
33    }
34
35    /// Synchronizes all added pools for the specified chain
36    pub async fn sync_pools(&self) -> Result<(Vec<Pool>, u64), PoolSyncError> {
37        // load in the dotenv
38        dotenv::dotenv().ok();
39
40        // setup arvhice node provider
41        let archive = Arc::new(
42            ProviderBuilder::new()
43                .network::<alloy::network::AnyNetwork>()
44                .on_http(std::env::var("ARCHIVE").unwrap().parse().unwrap()),
45        );
46
47        // setup full node provider
48        let full = Arc::new(
49            ProviderBuilder::new()
50                .network::<alloy::network::AnyNetwork>()
51                .on_http(std::env::var("FULL").unwrap().parse().unwrap()),
52        );
53
54        // create the cache files
55        std::fs::create_dir_all("cache").unwrap();
56
57        // create all of the caches
58        let mut pool_caches: Vec<PoolCache> = self
59            .fetchers
60            .keys()
61            .map(|pool_type| read_cache_file(pool_type, self.chain).unwrap())
62            .collect();
63
64        let mut fully_synced = false;
65        let mut last_synced_block = 0;
66
67        while !fully_synced {
68            fully_synced = true;
69            let end_block = full.get_block_number().await.unwrap();
70
71            for cache in &mut pool_caches {
72                let start_block = cache.last_synced_block + 1;
73                if start_block <= end_block {
74                    fully_synced = false;
75
76                    let fetcher = self.fetchers[&cache.pool_type].clone();
77
78                    // fetch all of the pool addresses
79                    let pool_addrs = Rpc::fetch_pool_addrs(
80                        start_block,
81                        end_block,
82                        archive.clone(),
83                        fetcher.clone(),
84                        self.chain,
85                        self.rate_limit,
86                    )
87                    .await
88                    .expect(
89                        "Failed to fetch pool addresses. Exiting due to having inconclusive state",
90                    );
91
92                    // populate all of the pool data
93                    let mut new_pools = Rpc::populate_pools(
94                        pool_addrs,
95                        full.clone(),
96                        cache.pool_type,
97                        fetcher.clone(),
98                        self.rate_limit,
99                        self.chain,
100                    )
101                    .await
102                    .expect("Failed to sync pool data, Exiting due to haveing inconclusive state");
103
104
105                    // catch up all the old pools
106                    Rpc::populate_liquidity(
107                        start_block,
108                        end_block,
109                        &mut cache.pools,
110                        archive.clone(),
111                        cache.pool_type,
112                        self.rate_limit,
113                        cache.is_initial_sync,
114                    )
115                    .await
116                    .expect("Failed to populate liquidity information, Exiting due to having inconclusive state");
117
118                    // update the new pools
119                    if !new_pools.is_empty() {
120                        Rpc::populate_liquidity(
121                            start_block,
122                            end_block,
123                            &mut new_pools,
124                            archive.clone(),
125                            cache.pool_type,
126                            self.rate_limit,
127                            true,
128                        )
129                        .await
130                        .expect("Failed to populate liquidity information, Exiting due to having inconclusive state");
131                    }
132
133
134                    // merge old and new
135                    cache.pools.extend(new_pools);
136
137
138                    // update info for cache
139                    cache.last_synced_block = end_block;
140                    last_synced_block = end_block;
141                    cache.is_initial_sync = false;
142                }
143            }
144        }
145
146        // write all of the cache files
147        pool_caches
148            .iter()
149            .for_each(|cache| write_cache_file(cache, self.chain).unwrap());
150
151        // return all the pools
152        Ok((
153            pool_caches
154                .into_iter()
155                .flat_map(|cache| cache.pools)
156                .collect(),
157            last_synced_block,
158        ))
159    }
160}