use alloy::providers::Provider;
use alloy::providers::ProviderBuilder;
use std::collections::HashMap;
use std::sync::Arc;
use crate::builder::PoolSyncBuilder;
use crate::cache::{read_cache_file, write_cache_file, PoolCache};
use crate::chain::Chain;
use crate::errors::*;
use crate::pools::*;
use crate::rpc::Rpc;
pub struct PoolSync {
pub fetchers: HashMap<PoolType, Arc<dyn PoolFetcher>>,
pub chain: Chain,
pub rate_limit: u64,
}
impl PoolSync {
pub fn builder() -> PoolSyncBuilder {
PoolSyncBuilder::default()
}
pub async fn sync_pools(&self) -> Result<(Vec<Pool>, u64), PoolSyncError> {
dotenv::dotenv().ok();
let archive = Arc::new(
ProviderBuilder::new()
.network::<alloy::network::AnyNetwork>()
.on_http(std::env::var("ARCHIVE").unwrap().parse().unwrap()),
);
let full = Arc::new(
ProviderBuilder::new()
.network::<alloy::network::AnyNetwork>()
.on_http(std::env::var("FULL").unwrap().parse().unwrap()),
);
std::fs::create_dir_all("cache").unwrap();
let mut pool_caches: Vec<PoolCache> = self
.fetchers
.keys()
.map(|pool_type| read_cache_file(pool_type, self.chain).unwrap())
.collect();
let mut fully_synced = false;
let mut last_synced_block = 0;
while !fully_synced {
fully_synced = true;
let end_block = full.get_block_number().await.unwrap();
for cache in &mut pool_caches {
let start_block = cache.last_synced_block + 1;
if start_block <= end_block {
fully_synced = false;
let fetcher = self.fetchers[&cache.pool_type].clone();
let pool_addrs = Rpc::fetch_pool_addrs(
start_block,
end_block,
archive.clone(),
fetcher.clone(),
self.chain,
self.rate_limit,
)
.await
.expect(
"Failed to fetch pool addresses. Exiting due to having inconclusive state",
);
let mut new_pools = Rpc::populate_pools(
pool_addrs,
full.clone(),
cache.pool_type,
fetcher.clone(),
self.rate_limit,
self.chain,
)
.await
.expect("Failed to sync pool data, Exiting due to haveing inconclusive state");
Rpc::populate_liquidity(
start_block,
end_block,
&mut cache.pools,
archive.clone(),
cache.pool_type,
self.rate_limit,
cache.is_initial_sync,
)
.await
.expect("Failed to populate liquidity information, Exiting due to having inconclusive state");
if !new_pools.is_empty() {
Rpc::populate_liquidity(
start_block,
end_block,
&mut new_pools,
archive.clone(),
cache.pool_type,
self.rate_limit,
true,
)
.await
.expect("Failed to populate liquidity information, Exiting due to having inconclusive state");
}
cache.pools.extend(new_pools);
cache.last_synced_block = end_block;
last_synced_block = end_block;
cache.is_initial_sync = false;
}
}
}
pool_caches
.iter()
.for_each(|cache| write_cache_file(cache, self.chain).unwrap());
Ok((
pool_caches
.into_iter()
.flat_map(|cache| cache.pools)
.collect(),
last_synced_block,
))
}
}