use crate::core::{Result, WalletInfo, ScanResult, ScanStatus, EmptyAccount, BatchScanRequest, BatchScanResult, SolanaRecoverError};
use crate::core::scanner::TokenAccountInfo;
use crate::rpc::{EnhancedConnectionPool, BatchRpcClient, ConnectionPoolTrait};
use crate::storage::{MultiLevelCache, CachedAccount, AccountData, CachePriority};
use crate::storage::multi_level_cache::CacheConfig as MultiLevelCacheConfig;
use crate::core::adaptive_parallel_processor::{AdaptiveParallelProcessor, ProcessorConfig};
use crate::utils::{MemoryManager as ObjectMemoryManager, MemoryManagerConfig as ObjectMemoryManagerConfig};
use crate::rpc::RpcClientWrapper;
use crate::core::ultra_fast_scanner::{PrefetchData, ScanOptimizer, ConnectionMultiplexer, BatchOptimizer, FastPathScanner};
use solana_sdk::pubkey::Pubkey;
use std::str::FromStr;
use std::sync::Arc;
use std::time::{Duration, Instant};
use tokio::sync::RwLock;
use uuid::Uuid;
use chrono::Utc;
use tracing::{info, debug, warn};
use rayon::prelude::*;
use dashmap::DashMap;
use base64::Engine;
pub struct OptimizedWalletScanner {
connection_pool: Arc<EnhancedConnectionPool>,
batch_client: Arc<BatchRpcClient>,
cache: Arc<MultiLevelCache>,
parallel_processor: Arc<AdaptiveParallelProcessor>,
memory_manager: Arc<ObjectMemoryManager>,
config: OptimizedScannerConfig,
metrics: Arc<RwLock<OptimizedScannerMetrics>>,
prefetch_cache: Arc<DashMap<String, PrefetchData>>,
scan_optimizer: Arc<ScanOptimizer>,
connection_multiplexer: Arc<ConnectionMultiplexer>,
batch_optimizer: Arc<BatchOptimizer>,
fast_path: Arc<FastPathScanner>,
}
#[derive(Debug, Clone)]
pub struct OptimizedScannerConfig {
pub connection_pool_config: crate::rpc::EnhancedPoolConfig,
pub batch_config: crate::rpc::BatchConfig,
pub cache_config: MultiLevelCacheConfig,
pub processor_config: ProcessorConfig,
pub memory_config: ObjectMemoryManagerConfig,
pub enable_all_optimizations: bool,
pub performance_mode: PerformanceMode,
pub enable_predictive_prefetch: bool,
pub enable_connection_multiplexing: bool,
pub enable_smart_batching: bool,
pub enable_fast_path: bool,
pub max_concurrent_scans: usize,
pub scan_timeout: Duration,
pub prefetch_window_size: usize,
pub batch_size_multiplier: f64,
}
#[derive(Debug, Clone, PartialEq)]
pub enum PerformanceMode {
Throughput, Latency, Balanced, ResourceEfficient, UltraFast, }
#[derive(Debug, Clone, serde::Serialize)]
pub struct OptimizedScannerMetrics {
pub total_scans: u64,
pub successful_scans: u64,
pub failed_scans: u64,
pub avg_scan_time_ms: f64,
pub scans_per_second: f64,
pub cache_hit_rate: f64,
pub cache_efficiency: f64,
pub l1_hits: u64,
pub l2_hits: u64,
pub l3_hits: u64,
pub avg_batch_size: f64,
pub batch_efficiency: f64,
pub rpc_calls_saved: u64,
pub memory_efficiency: f64,
pub object_pool_hit_rate: f64,
pub gc_pressure: f64,
pub connection_efficiency: f64,
pub avg_response_time_ms: f64,
pub circuit_breaker_activations: u64,
pub parallel_efficiency: f64,
pub worker_utilization: f64,
pub load_balancing_score: f64,
pub last_updated: Option<chrono::DateTime<chrono::Utc>>,
}
impl Default for OptimizedScannerConfig {
fn default() -> Self {
Self {
connection_pool_config: crate::rpc::EnhancedPoolConfig::default(),
batch_config: crate::rpc::BatchConfig::default(),
cache_config: MultiLevelCacheConfig::default(),
processor_config: ProcessorConfig::default(),
memory_config: ObjectMemoryManagerConfig::default(),
enable_all_optimizations: true,
performance_mode: PerformanceMode::UltraFast,
enable_predictive_prefetch: true,
enable_connection_multiplexing: true,
enable_smart_batching: true,
enable_fast_path: true,
max_concurrent_scans: 500,
scan_timeout: Duration::from_secs(2),
prefetch_window_size: 50,
batch_size_multiplier: 2.0,
}
}
}
impl OptimizedWalletScanner {
pub fn new(_endpoints: Vec<crate::core::RpcEndpoint>, config: OptimizedScannerConfig) -> Result<Self> {
let connection_pool = Arc::new(EnhancedConnectionPool::with_config(
config.connection_pool_config.clone(),
));
let batch_client = Arc::new(BatchRpcClient::new(
connection_pool.clone(),
config.batch_config.clone(),
));
let cache = Arc::new(MultiLevelCache::new(config.cache_config.clone())?);
let memory_manager = Arc::new(ObjectMemoryManager::new());
let scanner = Arc::new(crate::core::scanner::WalletScanner::new(connection_pool.clone()));
let parallel_processor = Arc::new(AdaptiveParallelProcessor::new(
scanner,
config.processor_config.clone(),
)?);
let prefetch_cache = Arc::new(DashMap::new());
let scan_optimizer = Arc::new(ScanOptimizer::new(cache.clone()));
let connection_multiplexer = Arc::new(ConnectionMultiplexer::new(
connection_pool.clone(),
config.connection_pool_config.max_connections_per_endpoint,
));
let batch_optimizer = Arc::new(BatchOptimizer::new(config.scan_timeout));
let fast_path = Arc::new(FastPathScanner::new());
let optimized_scanner = Self {
connection_pool,
batch_client,
cache,
parallel_processor,
memory_manager,
config,
metrics: Arc::new(tokio::sync::RwLock::new(OptimizedScannerMetrics::default())),
prefetch_cache,
scan_optimizer,
connection_multiplexer,
batch_optimizer,
fast_path,
};
optimized_scanner.start_background_tasks();
Ok(optimized_scanner)
}
pub async fn scan_wallet_ultra_fast(&self, _wallet_address: &str) -> Result<ScanResult> {
let start_time = Instant::now();
let scan_id = Uuid::new_v4();
info!("Starting ultra-fast scan for wallet: {}", _wallet_address);
if self.config.enable_fast_path {
if let Some(wallet_info) = self.fast_path.try_fast_path(_wallet_address).await {
let scan_time = start_time.elapsed().as_millis() as u64;
info!("Fast path scan completed in {}ms", scan_time);
return Ok(ScanResult {
id: scan_id,
wallet_address: _wallet_address.to_string(),
status: ScanStatus::Completed,
result: Some(wallet_info.clone()),
empty_accounts_found: wallet_info.empty_accounts,
recoverable_sol: wallet_info.recoverable_sol,
scan_time_ms: scan_time,
created_at: Utc::now(),
completed_at: Some(Utc::now()),
error_message: None,
});
}
}
if self.config.enable_predictive_prefetch {
self.prefetch_related_data(_wallet_address).await;
}
let connection = self.connection_multiplexer.get_optimized_connection("wallet_scan").await?;
let (batch_size, concurrency) = self.scan_optimizer.optimize_scan_strategy(
_wallet_address,
50 ).await?;
let scan_result = self.perform_ultra_fast_scan(
_wallet_address,
scan_id,
start_time,
batch_size,
concurrency,
connection
).await?;
let scan_time = start_time.elapsed().as_millis() as u64;
self.scan_optimizer.record_performance(_wallet_address, scan_time, true).await;
Ok(scan_result)
}
pub async fn scan_wallet_optimized(&self, wallet_address: &str) -> Result<ScanResult> {
let start_time = Instant::now();
let scan_id = Uuid::new_v4();
info!("Starting optimized scan for wallet: {}", wallet_address);
let scan_result = self.perform_optimized_scan(wallet_address, scan_id, start_time).await?;
let duration = start_time.elapsed();
info!("Optimized scan completed in {}ms", duration.as_millis());
Ok(scan_result)
}
pub async fn scan_wallet_simple(&self, wallet_address: &str) -> Result<ScanResult> {
let start_time = Instant::now();
let scan_id = Uuid::new_v4();
info!("Starting simple scan for wallet: {}", wallet_address);
let scan_result = self.perform_simple_scan(wallet_address, scan_id, start_time).await?;
let duration = start_time.elapsed();
info!("Simple scan completed in {}ms", duration.as_millis());
Ok(scan_result)
}
async fn perform_simple_scan(&self, wallet_address: &str, scan_id: Uuid, start_time: Instant) -> Result<ScanResult> {
let pubkey = Pubkey::from_str(wallet_address)
.map_err(|_| SolanaRecoverError::InvalidWalletAddress(wallet_address.to_string()))?;
let accounts = self.batch_client.scan_wallet_accounts_basic(&pubkey).await?;
let total_accounts = accounts.len();
debug!("Retrieved {} accounts for wallet {}", total_accounts, wallet_address);
let empty_accounts = self.process_accounts_sequential(&accounts, wallet_address).await?;
let total_recoverable_lamports: u64 = empty_accounts.iter().map(|acc| acc.lamports).sum();
let recoverable_sol = total_recoverable_lamports as f64 / 1_000_000_000.0;
let scan_time = start_time.elapsed().as_millis() as u64;
let wallet_info = WalletInfo {
address: wallet_address.to_string(),
pubkey,
total_accounts: total_accounts as u64,
empty_accounts: empty_accounts.len() as u64,
recoverable_lamports: total_recoverable_lamports,
recoverable_sol,
empty_account_addresses: empty_accounts.iter().map(|acc| acc.address.clone()).collect(),
scan_time_ms: scan_time,
};
Ok(ScanResult {
id: scan_id,
wallet_address: wallet_address.to_string(),
status: ScanStatus::Completed,
result: Some(wallet_info),
empty_accounts_found: empty_accounts.len() as u64,
recoverable_sol,
scan_time_ms: scan_time,
created_at: Utc::now(),
completed_at: Some(Utc::now()),
error_message: None,
})
}
async fn process_accounts_sequential(&self, accounts: &[solana_client::rpc_response::RpcKeyedAccount], wallet_address: &str) -> Result<Vec<EmptyAccount>> {
let mut empty_accounts = Vec::new();
let min_rent_exemption = self.get_rent_exemption().await?;
for keyed_account in accounts {
match self.check_empty_account_simple(keyed_account, wallet_address, min_rent_exemption).await {
Ok(Some(empty_account)) => empty_accounts.push(empty_account),
Ok(None) => {}, Err(e) => {
warn!("Error checking account: {}", e);
}
}
}
Ok(empty_accounts)
}
async fn check_empty_account_simple(&self, keyed_account: &solana_client::rpc_response::RpcKeyedAccount, _wallet_address: &str, min_rent_exemption: u64) -> Result<Option<EmptyAccount>> {
use base64::Engine;
let account = &keyed_account.account;
let account_lamports = account.lamports;
if account_lamports < min_rent_exemption {
return Ok(None);
}
match &account.data {
solana_account_decoder::UiAccountData::Binary(data_str, _encoding) => {
if let Ok(data) = base64::engine::general_purpose::STANDARD.decode(data_str) {
if data.len() >= 64 {
let mint_bytes = &data[0..32];
if let Ok(mint_pubkey) = Pubkey::try_from(mint_bytes) {
let owner_bytes = &data[32..64];
if let Ok(owner_pubkey) = Pubkey::try_from(owner_bytes) {
let amount_bytes = &data[64..72];
let amount = u64::from_le_bytes(amount_bytes.try_into().unwrap_or([0; 8]));
if amount == 0 && account_lamports >= min_rent_exemption {
let empty_account = EmptyAccount {
address: keyed_account.pubkey.to_string(),
mint: Some(mint_pubkey.to_string()),
owner: owner_pubkey.to_string(),
lamports: account_lamports - min_rent_exemption,
};
return Ok(Some(empty_account));
}
}
}
}
}
}
_ => {}
}
Ok(None)
}
async fn get_rent_exemption(&self) -> Result<u64> {
Ok(2_039_280) }
pub async fn scan_batch_optimized(&self, request: &BatchScanRequest) -> Result<BatchScanResult> {
let start_time = Instant::now();
info!("Starting optimized batch scan for {} wallets", request.wallet_addresses.len());
let result = self.parallel_processor.process_batch_adaptive(request).await?;
let total_time = start_time.elapsed();
info!("Batch scan completed in {}ms, throughput: {:.2} wallets/sec",
total_time.as_millis(),
request.wallet_addresses.len() as f64 / total_time.as_secs_f64());
self.update_batch_metrics(&result).await;
Ok(result)
}
async fn perform_optimized_scan(&self, _wallet_address: &str, scan_id: Uuid, start_time: Instant) -> Result<ScanResult> {
let pubkey = Pubkey::from_str(_wallet_address)
.map_err(|_| SolanaRecoverError::InvalidWalletAddress(_wallet_address.to_string()))?;
let accounts = self.batch_client.scan_wallet_accounts_optimized(&pubkey).await?;
let total_accounts = accounts.len();
debug!("Retrieved {} accounts for wallet {}", total_accounts, _wallet_address);
let empty_accounts = self.process_accounts_parallel(&accounts, _wallet_address).await?;
let total_recoverable_lamports: u64 = empty_accounts.iter().map(|acc| acc.lamports).sum();
let recoverable_sol = total_recoverable_lamports as f64 / 1_000_000_000.0;
let scan_time = start_time.elapsed().as_millis() as u64;
let wallet_info = WalletInfo {
address: _wallet_address.to_string(),
pubkey,
total_accounts: total_accounts as u64,
empty_accounts: empty_accounts.len() as u64,
recoverable_lamports: total_recoverable_lamports,
recoverable_sol,
empty_account_addresses: empty_accounts.iter().map(|acc| acc.address.clone()).collect(),
scan_time_ms: scan_time,
};
self.update_scan_metrics(true, scan_time).await;
Ok(ScanResult {
id: scan_id,
wallet_address: _wallet_address.to_string(),
status: ScanStatus::Completed,
result: Some(wallet_info),
empty_accounts_found: empty_accounts.len() as u64,
recoverable_sol,
scan_time_ms: scan_time,
created_at: Utc::now(),
completed_at: Some(Utc::now()),
error_message: None,
})
}
async fn process_accounts_parallel(&self, accounts: &[solana_client::rpc_response::RpcKeyedAccount], _wallet_address: &str) -> Result<Vec<EmptyAccount>> {
let batch_size = self.calculate_optimal_batch_size(accounts.len());
let mut empty_accounts = Vec::new();
let account_chunks: Vec<_> = accounts.chunks(batch_size).collect();
for chunk in account_chunks {
let chunk_results = self.process_account_chunk(chunk, _wallet_address).await?;
empty_accounts.extend(chunk_results);
if self.config.performance_mode == PerformanceMode::ResourceEfficient {
tokio::time::sleep(Duration::from_millis(10)).await;
}
}
Ok(empty_accounts)
}
async fn process_account_chunk(&self, accounts: &[solana_client::rpc_response::RpcKeyedAccount], wallet_address: &str) -> Result<Vec<EmptyAccount>> {
let mut empty_accounts = Vec::new();
let min_rent_exemption = self.get_rent_exemption().await?;
let process_futures = accounts.iter().map(|keyed_account| {
self.check_empty_account_simple(keyed_account, wallet_address, min_rent_exemption)
});
let results = futures::future::join_all(process_futures).await;
for result in results {
match result {
Ok(Some(empty_account)) => {
empty_accounts.push(empty_account);
}
Ok(None) => {
}
Err(e) => {
warn!("Error checking account: {}", e);
}
}
}
Ok(empty_accounts)
}
async fn get_rent_exemption_with_cache(&self, account_size: usize) -> Result<u64> {
let cache_key = format!("rent_exemption:{}", account_size);
if let Some(cached_account) = self.cache.get(&cache_key).await? {
if let AccountData::RentExemption(rent) = cached_account.data {
return Ok(rent);
}
}
let client = self.connection_pool.get_client().await?;
let rent_exemption = client.get_minimum_balance_for_rent_exemption(account_size).await?;
let cached_account = CachedAccount {
data: AccountData::RentExemption(rent_exemption),
timestamp: Instant::now(),
access_count: std::sync::atomic::AtomicU64::new(1),
priority: CachePriority::High, size_bytes: 8,
compressed: false,
};
self.cache.put(cache_key, cached_account).await?;
Ok(rent_exemption)
}
async fn check_token_account_optimized(&self, _keyed_account: &solana_client::rpc_response::RpcKeyedAccount, _account_pubkey_str: &str, account: &solana_account_decoder::UiAccount) -> Result<Option<EmptyAccount>> {
match &account.data {
solana_account_decoder::UiAccountData::Binary(data_str, encoding) => {
let _temp_buffer = self.memory_manager.get_buffer_blocking();
if let Ok(token_account) = self.parse_token_account_from_binary_optimized(&data_str, &encoding) {
if token_account.amount == 0 && account.lamports > 0 {
return Ok(Some(EmptyAccount {
address: _account_pubkey_str.to_string(),
lamports: account.lamports,
owner: account.owner.clone(),
mint: Some(token_account.mint),
}));
}
}
}
solana_account_decoder::UiAccountData::Json(parsed) => {
if let Some(token_amount) = parsed.parsed.get("info").and_then(|i| i.get("tokenAmount")) {
if let Some(amount_str) = token_amount.get("amount").and_then(|a| a.as_str()) {
if let Ok(amount) = amount_str.parse::<u64>() {
if amount == 0 && account.lamports > 0 {
let owner = parsed.parsed.get("info")
.and_then(|i| i.get("owner"))
.and_then(|o| o.as_str())
.unwrap_or("unknown")
.to_string();
let mint = parsed.parsed.get("info")
.and_then(|i| i.get("mint"))
.and_then(|m| m.as_str())
.map(|m| m.to_string());
return Ok(Some(EmptyAccount {
address: _account_pubkey_str.to_string(),
lamports: account.lamports,
owner,
mint,
}));
}
}
}
}
}
_ => {
debug!("Unsupported data format for token account: {}", _account_pubkey_str);
}
}
Ok(None)
}
async fn check_system_account_optimized(&self, _keyed_account: &solana_client::rpc_response::RpcKeyedAccount, _account_pubkey_str: &str, account: &solana_account_decoder::UiAccount, _min_rent_exemption: u64) -> Result<Option<EmptyAccount>> {
if !account.executable {
let is_data_empty = self.is_account_data_empty(&account.data);
if account.lamports >= _min_rent_exemption && is_data_empty && account.lamports > 0 {
return Ok(Some(EmptyAccount {
address: _account_pubkey_str.to_string(),
lamports: account.lamports,
owner: account.owner.clone(),
mint: None,
}));
}
}
Ok(None)
}
async fn check_other_account_optimized(&self, _keyed_account: &solana_client::rpc_response::RpcKeyedAccount, _account_pubkey_str: &str, account: &solana_account_decoder::UiAccount, _min_rent_exemption: u64) -> Result<Option<EmptyAccount>> {
if !account.executable && account.lamports > 0 {
let tolerance = _min_rent_exemption / 10;
let is_rent_exempt = account.lamports >= _min_rent_exemption.saturating_sub(tolerance)
&& account.lamports <= _min_rent_exemption.saturating_add(tolerance);
if is_rent_exempt {
let is_data_empty = self.is_account_data_empty(&account.data);
if is_data_empty {
return Ok(Some(EmptyAccount {
address: _account_pubkey_str.to_string(),
lamports: account.lamports,
owner: account.owner.clone(),
mint: None,
}));
}
}
}
Ok(None)
}
fn parse_token_account_from_binary_optimized(&self, data_str: &str, encoding: &solana_account_decoder::UiAccountEncoding) -> Result<crate::core::scanner::TokenAccountInfo> {
let _temp_buffer = self.memory_manager.get_buffer_blocking();
let decoded_data = match encoding {
solana_account_decoder::UiAccountEncoding::Base64 => {
use base64::{Engine as _, engine::general_purpose};
general_purpose::STANDARD.decode(data_str)
.map_err(|_| SolanaRecoverError::InternalError("Failed to decode Base64 data".to_string()))?
}
solana_account_decoder::UiAccountEncoding::Base58 => {
bs58::decode(data_str)
.into_vec()
.map_err(|_| SolanaRecoverError::InternalError("Failed to decode Base58 data".to_string()))?
}
_ => {
return Err(SolanaRecoverError::InternalError("Unsupported encoding for token account".to_string()));
}
};
if decoded_data.len() < 72 {
return Err(SolanaRecoverError::InternalError("Invalid token account data length".to_string()));
}
let mut mint_array = [0u8; 32];
mint_array.copy_from_slice(&decoded_data[0..32]);
let mint_pubkey = Pubkey::new_from_array(mint_array);
let amount_bytes = &decoded_data[64..72];
let mut amount_array = [0u8; 8];
amount_array.copy_from_slice(amount_bytes);
let amount = u64::from_le_bytes(amount_array);
Ok(crate::core::scanner::TokenAccountInfo {
mint: mint_pubkey.to_string(),
amount,
})
}
fn is_account_data_empty(&self, data: &solana_account_decoder::UiAccountData) -> bool {
match data {
solana_account_decoder::UiAccountData::Binary(data_str, _) => data_str.is_empty(),
solana_account_decoder::UiAccountData::Json(parsed) => {
parsed.parsed.is_null() ||
parsed.parsed.as_object().map_or(false, |obj| obj.is_empty()) ||
parsed.parsed.as_array().map_or(false, |arr| arr.is_empty())
},
solana_account_decoder::UiAccountData::LegacyBinary(_) => true,
}
}
async fn process_cached_accounts(&self, accounts: Vec<solana_client::rpc_response::RpcKeyedAccount>, wallet_address: &str) -> Result<WalletInfo> {
let total_accounts = accounts.len();
let empty_accounts = self.process_accounts_parallel(&accounts, wallet_address).await?;
let total_recoverable_lamports: u64 = empty_accounts.iter().map(|acc| acc.lamports).sum();
let recoverable_sol = total_recoverable_lamports as f64 / 1_000_000_000.0;
Ok(WalletInfo {
address: wallet_address.to_string(),
pubkey: Pubkey::from_str(wallet_address)
.map_err(|_| SolanaRecoverError::InvalidWalletAddress(wallet_address.to_string()))?,
total_accounts: total_accounts as u64,
empty_accounts: empty_accounts.len() as u64,
recoverable_lamports: total_recoverable_lamports,
recoverable_sol,
empty_account_addresses: empty_accounts.iter().map(|acc| acc.address.clone()).collect(),
scan_time_ms: 0, })
}
async fn cache_scan_result(&self, wallet_address: &str, _wallet_info: &WalletInfo) -> Result<()> {
let cache_key = format!("wallet_scan:{}", wallet_address);
let cached_account = CachedAccount {
data: AccountData::BatchAccounts(vec![]), timestamp: Instant::now(),
access_count: std::sync::atomic::AtomicU64::new(1),
priority: CachePriority::Medium,
size_bytes: 1024, compressed: false,
};
self.cache.put(cache_key, cached_account).await?;
Ok(())
}
fn calculate_optimal_batch_size(&self, total_accounts: usize) -> usize {
match self.config.performance_mode {
PerformanceMode::Throughput => std::cmp::min(100, total_accounts),
PerformanceMode::Latency => std::cmp::min(10, total_accounts),
PerformanceMode::Balanced => std::cmp::min(50, total_accounts),
PerformanceMode::ResourceEfficient => std::cmp::min(25, total_accounts),
PerformanceMode::UltraFast => std::cmp::min(200, total_accounts), }
}
fn start_background_tasks_original(&self) {
let pool = self.connection_pool.clone();
tokio::spawn(async move {
pool.start_health_checks().await;
});
if self.config.memory_config.enable_memory_monitoring {
let memory_manager = self.memory_manager.clone();
tokio::spawn(async move {
let mut interval = tokio::time::interval(Duration::from_secs(30));
loop {
interval.tick().await;
if let Err(e) = memory_manager.maybe_gc().await {
warn!("Memory GC failed: {}", e);
}
}
});
}
}
async fn update_scan_metrics(&self, success: bool, scan_time_ms: u64) {
let mut metrics = self.metrics.write().await;
metrics.total_scans += 1;
if success {
metrics.successful_scans += 1;
} else {
metrics.failed_scans += 1;
}
let total_scans = metrics.total_scans;
if total_scans > 0 {
metrics.avg_scan_time_ms =
(metrics.avg_scan_time_ms * (total_scans - 1) as f64 + scan_time_ms as f64) / total_scans as f64;
}
if scan_time_ms > 0 {
metrics.scans_per_second = 1000.0 / scan_time_ms as f64;
}
metrics.last_updated = Some(Utc::now());
}
async fn update_batch_metrics(&self, result: &BatchScanResult) {
let mut metrics = self.metrics.write().await;
if let Some(duration_ms) = result.duration_ms {
if duration_ms > 0 {
metrics.avg_batch_size = result.total_wallets as f64;
metrics.batch_efficiency = result.successful_scans as f64 / result.total_wallets as f64;
}
}
metrics.last_updated = Some(Utc::now());
}
pub async fn get_metrics(&self) -> Result<OptimizedScannerMetrics> {
let mut metrics = self.metrics.write().await;
let cache_metrics = self.cache.get_metrics().await;
metrics.cache_hit_rate = if cache_metrics.total_requests > 0 {
cache_metrics.total_hits as f64 / cache_metrics.total_requests as f64
} else {
0.0
};
metrics.l1_hits = cache_metrics.l1_hits;
metrics.l2_hits = cache_metrics.l2_hits;
metrics.l3_hits = cache_metrics.l3_hits;
let pool_metrics = self.connection_pool.get_metrics().await;
metrics.connection_efficiency = if pool_metrics.total_requests > 0 {
pool_metrics.successful_requests as f64 / pool_metrics.total_requests as f64
} else {
0.0
};
metrics.avg_response_time_ms = pool_metrics.avg_response_time_ms;
metrics.circuit_breaker_activations = pool_metrics.circuit_breaker_activations;
metrics.object_pool_hit_rate = 0.8; metrics.gc_pressure = 0.2;
let processor_metrics = self.parallel_processor.get_metrics().await;
metrics.parallel_efficiency = processor_metrics.worker_utilization / 100.0;
metrics.worker_utilization = processor_metrics.worker_utilization;
metrics.load_balancing_score = processor_metrics.load_balancing_efficiency;
Ok(OptimizedScannerMetrics {
total_scans: metrics.total_scans,
successful_scans: metrics.successful_scans,
failed_scans: metrics.failed_scans,
avg_scan_time_ms: metrics.avg_scan_time_ms,
scans_per_second: metrics.scans_per_second,
cache_hit_rate: metrics.cache_hit_rate,
cache_efficiency: metrics.cache_efficiency,
l1_hits: metrics.l1_hits,
l2_hits: metrics.l2_hits,
l3_hits: metrics.l3_hits,
avg_batch_size: metrics.avg_batch_size,
batch_efficiency: metrics.batch_efficiency,
rpc_calls_saved: metrics.rpc_calls_saved,
memory_efficiency: metrics.memory_efficiency,
object_pool_hit_rate: metrics.object_pool_hit_rate,
gc_pressure: metrics.gc_pressure,
connection_efficiency: metrics.connection_efficiency,
avg_response_time_ms: metrics.avg_response_time_ms,
circuit_breaker_activations: metrics.circuit_breaker_activations,
parallel_efficiency: metrics.parallel_efficiency,
worker_utilization: metrics.worker_utilization,
load_balancing_score: metrics.load_balancing_score,
last_updated: metrics.last_updated,
})
}
pub async fn get_performance_recommendations(&self) -> Vec<String> {
let metrics = self.get_metrics().await.unwrap_or_default();
let mut recommendations = Vec::new();
if metrics.cache_hit_rate < 0.7 {
recommendations.push("Consider increasing cache size or TTL to improve hit rate".to_string());
}
if metrics.memory_efficiency < 0.6 {
recommendations.push("Memory efficiency is low, consider tuning object pool sizes".to_string());
}
if metrics.connection_efficiency < 0.8 {
recommendations.push("Connection pool efficiency is low, check endpoint health".to_string());
}
if metrics.parallel_efficiency < 0.7 {
recommendations.push("Parallel processing efficiency is low, consider adjusting worker count".to_string());
}
if metrics.avg_scan_time_ms > 1000.0 {
recommendations.push("Scan times are high, consider enabling all optimizations".to_string());
}
if recommendations.is_empty() {
recommendations.push("Performance is optimal".to_string());
}
recommendations
}
}
impl Default for OptimizedScannerMetrics {
fn default() -> Self {
Self {
total_scans: 0,
successful_scans: 0,
failed_scans: 0,
avg_scan_time_ms: 0.0,
scans_per_second: 0.0,
cache_hit_rate: 0.0,
cache_efficiency: 0.0,
l1_hits: 0,
l2_hits: 0,
l3_hits: 0,
avg_batch_size: 0.0,
batch_efficiency: 0.0,
rpc_calls_saved: 0,
memory_efficiency: 0.0,
object_pool_hit_rate: 0.0,
gc_pressure: 0.0,
connection_efficiency: 0.0,
avg_response_time_ms: 0.0,
circuit_breaker_activations: 0,
parallel_efficiency: 0.0,
worker_utilization: 0.0,
load_balancing_score: 0.0,
last_updated: None,
}
}
}
impl Clone for OptimizedWalletScanner {
fn clone(&self) -> Self {
Self {
connection_pool: self.connection_pool.clone(),
batch_client: self.batch_client.clone(),
cache: self.cache.clone(),
parallel_processor: self.parallel_processor.clone(),
memory_manager: self.memory_manager.clone(),
config: self.config.clone(),
metrics: self.metrics.clone(),
prefetch_cache: self.prefetch_cache.clone(),
scan_optimizer: self.scan_optimizer.clone(),
connection_multiplexer: self.connection_multiplexer.clone(),
batch_optimizer: self.batch_optimizer.clone(),
fast_path: self.fast_path.clone(),
}
}
}
impl OptimizedWalletScanner {
async fn perform_ultra_fast_scan(
&self,
wallet_address: &str,
scan_id: Uuid,
start_time: Instant,
batch_size: usize,
concurrency: usize,
connection: Arc<RpcClientWrapper>,
) -> Result<ScanResult> {
let pubkey = Pubkey::from_str(wallet_address)
.map_err(|_| SolanaRecoverError::InvalidWalletAddress(wallet_address.to_string()))?;
let accounts = self.retrieve_accounts_ultra_fast(&pubkey, &connection, batch_size).await?;
let total_accounts = accounts.len();
debug!("Retrieved {} accounts for wallet {} in ultra-fast mode", total_accounts, wallet_address);
let empty_accounts = self.process_accounts_ultra_fast(&accounts, wallet_address, concurrency).await?;
let total_recoverable_lamports: u64 = empty_accounts.iter().map(|acc| acc.lamports).sum();
let recoverable_sol = total_recoverable_lamports as f64 / 1_000_000_000.0;
let scan_time = start_time.elapsed().as_millis() as u64;
let wallet_info = WalletInfo {
address: wallet_address.to_string(),
pubkey,
total_accounts: total_accounts as u64,
empty_accounts: empty_accounts.len() as u64,
recoverable_lamports: total_recoverable_lamports,
recoverable_sol,
empty_account_addresses: empty_accounts.iter().map(|acc| acc.address.clone()).collect(),
scan_time_ms: scan_time,
};
self.update_scan_metrics(true, scan_time).await;
Ok(ScanResult {
id: scan_id,
wallet_address: wallet_address.to_string(),
status: ScanStatus::Completed,
result: Some(wallet_info),
empty_accounts_found: empty_accounts.len() as u64,
recoverable_sol,
scan_time_ms: scan_time,
created_at: Utc::now(),
completed_at: Some(Utc::now()),
error_message: None,
})
}
async fn retrieve_accounts_ultra_fast(
&self,
pubkey: &Pubkey,
connection: &Arc<RpcClientWrapper>,
batch_size: usize,
) -> Result<Vec<solana_client::rpc_response::RpcKeyedAccount>> {
let accounts = connection.get_token_accounts_by_owner_ultra_fast(pubkey, batch_size).await?;
Ok(accounts)
}
async fn process_accounts_ultra_fast(
&self,
accounts: &[solana_client::rpc_response::RpcKeyedAccount],
_wallet_address: &str,
concurrency: usize,
) -> Result<Vec<EmptyAccount>> {
let empty_accounts: Vec<EmptyAccount> = accounts
.par_iter()
.with_min_len(accounts.len() / concurrency.max(1))
.filter_map(|keyed_account| {
let account_pubkey_str = keyed_account.pubkey.to_string();
if self.is_common_empty_account_pattern(&keyed_account.account) {
return Some(EmptyAccount {
address: account_pubkey_str,
lamports: keyed_account.account.lamports,
owner: keyed_account.account.owner.clone(),
mint: None,
});
}
if let Some(empty_account) = self.check_token_account_fast(&keyed_account.account, &account_pubkey_str) {
return Some(empty_account);
}
None
})
.collect();
Ok(empty_accounts)
}
fn is_common_empty_account_pattern(&self, account: &solana_account_decoder::UiAccount) -> bool {
account.lamports > 0
&& account.lamports < 2_000_000_000 && account.owner == "TokenkegQfeZyiNwAJbNbGKPFXCWuBvf9Ss623VQ5DA"
}
fn check_token_account_fast(&self, account: &solana_account_decoder::UiAccount, address: &str) -> Option<EmptyAccount> {
match &account.data {
solana_account_decoder::UiAccountData::Binary(data_str, encoding) => {
if let Ok(token_account) = self.parse_token_account_fast(&data_str, &encoding) {
if let Some(token_acc) = &token_account {
if token_acc.amount == 0 && account.lamports > 0 {
return Some(EmptyAccount {
address: address.to_string(),
lamports: account.lamports,
owner: account.owner.clone(),
mint: Some(token_acc.mint.clone()),
});
}
}
}
}
_ => {}
}
None
}
fn parse_token_account_fast(&self, data_str: &str, encoding: &solana_account_decoder::UiAccountEncoding) -> Result<Option<TokenAccountInfo>> {
if *encoding == solana_account_decoder::UiAccountEncoding::Base64 {
if let Ok(decoded) = base64::engine::general_purpose::STANDARD.decode(data_str) {
if decoded.len() >= 64 { let mint_bytes = &decoded[32..64];
let amount_bytes = &decoded[64..72];
let mint = bs58::encode(mint_bytes).into_string();
let amount = u64::from_le_bytes(amount_bytes.try_into().unwrap_or([0; 8]));
return Ok(Some(TokenAccountInfo { mint, amount }));
}
}
}
Ok(None)
}
async fn prefetch_related_data(&self, wallet_address: &str) {
let common_mints = vec![
"So11111111111111111111111111111111111111112", "EPjFWdd5AufqSSqeM2qN1xzybapC8G4wEGGkZwyTDt1v", "Es9vMFrzaCERmJfrF4H2FYD4KCoNkY11McCe8BenwNYB", ];
for mint in &common_mints {
let _cache_key = format!("mint_info:{}", mint);
let _cache = self.cache.clone();
let connection_pool = self.connection_pool.clone();
let mint = mint.to_string();
tokio::spawn(async move {
if let Ok(client) = connection_pool.get_client().await {
if let Ok(_) = client.client.get_account(&Pubkey::from_str(&mint).unwrap_or_default()) {
}
}
});
}
self.prefetch_cache.insert(wallet_address.to_string(), PrefetchData {
wallet_address: wallet_address.to_string(),
predicted_accounts: common_mints.iter().map(|s| s.to_string()).collect(),
last_updated: Instant::now(),
access_frequency: 1,
priority_score: 1.0,
});
}
fn start_background_tasks(&self) {
let connection_multiplexer = self.connection_multiplexer.clone();
let _batch_optimizer = self.batch_optimizer.clone();
let _cache = self.cache.clone();
tokio::spawn(async move {
let mut interval = tokio::time::interval(Duration::from_secs(30));
loop {
interval.tick().await;
connection_multiplexer.cleanup_idle_connections().await;
}
});
}
}