use std::sync::Arc;
use parking_lot::RwLock;
use tracing::{error};
use serde::{Serialize, Deserialize};
use crate::core::types::{WalletInfo, EmptyAccount, ScanResult, BatchScanResult};
use super::enhanced_memory_manager::{EnhancedMemoryManager, MemoryManagerConfig};
use super::advanced_buffer_pools::{AdvancedBufferPool, BufferPoolConfig};
use super::gc_scheduler::GcScheduler;
use super::memory_monitor::{MemoryMonitor, MemoryMonitorConfig};
#[derive(Debug)]
pub struct MemoryIntegrationLayer {
memory_manager: Arc<EnhancedMemoryManager>,
buffer_pool: Arc<AdvancedBufferPool>,
gc_scheduler: Arc<GcScheduler>,
memory_monitor: Arc<MemoryMonitor>,
config: MemoryIntegrationConfig,
stats: Arc<RwLock<IntegrationStats>>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct MemoryIntegrationConfig {
pub enable_scanner_pooling: bool,
pub enable_rpc_pooling: bool,
pub enable_buffer_pooling: bool,
pub enable_auto_gc: bool,
pub enable_monitoring: bool,
pub scanner_config: ScannerMemoryConfig,
pub rpc_config: RpcMemoryConfig,
pub buffer_config: BufferIntegrationConfig,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct ScannerMemoryConfig {
pub wallet_info_pool_size: usize,
pub empty_account_pool_size: usize,
pub scan_result_pool_size: usize,
pub batch_scan_result_pool_size: usize,
pub enable_scan_tracking: bool,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct RpcMemoryConfig {
pub request_buffer_pool_size: usize,
pub response_buffer_pool_size: usize,
pub account_data_buffer_pool_size: usize,
pub enable_rpc_tracking: bool,
pub max_request_size: usize,
pub max_response_size: usize,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct BufferIntegrationConfig {
pub enable_size_tiered_pools: bool,
pub enable_rpc_specialized_buffers: bool,
pub cleanup_interval_seconds: u64,
pub max_buffer_age_seconds: u64,
}
#[derive(Debug, Clone, Default, Serialize, Deserialize)]
pub struct IntegrationStats {
pub scanner_pool_operations: u64,
pub rpc_pool_operations: u64,
pub buffer_operations: u64,
pub memory_saved_bytes: usize,
pub integration_gc_triggers: u64,
pub integration_alerts: u64,
pub scanner_time_saved_ms: u64,
pub rpc_time_saved_ms: u64,
}
impl Default for MemoryIntegrationConfig {
fn default() -> Self {
Self {
enable_scanner_pooling: true,
enable_rpc_pooling: true,
enable_buffer_pooling: true,
enable_auto_gc: true,
enable_monitoring: true,
scanner_config: ScannerMemoryConfig::default(),
rpc_config: RpcMemoryConfig::default(),
buffer_config: BufferIntegrationConfig::default(),
}
}
}
impl Default for ScannerMemoryConfig {
fn default() -> Self {
Self {
wallet_info_pool_size: 10000,
empty_account_pool_size: 50000,
scan_result_pool_size: 10000,
batch_scan_result_pool_size: 1000,
enable_scan_tracking: true,
}
}
}
impl Default for RpcMemoryConfig {
fn default() -> Self {
Self {
request_buffer_pool_size: 1000,
response_buffer_pool_size: 1000,
account_data_buffer_pool_size: 2000,
enable_rpc_tracking: true,
max_request_size: 64 * 1024, max_response_size: 1024 * 1024, }
}
}
impl Default for BufferIntegrationConfig {
fn default() -> Self {
Self {
enable_size_tiered_pools: true,
enable_rpc_specialized_buffers: true,
cleanup_interval_seconds: 60,
max_buffer_age_seconds: 300,
}
}
}
impl MemoryIntegrationLayer {
pub fn new() -> Arc<Self> {
Self::with_config(MemoryIntegrationConfig::default())
}
pub fn with_config(config: MemoryIntegrationConfig) -> Arc<Self> {
let memory_config = Self::build_memory_manager_config(&config);
let memory_manager = EnhancedMemoryManager::with_config(memory_config);
let buffer_config = Self::build_buffer_pool_config(&config);
let buffer_pool = AdvancedBufferPool::with_config(buffer_config);
let gc_scheduler = GcScheduler::new();
let monitor_config = Self::build_monitor_config(&config);
let memory_monitor = MemoryMonitor::with_config(monitor_config);
let integration = Arc::new(Self {
memory_manager,
buffer_pool,
gc_scheduler,
memory_monitor,
config,
stats: Arc::new(RwLock::new(IntegrationStats::default())),
});
integration.start_background_services();
integration
}
fn build_memory_manager_config(config: &MemoryIntegrationConfig) -> MemoryManagerConfig {
let mut memory_config = MemoryManagerConfig::default();
memory_config.max_pool_sizes.wallet_info_pool = config.scanner_config.wallet_info_pool_size;
memory_config.max_pool_sizes.empty_account_pool = config.scanner_config.empty_account_pool_size;
memory_config.max_pool_sizes.scan_result_pool = config.scanner_config.scan_result_pool_size;
memory_config.max_pool_sizes.batch_scan_result_pool = config.scanner_config.batch_scan_result_pool_size;
memory_config.enable_object_pooling = config.enable_scanner_pooling || config.enable_rpc_pooling;
memory_config.enable_memory_monitoring = config.enable_monitoring;
memory_config.enable_auto_optimization = config.enable_auto_gc;
memory_config
}
fn build_buffer_pool_config(config: &MemoryIntegrationConfig) -> BufferPoolConfig {
let mut buffer_config = BufferPoolConfig::default();
buffer_config.pool_sizes.rpc_request_pool_size = config.rpc_config.request_buffer_pool_size;
buffer_config.pool_sizes.rpc_response_pool_size = config.rpc_config.response_buffer_pool_size;
buffer_config.pool_sizes.account_data_pool_size = config.rpc_config.account_data_buffer_pool_size;
buffer_config.cleanup_interval_seconds = config.buffer_config.cleanup_interval_seconds;
buffer_config.max_buffer_age_seconds = config.buffer_config.max_buffer_age_seconds;
buffer_config.enable_stats_collection = config.enable_monitoring;
buffer_config
}
fn build_monitor_config(config: &MemoryIntegrationConfig) -> MemoryMonitorConfig {
let mut monitor_config = MemoryMonitorConfig::default();
monitor_config.enable_profiling = config.enable_monitoring;
monitor_config.enable_leak_detection = config.enable_monitoring;
monitor_config.enable_performance_monitoring = config.enable_monitoring;
monitor_config.enable_real_time_events = config.enable_monitoring;
monitor_config
}
fn start_background_services(self: &Arc<Self>) {
if self.config.enable_monitoring {
let monitor = self.memory_monitor.clone();
tokio::spawn(async move {
if let Err(e) = monitor.start_monitoring().await {
error!("Failed to start memory monitoring: {}", e);
}
});
}
if self.config.enable_monitoring {
let integration = self.clone();
let mut receiver = self.memory_monitor.subscribe_events();
tokio::spawn(async move {
while let Ok(event) = receiver.recv().await {
integration.handle_memory_event(event).await;
}
});
}
}
async fn handle_memory_event(&self, event: super::memory_monitor::MemoryEvent) {
use super::memory_monitor::{MemoryEventType, EventSeverity};
match event.event_type {
MemoryEventType::Allocation { size, pool } => {
if pool.is_some() {
let mut stats = self.stats.write();
stats.buffer_operations += 1;
stats.memory_saved_bytes += size / 2; }
}
MemoryEventType::GcCollection { .. } => {
let mut stats = self.stats.write();
stats.integration_gc_triggers += 1;
}
MemoryEventType::MemoryPressure { .. } => {
if event.severity >= EventSeverity::Warning {
let mut stats = self.stats.write();
stats.integration_alerts += 1;
}
}
_ => {}
}
}
pub fn get_memory_manager(&self) -> Arc<EnhancedMemoryManager> {
self.memory_manager.clone()
}
pub fn get_buffer_pool(&self) -> Arc<AdvancedBufferPool> {
self.buffer_pool.clone()
}
pub fn get_gc_scheduler(&self) -> Arc<GcScheduler> {
self.gc_scheduler.clone()
}
pub fn get_memory_monitor(&self) -> Arc<MemoryMonitor> {
self.memory_monitor.clone()
}
pub fn get_integration_stats(&self) -> IntegrationStats {
self.stats.read().clone()
}
pub fn create_scanner_memory_manager(&self) -> ScannerMemoryManager {
ScannerMemoryManager::new(
self.memory_manager.clone(),
self.config.scanner_config.clone(),
self.stats.clone(),
)
}
pub fn create_rpc_memory_manager(&self) -> RpcMemoryManager {
RpcMemoryManager::new(
self.memory_manager.clone(),
self.buffer_pool.clone(),
self.config.rpc_config.clone(),
self.stats.clone(),
)
}
pub async fn generate_integration_report(&self) -> serde_json::Value {
let stats = self.get_integration_stats();
let memory_report = self.memory_manager.get_comprehensive_report().await;
let buffer_report = self.buffer_pool.get_performance_report();
let gc_report = self.gc_scheduler.get_comprehensive_report();
let monitor_report = self.memory_monitor.generate_report();
serde_json::json!({
"timestamp": chrono::Utc::now(),
"integration_stats": format!("{:?}", stats),
"memory_manager_report": memory_report,
"buffer_pool_report": buffer_report,
"gc_scheduler_report": gc_report,
"memory_monitor_report": monitor_report,
"config": format!("{:?}", self.config),
"recommendations": self.generate_integration_recommendations(&stats),
})
}
fn generate_integration_recommendations(&self, stats: &IntegrationStats) -> Vec<String> {
let mut recommendations = Vec::new();
if stats.scanner_pool_operations == 0 {
recommendations.push("Scanner object pooling is not being utilized. Consider enabling scanner pooling for better performance.".to_string());
}
if stats.rpc_pool_operations == 0 {
recommendations.push("RPC object pooling is not being utilized. Consider enabling RPC pooling for better performance.".to_string());
}
if stats.memory_saved_bytes < 1024 * 1024 { recommendations.push("Low memory savings detected. Consider increasing pool sizes or optimizing allocation patterns.".to_string());
}
if stats.integration_alerts > 10 {
recommendations.push("High number of memory alerts detected. Review memory usage patterns and consider optimization.".to_string());
}
if recommendations.is_empty() {
recommendations.push("Memory integration is performing optimally. No immediate action required.".to_string());
}
recommendations
}
}
#[derive(Debug, Clone)]
pub struct ScannerMemoryManager {
memory_manager: Arc<EnhancedMemoryManager>,
config: ScannerMemoryConfig,
stats: Arc<RwLock<IntegrationStats>>,
}
impl ScannerMemoryManager {
fn new(
memory_manager: Arc<EnhancedMemoryManager>,
config: ScannerMemoryConfig,
stats: Arc<RwLock<IntegrationStats>>,
) -> Self {
Self {
memory_manager,
config,
stats,
}
}
pub fn acquire_wallet_info(&self) -> crate::utils::memory_pool::PooledItem<WalletInfo> {
let wallet_info = self.memory_manager.acquire_wallet_info();
if self.config.enable_scan_tracking {
let mut stats = self.stats.write();
stats.scanner_pool_operations += 1;
}
wallet_info
}
pub fn acquire_empty_account(&self) -> crate::utils::memory_pool::PooledItem<EmptyAccount> {
let empty_account = self.memory_manager.acquire_empty_account();
if self.config.enable_scan_tracking {
let mut stats = self.stats.write();
stats.scanner_pool_operations += 1;
}
empty_account
}
pub fn acquire_scan_result(&self) -> crate::utils::memory_pool::PooledItem<ScanResult> {
let scan_result = self.memory_manager.acquire_scan_result();
if self.config.enable_scan_tracking {
let mut stats = self.stats.write();
stats.scanner_pool_operations += 1;
}
scan_result
}
pub fn acquire_batch_scan_result(&self) -> crate::utils::memory_pool::PooledItem<BatchScanResult> {
let batch_result = self.memory_manager.acquire_batch_scan_result();
if self.config.enable_scan_tracking {
let mut stats = self.stats.write();
stats.scanner_pool_operations += 1;
}
batch_result
}
pub fn get_scanner_stats(&self) -> serde_json::Value {
serde_json::json!({
"scanner_pool_operations": self.stats.read().scanner_pool_operations,
"scanner_config": format!("{:?}", self.config),
"memory_manager_stats": self.memory_manager.get_memory_stats(),
})
}
}
#[derive(Debug)]
pub struct RpcMemoryManager {
memory_manager: Arc<EnhancedMemoryManager>,
buffer_pool: Arc<AdvancedBufferPool>,
config: RpcMemoryConfig,
stats: Arc<RwLock<IntegrationStats>>,
}
impl RpcMemoryManager {
fn new(
memory_manager: Arc<EnhancedMemoryManager>,
buffer_pool: Arc<AdvancedBufferPool>,
config: RpcMemoryConfig,
stats: Arc<RwLock<IntegrationStats>>,
) -> Self {
Self {
memory_manager,
buffer_pool,
config,
stats,
}
}
pub fn acquire_request_buffer(&self, size: usize) -> super::memory_pool::PooledItem<Vec<u8>> {
let buffer_size = size.min(self.config.max_request_size);
let buffer = self.buffer_pool.get_buffer(buffer_size);
if self.config.enable_rpc_tracking {
let mut stats = self.stats.write();
stats.rpc_pool_operations += 1;
stats.memory_saved_bytes += buffer_size / 2; }
buffer
}
pub fn acquire_rpc_request_buffer(&self, method: &str, request_id: &str) -> super::memory_pool::PooledItem<super::advanced_buffer_pools::RpcRequestBuffer> {
let buffer = self.buffer_pool.get_rpc_request_buffer(method, request_id);
if self.config.enable_rpc_tracking {
let mut stats = self.stats.write();
stats.rpc_pool_operations += 1;
}
buffer
}
pub fn acquire_rpc_response_buffer(&self, request_id: &str) -> super::memory_pool::PooledItem<super::advanced_buffer_pools::RpcResponseBuffer> {
let buffer = self.buffer_pool.get_rpc_response_buffer(request_id);
if self.config.enable_rpc_tracking {
let mut stats = self.stats.write();
stats.rpc_pool_operations += 1;
}
buffer
}
pub fn acquire_account_data_buffer(&self, address: &str, slot: u64) -> super::memory_pool::PooledItem<super::advanced_buffer_pools::AccountDataBuffer> {
let buffer = self.buffer_pool.get_account_data_buffer(address, slot);
if self.config.enable_rpc_tracking {
let mut stats = self.stats.write();
stats.rpc_pool_operations += 1;
}
buffer
}
pub fn get_rpc_stats(&self) -> serde_json::Value {
serde_json::json!({
"rpc_pool_operations": self.stats.read().rpc_pool_operations,
"rpc_config": format!("{:?}", self.config),
"buffer_pool_stats": self.buffer_pool.get_stats(),
"memory_manager_stats": self.memory_manager.get_memory_stats(),
})
}
}
use std::sync::OnceLock;
static GLOBAL_MEMORY_INTEGRATION: OnceLock<Arc<MemoryIntegrationLayer>> = OnceLock::new();
pub fn get_global_memory_integration() -> Arc<MemoryIntegrationLayer> {
GLOBAL_MEMORY_INTEGRATION.get_or_init(|| MemoryIntegrationLayer::new()).clone()
}
pub fn init_global_memory_integration(config: MemoryIntegrationConfig) -> Arc<MemoryIntegrationLayer> {
let integration = MemoryIntegrationLayer::with_config(config);
GLOBAL_MEMORY_INTEGRATION.set(integration.clone()).expect("Global memory integration already initialized");
integration
}
#[cfg(test)]
mod tests {
use super::*;
#[tokio::test]
async fn test_memory_integration_creation() {
let integration = MemoryIntegrationLayer::new();
let stats = integration.get_integration_stats();
assert_eq!(stats.scanner_pool_operations, 0);
assert_eq!(stats.rpc_pool_operations, 0);
}
#[tokio::test]
async fn test_scanner_memory_manager() {
let integration = MemoryIntegrationLayer::new();
let scanner_manager = integration.create_scanner_memory_manager();
let wallet_info = scanner_manager.acquire_wallet_info();
assert!(wallet_info.address.is_empty());
drop(wallet_info);
let stats = integration.get_integration_stats();
assert_eq!(stats.scanner_pool_operations, 1);
}
#[tokio::test]
async fn test_rpc_memory_manager() {
let integration = MemoryIntegrationLayer::new();
let rpc_manager = integration.create_rpc_memory_manager();
let buffer = rpc_manager.acquire_request_buffer(1024);
assert_eq!(buffer.len(), 1024);
drop(buffer);
let stats = integration.get_integration_stats();
assert_eq!(stats.rpc_pool_operations, 1);
}
#[tokio::test]
async fn test_integration_report() {
let integration = MemoryIntegrationLayer::new();
let scanner_manager = integration.create_scanner_memory_manager();
let _wallet_info = scanner_manager.acquire_wallet_info();
let rpc_manager = integration.create_rpc_memory_manager();
let _buffer = rpc_manager.acquire_request_buffer(512);
let report = integration.generate_integration_report().await;
assert!(report.get("timestamp").is_some());
assert!(report.get("integration_stats").is_some());
assert!(report.get("memory_manager_report").is_some());
assert!(report.get("recommendations").is_some());
}
#[tokio::test]
async fn test_global_memory_integration() {
let global_integration = get_global_memory_integration();
let stats = global_integration.get_integration_stats();
assert!(stats.scanner_pool_operations >= 1);
}
}