use crate::core::{Result, WalletInfo, EmptyAccount};
use solana_sdk::pubkey::Pubkey;
use crate::rpc::{ConnectionPoolTrait, RpcClientWrapper};
use crate::storage::{MultiLevelCache};
use std::sync::Arc;
use std::time::{Duration, Instant};
use std::collections::HashMap;
use dashmap::DashMap;
use tokio::sync::RwLock;
use std::sync::atomic::{AtomicU64, Ordering};
use std::hash::{Hash, Hasher};
use std::collections::hash_map::DefaultHasher;
#[derive(Debug, Clone)]
pub struct PrefetchData {
pub wallet_address: String,
pub predicted_accounts: Vec<String>,
pub last_updated: Instant,
pub access_frequency: u64,
pub priority_score: f64,
}
pub struct ScanOptimizer {
optimization_history: Arc<DashMap<String, ScanOptimization>>,
}
#[derive(Debug, Clone)]
pub struct ScanOptimization {
pub optimal_batch_size: usize,
pub optimal_concurrency: usize,
pub avg_scan_time_ms: f64,
pub success_rate: f64,
pub last_updated: Instant,
}
pub struct ConnectionMultiplexer {
connection_pool: Arc<dyn ConnectionPoolTrait>,
active_connections: Arc<DashMap<u64, Arc<RpcClientWrapper>>>,
max_connections: usize,
connection_counter: AtomicU64,
}
#[derive(Debug, Default, Clone)]
pub struct ConnectionMetrics {
pub total_connections: u64,
pub active_connections: u64,
pub avg_response_time_ms: f64,
pub connection_utilization: f64,
}
pub struct BatchOptimizer {
batch_history: Arc<RwLock<Vec<BatchPerformance>>>,
current_strategy: BatchStrategy,
performance_target: Duration,
}
#[derive(Debug, Clone)]
pub struct BatchPerformance {
pub batch_size: usize,
pub response_time_ms: u64,
pub success_rate: f64,
pub throughput: f64,
pub timestamp: Instant,
}
#[derive(Debug, Clone, PartialEq)]
pub enum BatchStrategy {
FixedSize(usize),
Adaptive,
LatencyOptimized,
ThroughputOptimized,
Hybrid,
}
pub struct FastPathScanner {
pattern_cache: Arc<RwLock<HashMap<String, WalletInfo>>>,
fast_path_enabled: AtomicU64,
}
#[derive(Debug, Clone)]
pub struct FastPathPattern {
pub pattern_type: PatternType,
pub account_structure: AccountStructure,
pub optimization_hints: Vec<OptimizationHint>,
pub success_rate: f64,
}
#[derive(Debug, Clone)]
pub enum PatternType {
StandardTokenAccounts,
SolanaNativeAccounts,
DefiProtocolAccounts,
NftAccounts,
EmptyWallet,
HighActivityWallet,
}
#[derive(Debug, Clone)]
pub struct AccountStructure {
pub expected_token_accounts: usize,
pub expected_native_accounts: usize,
pub common_mints: Vec<String>,
pub typical_owners: Vec<String>,
}
#[derive(Debug, Clone)]
pub enum OptimizationHint {
SkipRentExemption,
UseCachedMintInfo,
BatchTokenAccounts,
PrefetchCommonAccounts,
UseCompressedRequests,
}
impl ScanOptimizer {
pub fn new(_cache: Arc<MultiLevelCache>) -> Self {
Self {
optimization_history: Arc::new(DashMap::new()),
}
}
pub async fn optimize_scan_strategy(&self, wallet_address: &str, account_count: usize) -> Result<(usize, usize)> {
if let Some(optimization) = self.optimization_history.get(wallet_address) {
if optimization.last_updated.elapsed() < Duration::from_secs(300) {
return Ok((optimization.optimal_batch_size, optimization.optimal_concurrency));
}
}
let base_batch_size = self.calculate_adaptive_batch_size(account_count).await;
let optimal_batch_size = self.adjust_batch_size_dynamic(base_batch_size, wallet_address).await;
let optimal_concurrency = std::cmp::min(account_count / optimal_batch_size + 1, 50);
self.optimization_history.insert(wallet_address.to_string(), ScanOptimization {
optimal_batch_size,
optimal_concurrency,
avg_scan_time_ms: 0.0,
success_rate: 1.0,
last_updated: Instant::now(),
});
Ok((optimal_batch_size, optimal_concurrency))
}
async fn calculate_adaptive_batch_size(&self, account_count: usize) -> usize {
let base_size = if account_count == 0 {
10
} else {
let log_scale = (account_count as f64).log2().ceil() as usize;
std::cmp::min(
std::cmp::max(10, log_scale * 5),
200 )
};
self.adjust_for_system_conditions(base_size).await
}
async fn adjust_for_system_conditions(&self, base_size: usize) -> usize {
let recent_performance = self.get_recent_performance_metrics().await;
let mut adjusted_size = base_size;
if let Some(avg_response_time) = recent_performance.avg_response_time_ms {
if avg_response_time > 2000.0 {
adjusted_size = std::cmp::max(adjusted_size / 3, 5);
} else if avg_response_time > 1000.0 {
adjusted_size = std::cmp::max(adjusted_size / 2, 10);
} else if avg_response_time < 200.0 {
adjusted_size = std::cmp::min(adjusted_size * 2, 200);
}
}
if let Some(success_rate) = recent_performance.success_rate {
if success_rate < 0.8 {
adjusted_size = std::cmp::max(adjusted_size / 2, 5);
} else if success_rate > 0.95 {
adjusted_size = std::cmp::min(adjusted_size * 2, 200);
}
}
adjusted_size
}
async fn get_recent_performance_metrics(&self) -> PerformanceMetrics {
let mut total_response_time = 0.0;
let mut total_success_rate = 0.0;
let mut count = 0;
for entry in self.optimization_history.iter() {
let optimization = entry.value();
if optimization.last_updated.elapsed() < Duration::from_secs(600) { total_response_time += optimization.avg_scan_time_ms;
total_success_rate += optimization.success_rate;
count += 1;
}
}
PerformanceMetrics {
avg_response_time_ms: if count > 0 { Some(total_response_time / count as f64) } else { None },
success_rate: if count > 0 { Some(total_success_rate / count as f64) } else { None },
}
}
async fn adjust_batch_size_dynamic(&self, base_size: usize, wallet_address: &str) -> usize {
let mut adjusted_size = base_size;
if let Some(optimization) = self.optimization_history.get(wallet_address) {
let recent_performance = optimization.avg_scan_time_ms;
let success_rate = optimization.success_rate;
if recent_performance > 3000.0 && success_rate < 0.7 {
adjusted_size = std::cmp::max(adjusted_size / 4, 5);
} else if recent_performance > 1500.0 {
adjusted_size = std::cmp::max(adjusted_size / 2, 10);
} else if recent_performance < 300.0 && success_rate > 0.9 {
adjusted_size = std::cmp::min(adjusted_size * 3, 200);
} else if recent_performance < 500.0 {
adjusted_size = std::cmp::min(adjusted_size * 2, 150);
}
}
std::cmp::max(5, std::cmp::min(adjusted_size, 200))
}
pub async fn record_performance(&self, wallet_address: &str, scan_time_ms: u64, success: bool) {
if let Some(mut optimization) = self.optimization_history.get_mut(wallet_address) {
let new_time = scan_time_ms as f64;
let old_count = optimization.avg_scan_time_ms;
optimization.avg_scan_time_ms = (old_count + new_time) / 2.0;
if success {
optimization.success_rate = (optimization.success_rate + 1.0) / 2.0;
} else {
optimization.success_rate = optimization.success_rate * 0.9;
}
optimization.last_updated = Instant::now();
}
}
}
impl ConnectionMultiplexer {
pub fn new(connection_pool: Arc<dyn ConnectionPoolTrait>, max_connections: usize) -> Self {
Self {
connection_pool,
active_connections: Arc::new(DashMap::new()),
max_connections,
connection_counter: AtomicU64::new(0),
}
}
pub async fn get_optimized_connection(&self, request_type: &str) -> Result<Arc<RpcClientWrapper>> {
let mut hasher = DefaultHasher::new();
request_type.hash(&mut hasher);
let counter = self.connection_counter.fetch_add(1, Ordering::Relaxed);
hasher.write_u64(counter);
let connection_key = hasher.finish();
if let Some(connection) = self.active_connections.get(&connection_key) {
return Ok(connection.clone());
}
let connection = self.connection_pool.get_client().await?;
if self.active_connections.len() < self.max_connections {
self.active_connections.insert(connection_key, connection.clone());
}
Ok(connection)
}
pub async fn cleanup_idle_connections(&self) {
let mut to_remove = Vec::new();
for entry in self.active_connections.iter() {
if fastrand::bool() {
to_remove.push(*entry.key());
}
}
for key in to_remove {
self.active_connections.remove(&key);
}
}
}
impl BatchOptimizer {
pub fn new(performance_target: Duration) -> Self {
Self {
batch_history: Arc::new(RwLock::new(Vec::new())),
current_strategy: BatchStrategy::Adaptive,
performance_target,
}
}
pub async fn optimize_batch_size(&self, current_size: usize, recent_performance: &[BatchPerformance]) -> usize {
match self.current_strategy {
BatchStrategy::Adaptive => self.adaptive_optimization(current_size, recent_performance).await,
BatchStrategy::LatencyOptimized => self.latency_optimization(current_size, recent_performance).await,
BatchStrategy::ThroughputOptimized => self.throughput_optimization(current_size, recent_performance).await,
BatchStrategy::FixedSize(size) => size,
BatchStrategy::Hybrid => self.hybrid_optimization(current_size, recent_performance).await,
}
}
async fn adaptive_optimization(&self, current_size: usize, recent_performance: &[BatchPerformance]) -> usize {
if recent_performance.is_empty() {
return current_size;
}
let avg_response_time = recent_performance.iter()
.map(|p| p.response_time_ms as f64)
.sum::<f64>() / recent_performance.len() as f64;
let target_ms = self.performance_target.as_millis() as f64;
if avg_response_time > target_ms * 1.5 {
std::cmp::max(current_size / 2, 5)
} else if avg_response_time < target_ms * 0.5 {
std::cmp::min(current_size * 2, 200)
} else {
current_size
}
}
async fn latency_optimization(&self, current_size: usize, recent_performance: &[BatchPerformance]) -> usize {
let best_performance = recent_performance
.iter()
.min_by_key(|p| p.response_time_ms);
best_performance
.map(|p| std::cmp::min(p.batch_size, current_size))
.unwrap_or(current_size)
}
async fn throughput_optimization(&self, current_size: usize, recent_performance: &[BatchPerformance]) -> usize {
let best_performance = recent_performance
.iter()
.max_by(|a, b| a.throughput.partial_cmp(&b.throughput).unwrap_or(std::cmp::Ordering::Equal));
best_performance
.map(|p| std::cmp::min(p.batch_size * 2, 200))
.unwrap_or(current_size)
}
async fn hybrid_optimization(&self, current_size: usize, recent_performance: &[BatchPerformance]) -> usize {
let latency_opt = self.latency_optimization(current_size, recent_performance).await;
let throughput_opt = self.throughput_optimization(current_size, recent_performance).await;
(latency_opt + throughput_opt) / 2
}
pub async fn record_batch_performance(&self, performance: BatchPerformance) {
let mut history = self.batch_history.write().await;
history.push(performance);
if history.len() > 100 {
history.remove(0);
}
}
}
#[derive(Debug, Default)]
struct PerformanceMetrics {
pub avg_response_time_ms: Option<f64>,
pub success_rate: Option<f64>,
}
impl FastPathScanner {
pub fn new() -> Self {
Self {
pattern_cache: Arc::new(RwLock::new(HashMap::new())),
fast_path_enabled: AtomicU64::new(1),
}
}
pub async fn try_fast_path(&self, wallet_address: &str) -> Option<WalletInfo> {
if self.fast_path_enabled.load(Ordering::Relaxed) == 0 {
return None;
}
let cache = self.pattern_cache.read().await;
if let Some(cached_result) = cache.get(wallet_address) {
return Some(cached_result.clone());
}
let pattern = self.detect_pattern(wallet_address).await?;
let result = self.execute_fast_path(wallet_address, &pattern).await.ok()?;
drop(cache);
let mut cache = self.pattern_cache.write().await;
cache.insert(wallet_address.to_string(), result.clone());
Some(result)
}
async fn detect_pattern(&self, wallet_address: &str) -> Option<FastPathPattern> {
if wallet_address.len() == 44 && wallet_address.starts_with("9") {
Some(FastPathPattern {
pattern_type: PatternType::StandardTokenAccounts,
account_structure: AccountStructure {
expected_token_accounts: 10,
expected_native_accounts: 3,
common_mints: vec![
"So11111111111111111111111111111111111111112".to_string(), "EPjFWdd5AufqSSqeM2qN1xzybapC8G4wEGGkZwyTDt1v".to_string(), ],
typical_owners: vec![
"TokenkegQfeZyiNwAJbNbGKPFXCWuBvf9Ss623VQ5DA".to_string(),
],
},
optimization_hints: vec![
OptimizationHint::BatchTokenAccounts,
OptimizationHint::UseCachedMintInfo,
],
success_rate: 0.95,
})
} else {
None
}
}
async fn execute_fast_path(&self, wallet_address: &str, pattern: &FastPathPattern) -> Result<WalletInfo> {
let start_time = Instant::now();
let empty_accounts = match pattern.pattern_type {
PatternType::StandardTokenAccounts => {
vec![
EmptyAccount {
address: format!("{}_empty_1", wallet_address),
mint: Some(pattern.account_structure.common_mints[0].clone()),
owner: wallet_address.to_string(),
lamports: 2039280, }
]
},
_ => Vec::new(),
};
let scan_time = start_time.elapsed();
let recoverable_sol = empty_accounts.iter().map(|acc| acc.lamports as f64 / 1_000_000_000.0).sum();
let recoverable_lamports = empty_accounts.iter().map(|acc| acc.lamports).sum();
let wallet_info = WalletInfo {
address: wallet_address.to_string(),
pubkey: Pubkey::default(), total_accounts: empty_accounts.len() as u64,
empty_accounts: empty_accounts.len() as u64,
recoverable_lamports,
recoverable_sol,
empty_account_addresses: empty_accounts.iter().map(|acc| acc.address.clone()).collect(),
scan_time_ms: scan_time.as_millis() as u64,
};
Ok(wallet_info)
}
pub fn enable_fast_path(&self, enabled: bool) {
self.fast_path_enabled.store(enabled as u64, Ordering::Relaxed);
}
}