use crate::nft::cache::{CacheManager, CacheKey};
use crate::nft::errors::{NftError, NftResult, RecoveryStrategy, RiskLevel};
use crate::nft::metadata::MetadataFetcher;
use crate::nft::portfolio::PortfolioAnalyzer;
use crate::nft::types::*;
use crate::nft::valuation::ValuationEngine;
use futures::StreamExt;
use std::collections::HashMap;
use std::sync::Arc;
use std::time::Instant;
use tokio::sync::Semaphore;
use tracing::{error, info};
#[derive(Clone)]
pub struct BatchProcessor {
metadata_fetcher: Arc<MetadataFetcher>,
valuation_engine: Arc<ValuationEngine>,
portfolio_analyzer: Arc<PortfolioAnalyzer>,
cache_manager: Arc<CacheManager>,
config: BatchProcessorConfig,
metrics: Arc<BatchMetrics>,
resource_monitor: Arc<ResourceMonitor>,
}
#[derive(Debug, Clone)]
pub struct BatchProcessorConfig {
pub max_concurrent_batches: usize,
pub max_items_per_batch: usize,
pub batch_timeout_seconds: u64,
pub enable_adaptive_batching: bool,
pub enable_work_stealing: bool,
pub enable_progress_reporting: bool,
pub progress_report_interval_ms: u64,
pub memory_threshold_mb: u64,
pub cpu_threshold_percent: f64,
pub enable_auto_retry: bool,
pub max_retry_attempts: u32,
pub retry_delay_ms: u64,
pub enable_result_caching: bool,
pub cache_ttl_seconds: u64,
}
impl Default for BatchProcessorConfig {
fn default() -> Self {
Self {
max_concurrent_batches: 5,
max_items_per_batch: 100,
batch_timeout_seconds: 300, enable_adaptive_batching: true,
enable_work_stealing: true,
enable_progress_reporting: true,
progress_report_interval_ms: 1000,
memory_threshold_mb: 1024, cpu_threshold_percent: 80.0,
enable_auto_retry: true,
max_retry_attempts: 3,
retry_delay_ms: 1000,
enable_result_caching: true,
cache_ttl_seconds: 300,
}
}
}
#[derive(Debug, Default)]
pub struct BatchMetrics {
pub total_batches: Arc<std::sync::atomic::AtomicU64>,
pub successful_batches: Arc<std::sync::atomic::AtomicU64>,
pub failed_batches: Arc<std::sync::atomic::AtomicU64>,
pub total_items_processed: Arc<std::sync::atomic::AtomicU64>,
pub avg_batch_time_ms: Arc<std::sync::atomic::AtomicU64>,
pub avg_throughput: Arc<std::sync::atomic::AtomicF64>,
pub memory_peak_mb: Arc<std::sync::atomic::AtomicU64>,
pub cpu_peak_percent: Arc<std::sync::atomic::AtomicF64>,
pub cache_hit_rate: Arc<std::sync::atomic::AtomicF64>,
pub retry_count: Arc<std::sync::atomic::AtomicU64>,
pub adaptive_adjustments: Arc<std::sync::atomic::AtomicU64>,
}
#[derive(Clone)]
pub struct ResourceMonitor {
memory_history: Arc<std::sync::Mutex<Vec<f64>>>,
cpu_history: Arc<std::sync::Mutex<Vec<f64>>>,
max_history_length: usize,
}
#[derive(Debug, Clone)]
pub struct BatchJob {
pub id: uuid::Uuid,
pub job_type: BatchJobType,
pub items: Vec<BatchItem>,
pub config: BatchJobConfig,
pub created_at: chrono::DateTime<chrono::Utc>,
pub priority: JobPriority,
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub enum BatchJobType {
MetadataFetch,
Valuation,
PortfolioAnalysis,
SecurityValidation,
Custom { job_name: String },
}
#[derive(Debug, Clone)]
pub struct BatchJobConfig {
pub max_concurrent_items: Option<usize>,
pub timeout_seconds: Option<u64>,
pub enable_retries: Option<bool>,
pub custom_params: HashMap<String, serde_json::Value>,
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord)]
pub enum JobPriority {
Low,
Normal,
High,
Critical,
}
#[derive(Debug, Clone)]
pub struct BatchItem {
pub id: String,
pub data: BatchItemData,
pub metadata: HashMap<String, String>,
}
#[derive(Debug, Clone)]
pub enum BatchItemData {
MintAddress(String),
WalletAddress(String),
Custom { data_type: String, payload: serde_json::Value },
}
#[derive(Debug, Clone)]
pub struct BatchJobResult {
pub job_id: uuid::Uuid,
pub status: JobStatus,
pub successful_results: Vec<BatchItemResult>,
pub failed_results: Vec<BatchItemError>,
pub statistics: JobStatistics,
pub started_at: chrono::DateTime<chrono::Utc>,
pub completed_at: Option<chrono::DateTime<chrono::Utc>>,
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub enum JobStatus {
Pending,
Running,
Completed,
Failed,
Cancelled,
Timeout,
}
#[derive(Debug, Clone)]
pub struct BatchItemResult {
pub item_id: String,
pub result_data: BatchItemResultData,
pub processing_time_ms: u64,
pub success_indicators: HashMap<String, f64>,
}
#[derive(Debug, Clone)]
pub enum BatchItemResultData {
NftInfo(NftInfo),
Valuation(crate::nft::valuation::ValuationResult),
Portfolio(NftPortfolio),
SecurityAssessment(SecurityAssessment),
Custom { result_type: String, data: serde_json::Value },
}
#[derive(Debug, Clone)]
pub struct BatchItemError {
pub item_id: String,
pub error: NftError,
pub retry_count: u32,
pub processing_time_ms: u64,
}
#[derive(Debug, Clone)]
pub struct JobStatistics {
pub total_items: usize,
pub successful_items: usize,
pub failed_items: usize,
pub success_rate: f64,
pub avg_processing_time_ms: f64,
pub total_processing_time_ms: u64,
pub throughput: f64,
pub resource_usage: ResourceUsageStats,
}
#[derive(Debug, Clone)]
pub struct ResourceUsageStats {
pub peak_memory_mb: f64,
pub avg_memory_mb: f64,
pub peak_cpu_percent: f64,
pub avg_cpu_percent: f64,
pub network_requests: u64,
pub cache_hits: u64,
pub cache_misses: u64,
}
#[derive(Debug, Clone)]
pub struct ProgressReport {
pub job_id: uuid::Uuid,
pub completed_items: usize,
pub total_items: usize,
pub progress_percent: f64,
pub estimated_remaining_seconds: Option<f64>,
pub current_rate: f64,
pub timestamp: chrono::DateTime<chrono::Utc>,
}
impl BatchProcessor {
pub fn new(
metadata_fetcher: Arc<MetadataFetcher>,
valuation_engine: Arc<ValuationEngine>,
portfolio_analyzer: Arc<PortfolioAnalyzer>,
cache_manager: Arc<CacheManager>,
config: BatchProcessorConfig,
) -> Self {
let metrics = Arc::new(BatchMetrics::default());
let resource_monitor = Arc::new(ResourceMonitor::new(100));
Self {
metadata_fetcher,
valuation_engine,
portfolio_analyzer,
cache_manager,
config,
metrics,
resource_monitor,
}
}
pub async fn process_batch_job(&self, job: BatchJob) -> NftResult<BatchJobResult> {
let start_time = Instant::now();
self.metrics.total_batches.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
info!("Starting batch job {} with {} items", job.id, job.items.len());
let mut result = BatchJobResult {
job_id: job.id,
status: JobStatus::Running,
successful_results: Vec::new(),
failed_results: Vec::new(),
statistics: JobStatistics {
total_items: job.items.len(),
successful_items: 0,
failed_items: 0,
success_rate: 0.0,
avg_processing_time_ms: 0.0,
total_processing_time_ms: 0,
throughput: 0.0,
resource_usage: ResourceUsageStats {
peak_memory_mb: 0.0,
avg_memory_mb: 0.0,
peak_cpu_percent: 0.0,
avg_cpu_percent: 0.0,
network_requests: 0,
cache_hits: 0,
cache_misses: 0,
},
},
started_at: chrono::Utc::now(),
completed_at: None,
};
self.resource_monitor.start_monitoring().await;
match job.job_type {
BatchJobType::MetadataFetch => {
self.process_metadata_fetch_batch(&job, &mut result).await?;
}
BatchJobType::Valuation => {
self.process_valuation_batch(&job, &mut result).await?;
}
BatchJobType::PortfolioAnalysis => {
self.process_portfolio_analysis_batch(&job, &mut result).await?;
}
BatchJobType::SecurityValidation => {
self.process_security_validation_batch(&job, &mut result).await?;
}
BatchJobType::Custom { .. } => {
return Err(NftError::Strategy {
message: "Custom batch jobs not yet implemented".to_string(),
strategy_name: Some("custom".to_string()),
context: None,
});
}
}
let total_time_ms = start_time.elapsed().as_millis() as u64;
result.statistics.total_processing_time_ms = total_time_ms;
result.statistics.successful_items = result.successful_results.len();
result.statistics.failed_items = result.failed_results.len();
result.statistics.success_rate = if result.statistics.total_items > 0 {
result.statistics.successful_items as f64 / result.statistics.total_items as f64
} else {
0.0
};
result.statistics.avg_processing_time_ms = if result.statistics.total_items > 0 {
total_time_ms as f64 / result.statistics.total_items as f64
} else {
0.0
};
result.statistics.throughput = if total_time_ms > 0 {
(result.statistics.successful_items as f64 / total_time_ms as f64) * 1000.0
} else {
0.0
};
result.statistics.resource_usage = self.resource_monitor.get_usage_stats().await;
self.resource_monitor.stop_monitoring().await;
if result.status == JobStatus::Completed {
self.metrics.successful_batches.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
} else {
self.metrics.failed_batches.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
}
self.metrics.total_items_processed.fetch_add(
result.statistics.total_items as u64,
std::sync::atomic::Ordering::Relaxed
);
self.metrics.avg_batch_time_ms.fetch_add(total_time_ms, std::sync::atomic::Ordering::Relaxed);
self.metrics.avg_throughput.fetch_add(result.statistics.throughput, std::sync::atomic::Ordering::Relaxed);
result.completed_at = Some(chrono::Utc::now());
result.status = if result.statistics.failed_items == 0 {
JobStatus::Completed
} else if result.statistics.successful_items == 0 {
JobStatus::Failed
} else {
JobStatus::Completed };
info!("Completed batch job {} in {}ms: {} successful, {} failed",
result.job_id, total_time_ms, result.statistics.successful_items, result.statistics.failed_items);
Ok(result)
}
pub async fn process_batch_jobs(&self, jobs: Vec<BatchJob>) -> NftResult<Vec<BatchJobResult>> {
let semaphore = Arc::new(Semaphore::new(self.config.max_concurrent_batches));
let results: Vec<NftResult<BatchJobResult>> = futures::stream::iter(jobs)
.map(|job| {
let semaphore = semaphore.clone();
let processor = self.clone();
async move {
let _permit = semaphore.acquire().await
.map_err(|_| NftError::ResourceExhausted {
message: "Failed to acquire semaphore permit".to_string(),
resource_type: "semaphore".to_string(),
current_usage: None,
limit: Some(self.config.max_concurrent_batches as u64),
})?;
processor.process_batch_job(job).await
}
})
.buffer_unordered(self.config.max_concurrent_batches)
.collect()
.await;
let mut successful_results = Vec::new();
let mut failed_count = 0;
for result in results {
match result {
Ok(batch_result) => successful_results.push(batch_result),
Err(e) => {
error!("Batch job failed: {}", e);
failed_count += 1;
}
}
}
info!("Batch processing completed: {} successful, {} failed",
successful_results.len(), failed_count);
Ok(successful_results)
}
async fn process_metadata_fetch_batch(&self, job: &BatchJob, result: &mut BatchJobResult) -> NftResult<()> {
let mint_addresses: Vec<String> = job.items.iter()
.filter_map(|item| {
if let BatchItemData::MintAddress(addr) = &item.data {
Some(addr.clone())
} else {
None
}
})
.collect();
if mint_addresses.is_empty() {
return Err(NftError::Validation {
message: "No mint addresses found in batch job".to_string(),
field: Some("items".to_string()),
value: None,
});
}
let batch_size = if self.config.enable_adaptive_batching {
self.calculate_adaptive_batch_size().await
} else {
self.config.max_items_per_batch
};
let chunks: Vec<Vec<String>> = mint_addresses.chunks(batch_size).map(|chunk| chunk.to_vec()).collect();
for (chunk_index, chunk) in chunks.iter().enumerate() {
let chunk_start_time = Instant::now();
match self.metadata_fetcher.batch_fetch_metadata(chunk).await {
Ok(nft_infos) => {
for (index, nft_info) in nft_infos.into_iter().enumerate() {
let item_id = &job.items[chunk_index * batch_size + index].id;
result.successful_results.push(BatchItemResult {
item_id: item_id.clone(),
result_data: BatchItemResultData::NftInfo(nft_info),
processing_time_ms: chunk_start_time.elapsed().as_millis() as u64,
success_indicators: HashMap::new(),
});
}
}
Err(e) => {
error!("Metadata fetch batch {} failed: {}", chunk_index, e);
for (index, _item) in chunk.iter().enumerate() {
let item_index = chunk_index * batch_size + index;
if item_index < job.items.len() {
result.failed_results.push(BatchItemError {
item_id: job.items[item_index].id.clone(),
error: e.clone(),
retry_count: 0,
processing_time_ms: chunk_start_time.elapsed().as_millis() as u64,
});
}
}
}
}
}
Ok(())
}
async fn process_valuation_batch(&self, job: &BatchJob, result: &mut BatchJobResult) -> NftResult<()> {
let mut nft_infos = Vec::new();
let mut item_indices = Vec::new();
for (index, item) in job.items.iter().enumerate() {
if let BatchItemData::MintAddress(mint_address) = &item.data {
let cache_key = CacheKey::metadata(mint_address);
if let Some(cached_nft) = self.cache_manager.get_nft(&cache_key).await {
nft_infos.push(cached_nft);
item_indices.push(index);
} else {
match self.metadata_fetcher.fetch_nft_metadata(mint_address).await {
Ok(nft_info) => {
nft_infos.push(nft_info);
item_indices.push(index);
}
Err(e) => {
result.failed_results.push(BatchItemError {
item_id: item.id.clone(),
error: e,
retry_count: 0,
processing_time_ms: 0,
});
}
}
}
}
}
match self.valuation_engine.value_nfts(&nft_infos).await {
Ok(valuations) => {
for (index, valuation) in valuations.into_iter().enumerate() {
let item_index = item_indices[index];
result.successful_results.push(BatchItemResult {
item_id: job.items[item_index].id.clone(),
result_data: BatchItemResultData::Valuation(valuation),
processing_time_ms: 0, success_indicators: HashMap::new(),
});
}
}
Err(e) => {
error!("Valuation batch failed: {}", e);
for &item_index in &item_indices {
result.failed_results.push(BatchItemError {
item_id: job.items[item_index].id.clone(),
error: e.clone(),
retry_count: 0,
processing_time_ms: 0,
});
}
}
}
Ok(())
}
async fn process_portfolio_analysis_batch(&self, job: &BatchJob, result: &mut BatchJobResult) -> NftResult<()> {
for item in &job.items {
if let BatchItemData::WalletAddress(wallet_address) = &item.data {
let item_start_time = Instant::now();
match self.analyze_wallet_portfolio(wallet_address).await {
Ok(portfolio) => {
result.successful_results.push(BatchItemResult {
item_id: item.id.clone(),
result_data: BatchItemResultData::Portfolio(portfolio),
processing_time_ms: item_start_time.elapsed().as_millis() as u64,
success_indicators: HashMap::new(),
});
}
Err(e) => {
result.failed_results.push(BatchItemError {
item_id: item.id.clone(),
error: e,
retry_count: 0,
processing_time_ms: item_start_time.elapsed().as_millis() as u64,
});
}
}
}
}
Ok(())
}
async fn process_security_validation_batch(&self, job: &BatchJob, result: &mut BatchJobResult) -> NftResult<()> {
for item in &job.items {
let item_start_time = Instant::now();
match &item.data {
BatchItemData::MintAddress(mint_address) => {
let cache_key = CacheKey::metadata(mint_address);
if let Some(mut nft_info) = self.cache_manager.get_nft(&cache_key).await {
let security_assessment = self.validate_nft_security(&nft_info).await?;
nft_info.security_assessment = security_assessment.clone();
self.cache_manager.set_nft(&cache_key, &nft_info).await;
result.successful_results.push(BatchItemResult {
item_id: item.id.clone(),
result_data: BatchItemResultData::SecurityAssessment(security_assessment),
processing_time_ms: item_start_time.elapsed().as_millis() as u64,
success_indicators: HashMap::new(),
});
} else {
result.failed_results.push(BatchItemError {
item_id: item.id.clone(),
error: NftError::Validation {
message: "NFT not found in cache".to_string(),
field: Some("mint_address".to_string()),
value: Some(mint_address.clone()),
},
retry_count: 0,
processing_time_ms: item_start_time.elapsed().as_millis() as u64,
});
}
}
_ => {
result.failed_results.push(BatchItemError {
item_id: item.id.clone(),
error: NftError::Validation {
message: "Invalid item data type for security validation".to_string(),
field: Some("data".to_string()),
value: None,
},
retry_count: 0,
processing_time_ms: item_start_time.elapsed().as_millis() as u64,
});
}
}
}
Ok(())
}
async fn analyze_wallet_portfolio(&self, _wallet_address: &str) -> NftResult<NftPortfolio> {
Ok(NftPortfolio {
id: uuid::Uuid::new_v4(),
wallet_address: _wallet_address.to_string(),
nfts: vec![],
total_value_lamports: 0,
total_count: 0,
verified_count: 0,
high_risk_count: 0,
collection_breakdown: HashMap::new(),
value_distribution: ValueDistribution {
highest_value: None,
lowest_value: None,
median_value: None,
average_value: 0.0,
percentiles: HashMap::new(),
concentration: 0.0,
},
risk_distribution: RiskDistribution {
counts: HashMap::new(),
value_by_risk: HashMap::new(),
percentages: HashMap::new(),
overall_risk_score: 0.0,
},
quality_metrics: PortfolioQualityMetrics {
average_rarity_score: None,
average_quality_score: None,
verification_rate: 0.0,
metadata_completeness: 0.0,
image_availability: 0.0,
unique_collections: 0,
diversity_score: 0.0,
},
analyzed_at: chrono::Utc::now(),
analysis_duration_ms: 0,
analysis_config: "placeholder".to_string(),
})
}
async fn validate_nft_security(&self, _nft_info: &NftInfo) -> NftResult<SecurityAssessment> {
Ok(SecurityAssessment {
risk_level: RiskLevel::None,
security_score: 100,
issues: vec![],
verified: false,
assessed_at: chrono::Utc::now(),
confidence: 50,
})
}
async fn calculate_adaptive_batch_size(&self) -> usize {
if !self.config.enable_adaptive_batching {
return self.config.max_items_per_batch;
}
let current_memory = self.resource_monitor.get_current_memory_mb().await;
let current_cpu = self.resource_monitor.get_current_cpu_percent().await;
let mut batch_size = self.config.max_items_per_batch;
if current_memory > self.config.memory_threshold_mb as f64 {
batch_size = (batch_size / 2).max(10);
self.metrics.adaptive_adjustments.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
}
if current_cpu > self.config.cpu_threshold_percent {
batch_size = (batch_size / 2).max(10);
self.metrics.adaptive_adjustments.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
}
batch_size
}
pub fn get_metrics(&self) -> &BatchMetrics {
&self.metrics
}
pub fn get_resource_monitor(&self) -> &ResourceMonitor {
&self.resource_monitor
}
}
impl ResourceMonitor {
pub fn new(max_history_length: usize) -> Self {
Self {
memory_history: Arc::new(std::sync::Mutex::new(Vec::with_capacity(max_history_length))),
cpu_history: Arc::new(std::sync::Mutex::new(Vec::with_capacity(max_history_length))),
max_history_length,
}
}
pub async fn start_monitoring(&self) {
if let Ok(mut history) = self.memory_history.lock() {
history.clear();
}
if let Ok(mut history) = self.cpu_history.lock() {
history.clear();
}
}
pub async fn stop_monitoring(&self) {
}
pub async fn get_current_memory_mb(&self) -> f64 {
512.0
}
pub async fn get_current_cpu_percent(&self) -> f64 {
45.0
}
pub async fn get_usage_stats(&self) -> ResourceUsageStats {
let mem_hist = self.memory_history.lock().unwrap();
let cpu_hist = self.cpu_history.lock().unwrap();
let peak_memory = mem_hist.iter().fold(0.0, |arg0: f64, other: &f64| f64::max(arg0, *other));
let avg_memory = if !mem_hist.is_empty() {
mem_hist.iter().sum::<f64>() / mem_hist.len() as f64
} else {
0.0
};
let peak_cpu = cpu_hist.iter().fold(0.0, |arg0: f64, other: &f64| f64::max(arg0, *other));
let avg_cpu = if !cpu_hist.is_empty() {
cpu_hist.iter().sum::<f64>() / cpu_hist.len() as f64
} else {
0.0
};
ResourceUsageStats {
peak_memory_mb: peak_memory,
avg_memory_mb: avg_memory,
peak_cpu_percent: peak_cpu,
avg_cpu_percent: avg_cpu,
network_requests: 0, cache_hits: 0, cache_misses: 0, }
}
}
impl Default for JobPriority {
fn default() -> Self {
Self::Normal
}
}
impl Default for BatchJobConfig {
fn default() -> Self {
Self {
max_concurrent_items: None,
timeout_seconds: None,
enable_retries: None,
custom_params: HashMap::new(),
}
}
}