1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285
//! PoolSync Core Implementation
//!
//! This module contains the core functionality for synchronizing pools across different
//! blockchain networks and protocols. It includes the main `PoolSync` struct and its
//! associated methods for configuring and executing the synchronization process.
//!
use alloy::network::Network;
use alloy::providers::Provider;
use alloy::rpc::types::Filter;
use alloy::transports::Transport;
use futures::future::try_join_all;
use indicatif::{ProgressBar, ProgressStyle};
use std::collections::HashMap;
use std::fs;
use std::path::Path;
use std::sync::Arc;
use std::time::Duration;
use tokio::sync::Semaphore;
use crate::builder::PoolSyncBuilder;
use crate::cache::{read_cache_file, write_cache_file, PoolCache};
use crate::chain::Chain;
use crate::errors::*;
use crate::pools::*;
/// The number of blocks to query in one call to get_logs
const STEP_SIZE: u64 = 10_000;
/// The maximum number of retries for a failed query
const MAX_RETRIES: u32 = 5;
/// The main struct for pool synchronization
pub struct PoolSync {
/// Map of pool types to their fetcher implementations
pub fetchers: HashMap<PoolType, Arc<dyn PoolFetcher>>,
/// The chain to sync on
pub chain: Chain,
/// The rate limit of the rpc
pub rate_limit: usize,
}
impl PoolSync {
/// Construct a new builder to configure sync parameters
pub fn builder() -> PoolSyncBuilder {
PoolSyncBuilder::default()
}
/// Synchronizes all added pools for the specified chain
///
/// This method performs the following steps:
/// 1. Creates a cache folder if it doesn't exist
/// 2. Reads the cache for each pool type
/// 3. Synchronizes new data for each pool type
/// 4. Updates and writes back the cache
/// 5. Combines all synchronized pools into a single vector
///
/// # Arguments
///
/// * `provider` - An Arc-wrapped provider for interacting with the blockchain
///
/// # Returns
///
/// A Result containing a vector of all synchronized pools or a PoolSyncError
pub async fn sync_pools<P, T, N>(&self, provider: Arc<P>) -> Result<Vec<Pool>, PoolSyncError>
where
P: Provider<T, N> + 'static,
T: Transport + Clone + 'static,
N: Network,
{
// create a cache folder if it does not exist
let path = Path::new("cache");
if !path.exists() {
let _ = fs::create_dir_all(path);
}
let mut pool_caches: Vec<PoolCache> = Vec::new(); // cache for each pool specified
// go through all the pools we want to sync
for fetchers in self.fetchers.iter() {
let pool_cache = read_cache_file(fetchers.0, self.chain);
pool_caches.push(pool_cache);
}
for cache in &mut pool_caches {
let start_block = cache.last_synced_block;
let end_block = provider.get_block_number().await.unwrap();
let block_difference = end_block.saturating_sub(start_block);
let (total_steps, step_size) = if block_difference < STEP_SIZE {
(1, block_difference)
} else {
(
((block_difference) as f64 / STEP_SIZE as f64).ceil() as u64,
STEP_SIZE,
)
};
let progress_bar = self.create_progress_bar(total_steps);
let rate_limiter = Arc::new(Semaphore::new(self.rate_limit));
let mut handles = vec![];
if block_difference > 0 {
for from_block in (start_block..=end_block).step_by(step_size as usize) {
let to_block = (from_block + step_size - 1).min(end_block);
let handle = self.spawn_block_range_task(
provider.clone(),
rate_limiter.clone(),
self.fetchers.clone(),
from_block,
to_block,
progress_bar.clone(),
self.chain,
);
handles.push(handle);
}
for handle in handles {
let pools = handle
.await
.map_err(|e| PoolSyncError::ProviderError(e.to_string()))??;
cache.pools.extend(pools);
}
}
cache.last_synced_block = end_block;
}
// write all of the caches back to file
for pool_cache in &pool_caches {
write_cache_file(pool_cache, self.chain);
}
// save all of them in one vector
let mut all_pools: Vec<Pool> = Vec::new();
for pool_cache in &mut pool_caches {
all_pools.append(&mut pool_cache.pools);
}
Ok(all_pools)
}
/// Spawns a task to process a range of blocks
///
/// This method creates a new asynchronous task for processing a specific range of blocks.
/// It uses a semaphore for rate limiting and updates a progress bar.
///
/// # Arguments
///
/// * `provider` - The blockchain provider
/// * `semaphore` - A semaphore for rate limiting
/// * `fetchers` - The pool fetchers
/// * `from_block` - The starting block number
/// * `to_block` - The ending block number
/// * `progress_bar` - A progress bar for visual feedback
/// * `chain` - The blockchain being synced
///
/// # Returns
///
/// A JoinHandle for the spawned task
fn spawn_block_range_task<P, T, N>(
&self,
provider: Arc<P>,
semaphore: Arc<Semaphore>,
fetchers: HashMap<PoolType, Arc<dyn PoolFetcher>>,
from_block: u64,
to_block: u64,
progress_bar: ProgressBar,
chain: Chain,
) -> tokio::task::JoinHandle<Result<Vec<Pool>, PoolSyncError>>
where
P: Provider<T, N> + 'static,
T: Transport + Clone + 'static,
N: Network,
{
tokio::spawn(async move {
let result = Self::process_block_range(
provider,
semaphore,
fetchers,
from_block,
to_block,
MAX_RETRIES,
chain,
)
.await;
progress_bar.inc(1);
result
})
}
/// Processes a range of blocks to find and decode pool creation events
///
/// This method queries the blockchain for logs within the specified block range,
/// decodes the logs into pool objects, and implements a retry mechanism for failed queries.
///
/// # Arguments
///
/// * `provider` - The blockchain provider
/// * `semaphore` - A semaphore for rate limiting
/// * `fetchers` - The pool fetchers
/// * `from_block` - The starting block number
/// * `to_block` - The ending block number
/// * `max_retries` - The maximum number of retries for failed queries
/// * `chain` - The blockchain being synced
///
/// # Returns
///
/// A Result containing a vector of found pools or a PoolSyncError
async fn process_block_range<P, T, N>(
provider: Arc<P>,
semaphore: Arc<Semaphore>,
fetchers: HashMap<PoolType, Arc<dyn PoolFetcher>>,
from_block: u64,
to_block: u64,
max_retries: u32,
chain: Chain,
) -> Result<Vec<Pool>, PoolSyncError>
where
P: Provider<T, N>,
T: Transport + Clone + 'static,
N: Network,
{
let mut retries = 0;
loop {
let _permit = semaphore
.acquire()
.await
.map_err(|e| PoolSyncError::ProviderError(e.to_string()))?;
let filters: Vec<Filter> = fetchers
.values()
.map(|fetcher| {
Filter::new()
.address(fetcher.factory_address(chain))
.event(fetcher.pair_created_signature())
.from_block(from_block)
.to_block(to_block)
})
.collect();
let log_futures = filters.iter().map(|filter| provider.get_logs(filter));
match try_join_all(log_futures).await {
Ok(all_logs) => {
let mut pools = Vec::new();
for (logs, fetcher) in all_logs.into_iter().zip(fetchers.values()) {
for log in logs {
if let Some(pool) = fetcher.from_log(&log.inner).await {
pools.push(pool);
}
}
}
return Ok(pools);
}
Err(e) => {
if retries >= max_retries {
return Err(PoolSyncError::ProviderError(e.to_string()));
}
retries += 1;
let delay = 2u64.pow(retries) * 1000;
tokio::time::sleep(Duration::from_millis(delay)).await;
}
}
}
}
/// Creates a progress bar for visual feedback during synchronization
///
/// # Arguments
///
/// * `total_steps` - The total number of steps in the synchronization process
///
/// # Returns
///
/// A configured ProgressBar instance
fn create_progress_bar(&self, total_steps: u64) -> ProgressBar {
let pb = ProgressBar::new(total_steps);
pb.set_style(
ProgressStyle::default_bar()
.template("[{elapsed_precise}] All pools: tasks completed {bar:40.cyan/blue} {pos}/{len} {msg}")
.unwrap()
.progress_chars("##-"),
);
pb
}
}