1use alloy::providers::Provider;
8use alloy::providers::ProviderBuilder;
9use std::collections::HashMap;
10use std::sync::Arc;
11
12use crate::builder::PoolSyncBuilder;
13use crate::cache::{read_cache_file, write_cache_file, PoolCache};
14use crate::chain::Chain;
15use crate::errors::*;
16use crate::pools::*;
17use crate::rpc::Rpc;
18
19pub struct PoolSync {
21 pub fetchers: HashMap<PoolType, Arc<dyn PoolFetcher>>,
23 pub chain: Chain,
25 pub rate_limit: u64,
27}
28
29impl PoolSync {
30 pub fn builder() -> PoolSyncBuilder {
32 PoolSyncBuilder::default()
33 }
34
35 pub async fn sync_pools(&self) -> Result<(Vec<Pool>, u64), PoolSyncError> {
37 dotenv::dotenv().ok();
39
40 let archive = Arc::new(
42 ProviderBuilder::new()
43 .network::<alloy::network::AnyNetwork>()
44 .on_http(std::env::var("ARCHIVE").unwrap().parse().unwrap()),
45 );
46
47 let full = Arc::new(
49 ProviderBuilder::new()
50 .network::<alloy::network::AnyNetwork>()
51 .on_http(std::env::var("FULL").unwrap().parse().unwrap()),
52 );
53
54 std::fs::create_dir_all("cache").unwrap();
56
57 let mut pool_caches: Vec<PoolCache> = self
59 .fetchers
60 .keys()
61 .map(|pool_type| read_cache_file(pool_type, self.chain).unwrap())
62 .collect();
63
64 let mut fully_synced = false;
65 let mut last_synced_block = 0;
66
67 while !fully_synced {
68 fully_synced = true;
69 let end_block = full.get_block_number().await.unwrap();
70
71 for cache in &mut pool_caches {
72 let start_block = cache.last_synced_block + 1;
73 if start_block <= end_block {
74 fully_synced = false;
75
76 let fetcher = self.fetchers[&cache.pool_type].clone();
77
78 let pool_addrs = Rpc::fetch_pool_addrs(
80 start_block,
81 end_block,
82 archive.clone(),
83 fetcher.clone(),
84 self.chain,
85 self.rate_limit,
86 )
87 .await
88 .expect(
89 "Failed to fetch pool addresses. Exiting due to having inconclusive state",
90 );
91
92 let mut new_pools = Rpc::populate_pools(
94 pool_addrs,
95 full.clone(),
96 cache.pool_type,
97 fetcher.clone(),
98 self.rate_limit,
99 self.chain,
100 )
101 .await
102 .expect("Failed to sync pool data, Exiting due to haveing inconclusive state");
103
104
105 Rpc::populate_liquidity(
107 start_block,
108 end_block,
109 &mut cache.pools,
110 archive.clone(),
111 cache.pool_type,
112 self.rate_limit,
113 cache.is_initial_sync,
114 )
115 .await
116 .expect("Failed to populate liquidity information, Exiting due to having inconclusive state");
117
118 if !new_pools.is_empty() {
120 Rpc::populate_liquidity(
121 start_block,
122 end_block,
123 &mut new_pools,
124 archive.clone(),
125 cache.pool_type,
126 self.rate_limit,
127 true,
128 )
129 .await
130 .expect("Failed to populate liquidity information, Exiting due to having inconclusive state");
131 }
132
133
134 cache.pools.extend(new_pools);
136
137
138 cache.last_synced_block = end_block;
140 last_synced_block = end_block;
141 cache.is_initial_sync = false;
142 }
143 }
144 }
145
146 pool_caches
148 .iter()
149 .for_each(|cache| write_cache_file(cache, self.chain).unwrap());
150
151 Ok((
153 pool_caches
154 .into_iter()
155 .flat_map(|cache| cache.pools)
156 .collect(),
157 last_synced_block,
158 ))
159 }
160}