use crate::core::{Result, SolanaRecoverError, ScanResult, ScanStatus, EmptyAccount, BatchScanRequest, BatchScanResult};
use crate::core::adaptive_parallel_processor::AdaptiveParallelProcessor;
use crate::rpc::{ConnectionPoolTrait};
use crate::utils::memory_integration::{MemoryIntegrationLayer, ScannerMemoryManager};
use solana_sdk::pubkey::Pubkey;
use std::sync::Arc;
use uuid::Uuid;
use std::time::Instant;
use chrono::Utc;
use std::str::FromStr;
use tracing::{info, debug, error};
use serde::{Deserialize, Serialize};
#[derive(Clone)]
pub struct EnhancedWalletScanner {
connection_pool: Arc<dyn ConnectionPoolTrait>,
parallel_processor: Option<Arc<AdaptiveParallelProcessor>>,
memory_integration: Arc<MemoryIntegrationLayer>,
scanner_memory_manager: ScannerMemoryManager,
config: EnhancedScannerConfig,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct EnhancedScannerConfig {
pub enable_memory_pooling: bool,
pub enable_performance_tracking: bool,
pub batch_config: BatchProcessingConfig,
pub memory_config: ScannerMemoryConfig,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct BatchProcessingConfig {
pub enable_intelligent_sizing: bool,
pub min_batch_size: usize,
pub max_batch_size: usize,
pub target_batch_time_ms: u64,
pub enable_work_stealing: bool,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct ScannerMemoryConfig {
pub wallet_info_pool_size: usize,
pub empty_account_pool_size: usize,
pub scan_result_pool_size: usize,
pub enable_scan_tracking: bool,
pub memory_optimization_interval_seconds: u64,
}
impl Default for EnhancedScannerConfig {
fn default() -> Self {
Self {
enable_memory_pooling: true,
enable_performance_tracking: true,
batch_config: BatchProcessingConfig::default(),
memory_config: ScannerMemoryConfig::default(),
}
}
}
impl Default for BatchProcessingConfig {
fn default() -> Self {
Self {
enable_intelligent_sizing: true,
min_batch_size: 10,
max_batch_size: 1000,
target_batch_time_ms: 5000, enable_work_stealing: true,
}
}
}
impl Default for ScannerMemoryConfig {
fn default() -> Self {
Self {
wallet_info_pool_size: 10000,
empty_account_pool_size: 50000,
scan_result_pool_size: 10000,
enable_scan_tracking: true,
memory_optimization_interval_seconds: 300, }
}
}
impl EnhancedWalletScanner {
pub fn new(connection_pool: Arc<dyn ConnectionPoolTrait>) -> Result<Self> {
Self::with_config(connection_pool, EnhancedScannerConfig::default())
}
pub fn with_config(
connection_pool: Arc<dyn ConnectionPoolTrait>,
config: EnhancedScannerConfig,
) -> Result<Self> {
let memory_integration = crate::utils::memory_integration::get_global_memory_integration();
let scanner_memory_manager = memory_integration.create_scanner_memory_manager();
Ok(Self {
connection_pool,
parallel_processor: None,
memory_integration,
scanner_memory_manager,
config,
})
}
pub fn new_with_parallel_processing(
connection_pool: Arc<dyn ConnectionPoolTrait>,
max_workers: Option<usize>,
max_concurrent_tasks: usize,
) -> Result<Self> {
Self::with_parallel_processing_and_config(
connection_pool,
max_workers,
max_concurrent_tasks,
EnhancedScannerConfig::default(),
)
}
pub fn with_parallel_processing_and_config(
connection_pool: Arc<dyn ConnectionPoolTrait>,
max_workers: Option<usize>,
max_concurrent_tasks: usize,
config: EnhancedScannerConfig,
) -> Result<Self> {
let config_clone = config.clone();
let scanner = Self {
connection_pool: connection_pool.clone(),
parallel_processor: None,
memory_integration: crate::utils::memory_integration::get_global_memory_integration(),
scanner_memory_manager: crate::utils::memory_integration::get_global_memory_integration().create_scanner_memory_manager(),
config: config_clone,
};
let processor_config = crate::core::adaptive_parallel_processor::ProcessorConfig {
max_workers: max_workers.unwrap_or(4),
max_concurrent_tasks: max_concurrent_tasks,
work_stealing_enabled: true,
cpu_affinity_enabled: false,
adaptive_batching: true,
resource_monitoring: true,
load_balancing_strategy: crate::core::adaptive_parallel_processor::LoadBalancingStrategy::WorkStealing,
task_timeout: std::time::Duration::from_secs(30),
worker_idle_timeout: std::time::Duration::from_secs(60),
};
let parallel_processor = Arc::new(AdaptiveParallelProcessor::new(
Arc::new(crate::core::scanner::WalletScanner::new(scanner.connection_pool.clone())),
processor_config,
)?);
Ok(Self {
connection_pool,
parallel_processor: Some(parallel_processor),
memory_integration: crate::utils::memory_integration::get_global_memory_integration(),
scanner_memory_manager: crate::utils::memory_integration::get_global_memory_integration().create_scanner_memory_manager(),
config,
})
}
pub async fn scan_batch_enhanced(&mut self, request: &BatchScanRequest) -> Result<BatchScanResult> {
let start_time = Instant::now();
info!("Starting enhanced batch scan for {} wallets", request.wallet_addresses.len());
if self.config.memory_config.enable_scan_tracking {
self.optimize_memory_for_batch().await;
}
let processed_request = if self.config.batch_config.enable_intelligent_sizing {
self.optimize_batch_size(request).await?
} else {
request.clone()
};
let result = if let Some(processor) = &self.parallel_processor {
self.process_batch_with_memory_tracking(processor, &processed_request).await?
} else {
self.scan_batch_sequential_enhanced(&processed_request).await?
};
let duration = start_time.elapsed();
info!("Enhanced batch scan completed in {}ms: {} successful, {} failed",
duration.as_millis(), result.successful_scans, result.failed_scans);
if self.config.enable_performance_tracking {
self.update_performance_metrics(&result, duration).await;
}
Ok(result)
}
async fn optimize_batch_size(&self, request: &BatchScanRequest) -> Result<BatchScanRequest> {
let memory_stats = self.memory_integration.get_memory_manager().get_memory_stats();
let current_memory_pressure = memory_stats.memory_pressure;
let size_multiplier = if current_memory_pressure > 80.0 {
0.5 } else if current_memory_pressure < 40.0 {
1.5 } else {
1.0 };
let target_size = ((request.wallet_addresses.len() as f64 * size_multiplier) as usize)
.clamp(self.config.batch_config.min_batch_size, self.config.batch_config.max_batch_size);
if target_size != request.wallet_addresses.len() {
debug!("Adjusting batch size from {} to {} based on memory pressure: {:.1}%",
request.wallet_addresses.len(), target_size, current_memory_pressure);
Ok(BatchScanRequest {
id: request.id,
wallet_addresses: request.wallet_addresses.iter().take(target_size).cloned().collect(),
user_id: request.user_id.clone(),
fee_percentage: request.fee_percentage,
created_at: request.created_at,
})
} else {
Ok(request.clone())
}
}
async fn process_batch_with_memory_tracking(
&self,
_processor: &Arc<AdaptiveParallelProcessor>,
request: &BatchScanRequest,
) -> Result<BatchScanResult> {
let _start_time = Instant::now();
let initial_memory = self.memory_integration.get_memory_manager().get_memory_stats().total_allocated_bytes;
let processor_config = crate::core::adaptive_parallel_processor::ProcessorConfig {
max_workers: 4,
max_concurrent_tasks: 100,
work_stealing_enabled: true,
cpu_affinity_enabled: false,
adaptive_batching: true,
resource_monitoring: true,
load_balancing_strategy: crate::core::adaptive_parallel_processor::LoadBalancingStrategy::WorkStealing,
task_timeout: std::time::Duration::from_secs(30),
worker_idle_timeout: std::time::Duration::from_secs(60),
};
let local_processor = AdaptiveParallelProcessor::new(
Arc::new(crate::core::scanner::WalletScanner::new(self.connection_pool.clone())),
processor_config,
)?;
let result = local_processor.process_batch_adaptive(request).await?;
let final_memory = self.memory_integration.get_memory_manager().get_memory_stats().total_allocated_bytes;
let memory_used = final_memory.saturating_sub(initial_memory);
debug!("Batch processing used {}MB of memory", memory_used / 1024 / 1024);
if memory_used > 100 * 1024 * 1024 { debug!("Triggering memory optimization after batch processing");
self.memory_integration.get_gc_scheduler().schedule_gc(75.0).await;
}
Ok(result)
}
async fn scan_batch_sequential_enhanced(&self, request: &BatchScanRequest) -> Result<BatchScanResult> {
let start_time = Instant::now();
let mut results = Vec::new();
let mut successful_scans = 0;
let mut failed_scans = 0;
let mut total_recoverable_sol = 0.0;
for wallet_address in &request.wallet_addresses {
match self.scan_wallet_enhanced(wallet_address).await {
Ok(scan_result) => {
if scan_result.status == ScanStatus::Completed {
successful_scans += 1;
if let Some(wallet_info) = &scan_result.result {
total_recoverable_sol += wallet_info.recoverable_sol;
}
} else {
failed_scans += 1;
}
results.push(scan_result);
}
Err(e) => {
error!("Failed to scan wallet {}: {}", wallet_address, e);
failed_scans += 1;
let mut error_result = self.scanner_memory_manager.acquire_scan_result();
error_result.id = Uuid::new_v4();
error_result.wallet_address = wallet_address.clone();
error_result.status = ScanStatus::Failed;
error_result.error_message = Some(e.to_string());
error_result.created_at = Utc::now();
results.push(error_result.into_inner());
}
}
}
let duration = start_time.elapsed();
let mut batch_result = self.scanner_memory_manager.acquire_batch_scan_result();
batch_result.request_id = request.id;
batch_result.total_wallets = request.wallet_addresses.len();
batch_result.successful_scans = successful_scans;
batch_result.failed_scans = failed_scans;
batch_result.completed_wallets = successful_scans; batch_result.failed_wallets = failed_scans; batch_result.total_recoverable_sol = total_recoverable_sol;
batch_result.estimated_fee_sol = total_recoverable_sol * 0.15; batch_result.results = results;
batch_result.created_at = request.created_at;
batch_result.completed_at = Some(Utc::now());
batch_result.duration_ms = Some(duration.as_millis() as u64);
Ok(batch_result.into_inner())
}
async fn scan_wallet_enhanced(&self, wallet_address: &str) -> Result<ScanResult> {
let start_time = Instant::now();
debug!("Starting enhanced scan for wallet: {}", wallet_address);
let pubkey = match Pubkey::from_str(wallet_address) {
Ok(key) => key,
Err(e) => {
return Err(SolanaRecoverError::InvalidWalletAddress(format!("Invalid wallet address: {}", e)));
}
};
let mut wallet_info = self.scanner_memory_manager.acquire_wallet_info();
wallet_info.address = wallet_address.to_string();
wallet_info.pubkey = pubkey;
let empty_accounts = self.scan_empty_accounts_enhanced(&pubkey).await?;
wallet_info.empty_accounts = empty_accounts.len() as u64;
wallet_info.total_accounts = empty_accounts.len() as u64;
let total_lamports = empty_accounts.iter().map(|acc| acc.lamports).sum();
wallet_info.recoverable_lamports = total_lamports;
wallet_info.recoverable_sol = total_lamports as f64 / 1_000_000_000.0;
wallet_info.empty_account_addresses = empty_accounts.iter().map(|acc| acc.address.clone()).collect();
let scan_time = start_time.elapsed();
wallet_info.scan_time_ms = scan_time.as_millis() as u64;
let mut scan_result = self.scanner_memory_manager.acquire_scan_result();
scan_result.id = Uuid::new_v4();
scan_result.wallet_address = wallet_address.to_string();
scan_result.status = ScanStatus::Completed;
scan_result.result = Some(wallet_info.into_inner());
scan_result.created_at = Utc::now();
debug!("Enhanced scan completed for {} in {}ms", wallet_address, scan_time.as_millis());
Ok(scan_result.into_inner())
}
async fn scan_empty_accounts_enhanced(&self, pubkey: &Pubkey) -> Result<Vec<EmptyAccount>> {
let start_time = Instant::now();
let client = self.connection_pool.get_client().await?;
let token_accounts = client.get_all_recoverable_accounts(pubkey).await?;
let mut empty_accounts = Vec::new();
for keyed_account in token_accounts {
if keyed_account.account.lamports == 0 {
let mut empty_account = self.scanner_memory_manager.acquire_empty_account();
empty_account.address = keyed_account.pubkey.to_string();
empty_account.lamports = keyed_account.account.lamports;
empty_account.owner = keyed_account.account.owner.to_string();
match &keyed_account.account.data {
solana_account_decoder::UiAccountData::Binary(data, _) => {
if data.len() >= 165 { let mint_bytes = &data[0..32];
let mint_pubkey = Pubkey::try_from(mint_bytes).unwrap_or_default();
empty_account.mint = Some(mint_pubkey.to_string());
}
}
_ => {
}
}
empty_accounts.push(empty_account.into_inner());
}
}
debug!("Found {} empty accounts in {}ms", empty_accounts.len(), start_time.elapsed().as_millis());
Ok(empty_accounts)
}
async fn optimize_memory_for_batch(&self) {
debug!("Optimizing memory for batch processing");
let memory_stats = self.memory_integration.get_memory_manager().get_memory_stats();
if memory_stats.memory_pressure > 70.0 {
self.memory_integration.get_gc_scheduler().schedule_gc(memory_stats.memory_pressure).await;
}
let buffer_pool = self.memory_integration.get_buffer_pool();
buffer_pool.cleanup_old_buffers().await;
}
async fn update_performance_metrics(&self, result: &BatchScanResult, duration: std::time::Duration) {
debug!("Performance metrics updated: {} wallets in {}ms",
result.total_wallets, duration.as_millis());
}
pub fn get_scanner_stats(&self) -> serde_json::Value {
serde_json::json!({
"config": self.config,
"memory_manager_stats": self.scanner_memory_manager.get_scanner_stats(),
"memory_integration_stats": self.memory_integration.get_integration_stats(),
"parallel_processor_enabled": self.parallel_processor.is_some(),
})
}
pub async fn get_comprehensive_report(&self) -> serde_json::Value {
let scanner_stats = self.get_scanner_stats();
let memory_report = self.memory_integration.generate_integration_report().await;
serde_json::json!({
"timestamp": chrono::Utc::now(),
"scanner_stats": scanner_stats,
"memory_integration_report": memory_report,
"recommendations": self.generate_scanner_recommendations(),
})
}
fn generate_scanner_recommendations(&self) -> Vec<String> {
let mut recommendations = Vec::new();
let stats = self.memory_integration.get_integration_stats();
if stats.scanner_pool_operations == 0 {
recommendations.push("Scanner memory pooling is not being utilized. Consider enabling memory pooling for better performance.".to_string());
}
if stats.memory_saved_bytes < 10 * 1024 * 1024 { recommendations.push("Low memory savings detected. Consider increasing pool sizes or optimizing allocation patterns.".to_string());
}
if !self.config.enable_memory_pooling {
recommendations.push("Memory pooling is disabled. Enable it for improved performance.".to_string());
}
if !self.config.enable_performance_tracking {
recommendations.push("Performance tracking is disabled. Enable it for better monitoring and optimization.".to_string());
}
if recommendations.is_empty() {
recommendations.push("Scanner is configured optimally. No immediate action required.".to_string());
}
recommendations
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::rpc::mock::MockConnectionPool;
#[tokio::test]
async fn test_enhanced_scanner_creation() {
let connection_pool = Arc::new(MockConnectionPool::new(vec!["https://api.mainnet-beta.solana.com".to_string()], 10).unwrap());
let scanner = EnhancedWalletScanner::new(connection_pool).unwrap();
let stats = scanner.get_scanner_stats();
assert!(stats.get("config").is_some());
assert!(stats.get("memory_manager_stats").is_some());
}
#[tokio::test]
async fn test_enhanced_wallet_scan() {
let connection_pool = Arc::new(MockConnectionPool::new(vec!["https://api.mainnet-beta.solana.com".to_string()], 10).unwrap());
let scanner = EnhancedWalletScanner::new(connection_pool).unwrap();
let result = scanner.scan_wallet_enhanced("11111111111111111111111111111112").await;
match result {
Ok(scan_result) => {
assert_eq!(scan_result.wallet_address, "11111111111111111111111111111112");
}
Err(_) => {
}
}
}
#[tokio::test]
async fn test_batch_size_optimization() {
let connection_pool = Arc::new(MockConnectionPool::new(vec!["https://api.mainnet-beta.solana.com".to_string()], 10).unwrap());
let scanner = EnhancedWalletScanner::new(connection_pool).unwrap();
let request = BatchScanRequest {
id: Uuid::new_v4(),
wallet_addresses: (0..100).map(|i| format!("wallet_{}", i)).collect(),
user_id: None,
fee_percentage: None,
created_at: Utc::now(),
};
let optimized = scanner.optimize_batch_size(&request).await.unwrap();
assert!(optimized.wallet_addresses.len() <= 100);
assert!(optimized.wallet_addresses.len() >= 10); }
#[tokio::test]
async fn test_comprehensive_report() {
let connection_pool = Arc::new(MockConnectionPool::new(vec!["https://api.mainnet-beta.solana.com".to_string()], 10).unwrap());
let scanner = EnhancedWalletScanner::new(connection_pool).unwrap();
let report = scanner.get_comprehensive_report().await;
assert!(report.get("timestamp").is_some());
assert!(report.get("scanner_stats").is_some());
assert!(report.get("memory_integration_report").is_some());
assert!(report.get("recommendations").is_some());
}
}