ant_quic/optimization/
memory.rs

1//! Memory optimization components for ant-quic
2//!
3//! This module provides memory-efficient resource management including:
4//! - Connection pooling for Quinn connections
5//! - Candidate caching with TTL
6//! - Automatic cleanup of expired sessions and state
7//! - Frame batching for reduced packet overhead
8
9use std::{
10    collections::HashMap,
11    net::SocketAddr,
12    sync::{Arc, Mutex, RwLock},
13    time::{Duration, Instant},
14};
15
16use tracing::{debug, info};
17
18#[cfg(feature = "production-ready")]
19use crate::{HighLevelConnection as QuinnConnection, Endpoint as QuinnEndpoint};
20
21use crate::{
22    nat_traversal_api::{CandidateAddress, PeerId},
23    VarInt,
24};
25
26/// Connection pool for reusing Quinn connections
27#[derive(Debug)]
28pub struct ConnectionPool {
29    /// Active connections by peer ID
30    active_connections: Arc<RwLock<HashMap<PeerId, PooledConnection>>>,
31    /// Connection pool configuration
32    config: ConnectionPoolConfig,
33    /// Pool statistics
34    stats: Arc<Mutex<ConnectionPoolStats>>,
35    /// Cleanup task handle
36    cleanup_handle: Option<tokio::task::JoinHandle<()>>,
37}
38
39/// Configuration for connection pooling
40#[derive(Debug, Clone)]
41pub struct ConnectionPoolConfig {
42    /// Maximum number of connections to pool
43    pub max_connections: usize,
44    /// Maximum idle time before connection cleanup
45    pub max_idle_time: Duration,
46    /// Cleanup interval for expired connections
47    pub cleanup_interval: Duration,
48    /// Enable connection reuse
49    pub enable_reuse: bool,
50    /// Maximum connection age before forced refresh
51    pub max_connection_age: Duration,
52}
53
54/// A pooled connection with metadata
55#[derive(Debug)]
56struct PooledConnection {
57    #[cfg(feature = "production-ready")]
58    connection: Arc<QuinnConnection>,
59    #[cfg(not(feature = "production-ready"))]
60    _placeholder: (),
61    peer_id: PeerId,
62    remote_address: SocketAddr,
63    created_at: Instant,
64    last_used: Instant,
65    use_count: u64,
66    is_active: bool,
67}
68
69/// Statistics for connection pool
70#[derive(Debug, Default, Clone)]
71pub struct ConnectionPoolStats {
72    /// Total connections created
73    pub connections_created: u64,
74    /// Total connections reused
75    pub connections_reused: u64,
76    /// Total connections expired
77    pub connections_expired: u64,
78    /// Current active connections
79    pub active_connections: usize,
80    /// Pool hit rate (reuse / total requests)
81    pub hit_rate: f64,
82    /// Average connection age
83    pub avg_connection_age: Duration,
84}
85
86/// Candidate cache with TTL for efficient candidate management
87#[derive(Debug)]
88pub struct CandidateCache {
89    /// Cached candidates by peer ID
90    cache: Arc<RwLock<HashMap<PeerId, CachedCandidateSet>>>,
91    /// Cache configuration
92    config: CandidateCacheConfig,
93    /// Cache statistics
94    stats: Arc<Mutex<CandidateCacheStats>>,
95    /// Cleanup task handle
96    cleanup_handle: Option<tokio::task::JoinHandle<()>>,
97}
98
99/// Configuration for candidate caching
100#[derive(Debug, Clone)]
101pub struct CandidateCacheConfig {
102    /// Default TTL for cached candidates
103    pub default_ttl: Duration,
104    /// Maximum number of candidate sets to cache
105    pub max_cache_size: usize,
106    /// Cleanup interval for expired entries
107    pub cleanup_interval: Duration,
108    /// Enable candidate validation caching
109    pub enable_validation_cache: bool,
110    /// TTL for validation results
111    pub validation_ttl: Duration,
112}
113
114/// Cached candidate set with metadata
115#[derive(Debug, Clone)]
116struct CachedCandidateSet {
117    candidates: Vec<CandidateAddress>,
118    cached_at: Instant,
119    ttl: Duration,
120    access_count: u64,
121    last_accessed: Instant,
122    validation_results: HashMap<SocketAddr, ValidationCacheEntry>,
123}
124
125/// Cached validation result
126#[derive(Debug, Clone)]
127struct ValidationCacheEntry {
128    is_valid: bool,
129    rtt: Option<Duration>,
130    cached_at: Instant,
131    ttl: Duration,
132}
133
134/// Statistics for candidate cache
135#[derive(Debug, Default, Clone)]
136pub struct CandidateCacheStats {
137    /// Total cache hits
138    pub cache_hits: u64,
139    /// Total cache misses
140    pub cache_misses: u64,
141    /// Total entries expired
142    pub entries_expired: u64,
143    /// Current cache size
144    pub current_size: usize,
145    /// Cache hit rate
146    pub hit_rate: f64,
147    /// Average entry age
148    pub avg_entry_age: Duration,
149}
150
151/// Session state cleanup coordinator
152#[derive(Debug)]
153pub struct SessionCleanupCoordinator {
154    /// Active sessions by peer ID
155    active_sessions: Arc<RwLock<HashMap<PeerId, SessionState>>>,
156    /// Cleanup configuration
157    config: SessionCleanupConfig,
158    /// Cleanup statistics
159    stats: Arc<Mutex<SessionCleanupStats>>,
160    /// Cleanup task handle
161    cleanup_handle: Option<tokio::task::JoinHandle<()>>,
162}
163
164/// Configuration for session cleanup
165#[derive(Debug, Clone)]
166pub struct SessionCleanupConfig {
167    /// Maximum session idle time
168    pub max_idle_time: Duration,
169    /// Maximum session age
170    pub max_session_age: Duration,
171    /// Cleanup interval
172    pub cleanup_interval: Duration,
173    /// Enable aggressive cleanup under memory pressure
174    pub enable_aggressive_cleanup: bool,
175    /// Memory pressure threshold (MB)
176    pub memory_pressure_threshold: usize,
177}
178
179/// Session state for cleanup tracking
180#[derive(Debug)]
181struct SessionState {
182    peer_id: PeerId,
183    created_at: Instant,
184    last_activity: Instant,
185    memory_usage: usize,
186    is_active: bool,
187    cleanup_priority: CleanupPriority,
188}
189
190/// Priority for session cleanup
191#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord)]
192pub enum CleanupPriority {
193    Low,    // Keep as long as possible
194    Normal, // Standard cleanup rules
195    High,   // Clean up aggressively
196}
197
198/// Statistics for session cleanup
199#[derive(Debug, Default, Clone)]
200pub struct SessionCleanupStats {
201    /// Total sessions cleaned up
202    pub sessions_cleaned: u64,
203    /// Memory freed (bytes)
204    pub memory_freed: u64,
205    /// Current active sessions
206    pub active_sessions: usize,
207    /// Average session lifetime
208    pub avg_session_lifetime: Duration,
209}
210
211/// Frame batching coordinator for reduced packet overhead
212#[derive(Debug)]
213pub struct FrameBatchingCoordinator {
214    /// Pending frames by destination
215    pending_frames: Arc<Mutex<HashMap<SocketAddr, BatchedFrameSet>>>,
216    /// Batching configuration
217    config: FrameBatchingConfig,
218    /// Batching statistics
219    stats: Arc<Mutex<FrameBatchingStats>>,
220    /// Flush task handle
221    flush_handle: Option<tokio::task::JoinHandle<()>>,
222}
223
224/// Configuration for frame batching
225#[derive(Debug, Clone)]
226pub struct FrameBatchingConfig {
227    /// Maximum batch size (bytes)
228    pub max_batch_size: usize,
229    /// Maximum batch delay
230    pub max_batch_delay: Duration,
231    /// Maximum frames per batch
232    pub max_frames_per_batch: usize,
233    /// Enable adaptive batching based on network conditions
234    pub enable_adaptive_batching: bool,
235    /// Minimum batch size for efficiency
236    pub min_batch_size: usize,
237}
238
239/// Batched frame set
240#[derive(Debug)]
241struct BatchedFrameSet {
242    frames: Vec<BatchedFrame>,
243    total_size: usize,
244    created_at: Instant,
245    destination: SocketAddr,
246    priority: BatchPriority,
247}
248
249/// Individual batched frame
250#[derive(Debug)]
251struct BatchedFrame {
252    frame_type: u8,
253    payload: Vec<u8>,
254    size: usize,
255    priority: FramePriority,
256    created_at: Instant,
257}
258
259/// Priority for frame batching
260#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord)]
261enum BatchPriority {
262    Low,    // Can wait for full batch
263    Normal, // Standard batching rules
264    High,   // Send quickly, minimal batching
265}
266
267/// Priority for individual frames
268#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord)]
269pub enum FramePriority {
270    Background, // Low priority, can be delayed
271    Normal,     // Standard priority
272    Urgent,     // High priority, minimal delay
273}
274
275/// Statistics for frame batching
276#[derive(Debug, Default, Clone)]
277pub struct FrameBatchingStats {
278    /// Total frames batched
279    pub frames_batched: u64,
280    /// Total batches sent
281    pub batches_sent: u64,
282    /// Average batch size
283    pub avg_batch_size: f64,
284    /// Bytes saved through batching
285    pub bytes_saved: u64,
286    /// Batching efficiency (0.0 - 1.0)
287    pub batching_efficiency: f64,
288}
289
290impl Default for ConnectionPoolConfig {
291    fn default() -> Self {
292        Self {
293            max_connections: 1000,
294            max_idle_time: Duration::from_secs(300), // 5 minutes
295            cleanup_interval: Duration::from_secs(60), // 1 minute
296            enable_reuse: true,
297            max_connection_age: Duration::from_secs(3600), // 1 hour
298        }
299    }
300}
301
302impl Default for CandidateCacheConfig {
303    fn default() -> Self {
304        Self {
305            default_ttl: Duration::from_secs(300), // 5 minutes
306            max_cache_size: 10000,
307            cleanup_interval: Duration::from_secs(60), // 1 minute
308            enable_validation_cache: true,
309            validation_ttl: Duration::from_secs(60), // 1 minute
310        }
311    }
312}
313
314impl Default for SessionCleanupConfig {
315    fn default() -> Self {
316        Self {
317            max_idle_time: Duration::from_secs(600), // 10 minutes
318            max_session_age: Duration::from_secs(3600), // 1 hour
319            cleanup_interval: Duration::from_secs(120), // 2 minutes
320            enable_aggressive_cleanup: true,
321            memory_pressure_threshold: 512, // 512 MB
322        }
323    }
324}
325
326impl Default for FrameBatchingConfig {
327    fn default() -> Self {
328        Self {
329            max_batch_size: 1200, // Just under typical MTU
330            max_batch_delay: Duration::from_millis(10), // 10ms max delay
331            max_frames_per_batch: 10,
332            enable_adaptive_batching: true,
333            min_batch_size: 200, // Minimum size for efficiency
334        }
335    }
336}
337
338impl ConnectionPool {
339    /// Create a new connection pool
340    pub fn new(config: ConnectionPoolConfig) -> Self {
341        let pool = Self {
342            active_connections: Arc::new(RwLock::new(HashMap::new())),
343            config,
344            stats: Arc::new(Mutex::new(ConnectionPoolStats::default())),
345            cleanup_handle: None,
346        };
347
348        pool
349    }
350
351    /// Start the connection pool with cleanup task
352    pub async fn start(&mut self) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
353        let connections = Arc::clone(&self.active_connections);
354        let stats = Arc::clone(&self.stats);
355        let config = self.config.clone();
356
357        let cleanup_handle = tokio::spawn(async move {
358            let mut interval = tokio::time::interval(config.cleanup_interval);
359            
360            loop {
361                interval.tick().await;
362                Self::cleanup_expired_connections(&connections, &stats, &config).await;
363            }
364        });
365
366        self.cleanup_handle = Some(cleanup_handle);
367        info!("Connection pool started with max_connections={}", self.config.max_connections);
368        Ok(())
369    }
370
371    /// Get or create a connection for a peer
372    #[cfg(feature = "production-ready")]
373    pub async fn get_connection(
374        &self,
375        peer_id: PeerId,
376        remote_address: SocketAddr,
377        endpoint: &QuinnEndpoint,
378    ) -> Result<Arc<QuinnConnection>, Box<dyn std::error::Error + Send + Sync>> {
379        // Try to get existing connection
380        if let Some(connection) = self.try_get_existing_connection(peer_id, remote_address).await {
381            self.update_stats_hit().await;
382            return Ok(connection);
383        }
384
385        // Create new connection
386        let connection = self.create_new_connection(peer_id, remote_address, endpoint).await?;
387        self.update_stats_miss().await;
388        Ok(connection)
389    }
390
391    /// Try to get existing connection from pool
392    #[cfg(feature = "production-ready")]
393    async fn try_get_existing_connection(
394        &self,
395        peer_id: PeerId,
396        remote_address: SocketAddr,
397    ) -> Option<Arc<QuinnConnection>> {
398        let mut connections = self.active_connections.write().unwrap();
399        
400        if let Some(pooled) = connections.get_mut(&peer_id) {
401            if pooled.is_active && pooled.remote_address == remote_address {
402                // Check if connection is still valid
403                if pooled.connection.close_reason().is_none() {
404                    pooled.last_used = Instant::now();
405                    pooled.use_count += 1;
406                    debug!("Reusing pooled connection for peer {:?}", peer_id);
407                    return Some(Arc::clone(&pooled.connection));
408                } else {
409                    // Connection is closed, remove it
410                    connections.remove(&peer_id);
411                }
412            }
413        }
414
415        None
416    }
417
418    /// Create a new connection and add to pool
419    #[cfg(feature = "production-ready")]
420    async fn create_new_connection(
421        &self,
422        peer_id: PeerId,
423        remote_address: SocketAddr,
424        endpoint: &QuinnEndpoint,
425    ) -> Result<Arc<QuinnConnection>, Box<dyn std::error::Error + Send + Sync>> {
426        // Check pool size limit
427        {
428            let connections = self.active_connections.read().unwrap();
429            if connections.len() >= self.config.max_connections {
430                // Pool is full, need to evict least recently used
431                drop(connections);
432                self.evict_lru_connection().await;
433            }
434        }
435
436        // Create new connection
437        let rustls_config = rustls::ClientConfig::builder()
438            .with_root_certificates(rustls::RootCertStore::empty())
439            .with_no_client_auth();
440        
441        let client_crypto = crate::crypto::rustls::QuicClientConfig::try_from(rustls_config)
442            .map_err(|e| format!("Failed to create QUIC client config: {}", e))?;
443        
444        let client_config = crate::ClientConfig::new(Arc::new(client_crypto));
445        
446        let connecting = endpoint.connect_with(client_config, remote_address, "ant-quic")
447            .map_err(|e| format!("Failed to initiate connection: {}", e))?;
448        
449        // Wait for the connection to be established
450        let connection = connecting.await
451            .map_err(|e| format!("Connection failed: {}", e))?;
452
453        let connection_arc = Arc::new(connection);
454        
455        let pooled = PooledConnection {
456            connection: Arc::clone(&connection_arc),
457            peer_id,
458            remote_address,
459            created_at: Instant::now(),
460            last_used: Instant::now(),
461            use_count: 1,
462            is_active: true,
463        };
464
465        // Add to pool
466        {
467            let mut connections = self.active_connections.write().unwrap();
468            connections.insert(peer_id, pooled);
469        }
470
471        // Update stats
472        {
473            let mut stats = self.stats.lock().unwrap();
474            stats.connections_created += 1;
475            stats.active_connections += 1;
476        }
477
478        info!("Created new pooled connection for peer {:?}", peer_id);
479        Ok(connection_arc)
480    }
481
482    /// Evict least recently used connection
483    async fn evict_lru_connection(&self) {
484        let mut connections = self.active_connections.write().unwrap();
485        
486        if let Some((lru_peer_id, _)) = connections
487            .iter()
488            .min_by_key(|(_, pooled)| pooled.last_used)
489            .map(|(peer_id, pooled)| (*peer_id, pooled.last_used))
490        {
491            connections.remove(&lru_peer_id);
492            debug!("Evicted LRU connection for peer {:?}", lru_peer_id);
493            
494            // Update stats
495            let mut stats = self.stats.lock().unwrap();
496            stats.active_connections = stats.active_connections.saturating_sub(1);
497        }
498    }
499
500    /// Update stats for cache hit
501    async fn update_stats_hit(&self) {
502        let mut stats = self.stats.lock().unwrap();
503        stats.connections_reused += 1;
504        let total_requests = stats.connections_created + stats.connections_reused;
505        stats.hit_rate = stats.connections_reused as f64 / total_requests as f64;
506    }
507
508    /// Update stats for cache miss
509    async fn update_stats_miss(&self) {
510        let stats = self.stats.lock().unwrap();
511        let total_requests = stats.connections_created + stats.connections_reused;
512        drop(stats);
513        
514        let mut stats = self.stats.lock().unwrap();
515        stats.hit_rate = stats.connections_reused as f64 / total_requests as f64;
516    }
517
518    /// Cleanup expired connections
519    async fn cleanup_expired_connections(
520        connections: &Arc<RwLock<HashMap<PeerId, PooledConnection>>>,
521        stats: &Arc<Mutex<ConnectionPoolStats>>,
522        config: &ConnectionPoolConfig,
523    ) {
524        let now = Instant::now();
525        let mut to_remove = Vec::new();
526
527        // Find expired connections
528        {
529            let connections_read = connections.read().unwrap();
530            for (peer_id, pooled) in connections_read.iter() {
531                let idle_time = now.duration_since(pooled.last_used);
532                let age = now.duration_since(pooled.created_at);
533
534                if idle_time > config.max_idle_time || age > config.max_connection_age {
535                    to_remove.push(*peer_id);
536                }
537
538                #[cfg(feature = "production-ready")]
539                {
540                    // Also remove closed connections
541                    if pooled.connection.close_reason().is_some() {
542                        to_remove.push(*peer_id);
543                    }
544                }
545            }
546        }
547
548        // Remove expired connections
549        if !to_remove.is_empty() {
550            let mut connections_write = connections.write().unwrap();
551            for peer_id in &to_remove {
552                connections_write.remove(peer_id);
553            }
554
555            // Update stats
556            let mut stats_guard = stats.lock().unwrap();
557            stats_guard.connections_expired += to_remove.len() as u64;
558            stats_guard.active_connections = connections_write.len();
559
560            debug!("Cleaned up {} expired connections", to_remove.len());
561        }
562    }
563
564    /// Get connection pool statistics
565    pub async fn get_stats(&self) -> ConnectionPoolStats {
566        self.stats.lock().unwrap().clone()
567    }
568
569    /// Shutdown the connection pool
570    pub async fn shutdown(&mut self) {
571        if let Some(handle) = self.cleanup_handle.take() {
572            handle.abort();
573        }
574
575        // Close all connections
576        #[cfg(feature = "production-ready")]
577        {
578            let connections = self.active_connections.read().unwrap();
579            for (_, pooled) in connections.iter() {
580                pooled.connection.close(VarInt::from_u32(0), b"shutdown");
581            }
582        }
583
584        info!("Connection pool shutdown complete");
585    }
586}
587
588impl CandidateCache {
589    /// Create a new candidate cache
590    pub fn new(config: CandidateCacheConfig) -> Self {
591        let cache = Self {
592            cache: Arc::new(RwLock::new(HashMap::new())),
593            config,
594            stats: Arc::new(Mutex::new(CandidateCacheStats::default())),
595            cleanup_handle: None,
596        };
597
598        cache
599    }
600
601    /// Start the candidate cache with cleanup task
602    pub async fn start(&mut self) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
603        let cache = Arc::clone(&self.cache);
604        let stats = Arc::clone(&self.stats);
605        let config = self.config.clone();
606
607        let cleanup_handle = tokio::spawn(async move {
608            let mut interval = tokio::time::interval(config.cleanup_interval);
609            
610            loop {
611                interval.tick().await;
612                Self::cleanup_expired_entries(&cache, &stats, &config).await;
613            }
614        });
615
616        self.cleanup_handle = Some(cleanup_handle);
617        info!("Candidate cache started with max_size={}", self.config.max_cache_size);
618        Ok(())
619    }
620
621    /// Get cached candidates for a peer
622    pub async fn get_candidates(&self, peer_id: PeerId) -> Option<Vec<CandidateAddress>> {
623        let (is_valid, candidates) = {
624            let cache = self.cache.read().unwrap();
625            
626            if let Some(cached_set) = cache.get(&peer_id) {
627                let now = Instant::now();
628                
629                // Check if entry is still valid
630                if now.duration_since(cached_set.cached_at) <= cached_set.ttl {
631                    (true, Some(cached_set.candidates.clone()))
632                } else {
633                    (false, None)
634                }
635            } else {
636                (false, None)
637            }
638        };
639        
640        if is_valid {
641            // Update access statistics
642            self.update_access_stats(peer_id, true).await;
643            
644            if let Some(ref candidates) = candidates {
645                debug!("Cache hit for peer {:?}, {} candidates", peer_id, candidates.len());
646            }
647            return candidates;
648        }
649
650        // Cache miss
651        self.update_access_stats(peer_id, false).await;
652        debug!("Cache miss for peer {:?}", peer_id);
653        None
654    }
655
656    /// Cache candidates for a peer
657    pub async fn cache_candidates(
658        &self,
659        peer_id: PeerId,
660        candidates: Vec<CandidateAddress>,
661        ttl: Option<Duration>,
662    ) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
663        let ttl = ttl.unwrap_or(self.config.default_ttl);
664        
665        // Check cache size limit
666        {
667            let cache = self.cache.read().unwrap();
668            if cache.len() >= self.config.max_cache_size {
669                drop(cache);
670                self.evict_lru_entry().await;
671            }
672        }
673
674        let candidate_count = candidates.len();
675        let cached_set = CachedCandidateSet {
676            candidates,
677            cached_at: Instant::now(),
678            ttl,
679            access_count: 0,
680            last_accessed: Instant::now(),
681            validation_results: HashMap::new(),
682        };
683
684        // Add to cache
685        {
686            let mut cache = self.cache.write().unwrap();
687            cache.insert(peer_id, cached_set);
688        }
689
690        // Update stats
691        {
692            let mut stats = self.stats.lock().unwrap();
693            stats.current_size += 1;
694        }
695
696        debug!("Cached {} candidates for peer {:?} with TTL {:?}", 
697               candidate_count, peer_id, ttl);
698        Ok(())
699    }
700
701    /// Cache validation result for a candidate
702    pub async fn cache_validation_result(
703        &self,
704        peer_id: PeerId,
705        address: SocketAddr,
706        is_valid: bool,
707        rtt: Option<Duration>,
708    ) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
709        if !self.config.enable_validation_cache {
710            return Ok(());
711        }
712
713        let mut cache = self.cache.write().unwrap();
714        
715        if let Some(cached_set) = cache.get_mut(&peer_id) {
716            let validation_entry = ValidationCacheEntry {
717                is_valid,
718                rtt,
719                cached_at: Instant::now(),
720                ttl: self.config.validation_ttl,
721            };
722
723            cached_set.validation_results.insert(address, validation_entry);
724            debug!("Cached validation result for {}:{} -> {}", peer_id.0[0], address, is_valid);
725        }
726
727        Ok(())
728    }
729
730    /// Get cached validation result
731    pub async fn get_validation_result(
732        &self,
733        peer_id: PeerId,
734        address: SocketAddr,
735    ) -> Option<(bool, Option<Duration>)> {
736        if !self.config.enable_validation_cache {
737            return None;
738        }
739
740        let cache = self.cache.read().unwrap();
741        
742        if let Some(cached_set) = cache.get(&peer_id) {
743            if let Some(validation_entry) = cached_set.validation_results.get(&address) {
744                let now = Instant::now();
745                
746                // Check if validation result is still valid
747                if now.duration_since(validation_entry.cached_at) <= validation_entry.ttl {
748                    return Some((validation_entry.is_valid, validation_entry.rtt));
749                }
750            }
751        }
752
753        None
754    }
755
756    /// Update access statistics
757    async fn update_access_stats(&self, peer_id: PeerId, hit: bool) {
758        // Update cache-level stats
759        {
760            let mut stats = self.stats.lock().unwrap();
761            if hit {
762                stats.cache_hits += 1;
763            } else {
764                stats.cache_misses += 1;
765            }
766            
767            let total_accesses = stats.cache_hits + stats.cache_misses;
768            stats.hit_rate = stats.cache_hits as f64 / total_accesses as f64;
769        }
770
771        // Update entry-level stats
772        if hit {
773            let mut cache = self.cache.write().unwrap();
774            if let Some(cached_set) = cache.get_mut(&peer_id) {
775                cached_set.access_count += 1;
776                cached_set.last_accessed = Instant::now();
777            }
778        }
779    }
780
781    /// Evict least recently used entry
782    async fn evict_lru_entry(&self) {
783        let mut cache = self.cache.write().unwrap();
784        
785        if let Some((lru_peer_id, _)) = cache
786            .iter()
787            .min_by_key(|(_, cached_set)| cached_set.last_accessed)
788            .map(|(peer_id, cached_set)| (*peer_id, cached_set.last_accessed))
789        {
790            cache.remove(&lru_peer_id);
791            debug!("Evicted LRU cache entry for peer {:?}", lru_peer_id);
792            
793            // Update stats
794            let mut stats = self.stats.lock().unwrap();
795            stats.current_size = stats.current_size.saturating_sub(1);
796        }
797    }
798
799    /// Cleanup expired cache entries
800    async fn cleanup_expired_entries(
801        cache: &Arc<RwLock<HashMap<PeerId, CachedCandidateSet>>>,
802        stats: &Arc<Mutex<CandidateCacheStats>>,
803        _config: &CandidateCacheConfig,
804    ) {
805        let now = Instant::now();
806        let mut to_remove = Vec::new();
807
808        // Find expired entries
809        {
810            let cache_read = cache.read().unwrap();
811            for (peer_id, cached_set) in cache_read.iter() {
812                let age = now.duration_since(cached_set.cached_at);
813                if age > cached_set.ttl {
814                    to_remove.push(*peer_id);
815                }
816            }
817        }
818
819        // Remove expired entries
820        if !to_remove.is_empty() {
821            let mut cache_write = cache.write().unwrap();
822            for peer_id in &to_remove {
823                cache_write.remove(peer_id);
824            }
825
826            // Update stats
827            let mut stats_guard = stats.lock().unwrap();
828            stats_guard.entries_expired += to_remove.len() as u64;
829            stats_guard.current_size = cache_write.len();
830
831            debug!("Cleaned up {} expired cache entries", to_remove.len());
832        }
833
834        // Also cleanup expired validation results
835        {
836            let mut cache_write = cache.write().unwrap();
837            for cached_set in cache_write.values_mut() {
838                cached_set.validation_results.retain(|_, validation_entry| {
839                    now.duration_since(validation_entry.cached_at) <= validation_entry.ttl
840                });
841            }
842        }
843    }
844
845    /// Get cache statistics
846    pub async fn get_stats(&self) -> CandidateCacheStats {
847        self.stats.lock().unwrap().clone()
848    }
849
850    /// Shutdown the candidate cache
851    pub async fn shutdown(&mut self) {
852        if let Some(handle) = self.cleanup_handle.take() {
853            handle.abort();
854        }
855
856        // Clear cache
857        {
858            let mut cache = self.cache.write().unwrap();
859            cache.clear();
860        }
861
862        info!("Candidate cache shutdown complete");
863    }
864}
865
866impl SessionCleanupCoordinator {
867    /// Create a new session cleanup coordinator
868    pub fn new(config: SessionCleanupConfig) -> Self {
869        Self {
870            active_sessions: Arc::new(RwLock::new(HashMap::new())),
871            config,
872            stats: Arc::new(Mutex::new(SessionCleanupStats::default())),
873            cleanup_handle: None,
874        }
875    }
876
877    /// Start the session cleanup coordinator
878    pub async fn start(&mut self) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
879        let sessions = Arc::clone(&self.active_sessions);
880        let stats = Arc::clone(&self.stats);
881        let config = self.config.clone();
882
883        let cleanup_handle = tokio::spawn(async move {
884            let mut interval = tokio::time::interval(config.cleanup_interval);
885            
886            loop {
887                interval.tick().await;
888                Self::cleanup_expired_sessions(&sessions, &stats, &config).await;
889            }
890        });
891
892        self.cleanup_handle = Some(cleanup_handle);
893        info!("Session cleanup coordinator started");
894        Ok(())
895    }
896
897    /// Register a new session
898    pub async fn register_session(
899        &self,
900        peer_id: PeerId,
901        memory_usage: usize,
902        priority: CleanupPriority,
903    ) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
904        let session_state = SessionState {
905            peer_id,
906            created_at: Instant::now(),
907            last_activity: Instant::now(),
908            memory_usage,
909            is_active: true,
910            cleanup_priority: priority,
911        };
912
913        {
914            let mut sessions = self.active_sessions.write().unwrap();
915            sessions.insert(peer_id, session_state);
916        }
917
918        // Update stats
919        {
920            let mut stats = self.stats.lock().unwrap();
921            stats.active_sessions += 1;
922        }
923
924        debug!("Registered session for peer {:?} with {} bytes", peer_id, memory_usage);
925        Ok(())
926    }
927
928    /// Update session activity
929    pub async fn update_session_activity(&self, peer_id: PeerId) {
930        let mut sessions = self.active_sessions.write().unwrap();
931        if let Some(session) = sessions.get_mut(&peer_id) {
932            session.last_activity = Instant::now();
933        }
934    }
935
936    /// Cleanup expired sessions
937    async fn cleanup_expired_sessions(
938        sessions: &Arc<RwLock<HashMap<PeerId, SessionState>>>,
939        stats: &Arc<Mutex<SessionCleanupStats>>,
940        config: &SessionCleanupConfig,
941    ) {
942        let now = Instant::now();
943        let mut to_remove = Vec::new();
944        let mut memory_freed = 0u64;
945
946        // Check for memory pressure
947        let memory_pressure = Self::check_memory_pressure(config);
948
949        // Find sessions to cleanup
950        {
951            let sessions_read = sessions.read().unwrap();
952            for (peer_id, session) in sessions_read.iter() {
953                let idle_time = now.duration_since(session.last_activity);
954                let age = now.duration_since(session.created_at);
955
956                let should_cleanup = if memory_pressure && config.enable_aggressive_cleanup {
957                    // Aggressive cleanup under memory pressure
958                    match session.cleanup_priority {
959                        CleanupPriority::High => idle_time > Duration::from_secs(30),
960                        CleanupPriority::Normal => idle_time > Duration::from_secs(60),
961                        CleanupPriority::Low => idle_time > config.max_idle_time / 2,
962                    }
963                } else {
964                    // Normal cleanup rules
965                    idle_time > config.max_idle_time || age > config.max_session_age
966                };
967
968                if should_cleanup {
969                    to_remove.push(*peer_id);
970                    memory_freed += session.memory_usage as u64;
971                }
972            }
973        }
974
975        // Remove expired sessions
976        if !to_remove.is_empty() {
977            let mut sessions_write = sessions.write().unwrap();
978            for peer_id in &to_remove {
979                sessions_write.remove(peer_id);
980            }
981
982            // Update stats
983            let mut stats_guard = stats.lock().unwrap();
984            stats_guard.sessions_cleaned += to_remove.len() as u64;
985            stats_guard.memory_freed += memory_freed;
986            stats_guard.active_sessions = sessions_write.len();
987
988            if memory_pressure {
989                info!("Aggressive cleanup: removed {} sessions, freed {} bytes", 
990                      to_remove.len(), memory_freed);
991            } else {
992                debug!("Regular cleanup: removed {} sessions, freed {} bytes", 
993                       to_remove.len(), memory_freed);
994            }
995        }
996    }
997
998    /// Check if system is under memory pressure
999    fn check_memory_pressure(config: &SessionCleanupConfig) -> bool {
1000        // Simplified memory pressure detection
1001        // In production, this would check actual system memory usage
1002        // For now, return false (no memory pressure)
1003        // This can be enhanced with system memory monitoring
1004        let _ = config.memory_pressure_threshold;
1005        false
1006    }
1007
1008    /// Get session cleanup statistics
1009    pub async fn get_stats(&self) -> SessionCleanupStats {
1010        self.stats.lock().unwrap().clone()
1011    }
1012
1013    /// Shutdown the session cleanup coordinator
1014    pub async fn shutdown(&mut self) {
1015        if let Some(handle) = self.cleanup_handle.take() {
1016            handle.abort();
1017        }
1018
1019        // Clear sessions
1020        {
1021            let mut sessions = self.active_sessions.write().unwrap();
1022            sessions.clear();
1023        }
1024
1025        info!("Session cleanup coordinator shutdown complete");
1026    }
1027}
1028
1029impl FrameBatchingCoordinator {
1030    /// Create a new frame batching coordinator
1031    pub fn new(config: FrameBatchingConfig) -> Self {
1032        Self {
1033            pending_frames: Arc::new(Mutex::new(HashMap::new())),
1034            config,
1035            stats: Arc::new(Mutex::new(FrameBatchingStats::default())),
1036            flush_handle: None,
1037        }
1038    }
1039
1040    /// Start the frame batching coordinator
1041    pub async fn start(&mut self) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
1042        let pending_frames = Arc::clone(&self.pending_frames);
1043        let stats = Arc::clone(&self.stats);
1044        let config = self.config.clone();
1045
1046        let flush_handle = tokio::spawn(async move {
1047            let mut interval = tokio::time::interval(config.max_batch_delay / 4);
1048            
1049            loop {
1050                interval.tick().await;
1051                Self::flush_expired_batches(&pending_frames, &stats, &config).await;
1052            }
1053        });
1054
1055        self.flush_handle = Some(flush_handle);
1056        info!("Frame batching coordinator started");
1057        Ok(())
1058    }
1059
1060    /// Add frame to batch
1061    pub async fn add_frame(
1062        &self,
1063        destination: SocketAddr,
1064        frame_type: u8,
1065        payload: Vec<u8>,
1066        priority: FramePriority,
1067    ) -> Result<Option<Vec<u8>>, Box<dyn std::error::Error + Send + Sync>> {
1068        let frame = BatchedFrame {
1069            frame_type,
1070            size: payload.len(),
1071            payload,
1072            priority,
1073            created_at: Instant::now(),
1074        };
1075
1076        let mut pending = self.pending_frames.lock().unwrap();
1077        
1078        // Check if we need to flush
1079        let (should_flush, frames_count, total_size) = {
1080            let batch_set = pending.entry(destination).or_insert_with(|| {
1081                BatchedFrameSet {
1082                    frames: Vec::new(),
1083                    total_size: 0,
1084                    created_at: Instant::now(),
1085                    destination,
1086                    priority: BatchPriority::Normal,
1087                }
1088            });
1089
1090            batch_set.frames.push(frame);
1091            batch_set.total_size += batch_set.frames.last().unwrap().size;
1092            
1093            // Update batch priority based on frame priority
1094            if priority == FramePriority::Urgent {
1095                batch_set.priority = BatchPriority::High;
1096            }
1097
1098            // Check if batch should be flushed immediately
1099            let should_flush = self.should_flush_batch(batch_set);
1100            (should_flush, batch_set.frames.len(), batch_set.total_size)
1101        };
1102        
1103        if should_flush {
1104            // Remove and serialize the batch
1105            if let Some(batch_set) = pending.remove(&destination) {
1106                let batch_data = self.serialize_batch(&batch_set);
1107                
1108                // Update stats
1109                {
1110                    let mut stats = self.stats.lock().unwrap();
1111                    stats.batches_sent += 1;
1112                    stats.frames_batched += frames_count as u64;
1113                    stats.avg_batch_size = (stats.avg_batch_size * (stats.batches_sent - 1) as f64 + 
1114                                           total_size as f64) / stats.batches_sent as f64;
1115                }
1116
1117                debug!("Flushed batch to {} with {} frames ({} bytes)", 
1118                       destination, frames_count, total_size);
1119                return Ok(Some(batch_data));
1120            }
1121        }
1122
1123        Ok(None)
1124    }
1125
1126    /// Check if batch should be flushed
1127    fn should_flush_batch(&self, batch_set: &BatchedFrameSet) -> bool {
1128        let now = Instant::now();
1129        let age = now.duration_since(batch_set.created_at);
1130
1131        // Flush if batch is full, old, or high priority
1132        batch_set.total_size >= self.config.max_batch_size ||
1133        batch_set.frames.len() >= self.config.max_frames_per_batch ||
1134        age >= self.config.max_batch_delay ||
1135        batch_set.priority == BatchPriority::High
1136    }
1137
1138    /// Serialize batch into packet data
1139    fn serialize_batch(&self, batch_set: &BatchedFrameSet) -> Vec<u8> {
1140        let mut data = Vec::with_capacity(batch_set.total_size + batch_set.frames.len() * 4);
1141        
1142        for frame in &batch_set.frames {
1143            // Add frame type and length
1144            data.push(frame.frame_type);
1145            data.extend_from_slice(&(frame.payload.len() as u16).to_be_bytes());
1146            // Add frame payload
1147            data.extend_from_slice(&frame.payload);
1148        }
1149
1150        data
1151    }
1152
1153    /// Flush expired batches
1154    async fn flush_expired_batches(
1155        pending_frames: &Arc<Mutex<HashMap<SocketAddr, BatchedFrameSet>>>,
1156        stats: &Arc<Mutex<FrameBatchingStats>>,
1157        config: &FrameBatchingConfig,
1158    ) {
1159        let now = Instant::now();
1160        let mut to_flush = Vec::new();
1161
1162        // Find expired batches
1163        {
1164            let pending = pending_frames.lock().unwrap();
1165            for (destination, batch_set) in pending.iter() {
1166                let age = now.duration_since(batch_set.created_at);
1167                if age >= config.max_batch_delay {
1168                    to_flush.push((*destination, batch_set.frames.len(), batch_set.total_size));
1169                }
1170            }
1171        }
1172
1173        // Flush expired batches
1174        if !to_flush.is_empty() {
1175            let mut pending = pending_frames.lock().unwrap();
1176            let flush_count = to_flush.len();
1177            for (destination, frame_count, total_size) in to_flush {
1178                pending.remove(&destination);
1179                
1180                // Update stats
1181                let mut stats_guard = stats.lock().unwrap();
1182                stats_guard.batches_sent += 1;
1183                stats_guard.frames_batched += frame_count as u64;
1184                stats_guard.avg_batch_size = (stats_guard.avg_batch_size * (stats_guard.batches_sent - 1) as f64 + 
1185                                             total_size as f64) / stats_guard.batches_sent as f64;
1186            }
1187
1188            debug!("Flushed {} expired batches", flush_count);
1189        }
1190    }
1191
1192    /// Get batching statistics
1193    pub async fn get_stats(&self) -> FrameBatchingStats {
1194        self.stats.lock().unwrap().clone()
1195    }
1196
1197    /// Shutdown the frame batching coordinator
1198    pub async fn shutdown(&mut self) {
1199        if let Some(handle) = self.flush_handle.take() {
1200            handle.abort();
1201        }
1202
1203        // Flush all pending batches
1204        {
1205            let mut pending = self.pending_frames.lock().unwrap();
1206            pending.clear();
1207        }
1208
1209        info!("Frame batching coordinator shutdown complete");
1210    }
1211}
1212
1213/// Memory optimization manager that coordinates all memory optimization components
1214#[derive(Debug)]
1215pub struct MemoryOptimizationManager {
1216    connection_pool: ConnectionPool,
1217    candidate_cache: CandidateCache,
1218    session_cleanup: SessionCleanupCoordinator,
1219    frame_batching: FrameBatchingCoordinator,
1220    is_running: bool,
1221}
1222
1223impl MemoryOptimizationManager {
1224    /// Create a new memory optimization manager with default configurations
1225    pub fn new() -> Self {
1226        Self {
1227            connection_pool: ConnectionPool::new(ConnectionPoolConfig::default()),
1228            candidate_cache: CandidateCache::new(CandidateCacheConfig::default()),
1229            session_cleanup: SessionCleanupCoordinator::new(SessionCleanupConfig::default()),
1230            frame_batching: FrameBatchingCoordinator::new(FrameBatchingConfig::default()),
1231            is_running: false,
1232        }
1233    }
1234
1235    /// Create a new memory optimization manager with custom configurations
1236    pub fn with_configs(
1237        pool_config: ConnectionPoolConfig,
1238        cache_config: CandidateCacheConfig,
1239        cleanup_config: SessionCleanupConfig,
1240        batching_config: FrameBatchingConfig,
1241    ) -> Self {
1242        Self {
1243            connection_pool: ConnectionPool::new(pool_config),
1244            candidate_cache: CandidateCache::new(cache_config),
1245            session_cleanup: SessionCleanupCoordinator::new(cleanup_config),
1246            frame_batching: FrameBatchingCoordinator::new(batching_config),
1247            is_running: false,
1248        }
1249    }
1250
1251    /// Start all memory optimization components
1252    pub async fn start(&mut self) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
1253        if self.is_running {
1254            return Ok(());
1255        }
1256
1257        self.connection_pool.start().await?;
1258        self.candidate_cache.start().await?;
1259        self.session_cleanup.start().await?;
1260        self.frame_batching.start().await?;
1261
1262        self.is_running = true;
1263        info!("Memory optimization manager started");
1264        Ok(())
1265    }
1266
1267    /// Get connection pool reference
1268    pub fn connection_pool(&self) -> &ConnectionPool {
1269        &self.connection_pool
1270    }
1271
1272    /// Get candidate cache reference
1273    pub fn candidate_cache(&self) -> &CandidateCache {
1274        &self.candidate_cache
1275    }
1276
1277    /// Get session cleanup coordinator reference
1278    pub fn session_cleanup(&self) -> &SessionCleanupCoordinator {
1279        &self.session_cleanup
1280    }
1281
1282    /// Get frame batching coordinator reference
1283    pub fn frame_batching(&self) -> &FrameBatchingCoordinator {
1284        &self.frame_batching
1285    }
1286
1287    /// Get comprehensive memory optimization statistics
1288    pub async fn get_comprehensive_stats(&self) -> MemoryOptimizationStats {
1289        MemoryOptimizationStats {
1290            connection_pool: self.connection_pool.get_stats().await,
1291            candidate_cache: self.candidate_cache.get_stats().await,
1292            session_cleanup: self.session_cleanup.get_stats().await,
1293            frame_batching: self.frame_batching.get_stats().await,
1294        }
1295    }
1296
1297    /// Shutdown all memory optimization components
1298    pub async fn shutdown(&mut self) {
1299        if !self.is_running {
1300            return;
1301        }
1302
1303        self.connection_pool.shutdown().await;
1304        self.candidate_cache.shutdown().await;
1305        self.session_cleanup.shutdown().await;
1306        self.frame_batching.shutdown().await;
1307
1308        self.is_running = false;
1309        info!("Memory optimization manager shutdown complete");
1310    }
1311}
1312
1313/// Comprehensive memory optimization statistics
1314#[derive(Debug, Clone)]
1315pub struct MemoryOptimizationStats {
1316    pub connection_pool: ConnectionPoolStats,
1317    pub candidate_cache: CandidateCacheStats,
1318    pub session_cleanup: SessionCleanupStats,
1319    pub frame_batching: FrameBatchingStats,
1320}
1321
1322impl Default for MemoryOptimizationManager {
1323    fn default() -> Self {
1324        Self::new()
1325    }
1326}