use crate::nft::cache::{CacheManager, CacheKey};
use crate::nft::errors::{NftError, NftResult};
use crate::nft::types::*;
use crate::rpc::ConnectionPool;
use async_trait::async_trait;
use dashmap::DashMap;
use rayon::prelude::*;
use serde::{Deserialize, Serialize};
use serde_json::Value;
use std::collections::HashMap;
use std::sync::Arc;
use std::time::{Duration, Instant};
use tokio::sync::Semaphore;
use tracing::{debug, error, info, warn};
#[derive(Clone)]
pub struct ValuationEngine {
connection_pool: Arc<ConnectionPool>,
cache_manager: Arc<CacheManager>,
config: ValuationEngineConfig,
strategies: Vec<Arc<dyn ValuationStrategy>>,
market_data_provider: Arc<dyn MarketDataProvider>,
rate_limiter: Arc<Semaphore>,
metrics: Arc<ValuationMetrics>,
}
#[derive(Debug, Clone)]
pub struct ValuationEngineConfig {
pub max_concurrent_valuations: usize,
pub request_timeout_ms: u64,
pub max_retries: u32,
pub retry_delay_ms: u64,
pub cache_ttl_seconds: u64,
pub confidence_threshold: f64,
pub min_data_points: u32,
pub max_market_data_age_hours: u32,
pub enable_cross_validation: bool,
pub method_weights: HashMap<ValuationMethod, f64>,
pub risk_adjustment_factor: f64,
pub api_keys: HashMap<String, String>,
}
impl Default for ValuationEngineConfig {
fn default() -> Self {
let mut method_weights = HashMap::new();
method_weights.insert(ValuationMethod::FloorPrice, 0.4);
method_weights.insert(ValuationMethod::RecentSales, 0.4);
method_weights.insert(ValuationMethod::RarityBased, 0.2);
Self {
max_concurrent_valuations: 10,
request_timeout_ms: 15000,
max_retries: 3,
retry_delay_ms: 1000,
cache_ttl_seconds: 300, confidence_threshold: 0.7,
min_data_points: 3,
max_market_data_age_hours: 24,
enable_cross_validation: true,
method_weights,
risk_adjustment_factor: 0.1,
api_keys: HashMap::new(),
}
}
}
#[derive(Debug, Default)]
pub struct ValuationMetrics {
pub total_valuations: Arc<std::sync::atomic::AtomicU64>,
pub successful_valuations: Arc<std::sync::atomic::AtomicU64>,
pub failed_valuations: Arc<std::sync::atomic::AtomicU64>,
pub cache_hits: Arc<std::sync::atomic::AtomicU64>,
pub cache_misses: Arc<std::sync::atomic::AtomicU64>,
pub avg_valuation_time_ms: Arc<std::sync::atomic::AtomicU64>,
pub avg_confidence_score: Arc<std::sync::atomic::AtomicF64>,
pub valuations_by_method: Arc<DashMap<ValuationMethod, u64>>,
pub market_data_requests: Arc<std::sync::atomic::AtomicU64>,
pub cross_validation_failures: Arc<std::sync::atomic::AtomicU64>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct ValuationResult {
pub estimated_value_lamports: u64,
pub confidence: f64,
pub method: ValuationMethod,
pub strategy_results: Vec<StrategyResult>,
pub market_data: MarketData,
pub risk_adjusted_value: u64,
pub value_range: (u64, u64),
pub last_updated: chrono::DateTime<chrono::Utc>,
pub metadata: HashMap<String, serde_json::Value>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct StrategyResult {
pub strategy_name: String,
pub value: u64,
pub confidence: f64,
pub weight: f64,
pub data_points: u32,
pub processing_time_ms: u64,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct MarketData {
pub floor_price_lamports: Option<u64>,
pub recent_sales: Vec<SaleData>,
pub listings: Vec<ListingData>,
pub collection_stats: Option<CollectionStats>,
pub market_trends: MarketTrends,
pub timestamp: chrono::DateTime<chrono::Utc>,
pub sources: Vec<String>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct SaleData {
pub price_lamports: u64,
pub timestamp: chrono::DateTime<chrono::Utc>,
pub marketplace: String,
pub tx_hash: String,
pub buyer: String,
pub seller: String,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct ListingData {
pub price_lamports: u64,
pub timestamp: chrono::DateTime<chrono::Utc>,
pub marketplace: String,
pub seller: String,
pub is_auction: bool,
pub auction_end_time: Option<chrono::DateTime<chrono::Utc>>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct CollectionStats {
pub total_supply: u64,
pub num_owners: u64,
pub volume_24h_lamports: u64,
pub sales_24h: u32,
pub avg_sale_price_24h: f64,
pub market_cap_lamports: u64,
pub holders_ratio: f64,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct MarketTrends {
pub price_change_7d: f64,
pub price_change_30d: f64,
pub volume_trend: TrendDirection,
pub price_trend: TrendDirection,
pub market_sentiment: Sentiment,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub enum TrendDirection {
Increasing,
Decreasing,
Stable,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub enum Sentiment {
Bullish,
Bearish,
Neutral,
}
#[async_trait]
pub trait ValuationStrategy: Send + Sync {
fn name(&self) -> &str;
fn method(&self) -> ValuationMethod;
async fn calculate_value(&self, nft: &NftInfo, market_data: &MarketData) -> NftResult<StrategyResult>;
fn can_handle(&self, nft: &NftInfo) -> bool;
fn required_data_fields(&self) -> Vec<&'static str>;
}
#[async_trait]
pub trait MarketDataProvider: Send + Sync {
async fn get_collection_market_data(&self, collection_id: &str) -> NftResult<MarketData>;
async fn get_nft_market_data(&self, mint_address: &str) -> NftResult<MarketData>;
async fn get_recent_sales(&self, collection_id: &str, limit: u32) -> NftResult<Vec<SaleData>>;
async fn get_listings(&self, collection_id: &str, limit: u32) -> NftResult<Vec<ListingData>>;
async fn get_collection_stats(&self, collection_id: &str) -> NftResult<CollectionStats>;
fn supports_collection(&self, collection_id: &str) -> bool;
}
pub struct FloorPriceValuationStrategy {
market_data_provider: Arc<dyn MarketDataProvider>,
config: ValuationEngineConfig,
}
pub struct RecentSalesValuationStrategy {
market_data_provider: Arc<dyn MarketDataProvider>,
config: ValuationEngineConfig,
}
pub struct RarityValuationStrategy {
config: ValuationEngineConfig,
}
pub struct MlValuationStrategy {
model_config: serde_json::Value,
config: ValuationEngineConfig,
}
pub struct MockMarketDataProvider {
data_cache: Arc<DashMap<String, MarketData>>,
}
impl ValuationEngine {
pub fn new(
connection_pool: Arc<ConnectionPool>,
config: ValuationEngineConfig,
cache_manager: Arc<CacheManager>,
) -> NftResult<Self> {
let rate_limiter = Arc::new(Semaphore::new(config.max_concurrent_valuations));
let metrics = Arc::new(ValuationMetrics::default());
let market_data_provider: Arc<dyn MarketDataProvider> = Arc::new(MockMarketDataProvider::new());
let strategies: Vec<Arc<dyn ValuationStrategy>> = vec![
Arc::new(FloorPriceValuationStrategy::new(
market_data_provider.clone(),
config.clone(),
)),
Arc::new(RecentSalesValuationStrategy::new(
market_data_provider.clone(),
config.clone(),
)),
Arc::new(RarityValuationStrategy::new(config.clone())),
];
Ok(Self {
connection_pool,
cache_manager,
config,
strategies,
market_data_provider,
rate_limiter,
metrics,
})
}
pub async fn value_nft(&self, nft: &NftInfo) -> NftResult<ValuationResult> {
let start_time = Instant::now();
self.metrics.total_valuations.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
let cache_key = CacheKey::valuation(&nft.mint_address);
if let Some(cached_result) = self.cache_manager.get_valuation(&cache_key).await {
self.metrics.cache_hits.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
debug!("Cache hit for NFT valuation: {}", nft.mint_address);
return Ok(cached_result);
}
self.metrics.cache_misses.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
let _permit = self.rate_limiter.acquire().await.map_err(|e| {
NftError::ResourceExhausted {
message: format!("Failed to acquire rate limiter: {}", e),
resource_type: "rate_limiter".to_string(),
current_usage: None,
limit: Some(self.config.max_concurrent_valuations as u64),
}
})?;
let collection_id = self.get_collection_id(nft)?;
let market_data = self.market_data_provider.get_collection_market_data(&collection_id).await
.unwrap_or_else(|_| MarketData::default());
let mut strategy_results = Vec::new();
let mut total_weight = 0.0f64;
let mut weighted_value = 0.0f64;
let mut total_confidence = 0.0f64;
for strategy in &self.strategies {
if strategy.can_handle(nft) {
match strategy.calculate_value(nft, &market_data).await {
Ok(result) => {
let weight = self.config.method_weights.get(&strategy.method()).unwrap_or(&1.0);
weighted_value += result.value as f64 * weight;
total_weight += weight;
total_confidence += result.confidence * weight;
strategy_results.push(result);
self.metrics.valuations_by_method
.entry(strategy.method())
.or_insert(0)
.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
}
Err(e) => {
warn!("Strategy {} failed for NFT {}: {}", strategy.name(), nft.mint_address, e);
}
}
}
}
if strategy_results.is_empty() {
return Err(NftError::Valuation {
message: "No applicable valuation strategies".to_string(),
mint_address: Some(nft.mint_address.clone()),
method: None,
});
}
let estimated_value = if total_weight > 0.0 {
(weighted_value / total_weight) as u64
} else {
strategy_results[0].value };
let confidence = if total_weight > 0.0 {
(total_confidence / total_weight).min(1.0)
} else {
strategy_results[0].confidence
};
let values: Vec<u64> = strategy_results.iter().map(|r| r.value).collect();
let min_value = *values.iter().min().unwrap_or(&estimated_value);
let max_value = *values.iter().max().unwrap_or(&estimated_value);
let risk_adjusted_value = self.apply_risk_adjustment(estimated_value, nft, &market_data);
let valuation_result = ValuationResult {
estimated_value_lamports: estimated_value,
confidence,
method: strategy_results[0].clone().method, strategy_results,
market_data,
risk_adjusted_value,
value_range: (min_value, max_value),
last_updated: chrono::Utc::now(),
metadata: HashMap::new(),
};
if confidence >= self.config.confidence_threshold {
self.cache_manager.set_valuation(&cache_key, &valuation_result).await;
}
self.metrics.successful_valuations.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
let valuation_time_ms = start_time.elapsed().as_millis() as u64;
self.metrics.avg_valuation_time_ms.fetch_add(valuation_time_ms, std::sync::atomic::Ordering::Relaxed);
self.metrics.avg_confidence_score.fetch_add(confidence, std::sync::atomic::Ordering::Relaxed);
info!("Valued NFT {} at {} lamports with {} confidence in {}ms",
nft.mint_address, estimated_value, confidence, valuation_time_ms);
Ok(valuation_result)
}
pub async fn value_nfts(&self, nfts: &[NftInfo]) -> NftResult<Vec<ValuationResult>> {
let start_time = Instant::now();
let results: Vec<NftResult<ValuationResult>> = futures::stream::iter(nfts)
.map(|nft| async move {
self.value_nft(nft).await
})
.buffer_unordered(self.config.max_concurrent_valuations)
.collect()
.await;
let mut successful_results = Vec::new();
let mut failed_count = 0;
for result in results {
match result {
Ok(valuation) => successful_results.push(valuation),
Err(e) => {
error!("Failed to value NFT: {}", e);
failed_count += 1;
}
}
}
let total_time_ms = start_time.elapsed().as_millis() as u64;
info!(
"Batch valuation completed: {} successful, {} failed in {}ms",
successful_results.len(),
failed_count,
total_time_ms
);
Ok(successful_results)
}
fn get_collection_id(&self, nft: &NftInfo) -> NftResult<String> {
if let Some(collection) = &nft.collection {
Ok(collection.name.clone())
} else if let Some(symbol) = &nft.symbol {
Ok(symbol.clone())
} else {
Ok(nft.mint_address.clone())
}
}
fn apply_risk_adjustment(&self, base_value: u64, nft: &NftInfo, market_data: &MarketData) -> u64 {
let mut adjustment_factor = 1.0;
match nft.security_assessment.risk_level {
RiskLevel::High => adjustment_factor *= 0.8,
RiskLevel::Medium => adjustment_factor *= 0.9,
RiskLevel::Critical => adjustment_factor *= 0.5,
_ => {}
}
if !nft.metadata_verified {
adjustment_factor *= 0.95;
}
match market_data.market_trends.price_trend {
TrendDirection::Decreasing => adjustment_factor *= 0.95,
TrendDirection::Increasing => adjustment_factor *= 1.05,
TrendDirection::Stable => {}
}
adjustment_factor *= (1.0 - self.config.risk_adjustment_factor);
(base_value as f64 * adjustment_factor) as u64
}
pub fn get_metrics(&self) -> &ValuationMetrics {
&self.metrics
}
}
impl FloorPriceValuationStrategy {
pub fn new(market_data_provider: Arc<dyn MarketDataProvider>, config: ValuationEngineConfig) -> Self {
Self { market_data_provider, config }
}
}
#[async_trait]
impl ValuationStrategy for FloorPriceValuationStrategy {
fn name(&self) -> &str {
"FloorPrice"
}
fn method(&self) -> ValuationMethod {
ValuationMethod::FloorPrice
}
async fn calculate_value(&self, nft: &NftInfo, market_data: &MarketData) -> NftResult<StrategyResult> {
let start_time = Instant::now();
let base_value = market_data.floor_price_lamports.unwrap_or(0);
let rarity_multiplier = if let Some(rarity_score) = nft.rarity_score {
if rarity_score > 80.0 {
2.0 } else if rarity_score > 60.0 {
1.5 } else if rarity_score > 40.0 {
1.2 } else {
1.0 }
} else {
1.0
};
let adjusted_value = (base_value as f64 * rarity_multiplier) as u64;
let confidence = if market_data.floor_price_lamports.is_some() { 0.8 } else { 0.3 };
Ok(StrategyResult {
strategy_name: self.name().to_string(),
value: adjusted_value,
confidence,
weight: 1.0,
data_points: 1,
processing_time_ms: start_time.elapsed().as_millis() as u64,
})
}
fn can_handle(&self, _nft: &NftInfo) -> bool {
true }
fn required_data_fields(&self) -> Vec<&'static str> {
vec!["floor_price"]
}
}
impl RecentSalesValuationStrategy {
pub fn new(market_data_provider: Arc<dyn MarketDataProvider>, config: ValuationEngineConfig) -> Self {
Self { market_data_provider, config }
}
}
#[async_trait]
impl ValuationStrategy for RecentSalesValuationStrategy {
fn name(&self) -> &str {
"RecentSales"
}
fn method(&self) -> ValuationMethod {
ValuationMethod::RecentSales
}
async fn calculate_value(&self, _nft: &NftInfo, market_data: &MarketData) -> NftResult<StrategyResult> {
let start_time = Instant::now();
if market_data.recent_sales.is_empty() {
return Ok(StrategyResult {
strategy_name: self.name().to_string(),
value: 0,
confidence: 0.0,
weight: 0.0,
data_points: 0,
processing_time_ms: start_time.elapsed().as_millis() as u64,
});
}
let mut weighted_sum = 0.0f64;
let mut total_weight = 0.0f64;
for sale in &market_data.recent_sales {
let age_hours = (chrono::Utc::now() - sale.timestamp).num_hours() as f64;
let weight = 1.0 / (1.0 + age_hours / 24.0);
weighted_sum += sale.price_lamports as f64 * weight;
total_weight += weight;
}
let avg_value = if total_weight > 0.0 {
(weighted_sum / total_weight) as u64
} else {
market_data.recent_sales.iter()
.map(|s| s.price_lamports)
.sum::<u64>() / market_data.recent_sales.len() as u64
};
let confidence = if market_data.recent_sales.len() >= 3 { 0.9 } else { 0.6 };
Ok(StrategyResult {
strategy_name: self.name().to_string(),
value: avg_value,
confidence,
weight: 1.0,
data_points: market_data.recent_sales.len() as u32,
processing_time_ms: start_time.elapsed().as_millis() as u64,
})
}
fn can_handle(&self, _nft: &NftInfo) -> bool {
true }
fn required_data_fields(&self) -> Vec<&'static str> {
vec!["recent_sales"]
}
}
impl RarityValuationStrategy {
pub fn new(config: ValuationEngineConfig) -> Self {
Self { config }
}
}
#[async_trait]
impl ValuationStrategy for RarityValuationStrategy {
fn name(&self) -> &str {
"RarityBased"
}
fn method(&self) -> ValuationMethod {
ValuationMethod::RarityBased
}
async fn calculate_value(&self, nft: &NftInfo, _market_data: &MarketData) -> NftResult<StrategyResult> {
let start_time = Instant::now();
let rarity_score = nft.rarity_score.unwrap_or(50.0);
let base_value = 1_000_000;
let rarity_multiplier = match rarity_score {
score if score >= 95.0 => 10.0, score if score >= 85.0 => 5.0, score if score >= 70.0 => 3.0, score if score >= 50.0 => 2.0, score if score >= 30.0 => 1.5, _ => 1.0, };
let estimated_value = (base_value as f64 * rarity_multiplier) as u64;
let confidence = if nft.rarity_score.is_some() { 0.7 } else { 0.2 };
Ok(StrategyResult {
strategy_name: self.name().to_string(),
value: estimated_value,
confidence,
weight: 0.5, data_points: 1,
processing_time_ms: start_time.elapsed().as_millis() as u64,
})
}
fn can_handle(&self, nft: &NftInfo) -> bool {
nft.rarity_score.is_some()
}
fn required_data_fields(&self) -> Vec<&'static str> {
vec!["rarity_score"]
}
}
impl MlValuationStrategy {
pub fn new(model_config: serde_json::Value, config: ValuationEngineConfig) -> Self {
Self { model_config, config }
}
}
#[async_trait]
impl ValuationStrategy for MlValuationStrategy {
fn name(&self) -> &str {
"MlModel"
}
fn method(&self) -> ValuationMethod {
ValuationMethod::MlModel
}
async fn calculate_value(&self, _nft: &NftInfo, _market_data: &MarketData) -> NftResult<StrategyResult> {
let start_time = Instant::now();
Ok(StrategyResult {
strategy_name: self.name().to_string(),
value: 0, confidence: 0.0,
weight: 0.0,
data_points: 0,
processing_time_ms: start_time.elapsed().as_millis() as u64,
})
}
fn can_handle(&self, _nft: &NftInfo) -> bool {
false }
fn required_data_fields(&self) -> Vec<&'static str> {
vec!["all"]
}
}
impl MockMarketDataProvider {
pub fn new() -> Self {
Self {
data_cache: Arc::new(DashMap::new()),
}
}
}
#[async_trait]
impl MarketDataProvider for MockMarketDataProvider {
async fn get_collection_market_data(&self, collection_id: &str) -> NftResult<MarketData> {
if let Some(data) = self.data_cache.get(collection_id) {
return Ok(data.clone());
}
let mock_data = MarketData {
floor_price_lamports: Some(5_000_000), recent_sales: vec![
SaleData {
price_lamports: 6_000_000,
timestamp: chrono::Utc::now() - chrono::Duration::hours(2),
marketplace: "Magic Eden".to_string(),
tx_hash: "mock_tx_hash".to_string(),
buyer: "mock_buyer".to_string(),
seller: "mock_seller".to_string(),
},
SaleData {
price_lamports: 5_500_000,
timestamp: chrono::Utc::now() - chrono::Duration::hours(6),
marketplace: "Magic Eden".to_string(),
tx_hash: "mock_tx_hash_2".to_string(),
buyer: "mock_buyer_2".to_string(),
seller: "mock_seller_2".to_string(),
},
],
listings: vec![
ListingData {
price_lamports: 7_000_000,
timestamp: chrono::Utc::now() - chrono::Duration::minutes(30),
marketplace: "Magic Eden".to_string(),
seller: "mock_seller_3".to_string(),
is_auction: false,
auction_end_time: None,
},
],
collection_stats: Some(CollectionStats {
total_supply: 10000,
num_owners: 3500,
volume_24h_lamports: 150_000_000,
sales_24h: 25,
avg_sale_price_24h: 6_000_000.0,
market_cap_lamports: 50_000_000_000,
holders_ratio: 0.35,
}),
market_trends: MarketTrends {
price_change_7d: 0.15,
price_change_30d: -0.05,
volume_trend: TrendDirection::Increasing,
price_trend: TrendDirection::Stable,
market_sentiment: Sentiment::Neutral,
},
timestamp: chrono::Utc::now(),
sources: vec!["MockProvider".to_string()],
};
self.data_cache.insert(collection_id.to_string(), mock_data.clone());
Ok(mock_data)
}
async fn get_nft_market_data(&self, mint_address: &str) -> NftResult<MarketData> {
self.get_collection_market_data(mint_address).await
}
async fn get_recent_sales(&self, collection_id: &str, _limit: u32) -> NftResult<Vec<SaleData>> {
let market_data = self.get_collection_market_data(collection_id).await?;
Ok(market_data.recent_sales)
}
async fn get_listings(&self, collection_id: &str, _limit: u32) -> NftResult<Vec<ListingData>> {
let market_data = self.get_collection_market_data(collection_id).await?;
Ok(market_data.listings)
}
async fn get_collection_stats(&self, collection_id: &str) -> NftResult<CollectionStats> {
let market_data = self.get_collection_market_data(collection_id).await?;
market_data.collection_stats.ok_or_else(|| NftError::Valuation {
message: "Collection statistics not available".to_string(),
mint_address: None,
method: Some(ValuationMethod::RecentSales.to_string()),
})
}
fn supports_collection(&self, _collection_id: &str) -> bool {
true }
}
impl Default for MarketData {
fn default() -> Self {
Self {
floor_price_lamports: None,
recent_sales: vec![],
listings: vec![],
collection_stats: None,
market_trends: MarketTrends {
price_change_7d: 0.0,
price_change_30d: 0.0,
volume_trend: TrendDirection::Stable,
price_trend: TrendDirection::Stable,
market_sentiment: Sentiment::Neutral,
},
timestamp: chrono::Utc::now(),
sources: vec![],
}
}
}