use crate::{checkpoint, error::CFFMError};
use super::dex::Dex;
use super::pool::Pool;
use super::throttle::RequestThrottle;
use ethers::{
providers::{JsonRpcClient, Middleware, Provider},
types::{BlockNumber, Filter, ValueOrArray, H160, U64},
};
use indicatif::{MultiProgress, ProgressBar, ProgressStyle};
use std::{
panic::resume_unwind,
sync::{Arc, Mutex},
};
pub async fn sync_pairs<P: 'static + JsonRpcClient>(
dexes: Vec<Dex>,
provider: Arc<Provider<P>>,
save_checkpoint: bool,
) -> Result<Vec<Pool>, CFFMError<P>> {
sync_pairs_with_throttle(dexes, provider, 0, save_checkpoint).await
}
pub async fn sync_pairs_with_throttle<P: 'static + JsonRpcClient>(
dexes: Vec<Dex>,
provider: Arc<Provider<P>>,
requests_per_second_limit: usize,
save_checkpoint: bool,
) -> Result<Vec<Pool>, CFFMError<P>> {
let request_throttle = Arc::new(Mutex::new(RequestThrottle::new(requests_per_second_limit)));
let current_block = provider.get_block_number().await?;
let mut aggregated_pools: Vec<Pool> = vec![];
let mut handles = vec![];
let multi_progress_bar = MultiProgress::new();
for dex in dexes.clone() {
let async_provider = provider.clone();
let request_throttle = request_throttle.clone();
let progress_bar = multi_progress_bar.add(ProgressBar::new(0));
handles.push(tokio::spawn(async move {
progress_bar.set_style(
ProgressStyle::with_template("{msg} {bar:40.cyan/blue} {pos:>7}/{len:7} Blocks")
.expect("Error when setting progress bar style")
.progress_chars("##-"),
);
let pools = get_all_pools_from_dex(
dex,
async_provider.clone(),
BlockNumber::Number(current_block),
request_throttle.clone(),
progress_bar.clone(),
)
.await?;
progress_bar.reset();
progress_bar.set_style(
ProgressStyle::with_template("{msg} {bar:40.cyan/blue} {pos:>7}/{len:7} Pairs")
.expect("Error when setting progress bar style")
.progress_chars("##-"),
);
let mut pools = get_all_pool_data(
pools,
dex.factory_address(),
async_provider.clone(),
request_throttle.clone(),
progress_bar.clone(),
)
.await?;
progress_bar.reset();
progress_bar.set_style(
ProgressStyle::with_template("{msg} {bar:40.cyan/blue} {pos:>7}/{len:7} Pairs")
.expect("Error when setting progress bar style")
.progress_chars("##-"),
);
progress_bar.set_length(pools.len() as u64);
progress_bar.set_message(format!(
"Syncing reserves for pools from: {}",
dex.factory_address()
));
for pool in pools.iter_mut() {
let request_throttle = request_throttle.clone();
request_throttle
.lock()
.expect("Error when aquiring request throttle mutex lock")
.increment_or_sleep(1);
pool.sync_pool(async_provider.clone()).await?;
}
Ok::<_, CFFMError<P>>(pools)
}));
}
for handle in handles {
match handle.await {
Ok(sync_result) => aggregated_pools.extend(sync_result?),
Err(err) => {
{
if err.is_panic() {
resume_unwind(err.into_panic());
}
}
}
}
}
if save_checkpoint {
let latest_block = provider.get_block_number().await?;
checkpoint::construct_checkpoint(
dexes,
&aggregated_pools,
latest_block.as_u64(),
String::from("pool_sync_checkpoint.json"),
)
}
Ok(aggregated_pools)
}
pub async fn get_all_pools<P: 'static + JsonRpcClient>(
dexes: Vec<Dex>,
provider: Arc<Provider<P>>,
requests_per_second_limit: usize,
) -> Result<Vec<Pool>, CFFMError<P>> {
let request_throttle = Arc::new(Mutex::new(RequestThrottle::new(requests_per_second_limit)));
let current_block = provider.get_block_number().await?;
let mut handles = vec![];
let multi_progress_bar = MultiProgress::new();
for dex in dexes {
let async_provider = provider.clone();
let request_throttle = request_throttle.clone();
let progress_bar = multi_progress_bar.add(ProgressBar::new(0));
handles.push(tokio::spawn(async move {
progress_bar.set_style(
ProgressStyle::with_template("{msg} {bar:40.cyan/blue} {pos:>7}/{len:7} Blocks")
.expect("Error when setting progress bar style")
.progress_chars("##-"),
);
let pools = get_all_pools_from_dex(
dex,
async_provider.clone(),
BlockNumber::Number(current_block),
request_throttle.clone(),
progress_bar.clone(),
)
.await?;
Ok::<_, CFFMError<P>>(pools)
}));
}
let mut aggregated_pools: Vec<Pool> = vec![];
for handle in handles {
match handle.await {
Ok(sync_result) => aggregated_pools.extend(sync_result?),
Err(err) => {
{
if err.is_panic() {
resume_unwind(err.into_panic());
}
}
}
}
}
Ok(aggregated_pools)
}
pub async fn get_all_pools_from_dex<P: 'static + JsonRpcClient>(
dex: Dex,
provider: Arc<Provider<P>>,
current_block: BlockNumber,
request_throttle: Arc<Mutex<RequestThrottle>>,
progress_bar: ProgressBar,
) -> Result<Vec<Pool>, CFFMError<P>> {
let step = 100000;
let from_block = dex
.creation_block()
.as_number()
.expect("Error using converting creation block as number")
.as_u64();
let current_block = current_block
.as_number()
.expect("Error using converting current block as number")
.as_u64();
let mut aggregated_pairs: Vec<Pool> = vec![];
progress_bar.set_length(current_block - from_block);
progress_bar.set_message(format!("Getting all pools from: {}", dex.factory_address()));
let mut handles = vec![];
for from_block in (from_block..=current_block).step_by(step) {
let request_throttle = request_throttle.clone();
let provider = provider.clone();
let progress_bar = progress_bar.clone();
handles.push(tokio::spawn(async move {
let mut pools = vec![];
let to_block = from_block + step as u64;
request_throttle
.lock()
.expect("Error when aquiring request throttle mutex lock")
.increment_or_sleep(1);
let logs = provider
.get_logs(
&Filter::new()
.topic0(ValueOrArray::Value(dex.pool_created_event_signature()))
.address(dex.factory_address())
.from_block(BlockNumber::Number(U64([from_block])))
.to_block(BlockNumber::Number(U64([to_block]))),
)
.await?;
for log in logs {
let pool = dex.new_empty_pool_from_event(log)?;
pools.push(pool);
}
progress_bar.inc(step as u64);
Ok::<Vec<Pool>, CFFMError<P>>(pools)
}));
}
for handle in handles {
match handle.await {
Ok(sync_result) => aggregated_pairs.extend(sync_result?),
Err(err) => {
{
if err.is_panic() {
resume_unwind(err.into_panic());
}
}
}
}
}
Ok(aggregated_pairs)
}
pub async fn get_all_pool_data<P: 'static + JsonRpcClient>(
pools: Vec<Pool>,
dex_factory_address: H160,
provider: Arc<Provider<P>>,
request_throttle: Arc<Mutex<RequestThrottle>>,
progress_bar: ProgressBar,
) -> Result<Vec<Pool>, CFFMError<P>> {
let mut updated_pools: Vec<Pool> = vec![];
progress_bar.set_length(pools.len() as u64);
progress_bar.set_message(format!(
"Syncing pool data for pairs from: {}",
dex_factory_address
));
for mut pool in pools {
let request_throttle = request_throttle.clone();
let provider = provider.clone();
let progress_bar = progress_bar.clone();
request_throttle
.lock()
.expect("Error when aquiring request throttle mutex lock")
.increment_or_sleep(4);
match pool.get_pool_data(provider.clone()).await {
Ok(_) => updated_pools.push(pool),
Err(pair_sync_error) => {
if let CFFMError::ProviderError(provider_error) = pair_sync_error {
return Err(CFFMError::ProviderError(provider_error));
}
}
}
progress_bar.inc(1);
}
Ok(updated_pools)
}