use alloy_chains::NamedChain;
use alloy_erc20::LazyToken;
use alloy_primitives::{Address, BlockNumber, B256, U256};
use alloy_provider::Provider;
use alloy_rpc_types::Filter;
use futures::future::join_all;
use serde::Serialize;
use std::collections::{HashMap, HashSet};
use std::sync::Mutex;
use tracing::{error, info, warn};
use crate::config::SemioscanConfig;
use crate::errors::PriceCalculationError;
use crate::events::scanner::EventScanner;
use crate::price::cache::PriceCache;
use crate::price::{PriceSource, PriceSourceError, SwapData};
use crate::{NormalizedAmount, TokenAmount, TokenDecimals, TokenPrice, TransactionCount, UsdValue};
struct SwapAmounts {
token_amount: NormalizedAmount,
usdc_amount: UsdValue,
}
#[derive(Debug, Clone, Serialize)]
pub struct TokenPriceResult {
pub token_address: Address,
pub total_token_amount: NormalizedAmount,
pub total_usdc_amount: UsdValue,
pub transaction_count: TransactionCount,
}
impl Default for TokenPriceResult {
fn default() -> Self {
Self {
token_address: Address::ZERO,
total_token_amount: NormalizedAmount::ZERO,
total_usdc_amount: UsdValue::ZERO,
transaction_count: TransactionCount::ZERO,
}
}
}
impl TokenPriceResult {
pub fn new(token_address: Address) -> Self {
Self {
token_address,
total_token_amount: NormalizedAmount::ZERO,
total_usdc_amount: UsdValue::ZERO,
transaction_count: TransactionCount::ZERO,
}
}
fn add_swap(&mut self, token_amount: f64, usdc_amount: f64) {
self.total_token_amount += NormalizedAmount::new(token_amount);
self.total_usdc_amount += UsdValue::new(usdc_amount);
self.transaction_count += TransactionCount::new(1);
}
pub fn get_average_price(&self) -> TokenPrice {
if self.total_token_amount.is_zero() {
return TokenPrice::ZERO;
}
TokenPrice::new(self.total_usdc_amount.as_f64() / self.total_token_amount.as_f64())
}
pub fn merge(&mut self, other: &Self) {
self.total_token_amount += other.total_token_amount;
self.total_usdc_amount += other.total_usdc_amount;
self.transaction_count += other.transaction_count;
}
pub fn total_token_amount(&self) -> NormalizedAmount {
self.total_token_amount
}
pub fn total_usdc_amount(&self) -> UsdValue {
self.total_usdc_amount
}
pub fn transaction_count(&self) -> TransactionCount {
self.transaction_count
}
}
#[derive(Debug, Clone, Serialize)]
pub struct RawSwapResult {
pub swap: SwapData,
pub normalized_token_in_amount: NormalizedAmount,
pub normalized_token_out_amount: NormalizedAmount,
pub token_in_decimals: TokenDecimals,
pub token_out_decimals: TokenDecimals,
}
impl RawSwapResult {
pub fn tx_hash(&self) -> Option<B256> {
self.swap.tx_hash
}
pub fn block_number(&self) -> Option<BlockNumber> {
self.swap.block_number
}
pub fn sender(&self) -> Option<Address> {
self.swap.sender
}
}
pub struct PriceCalculator<P> {
provider: P,
price_source: Box<dyn PriceSource>,
usdc_address: Address,
chain: NamedChain,
token_decimals_cache: HashMap<Address, TokenDecimals>,
price_cache: Mutex<PriceCache>,
config: SemioscanConfig,
}
impl<P: Provider + Clone> PriceCalculator<P> {
pub fn new(
provider: P,
chain: NamedChain,
usdc_address: Address,
price_source: Box<dyn PriceSource>,
) -> Self {
Self::with_config(
provider,
chain,
usdc_address,
price_source,
crate::SemioscanConfig::default(),
)
}
pub fn with_config(
provider: P,
chain: NamedChain,
usdc_address: Address,
price_source: Box<dyn PriceSource>,
config: crate::SemioscanConfig,
) -> Self {
Self {
provider,
price_source,
usdc_address,
chain,
token_decimals_cache: HashMap::new(),
price_cache: Default::default(),
config,
}
}
async fn get_token_decimals(
&mut self,
token_address: Address,
) -> Result<TokenDecimals, PriceCalculationError> {
if let Some(&decimals) = self.token_decimals_cache.get(&token_address) {
return Ok(decimals);
}
let token_contract = LazyToken::new(token_address, self.provider.clone());
let decimals_raw = token_contract
.decimals()
.await
.map_err(|e| PriceCalculationError::metadata_fetch_failed(token_address, e))?;
let decimals = TokenDecimals::new(*decimals_raw);
self.token_decimals_cache.insert(token_address, decimals);
Ok(decimals)
}
async fn batch_fetch_token_decimals(&mut self, token_addresses: &[Address]) {
let uncached: Vec<Address> = token_addresses
.iter()
.filter(|addr| !self.token_decimals_cache.contains_key(*addr))
.copied()
.collect();
if uncached.is_empty() {
return;
}
info!(
count = uncached.len(),
"Batch fetching token decimals for uncached tokens"
);
let fetch_futures: Vec<_> = uncached
.iter()
.map(|&addr| {
let provider = self.provider.clone();
async move {
let token_contract = LazyToken::new(addr, provider);
let result = token_contract.decimals().await.copied();
(addr, result)
}
})
.collect();
let results = join_all(fetch_futures).await;
for (addr, result) in results {
match result {
Ok(decimals_raw) => {
let decimals = TokenDecimals::new(decimals_raw);
self.token_decimals_cache.insert(addr, decimals);
}
Err(e) => {
warn!(
token = ?addr,
error = ?e,
"Failed to fetch decimals for token, will retry on demand"
);
}
}
}
}
fn normalize_amount(&self, amount: U256, decimals: TokenDecimals) -> NormalizedAmount {
TokenAmount::new(amount).normalize(decimals)
}
async fn process_gap_for_price(
&mut self,
token_address: Address,
gap_start: BlockNumber,
gap_end: BlockNumber,
) -> Result<TokenPriceResult, PriceCalculationError> {
let mut gap_result = TokenPriceResult::new(token_address);
let event_topics = self.price_source.event_topics();
let scanner = EventScanner::new(&self.provider, self.config.clone());
let filter = Filter::new()
.address(self.price_source.router_address())
.event_signature(event_topics.clone());
let logs = scanner
.scan(self.chain, filter, gap_start, gap_end)
.await
.map_err(|e| {
PriceCalculationError::processing_failed(format!(
"Failed to scan swap events from {gap_start} to {gap_end}: {e}"
))
})?;
info!(
logs_count = logs.len(),
gap_start = gap_start,
gap_end = gap_end,
"Fetched logs for gap"
);
let mut swaps = Vec::new();
let mut token_addresses = HashSet::new();
for log in &logs {
match self.price_source.extract_swap_from_log(log) {
Ok(Some(swap_data)) => {
if !self.price_source.should_include_swap(&swap_data) {
continue;
}
let is_relevant = (swap_data.token_in == token_address
&& swap_data.token_out == self.usdc_address)
|| (swap_data.token_in == self.usdc_address
&& swap_data.token_out == token_address);
if is_relevant {
token_addresses.insert(swap_data.token_in);
token_addresses.insert(swap_data.token_out);
swaps.push(swap_data);
}
}
Ok(None) => {
}
Err(e @ PriceSourceError::DecodeError(_)) => {
error!(error = ?e, "Failed to decode log");
}
Err(
e @ (PriceSourceError::EmptyTokenArrays
| PriceSourceError::ArrayLengthMismatch { .. }
| PriceSourceError::InvalidSwapData { .. }),
) => {
error!(error = ?e, "Invalid swap data in log");
}
}
}
let addresses: Vec<Address> = token_addresses.into_iter().collect();
self.batch_fetch_token_decimals(&addresses).await;
for swap_data in swaps {
match self.process_swap_data(&swap_data, token_address).await {
Ok(Some(amounts)) => {
gap_result
.add_swap(amounts.token_amount.as_f64(), amounts.usdc_amount.as_f64());
}
Ok(None) => {
}
Err(e) => {
error!(error = ?e, "Error processing swap data");
}
}
}
Ok(gap_result)
}
async fn process_swap_data(
&mut self,
swap: &crate::price::SwapData,
token_address: Address,
) -> Result<Option<SwapAmounts>, PriceCalculationError> {
if swap.token_in == token_address && swap.token_out == self.usdc_address {
let token_decimals = self.get_token_decimals(token_address).await?;
let usdc_decimals = self.get_token_decimals(self.usdc_address).await?;
let token_amount = self.normalize_amount(swap.token_in_amount, token_decimals);
let usdc_amount = self.normalize_amount(swap.token_out_amount, usdc_decimals);
return Ok(Some(SwapAmounts {
token_amount,
usdc_amount: UsdValue::new(usdc_amount.as_f64()),
}));
}
if swap.token_in == self.usdc_address && swap.token_out == token_address {
let token_decimals = self.get_token_decimals(token_address).await?;
let usdc_decimals = self.get_token_decimals(self.usdc_address).await?;
let token_amount = self.normalize_amount(swap.token_out_amount, token_decimals);
let usdc_amount = self.normalize_amount(swap.token_in_amount, usdc_decimals);
return Ok(Some(SwapAmounts {
token_amount,
usdc_amount: UsdValue::new(usdc_amount.as_f64()),
}));
}
Ok(None)
}
pub async fn calculate_price_between_blocks(
&mut self,
token_address: Address,
start_block: BlockNumber,
end_block: BlockNumber,
) -> Result<TokenPriceResult, PriceCalculationError> {
info!(
token_address = ?token_address,
start_block = start_block,
end_block = end_block,
"Starting price calculation"
);
let (cached_result, gaps) = {
let cache = self.price_cache.lock().expect(
"Price cache mutex poisoned - indicates a panic occurred while holding the lock",
);
cache.calculate_gaps(token_address, start_block, end_block)
};
if let Some(result) = cached_result.clone() {
if gaps.is_empty() {
info!(
token_address = ?token_address,
"Using complete cached result for block range"
);
return Ok(result);
}
}
let mut price_data = cached_result.unwrap_or_else(|| TokenPriceResult::new(token_address));
for gap in gaps {
info!(
token_address = ?token_address,
gap_start = gap.start,
gap_end = gap.end,
"Processing uncached block range"
);
let gap_result = self
.process_gap_for_price(token_address, gap.start, gap.end)
.await?;
{
let mut cache = self.price_cache.lock()
.expect("Price cache mutex poisoned - indicates a panic occurred while holding the lock");
cache.insert(token_address, gap.start, gap.end, gap_result.clone());
}
price_data.merge(&gap_result);
}
{
let mut cache = self.price_cache.lock().expect(
"Price cache mutex poisoned - indicates a panic occurred while holding the lock",
);
cache.insert(token_address, start_block, end_block, price_data.clone());
}
info!(
token_address = ?token_address,
total_token_amount = price_data.total_token_amount.as_f64(),
total_usdc_amount = price_data.total_usdc_amount.as_f64(),
transaction_count = price_data.transaction_count.as_usize(),
"Finished price calculation"
);
Ok(price_data)
}
pub async fn extract_raw_swaps(
&mut self,
start_block: BlockNumber,
end_block: BlockNumber,
) -> Result<Vec<RawSwapResult>, PriceCalculationError> {
info!(
start_block = start_block,
end_block = end_block,
"Extracting raw swaps"
);
let event_topics = self.price_source.event_topics();
let scanner = EventScanner::new(&self.provider, self.config.clone());
let filter = Filter::new()
.address(self.price_source.router_address())
.event_signature(event_topics.clone());
let logs = scanner
.scan(self.chain, filter, start_block, end_block)
.await
.map_err(|e| {
PriceCalculationError::processing_failed(format!(
"Failed to scan swap events from {start_block} to {end_block}: {e}"
))
})?;
info!(
logs_count = logs.len(),
start_block = start_block,
end_block = end_block,
"Fetched logs for raw swap extraction"
);
let mut swaps = Vec::new();
let mut token_addresses = HashSet::new();
for log in &logs {
match self.price_source.extract_swap_from_log(log) {
Ok(Some(swap_data)) => {
if !self.price_source.should_include_swap(&swap_data) {
continue;
}
token_addresses.insert(swap_data.token_in);
token_addresses.insert(swap_data.token_out);
swaps.push(swap_data);
}
Ok(None) => {
}
Err(e @ PriceSourceError::DecodeError(_)) => {
error!(error = ?e, "Failed to decode log");
}
Err(
e @ (PriceSourceError::EmptyTokenArrays
| PriceSourceError::ArrayLengthMismatch { .. }
| PriceSourceError::InvalidSwapData { .. }),
) => {
error!(error = ?e, "Invalid swap data in log");
}
}
}
let addresses: Vec<Address> = token_addresses.into_iter().collect();
self.batch_fetch_token_decimals(&addresses).await;
let mut results = Vec::with_capacity(swaps.len());
for swap in swaps {
let token_in_decimals = match self.get_token_decimals(swap.token_in).await {
Ok(d) => d,
Err(e) => {
warn!(token = ?swap.token_in, error = ?e, "Failed to get decimals for token_in, skipping swap");
continue;
}
};
let token_out_decimals = match self.get_token_decimals(swap.token_out).await {
Ok(d) => d,
Err(e) => {
warn!(token = ?swap.token_out, error = ?e, "Failed to get decimals for token_out, skipping swap");
continue;
}
};
let normalized_token_in =
self.normalize_amount(swap.token_in_amount, token_in_decimals);
let normalized_token_out =
self.normalize_amount(swap.token_out_amount, token_out_decimals);
results.push(RawSwapResult {
swap,
normalized_token_in_amount: normalized_token_in,
normalized_token_out_amount: normalized_token_out,
token_in_decimals,
token_out_decimals,
});
}
info!(swap_count = results.len(), "Finished raw swap extraction");
Ok(results)
}
}
#[cfg(test)]
mod tests {
use super::*;
use alloy_primitives::address;
#[test]
fn test_add_swap_accumulates_amounts() {
let token = address!("1111111111111111111111111111111111111111");
let mut result = TokenPriceResult::new(token);
result.add_swap(100.0, 200.0);
assert_eq!(result.total_token_amount().as_f64(), 100.0);
assert_eq!(result.total_usdc_amount().as_f64(), 200.0);
assert_eq!(result.transaction_count().as_usize(), 1);
result.add_swap(50.0, 75.0);
assert_eq!(result.total_token_amount().as_f64(), 150.0);
assert_eq!(result.total_usdc_amount().as_f64(), 275.0);
assert_eq!(result.transaction_count().as_usize(), 2);
}
#[test]
fn test_get_average_price_normal_case() {
let token = address!("1111111111111111111111111111111111111111");
let mut result = TokenPriceResult::new(token);
result.add_swap(100.0, 200.0);
result.add_swap(50.0, 150.0);
let avg_price = result.get_average_price();
assert!((avg_price.as_f64() - 2.333333).abs() < 0.0001);
}
#[test]
fn test_get_average_price_zero_volume() {
let token = address!("1111111111111111111111111111111111111111");
let result = TokenPriceResult::new(token);
assert_eq!(result.get_average_price(), TokenPrice::ZERO);
}
#[test]
fn test_get_average_price_zero_token_amount_after_swaps() {
let token = address!("1111111111111111111111111111111111111111");
let mut result = TokenPriceResult::new(token);
result.add_swap(0.0, 100.0);
assert_eq!(result.get_average_price(), TokenPrice::ZERO);
}
#[test]
fn test_merge_combines_results() {
let token = address!("1111111111111111111111111111111111111111");
let mut result1 = TokenPriceResult::new(token);
result1.add_swap(100.0, 200.0);
result1.add_swap(50.0, 100.0);
let mut result2 = TokenPriceResult::new(token);
result2.add_swap(25.0, 50.0);
result1.merge(&result2);
assert_eq!(result1.total_token_amount().as_f64(), 175.0); assert_eq!(result1.total_usdc_amount().as_f64(), 350.0); assert_eq!(result1.transaction_count().as_usize(), 3);
}
#[test]
fn test_merge_with_empty_result() {
let token = address!("1111111111111111111111111111111111111111");
let mut result = TokenPriceResult::new(token);
result.add_swap(100.0, 200.0);
let empty = TokenPriceResult::new(token);
result.merge(&empty);
assert_eq!(result.total_token_amount().as_f64(), 100.0);
assert_eq!(result.total_usdc_amount().as_f64(), 200.0);
assert_eq!(result.transaction_count().as_usize(), 1);
}
#[test]
fn test_merge_two_empty_results() {
let token = address!("1111111111111111111111111111111111111111");
let mut result1 = TokenPriceResult::new(token);
let result2 = TokenPriceResult::new(token);
result1.merge(&result2);
assert_eq!(result1.total_token_amount().as_f64(), 0.0);
assert_eq!(result1.total_usdc_amount().as_f64(), 0.0);
assert_eq!(result1.transaction_count().as_usize(), 0);
assert_eq!(result1.get_average_price(), TokenPrice::ZERO);
}
#[test]
fn test_large_amounts() {
let token = address!("1111111111111111111111111111111111111111");
let mut result = TokenPriceResult::new(token);
result.add_swap(1_000_000_000.0, 2_000_000_000.0);
result.add_swap(500_000_000.0, 1_000_000_000.0);
assert_eq!(result.total_token_amount().as_f64(), 1_500_000_000.0);
assert_eq!(result.total_usdc_amount().as_f64(), 3_000_000_000.0);
assert_eq!(result.get_average_price().as_f64(), 2.0);
}
#[test]
fn test_fractional_amounts() {
let token = address!("1111111111111111111111111111111111111111");
let mut result = TokenPriceResult::new(token);
result.add_swap(0.001, 0.002);
result.add_swap(0.0005, 0.001);
assert!((result.total_token_amount().as_f64() - 0.0015).abs() < 1e-10);
assert!((result.total_usdc_amount().as_f64() - 0.003).abs() < 1e-10);
assert!((result.get_average_price().as_f64() - 2.0).abs() < 1e-6);
}
#[test]
fn test_normalize_amount_standard_decimals() {
let divisor = U256::from(10u64.pow(6));
let normalized = f64::from(U256::from(1_000_000u64)) / f64::from(divisor);
assert_eq!(normalized, 1.0);
let divisor = U256::from(10u128.pow(18));
let normalized = f64::from(U256::from(1_000_000_000_000_000_000u64)) / f64::from(divisor);
assert_eq!(normalized, 1.0);
}
#[test]
fn test_normalize_amount_edge_cases() {
let divisor = U256::from(10u128.pow(18));
let normalized = f64::from(U256::ZERO) / f64::from(divisor);
assert_eq!(normalized, 0.0);
let divisor = U256::from(10u64.pow(0)); let normalized = f64::from(U256::from(42u64)) / f64::from(divisor);
assert_eq!(normalized, 42.0);
let divisor = U256::from(10u64.pow(1));
let normalized = f64::from(U256::from(100u64)) / f64::from(divisor);
assert_eq!(normalized, 10.0);
}
#[test]
fn test_average_price_calculation() {
let token = address!("1111111111111111111111111111111111111111");
let result = TokenPriceResult {
token_address: token,
total_token_amount: NormalizedAmount::new(100.0),
total_usdc_amount: UsdValue::new(200.0),
transaction_count: TransactionCount::new(5),
};
assert_eq!(result.get_average_price().as_f64(), 2.0);
}
#[test]
fn test_average_price_fractional() {
let token = address!("1111111111111111111111111111111111111111");
let result = TokenPriceResult {
token_address: token,
total_token_amount: NormalizedAmount::new(333.33),
total_usdc_amount: UsdValue::new(999.99),
transaction_count: TransactionCount::new(10),
};
let price = result.get_average_price();
assert!(
(price.as_f64() - 3.0).abs() < 0.01,
"Expected ~3.0, got {}",
price.as_f64()
);
}
#[test]
fn test_price_result_multiple_merges() {
let token = address!("1111111111111111111111111111111111111111");
let mut total = TokenPriceResult::new(token);
let r1 = TokenPriceResult {
token_address: token,
total_token_amount: NormalizedAmount::new(10.0),
total_usdc_amount: UsdValue::new(20.0),
transaction_count: TransactionCount::new(1),
};
let r2 = TokenPriceResult {
token_address: token,
total_token_amount: NormalizedAmount::new(20.0),
total_usdc_amount: UsdValue::new(40.0),
transaction_count: TransactionCount::new(2),
};
let r3 = TokenPriceResult {
token_address: token,
total_token_amount: NormalizedAmount::new(30.0),
total_usdc_amount: UsdValue::new(60.0),
transaction_count: TransactionCount::new(3),
};
total.merge(&r1);
total.merge(&r2);
total.merge(&r3);
assert_eq!(total.total_token_amount().as_f64(), 60.0);
assert_eq!(total.total_usdc_amount().as_f64(), 120.0);
assert_eq!(total.transaction_count().as_usize(), 6);
assert_eq!(total.get_average_price().as_f64(), 2.0);
}
#[test]
fn test_price_calculation_high_precision() {
let token = address!("1111111111111111111111111111111111111111");
let result = TokenPriceResult {
token_address: token,
total_token_amount: NormalizedAmount::new(0.000001), total_usdc_amount: UsdValue::new(0.00000123), transaction_count: TransactionCount::new(1),
};
let price = result.get_average_price();
assert!(
(price.as_f64() - 1.23).abs() < 0.001,
"Expected ~1.23, got {}",
price.as_f64()
);
}
}