use std::sync::Arc;
use nautilus_blockchain::{
config::BlockchainDataClientConfig,
data::core::BlockchainDataClientCore,
exchanges::{find_dex_type_case_insensitive, get_supported_dexes_for_chain},
rpc::providers::check_infura_rpc_provider,
};
use nautilus_core::string::mask_api_key;
use nautilus_infrastructure::sql::pg::get_postgres_connect_options;
use nautilus_model::defi::chain::Chain;
use crate::opt::DatabaseConfig;
pub async fn run_sync_dex(
chain: String,
dex: String,
rpc_url: Option<String>,
database: DatabaseConfig,
reset: bool,
multicall_calls_per_rpc_request: Option<u32>,
) -> anyhow::Result<()> {
let chain = Chain::from_chain_name(&chain)
.ok_or_else(|| anyhow::anyhow!("Invalid chain name: {chain}"))?;
let dex_type = find_dex_type_case_insensitive(&dex, chain).ok_or_else(|| {
let supported_dexes = get_supported_dexes_for_chain(chain.name);
if supported_dexes.is_empty() {
anyhow::anyhow!("Invalid DEX name '{}' (case-insensitive). Chain '{}' is not supported for pool syncing.",dex, chain.name)
} else {
anyhow::anyhow!("Invalid DEX name '{}' (case-insensitive). Supported DEXes for chain '{}': {}",dex,chain.name,supported_dexes.join(", "))
}
})?;
let postgres_connect_options = get_postgres_connect_options(
database.host,
database.port,
database.username,
database.password,
database.database,
);
let rpc_http_url = rpc_url
.or_else(|| check_infura_rpc_provider(&chain.name))
.or_else(|| std::env::var("RPC_HTTP_URL").ok())
.unwrap_or_else(|| {
panic!(
"No RPC URL provided for {name}. Set --rpc-url, INFURA_API_KEY, or RPC_HTTP_URL",
name = chain.name
)
});
let masked_url = if let Some(idx) = rpc_http_url.rfind('/') {
let (base, key) = rpc_http_url.split_at(idx + 1);
if key.is_empty() {
rpc_http_url.clone()
} else {
let masked_key = mask_api_key(key);
format!("{base}{masked_key}")
}
} else {
mask_api_key(&rpc_http_url)
};
log::info!("Using RPC HTTP URL: '{masked_url}'");
let config = BlockchainDataClientConfig::new(
Arc::new(chain.to_owned()),
vec![dex_type],
rpc_http_url,
None,
multicall_calls_per_rpc_request,
None,
true,
None,
None,
Some(postgres_connect_options),
);
let cancellation_token = tokio_util::sync::CancellationToken::new();
let mut data_client = BlockchainDataClientCore::new(config, None, None, cancellation_token);
data_client.initialize_cache_database().await;
data_client.cache.initialize_chain().await;
data_client
.register_dex_exchange(dex_type)
.await
.map_err(|e| anyhow::anyhow!("Failed to register DEX exchange: {e}"))?;
data_client
.sync_exchange_pools(&dex_type, 0, None, reset)
.await
.map_err(|e| anyhow::anyhow!("Failed to sync pools: {e}"))?;
Ok(())
}
pub async fn run_sync_blocks(
chain: String,
from_block: Option<u64>,
to_block: Option<u64>,
database: DatabaseConfig,
) -> anyhow::Result<()> {
let chain = Chain::from_chain_name(&chain)
.ok_or_else(|| anyhow::anyhow!("Invalid chain name: {chain}"))?;
let chain = Arc::new(chain.to_owned());
let from_block = from_block.unwrap_or(0);
let postgres_connect_options = get_postgres_connect_options(
database.host,
database.port,
database.username,
database.password,
database.database,
);
let config = BlockchainDataClientConfig::new(
chain.clone(),
vec![],
String::new(), None,
None,
None,
true,
None,
None,
Some(postgres_connect_options),
);
let cancellation_token = tokio_util::sync::CancellationToken::new();
let mut data_client = BlockchainDataClientCore::new(config, None, None, cancellation_token);
data_client.initialize_cache_database().await;
data_client.cache.initialize_chain().await;
data_client
.sync_blocks_checked(from_block, to_block)
.await
.map_err(|e| anyhow::anyhow!("Failed to sync blocks: {e}"))?;
Ok(())
}