1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146
//! PoolSync Core Implementation
//!
//! This module contains the core functionality for synchronizing pools across different
//! blockchain networks and protocols. It includes the main `PoolSync` struct and its
//! associated methods for configuring and executing the synchronization process.
//!
use alloy::providers::Provider;
use alloy::providers::ProviderBuilder;
use alloy::rpc::types::serde_helpers::quantity::vec;
use std::collections::HashMap;
use std::sync::Arc;
use crate::builder::PoolSyncBuilder;
use crate::cache::{read_cache_file, write_cache_file, PoolCache};
use crate::chain::Chain;
use crate::errors::*;
use crate::pools::*;
use crate::rpc::Rpc;
/// The main struct for pool synchronization
pub struct PoolSync {
/// Map of pool types to their fetcher implementations
pub fetchers: HashMap<PoolType, Arc<dyn PoolFetcher>>,
/// The chain to sync on
pub chain: Chain,
/// The rate limit of the rpc
pub rate_limit: u64,
}
impl PoolSync {
/// Construct a new builder to configure sync parameters
pub fn builder() -> PoolSyncBuilder {
PoolSyncBuilder::default()
}
/// Synchronizes all added pools for the specified chain
pub async fn sync_pools(&self) -> Result<(Vec<Pool>, u64), PoolSyncError> {
// load in the dotenv
dotenv::dotenv().ok();
// setup arvhice node provider
let archive = Arc::new(
ProviderBuilder::new()
.network::<alloy::network::AnyNetwork>()
.on_http(std::env::var("ARCHIVE").unwrap().parse().unwrap()),
);
// setup full node provider
let full = Arc::new(
ProviderBuilder::new()
.network::<alloy::network::AnyNetwork>()
.on_http(std::env::var("FULL").unwrap().parse().unwrap()),
);
// create the cache files
std::fs::create_dir_all("cache").unwrap();
// create all of the caches
let mut pool_caches: Vec<PoolCache> = self
.fetchers
.keys()
.map(|pool_type| read_cache_file(pool_type, self.chain).unwrap())
.collect();
let mut fully_synced = false;
let mut last_synced_block = 0;
while !fully_synced {
fully_synced = true;
let end_block = full.get_block_number().await.unwrap();
for cache in &mut pool_caches {
let start_block = cache.last_synced_block + 1;
if start_block <= end_block {
fully_synced = false;
let fetcher = self.fetchers[&cache.pool_type].clone();
// fetch all of the pool addresses
let pool_addrs = Rpc::fetch_pool_addrs(
start_block,
end_block,
archive.clone(),
fetcher.clone(),
self.chain,
self.rate_limit,
)
.await.unwrap();
// populate all of the pool data
let new_pools = Rpc::populate_pools(
pool_addrs,
full.clone(),
cache.pool_type,
self.rate_limit,
)
.await;
// sync old pools up to the current tip
// v2: the populate pools will have already got all of the reserves up to end block for the current
// set of pools, we do not want to fetch it again since we already have it, so we just process the new logs for
// the old pools since those logs are going to modify the state of them
// v3: same for v3, the cache.pools have synced up to start block - 1, so all logs from start block to end block
// are new logs that can modify the state of the v3 pools, so fetch the state for them
/*
let _ = Rpc::populate_liquidity(
start_block,
end_block,
&mut cache.pools,
archive.clone(),
cache.pool_type
).await;
*/
// populate the state for the new pools, if this is v2 we will already have the state and dont need it,
// for the v3 start, we do not nee dthe swap logs because we will already have an up to date state of
// the tick, sqrt price, and liquidyt from populate pools, but we do need to fill in the tick data
cache.pools.extend(new_pools);
// we need to do the initial tick sync
Rpc::populate_liquidity(
start_block,
end_block,
&mut cache.pools,
archive.clone(),
cache.pool_type,
).await.unwrap();
cache.last_synced_block = end_block;
last_synced_block = end_block;
}
}
}
// write all of the cache files
pool_caches
.iter()
.for_each(|cache| write_cache_file(cache, self.chain).unwrap());
// return all the pools
Ok((pool_caches
.into_iter()
.flat_map(|cache| cache.pools)
.collect(), last_synced_block))
}
}