use alloy_chains::NamedChain;
use alloy_primitives::{Address, BlockNumber, B256};
use alloy_provider::Provider;
use serde::Serialize;
use std::sync::Mutex;
use tracing::{error, info, warn};
use crate::config::SemioscanConfig;
use crate::errors::PriceCalculationError;
use crate::price::aggregator::PriceAggregator;
use crate::price::cache::PriceCache;
use crate::price::decimals::{TokenDecimalsCache, TokenMetadataProvider};
use crate::price::extractor::extract_swaps;
use crate::price::normalize::{involves_pair, normalize_against_pair, normalize_swap};
use crate::price::scanner::SwapLogScanner;
use crate::price::{PriceSource, SwapData};
use crate::{NormalizedAmount, TokenDecimals, TokenPrice, TransactionCount, UsdValue};
#[derive(Debug, Clone, Serialize)]
pub struct TokenPriceResult {
pub token_address: Address,
pub total_token_amount: NormalizedAmount,
pub total_usdc_amount: UsdValue,
pub transaction_count: TransactionCount,
}
impl Default for TokenPriceResult {
fn default() -> Self {
Self {
token_address: Address::ZERO,
total_token_amount: NormalizedAmount::ZERO,
total_usdc_amount: UsdValue::ZERO,
transaction_count: TransactionCount::ZERO,
}
}
}
impl TokenPriceResult {
pub fn new(token_address: Address) -> Self {
Self {
token_address,
total_token_amount: NormalizedAmount::ZERO,
total_usdc_amount: UsdValue::ZERO,
transaction_count: TransactionCount::ZERO,
}
}
pub(crate) fn add_swap(&mut self, token_amount: f64, usdc_amount: f64) {
self.total_token_amount += NormalizedAmount::new(token_amount);
self.total_usdc_amount += UsdValue::new(usdc_amount);
self.transaction_count += TransactionCount::new(1);
}
pub fn get_average_price(&self) -> TokenPrice {
if self.total_token_amount.is_zero() {
return TokenPrice::ZERO;
}
TokenPrice::new(self.total_usdc_amount.as_f64() / self.total_token_amount.as_f64())
}
pub fn merge(&mut self, other: &Self) {
self.total_token_amount += other.total_token_amount;
self.total_usdc_amount += other.total_usdc_amount;
self.transaction_count += other.transaction_count;
}
pub fn total_token_amount(&self) -> NormalizedAmount {
self.total_token_amount
}
pub fn total_usdc_amount(&self) -> UsdValue {
self.total_usdc_amount
}
pub fn transaction_count(&self) -> TransactionCount {
self.transaction_count
}
}
#[derive(Debug, Clone, Serialize)]
pub struct RawSwapResult {
pub swap: SwapData,
pub normalized_token_in_amount: NormalizedAmount,
pub normalized_token_out_amount: NormalizedAmount,
pub token_in_decimals: TokenDecimals,
pub token_out_decimals: TokenDecimals,
}
impl RawSwapResult {
pub fn tx_hash(&self) -> Option<B256> {
self.swap.tx_hash
}
pub fn block_number(&self) -> Option<BlockNumber> {
self.swap.block_number
}
pub fn sender(&self) -> Option<Address> {
self.swap.sender
}
}
struct GapScan {
result: TokenPriceResult,
degraded: bool,
}
pub struct PriceCalculator<P> {
provider: P,
price_source: Box<dyn PriceSource>,
usdc_address: Address,
chain: NamedChain,
decimals_cache: TokenDecimalsCache,
price_cache: Mutex<PriceCache>,
config: SemioscanConfig,
}
impl<P: Provider + Clone> PriceCalculator<P> {
pub fn new(
provider: P,
chain: NamedChain,
usdc_address: Address,
price_source: Box<dyn PriceSource>,
) -> Self {
Self::with_config(
provider,
chain,
usdc_address,
price_source,
crate::SemioscanConfig::default(),
)
}
pub fn with_config(
provider: P,
chain: NamedChain,
usdc_address: Address,
price_source: Box<dyn PriceSource>,
config: crate::SemioscanConfig,
) -> Self {
Self {
provider,
price_source,
usdc_address,
chain,
decimals_cache: TokenDecimalsCache::new(),
price_cache: Mutex::new(PriceCache::default()),
config,
}
}
async fn process_gap_for_price(
&mut self,
token_address: Address,
gap_start: BlockNumber,
gap_end: BlockNumber,
) -> Result<GapScan, PriceCalculationError> {
let scanner = SwapLogScanner::new(
&self.provider,
self.chain,
self.price_source.as_ref(),
self.config.clone(),
);
let logs = scanner.scan(gap_start, gap_end).await?;
info!(
logs_count = logs.len(),
gap_start = gap_start,
gap_end = gap_end,
"Fetched logs for gap"
);
let extracted = extract_swaps(self.price_source.as_ref(), &logs);
let relevant: Vec<&SwapData> = extracted
.swaps
.iter()
.filter(|swap| involves_pair(swap, token_address, self.usdc_address))
.collect();
let mut aggregator = PriceAggregator::new(token_address);
if relevant.is_empty() {
return Ok(GapScan {
result: aggregator.finish(),
degraded: false,
});
}
let metadata = TokenMetadataProvider::new(&self.provider);
metadata
.ensure_decimals(
&mut self.decimals_cache,
&[token_address, self.usdc_address],
)
.await;
let mut degraded = false;
for swap in relevant {
let target_decimals = match metadata
.get_or_fetch(&mut self.decimals_cache, token_address)
.await
{
Ok(d) => d,
Err(e) => {
error!(error = ?e, "Error processing swap data");
degraded = true;
continue;
}
};
let usdc_decimals = match metadata
.get_or_fetch(&mut self.decimals_cache, self.usdc_address)
.await
{
Ok(d) => d,
Err(e) => {
error!(error = ?e, "Error processing swap data");
degraded = true;
continue;
}
};
if let Some(amounts) = normalize_against_pair(
swap,
token_address,
self.usdc_address,
target_decimals,
usdc_decimals,
) {
aggregator.add(&amounts);
}
}
Ok(GapScan {
result: aggregator.finish(),
degraded,
})
}
pub async fn calculate_price_between_blocks(
&mut self,
token_address: Address,
start_block: BlockNumber,
end_block: BlockNumber,
) -> Result<TokenPriceResult, PriceCalculationError> {
info!(
token_address = ?token_address,
start_block = start_block,
end_block = end_block,
"Starting price calculation"
);
let (cached_result, gaps) = {
let cache = self.price_cache.lock().expect(
"Price cache mutex poisoned - indicates a panic occurred while holding the lock",
);
cache.calculate_gaps(token_address, start_block, end_block)
};
if let Some(result) = cached_result.clone() {
if gaps.is_empty() {
info!(
token_address = ?token_address,
"Using complete cached result for block range"
);
return Ok(result);
}
}
let mut price_data = cached_result.unwrap_or_else(|| TokenPriceResult::new(token_address));
let mut any_degraded = false;
for gap in gaps {
info!(
token_address = ?token_address,
gap_start = gap.start,
gap_end = gap.end,
"Processing uncached block range"
);
let GapScan {
result: gap_result,
degraded,
} = self
.process_gap_for_price(token_address, gap.start, gap.end)
.await?;
if degraded {
any_degraded = true;
warn!(
token_address = ?token_address,
gap_start = gap.start,
gap_end = gap.end,
"Gap scan degraded by a token-decimals fetch failure; not caching so the range is rescanned on a later call"
);
} else {
let mut cache = self.price_cache.lock()
.expect("Price cache mutex poisoned - indicates a panic occurred while holding the lock");
cache.insert(token_address, gap.start, gap.end, gap_result.clone());
}
price_data.merge(&gap_result);
}
if any_degraded {
warn!(
token_address = ?token_address,
start_block = start_block,
end_block = end_block,
"Range scan degraded by a token-decimals fetch failure; skipping full-range cache write-back so a later call rescans the affected gap"
);
} else {
let mut cache = self.price_cache.lock().expect(
"Price cache mutex poisoned - indicates a panic occurred while holding the lock",
);
cache.insert(token_address, start_block, end_block, price_data.clone());
}
info!(
token_address = ?token_address,
total_token_amount = price_data.total_token_amount.as_f64(),
total_usdc_amount = price_data.total_usdc_amount.as_f64(),
transaction_count = price_data.transaction_count.as_usize(),
"Finished price calculation"
);
Ok(price_data)
}
pub async fn extract_raw_swaps(
&mut self,
start_block: BlockNumber,
end_block: BlockNumber,
) -> Result<Vec<RawSwapResult>, PriceCalculationError> {
info!(
start_block = start_block,
end_block = end_block,
"Extracting raw swaps"
);
let scanner = SwapLogScanner::new(
&self.provider,
self.chain,
self.price_source.as_ref(),
self.config.clone(),
);
let logs = scanner.scan(start_block, end_block).await?;
info!(
logs_count = logs.len(),
start_block = start_block,
end_block = end_block,
"Fetched logs for raw swap extraction"
);
let extracted = extract_swaps(self.price_source.as_ref(), &logs);
let metadata = TokenMetadataProvider::new(&self.provider);
metadata
.ensure_decimals(
&mut self.decimals_cache,
&extracted.unique_token_addresses(),
)
.await;
let mut results = Vec::with_capacity(extracted.swaps.len());
for swap in extracted.swaps {
let token_in_decimals = match metadata
.get_or_fetch(&mut self.decimals_cache, swap.token_in)
.await
{
Ok(d) => d,
Err(e) => {
warn!(token = ?swap.token_in, error = ?e, "Failed to get decimals for token_in, skipping swap");
continue;
}
};
let token_out_decimals = match metadata
.get_or_fetch(&mut self.decimals_cache, swap.token_out)
.await
{
Ok(d) => d,
Err(e) => {
warn!(token = ?swap.token_out, error = ?e, "Failed to get decimals for token_out, skipping swap");
continue;
}
};
let normalized = normalize_swap(&swap, token_in_decimals, token_out_decimals);
results.push(RawSwapResult {
swap,
normalized_token_in_amount: normalized.token_in_amount,
normalized_token_out_amount: normalized.token_out_amount,
token_in_decimals: normalized.token_in_decimals,
token_out_decimals: normalized.token_out_decimals,
});
}
info!(swap_count = results.len(), "Finished raw swap extraction");
Ok(results)
}
}
#[cfg(test)]
mod tests {
use super::*;
use alloy_primitives::address;
#[test]
fn test_add_swap_accumulates_amounts() {
let token = address!("1111111111111111111111111111111111111111");
let mut result = TokenPriceResult::new(token);
result.add_swap(100.0, 200.0);
assert_eq!(result.total_token_amount().as_f64(), 100.0);
assert_eq!(result.total_usdc_amount().as_f64(), 200.0);
assert_eq!(result.transaction_count().as_usize(), 1);
result.add_swap(50.0, 75.0);
assert_eq!(result.total_token_amount().as_f64(), 150.0);
assert_eq!(result.total_usdc_amount().as_f64(), 275.0);
assert_eq!(result.transaction_count().as_usize(), 2);
}
#[test]
fn test_get_average_price_normal_case() {
let token = address!("1111111111111111111111111111111111111111");
let mut result = TokenPriceResult::new(token);
result.add_swap(100.0, 200.0);
result.add_swap(50.0, 150.0);
let avg_price = result.get_average_price();
assert!((avg_price.as_f64() - 2.333333).abs() < 0.0001);
}
#[test]
fn test_get_average_price_zero_volume() {
let token = address!("1111111111111111111111111111111111111111");
let result = TokenPriceResult::new(token);
assert_eq!(result.get_average_price(), TokenPrice::ZERO);
}
#[test]
fn test_get_average_price_zero_token_amount_after_swaps() {
let token = address!("1111111111111111111111111111111111111111");
let mut result = TokenPriceResult::new(token);
result.add_swap(0.0, 100.0);
assert_eq!(result.get_average_price(), TokenPrice::ZERO);
}
#[test]
fn test_merge_combines_results() {
let token = address!("1111111111111111111111111111111111111111");
let mut result1 = TokenPriceResult::new(token);
result1.add_swap(100.0, 200.0);
result1.add_swap(50.0, 100.0);
let mut result2 = TokenPriceResult::new(token);
result2.add_swap(25.0, 50.0);
result1.merge(&result2);
assert_eq!(result1.total_token_amount().as_f64(), 175.0);
assert_eq!(result1.total_usdc_amount().as_f64(), 350.0);
assert_eq!(result1.transaction_count().as_usize(), 3);
}
#[test]
fn test_merge_with_empty_result() {
let token = address!("1111111111111111111111111111111111111111");
let mut result = TokenPriceResult::new(token);
result.add_swap(100.0, 200.0);
let empty = TokenPriceResult::new(token);
result.merge(&empty);
assert_eq!(result.total_token_amount().as_f64(), 100.0);
assert_eq!(result.total_usdc_amount().as_f64(), 200.0);
assert_eq!(result.transaction_count().as_usize(), 1);
}
#[test]
fn test_merge_two_empty_results() {
let token = address!("1111111111111111111111111111111111111111");
let mut result1 = TokenPriceResult::new(token);
let result2 = TokenPriceResult::new(token);
result1.merge(&result2);
assert_eq!(result1.total_token_amount().as_f64(), 0.0);
assert_eq!(result1.total_usdc_amount().as_f64(), 0.0);
assert_eq!(result1.transaction_count().as_usize(), 0);
assert_eq!(result1.get_average_price(), TokenPrice::ZERO);
}
#[test]
fn test_large_amounts() {
let token = address!("1111111111111111111111111111111111111111");
let mut result = TokenPriceResult::new(token);
result.add_swap(1_000_000_000.0, 2_000_000_000.0);
result.add_swap(500_000_000.0, 1_000_000_000.0);
assert_eq!(result.total_token_amount().as_f64(), 1_500_000_000.0);
assert_eq!(result.total_usdc_amount().as_f64(), 3_000_000_000.0);
assert_eq!(result.get_average_price().as_f64(), 2.0);
}
#[test]
fn test_fractional_amounts() {
let token = address!("1111111111111111111111111111111111111111");
let mut result = TokenPriceResult::new(token);
result.add_swap(0.001, 0.002);
result.add_swap(0.0005, 0.001);
assert!((result.total_token_amount().as_f64() - 0.0015).abs() < 1e-10);
assert!((result.total_usdc_amount().as_f64() - 0.003).abs() < 1e-10);
assert!((result.get_average_price().as_f64() - 2.0).abs() < 1e-6);
}
#[test]
fn test_average_price_calculation() {
let token = address!("1111111111111111111111111111111111111111");
let result = TokenPriceResult {
token_address: token,
total_token_amount: NormalizedAmount::new(100.0),
total_usdc_amount: UsdValue::new(200.0),
transaction_count: TransactionCount::new(5),
};
assert_eq!(result.get_average_price().as_f64(), 2.0);
}
#[test]
fn test_average_price_fractional() {
let token = address!("1111111111111111111111111111111111111111");
let result = TokenPriceResult {
token_address: token,
total_token_amount: NormalizedAmount::new(333.33),
total_usdc_amount: UsdValue::new(999.99),
transaction_count: TransactionCount::new(10),
};
let price = result.get_average_price();
assert!(
(price.as_f64() - 3.0).abs() < 0.01,
"Expected ~3.0, got {price}",
price = price.as_f64()
);
}
#[test]
fn test_price_result_multiple_merges() {
let token = address!("1111111111111111111111111111111111111111");
let mut total = TokenPriceResult::new(token);
let r1 = TokenPriceResult {
token_address: token,
total_token_amount: NormalizedAmount::new(10.0),
total_usdc_amount: UsdValue::new(20.0),
transaction_count: TransactionCount::new(1),
};
let r2 = TokenPriceResult {
token_address: token,
total_token_amount: NormalizedAmount::new(20.0),
total_usdc_amount: UsdValue::new(40.0),
transaction_count: TransactionCount::new(2),
};
let r3 = TokenPriceResult {
token_address: token,
total_token_amount: NormalizedAmount::new(30.0),
total_usdc_amount: UsdValue::new(60.0),
transaction_count: TransactionCount::new(3),
};
total.merge(&r1);
total.merge(&r2);
total.merge(&r3);
assert_eq!(total.total_token_amount().as_f64(), 60.0);
assert_eq!(total.total_usdc_amount().as_f64(), 120.0);
assert_eq!(total.transaction_count().as_usize(), 6);
assert_eq!(total.get_average_price().as_f64(), 2.0);
}
#[test]
fn test_price_calculation_high_precision() {
let token = address!("1111111111111111111111111111111111111111");
let result = TokenPriceResult {
token_address: token,
total_token_amount: NormalizedAmount::new(0.000001),
total_usdc_amount: UsdValue::new(0.00000123),
transaction_count: TransactionCount::new(1),
};
let price = result.get_average_price();
assert!(
(price.as_f64() - 1.23).abs() < 0.001,
"Expected ~1.23, got {price}",
price = price.as_f64()
);
}
}