ant_quic/optimization/
memory.rs

1//! Memory optimization components for ant-quic
2//!
3//! This module provides memory-efficient resource management including:
4//! - Connection pooling for Quinn connections
5//! - Candidate caching with TTL
6//! - Automatic cleanup of expired sessions and state
7//! - Frame batching for reduced packet overhead
8
9use std::{
10    collections::HashMap,
11    net::SocketAddr,
12    sync::{Arc, Mutex, RwLock},
13    time::{Duration, Instant},
14};
15
16use tracing::{debug, info};
17
18use crate::{HighLevelConnection as QuinnConnection, Endpoint as QuinnEndpoint};
19
20use crate::{
21    nat_traversal_api::{CandidateAddress, PeerId},
22    VarInt,
23};
24
25/// Connection pool for reusing Quinn connections
26#[derive(Debug)]
27pub struct ConnectionPool {
28    /// Active connections by peer ID
29    active_connections: Arc<RwLock<HashMap<PeerId, PooledConnection>>>,
30    /// Connection pool configuration
31    config: ConnectionPoolConfig,
32    /// Pool statistics
33    stats: Arc<Mutex<ConnectionPoolStats>>,
34    /// Cleanup task handle
35    cleanup_handle: Option<tokio::task::JoinHandle<()>>,
36}
37
38/// Configuration for connection pooling
39#[derive(Debug, Clone)]
40pub struct ConnectionPoolConfig {
41    /// Maximum number of connections to pool
42    pub max_connections: usize,
43    /// Maximum idle time before connection cleanup
44    pub max_idle_time: Duration,
45    /// Cleanup interval for expired connections
46    pub cleanup_interval: Duration,
47    /// Enable connection reuse
48    pub enable_reuse: bool,
49    /// Maximum connection age before forced refresh
50    pub max_connection_age: Duration,
51}
52
53/// A pooled connection with metadata
54#[derive(Debug)]
55struct PooledConnection {
56    connection: Arc<QuinnConnection>,
57    peer_id: PeerId,
58    remote_address: SocketAddr,
59    created_at: Instant,
60    last_used: Instant,
61    use_count: u64,
62    is_active: bool,
63}
64
65/// Statistics for connection pool
66#[derive(Debug, Default, Clone)]
67pub struct ConnectionPoolStats {
68    /// Total connections created
69    pub connections_created: u64,
70    /// Total connections reused
71    pub connections_reused: u64,
72    /// Total connections expired
73    pub connections_expired: u64,
74    /// Current active connections
75    pub active_connections: usize,
76    /// Pool hit rate (reuse / total requests)
77    pub hit_rate: f64,
78    /// Average connection age
79    pub avg_connection_age: Duration,
80}
81
82/// Candidate cache with TTL for efficient candidate management
83#[derive(Debug)]
84pub struct CandidateCache {
85    /// Cached candidates by peer ID
86    cache: Arc<RwLock<HashMap<PeerId, CachedCandidateSet>>>,
87    /// Cache configuration
88    config: CandidateCacheConfig,
89    /// Cache statistics
90    stats: Arc<Mutex<CandidateCacheStats>>,
91    /// Cleanup task handle
92    cleanup_handle: Option<tokio::task::JoinHandle<()>>,
93}
94
95/// Configuration for candidate caching
96#[derive(Debug, Clone)]
97pub struct CandidateCacheConfig {
98    /// Default TTL for cached candidates
99    pub default_ttl: Duration,
100    /// Maximum number of candidate sets to cache
101    pub max_cache_size: usize,
102    /// Cleanup interval for expired entries
103    pub cleanup_interval: Duration,
104    /// Enable candidate validation caching
105    pub enable_validation_cache: bool,
106    /// TTL for validation results
107    pub validation_ttl: Duration,
108}
109
110/// Cached candidate set with metadata
111#[derive(Debug, Clone)]
112struct CachedCandidateSet {
113    candidates: Vec<CandidateAddress>,
114    cached_at: Instant,
115    ttl: Duration,
116    access_count: u64,
117    last_accessed: Instant,
118    validation_results: HashMap<SocketAddr, ValidationCacheEntry>,
119}
120
121/// Cached validation result
122#[derive(Debug, Clone)]
123struct ValidationCacheEntry {
124    is_valid: bool,
125    rtt: Option<Duration>,
126    cached_at: Instant,
127    ttl: Duration,
128}
129
130/// Statistics for candidate cache
131#[derive(Debug, Default, Clone)]
132pub struct CandidateCacheStats {
133    /// Total cache hits
134    pub cache_hits: u64,
135    /// Total cache misses
136    pub cache_misses: u64,
137    /// Total entries expired
138    pub entries_expired: u64,
139    /// Current cache size
140    pub current_size: usize,
141    /// Cache hit rate
142    pub hit_rate: f64,
143    /// Average entry age
144    pub avg_entry_age: Duration,
145}
146
147/// Session state cleanup coordinator
148#[derive(Debug)]
149pub struct SessionCleanupCoordinator {
150    /// Active sessions by peer ID
151    active_sessions: Arc<RwLock<HashMap<PeerId, SessionState>>>,
152    /// Cleanup configuration
153    config: SessionCleanupConfig,
154    /// Cleanup statistics
155    stats: Arc<Mutex<SessionCleanupStats>>,
156    /// Cleanup task handle
157    cleanup_handle: Option<tokio::task::JoinHandle<()>>,
158}
159
160/// Configuration for session cleanup
161#[derive(Debug, Clone)]
162pub struct SessionCleanupConfig {
163    /// Maximum session idle time
164    pub max_idle_time: Duration,
165    /// Maximum session age
166    pub max_session_age: Duration,
167    /// Cleanup interval
168    pub cleanup_interval: Duration,
169    /// Enable aggressive cleanup under memory pressure
170    pub enable_aggressive_cleanup: bool,
171    /// Memory pressure threshold (MB)
172    pub memory_pressure_threshold: usize,
173}
174
175/// Session state for cleanup tracking
176#[derive(Debug)]
177struct SessionState {
178    peer_id: PeerId,
179    created_at: Instant,
180    last_activity: Instant,
181    memory_usage: usize,
182    is_active: bool,
183    cleanup_priority: CleanupPriority,
184}
185
186/// Priority for session cleanup
187#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord)]
188pub enum CleanupPriority {
189    Low,    // Keep as long as possible
190    Normal, // Standard cleanup rules
191    High,   // Clean up aggressively
192}
193
194/// Statistics for session cleanup
195#[derive(Debug, Default, Clone)]
196pub struct SessionCleanupStats {
197    /// Total sessions cleaned up
198    pub sessions_cleaned: u64,
199    /// Memory freed (bytes)
200    pub memory_freed: u64,
201    /// Current active sessions
202    pub active_sessions: usize,
203    /// Average session lifetime
204    pub avg_session_lifetime: Duration,
205}
206
207/// Frame batching coordinator for reduced packet overhead
208#[derive(Debug)]
209pub struct FrameBatchingCoordinator {
210    /// Pending frames by destination
211    pending_frames: Arc<Mutex<HashMap<SocketAddr, BatchedFrameSet>>>,
212    /// Batching configuration
213    config: FrameBatchingConfig,
214    /// Batching statistics
215    stats: Arc<Mutex<FrameBatchingStats>>,
216    /// Flush task handle
217    flush_handle: Option<tokio::task::JoinHandle<()>>,
218}
219
220/// Configuration for frame batching
221#[derive(Debug, Clone)]
222pub struct FrameBatchingConfig {
223    /// Maximum batch size (bytes)
224    pub max_batch_size: usize,
225    /// Maximum batch delay
226    pub max_batch_delay: Duration,
227    /// Maximum frames per batch
228    pub max_frames_per_batch: usize,
229    /// Enable adaptive batching based on network conditions
230    pub enable_adaptive_batching: bool,
231    /// Minimum batch size for efficiency
232    pub min_batch_size: usize,
233}
234
235/// Batched frame set
236#[derive(Debug)]
237struct BatchedFrameSet {
238    frames: Vec<BatchedFrame>,
239    total_size: usize,
240    created_at: Instant,
241    destination: SocketAddr,
242    priority: BatchPriority,
243}
244
245/// Individual batched frame
246#[derive(Debug)]
247struct BatchedFrame {
248    frame_type: u8,
249    payload: Vec<u8>,
250    size: usize,
251    priority: FramePriority,
252    created_at: Instant,
253}
254
255/// Priority for frame batching
256#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord)]
257enum BatchPriority {
258    Low,    // Can wait for full batch
259    Normal, // Standard batching rules
260    High,   // Send quickly, minimal batching
261}
262
263/// Priority for individual frames
264#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord)]
265pub enum FramePriority {
266    Background, // Low priority, can be delayed
267    Normal,     // Standard priority
268    Urgent,     // High priority, minimal delay
269}
270
271/// Statistics for frame batching
272#[derive(Debug, Default, Clone)]
273pub struct FrameBatchingStats {
274    /// Total frames batched
275    pub frames_batched: u64,
276    /// Total batches sent
277    pub batches_sent: u64,
278    /// Average batch size
279    pub avg_batch_size: f64,
280    /// Bytes saved through batching
281    pub bytes_saved: u64,
282    /// Batching efficiency (0.0 - 1.0)
283    pub batching_efficiency: f64,
284}
285
286impl Default for ConnectionPoolConfig {
287    fn default() -> Self {
288        Self {
289            max_connections: 1000,
290            max_idle_time: Duration::from_secs(300), // 5 minutes
291            cleanup_interval: Duration::from_secs(60), // 1 minute
292            enable_reuse: true,
293            max_connection_age: Duration::from_secs(3600), // 1 hour
294        }
295    }
296}
297
298impl Default for CandidateCacheConfig {
299    fn default() -> Self {
300        Self {
301            default_ttl: Duration::from_secs(300), // 5 minutes
302            max_cache_size: 10000,
303            cleanup_interval: Duration::from_secs(60), // 1 minute
304            enable_validation_cache: true,
305            validation_ttl: Duration::from_secs(60), // 1 minute
306        }
307    }
308}
309
310impl Default for SessionCleanupConfig {
311    fn default() -> Self {
312        Self {
313            max_idle_time: Duration::from_secs(600), // 10 minutes
314            max_session_age: Duration::from_secs(3600), // 1 hour
315            cleanup_interval: Duration::from_secs(120), // 2 minutes
316            enable_aggressive_cleanup: true,
317            memory_pressure_threshold: 512, // 512 MB
318        }
319    }
320}
321
322impl Default for FrameBatchingConfig {
323    fn default() -> Self {
324        Self {
325            max_batch_size: 1200, // Just under typical MTU
326            max_batch_delay: Duration::from_millis(10), // 10ms max delay
327            max_frames_per_batch: 10,
328            enable_adaptive_batching: true,
329            min_batch_size: 200, // Minimum size for efficiency
330        }
331    }
332}
333
334impl ConnectionPool {
335    /// Create a new connection pool
336    pub fn new(config: ConnectionPoolConfig) -> Self {
337        let pool = Self {
338            active_connections: Arc::new(RwLock::new(HashMap::new())),
339            config,
340            stats: Arc::new(Mutex::new(ConnectionPoolStats::default())),
341            cleanup_handle: None,
342        };
343
344        pool
345    }
346
347    /// Start the connection pool with cleanup task
348    pub async fn start(&mut self) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
349        let connections = Arc::clone(&self.active_connections);
350        let stats = Arc::clone(&self.stats);
351        let config = self.config.clone();
352
353        let cleanup_handle = tokio::spawn(async move {
354            let mut interval = tokio::time::interval(config.cleanup_interval);
355            
356            loop {
357                interval.tick().await;
358                Self::cleanup_expired_connections(&connections, &stats, &config).await;
359            }
360        });
361
362        self.cleanup_handle = Some(cleanup_handle);
363        info!("Connection pool started with max_connections={}", self.config.max_connections);
364        Ok(())
365    }
366
367    /// Get or create a connection for a peer
368    pub async fn get_connection(
369        &self,
370        peer_id: PeerId,
371        remote_address: SocketAddr,
372        endpoint: &QuinnEndpoint,
373    ) -> Result<Arc<QuinnConnection>, Box<dyn std::error::Error + Send + Sync>> {
374        // Try to get existing connection
375        if let Some(connection) = self.try_get_existing_connection(peer_id, remote_address).await {
376            self.update_stats_hit().await;
377            return Ok(connection);
378        }
379
380        // Create new connection
381        let connection = self.create_new_connection(peer_id, remote_address, endpoint).await?;
382        self.update_stats_miss().await;
383        Ok(connection)
384    }
385
386    /// Try to get existing connection from pool
387    async fn try_get_existing_connection(
388        &self,
389        peer_id: PeerId,
390        remote_address: SocketAddr,
391    ) -> Option<Arc<QuinnConnection>> {
392        let mut connections = self.active_connections.write().unwrap();
393        
394        if let Some(pooled) = connections.get_mut(&peer_id) {
395            if pooled.is_active && pooled.remote_address == remote_address {
396                // Check if connection is still valid
397                if pooled.connection.close_reason().is_none() {
398                    pooled.last_used = Instant::now();
399                    pooled.use_count += 1;
400                    debug!("Reusing pooled connection for peer {:?}", peer_id);
401                    return Some(Arc::clone(&pooled.connection));
402                } else {
403                    // Connection is closed, remove it
404                    connections.remove(&peer_id);
405                }
406            }
407        }
408
409        None
410    }
411
412    /// Create a new connection and add to pool
413    async fn create_new_connection(
414        &self,
415        peer_id: PeerId,
416        remote_address: SocketAddr,
417        endpoint: &QuinnEndpoint,
418    ) -> Result<Arc<QuinnConnection>, Box<dyn std::error::Error + Send + Sync>> {
419        // Check pool size limit
420        {
421            let connections = self.active_connections.read().unwrap();
422            if connections.len() >= self.config.max_connections {
423                // Pool is full, need to evict least recently used
424                drop(connections);
425                self.evict_lru_connection().await;
426            }
427        }
428
429        // Create new connection
430        let rustls_config = rustls::ClientConfig::builder()
431            .with_root_certificates(rustls::RootCertStore::empty())
432            .with_no_client_auth();
433        
434        let client_crypto = crate::crypto::rustls::QuicClientConfig::try_from(rustls_config)
435            .map_err(|e| format!("Failed to create QUIC client config: {}", e))?;
436        
437        let client_config = crate::ClientConfig::new(Arc::new(client_crypto));
438        
439        let connecting = endpoint.connect_with(client_config, remote_address, "ant-quic")
440            .map_err(|e| format!("Failed to initiate connection: {}", e))?;
441        
442        // Wait for the connection to be established
443        let connection = connecting.await
444            .map_err(|e| format!("Connection failed: {}", e))?;
445
446        let connection_arc = Arc::new(connection);
447        
448        let pooled = PooledConnection {
449            connection: Arc::clone(&connection_arc),
450            peer_id,
451            remote_address,
452            created_at: Instant::now(),
453            last_used: Instant::now(),
454            use_count: 1,
455            is_active: true,
456        };
457
458        // Add to pool
459        {
460            let mut connections = self.active_connections.write().unwrap();
461            connections.insert(peer_id, pooled);
462        }
463
464        // Update stats
465        {
466            let mut stats = self.stats.lock().unwrap();
467            stats.connections_created += 1;
468            stats.active_connections += 1;
469        }
470
471        info!("Created new pooled connection for peer {:?}", peer_id);
472        Ok(connection_arc)
473    }
474
475    /// Evict least recently used connection
476    async fn evict_lru_connection(&self) {
477        let mut connections = self.active_connections.write().unwrap();
478        
479        if let Some((lru_peer_id, _)) = connections
480            .iter()
481            .min_by_key(|(_, pooled)| pooled.last_used)
482            .map(|(peer_id, pooled)| (*peer_id, pooled.last_used))
483        {
484            connections.remove(&lru_peer_id);
485            debug!("Evicted LRU connection for peer {:?}", lru_peer_id);
486            
487            // Update stats
488            let mut stats = self.stats.lock().unwrap();
489            stats.active_connections = stats.active_connections.saturating_sub(1);
490        }
491    }
492
493    /// Update stats for cache hit
494    async fn update_stats_hit(&self) {
495        let mut stats = self.stats.lock().unwrap();
496        stats.connections_reused += 1;
497        let total_requests = stats.connections_created + stats.connections_reused;
498        stats.hit_rate = stats.connections_reused as f64 / total_requests as f64;
499    }
500
501    /// Update stats for cache miss
502    async fn update_stats_miss(&self) {
503        let stats = self.stats.lock().unwrap();
504        let total_requests = stats.connections_created + stats.connections_reused;
505        drop(stats);
506        
507        let mut stats = self.stats.lock().unwrap();
508        stats.hit_rate = stats.connections_reused as f64 / total_requests as f64;
509    }
510
511    /// Cleanup expired connections
512    async fn cleanup_expired_connections(
513        connections: &Arc<RwLock<HashMap<PeerId, PooledConnection>>>,
514        stats: &Arc<Mutex<ConnectionPoolStats>>,
515        config: &ConnectionPoolConfig,
516    ) {
517        let now = Instant::now();
518        let mut to_remove = Vec::new();
519
520        // Find expired connections
521        {
522            let connections_read = connections.read().unwrap();
523            for (peer_id, pooled) in connections_read.iter() {
524                let idle_time = now.duration_since(pooled.last_used);
525                let age = now.duration_since(pooled.created_at);
526
527                if idle_time > config.max_idle_time || age > config.max_connection_age {
528                    to_remove.push(*peer_id);
529                }
530
531                // Also remove closed connections
532                if pooled.connection.close_reason().is_some() {
533                    to_remove.push(*peer_id);
534                }
535            }
536        }
537
538        // Remove expired connections
539        if !to_remove.is_empty() {
540            let mut connections_write = connections.write().unwrap();
541            for peer_id in &to_remove {
542                connections_write.remove(peer_id);
543            }
544
545            // Update stats
546            let mut stats_guard = stats.lock().unwrap();
547            stats_guard.connections_expired += to_remove.len() as u64;
548            stats_guard.active_connections = connections_write.len();
549
550            debug!("Cleaned up {} expired connections", to_remove.len());
551        }
552    }
553
554    /// Get connection pool statistics
555    pub async fn get_stats(&self) -> ConnectionPoolStats {
556        self.stats.lock().unwrap().clone()
557    }
558
559    /// Shutdown the connection pool
560    pub async fn shutdown(&mut self) {
561        if let Some(handle) = self.cleanup_handle.take() {
562            handle.abort();
563        }
564
565        // Close all connections
566        {
567            let connections = self.active_connections.read().unwrap();
568            for (_, pooled) in connections.iter() {
569                pooled.connection.close(VarInt::from_u32(0), b"shutdown");
570            }
571        }
572
573        info!("Connection pool shutdown complete");
574    }
575}
576
577impl CandidateCache {
578    /// Create a new candidate cache
579    pub fn new(config: CandidateCacheConfig) -> Self {
580        let cache = Self {
581            cache: Arc::new(RwLock::new(HashMap::new())),
582            config,
583            stats: Arc::new(Mutex::new(CandidateCacheStats::default())),
584            cleanup_handle: None,
585        };
586
587        cache
588    }
589
590    /// Start the candidate cache with cleanup task
591    pub async fn start(&mut self) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
592        let cache = Arc::clone(&self.cache);
593        let stats = Arc::clone(&self.stats);
594        let config = self.config.clone();
595
596        let cleanup_handle = tokio::spawn(async move {
597            let mut interval = tokio::time::interval(config.cleanup_interval);
598            
599            loop {
600                interval.tick().await;
601                Self::cleanup_expired_entries(&cache, &stats, &config).await;
602            }
603        });
604
605        self.cleanup_handle = Some(cleanup_handle);
606        info!("Candidate cache started with max_size={}", self.config.max_cache_size);
607        Ok(())
608    }
609
610    /// Get cached candidates for a peer
611    pub async fn get_candidates(&self, peer_id: PeerId) -> Option<Vec<CandidateAddress>> {
612        let (is_valid, candidates) = {
613            let cache = self.cache.read().unwrap();
614            
615            if let Some(cached_set) = cache.get(&peer_id) {
616                let now = Instant::now();
617                
618                // Check if entry is still valid
619                if now.duration_since(cached_set.cached_at) <= cached_set.ttl {
620                    (true, Some(cached_set.candidates.clone()))
621                } else {
622                    (false, None)
623                }
624            } else {
625                (false, None)
626            }
627        };
628        
629        if is_valid {
630            // Update access statistics
631            self.update_access_stats(peer_id, true).await;
632            
633            if let Some(ref candidates) = candidates {
634                debug!("Cache hit for peer {:?}, {} candidates", peer_id, candidates.len());
635            }
636            return candidates;
637        }
638
639        // Cache miss
640        self.update_access_stats(peer_id, false).await;
641        debug!("Cache miss for peer {:?}", peer_id);
642        None
643    }
644
645    /// Cache candidates for a peer
646    pub async fn cache_candidates(
647        &self,
648        peer_id: PeerId,
649        candidates: Vec<CandidateAddress>,
650        ttl: Option<Duration>,
651    ) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
652        let ttl = ttl.unwrap_or(self.config.default_ttl);
653        
654        // Check cache size limit
655        {
656            let cache = self.cache.read().unwrap();
657            if cache.len() >= self.config.max_cache_size {
658                drop(cache);
659                self.evict_lru_entry().await;
660            }
661        }
662
663        let candidate_count = candidates.len();
664        let cached_set = CachedCandidateSet {
665            candidates,
666            cached_at: Instant::now(),
667            ttl,
668            access_count: 0,
669            last_accessed: Instant::now(),
670            validation_results: HashMap::new(),
671        };
672
673        // Add to cache
674        {
675            let mut cache = self.cache.write().unwrap();
676            cache.insert(peer_id, cached_set);
677        }
678
679        // Update stats
680        {
681            let mut stats = self.stats.lock().unwrap();
682            stats.current_size += 1;
683        }
684
685        debug!("Cached {} candidates for peer {:?} with TTL {:?}", 
686               candidate_count, peer_id, ttl);
687        Ok(())
688    }
689
690    /// Cache validation result for a candidate
691    pub async fn cache_validation_result(
692        &self,
693        peer_id: PeerId,
694        address: SocketAddr,
695        is_valid: bool,
696        rtt: Option<Duration>,
697    ) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
698        if !self.config.enable_validation_cache {
699            return Ok(());
700        }
701
702        let mut cache = self.cache.write().unwrap();
703        
704        if let Some(cached_set) = cache.get_mut(&peer_id) {
705            let validation_entry = ValidationCacheEntry {
706                is_valid,
707                rtt,
708                cached_at: Instant::now(),
709                ttl: self.config.validation_ttl,
710            };
711
712            cached_set.validation_results.insert(address, validation_entry);
713            debug!("Cached validation result for {}:{} -> {}", peer_id.0[0], address, is_valid);
714        }
715
716        Ok(())
717    }
718
719    /// Get cached validation result
720    pub async fn get_validation_result(
721        &self,
722        peer_id: PeerId,
723        address: SocketAddr,
724    ) -> Option<(bool, Option<Duration>)> {
725        if !self.config.enable_validation_cache {
726            return None;
727        }
728
729        let cache = self.cache.read().unwrap();
730        
731        if let Some(cached_set) = cache.get(&peer_id) {
732            if let Some(validation_entry) = cached_set.validation_results.get(&address) {
733                let now = Instant::now();
734                
735                // Check if validation result is still valid
736                if now.duration_since(validation_entry.cached_at) <= validation_entry.ttl {
737                    return Some((validation_entry.is_valid, validation_entry.rtt));
738                }
739            }
740        }
741
742        None
743    }
744
745    /// Update access statistics
746    async fn update_access_stats(&self, peer_id: PeerId, hit: bool) {
747        // Update cache-level stats
748        {
749            let mut stats = self.stats.lock().unwrap();
750            if hit {
751                stats.cache_hits += 1;
752            } else {
753                stats.cache_misses += 1;
754            }
755            
756            let total_accesses = stats.cache_hits + stats.cache_misses;
757            stats.hit_rate = stats.cache_hits as f64 / total_accesses as f64;
758        }
759
760        // Update entry-level stats
761        if hit {
762            let mut cache = self.cache.write().unwrap();
763            if let Some(cached_set) = cache.get_mut(&peer_id) {
764                cached_set.access_count += 1;
765                cached_set.last_accessed = Instant::now();
766            }
767        }
768    }
769
770    /// Evict least recently used entry
771    async fn evict_lru_entry(&self) {
772        let mut cache = self.cache.write().unwrap();
773        
774        if let Some((lru_peer_id, _)) = cache
775            .iter()
776            .min_by_key(|(_, cached_set)| cached_set.last_accessed)
777            .map(|(peer_id, cached_set)| (*peer_id, cached_set.last_accessed))
778        {
779            cache.remove(&lru_peer_id);
780            debug!("Evicted LRU cache entry for peer {:?}", lru_peer_id);
781            
782            // Update stats
783            let mut stats = self.stats.lock().unwrap();
784            stats.current_size = stats.current_size.saturating_sub(1);
785        }
786    }
787
788    /// Cleanup expired cache entries
789    async fn cleanup_expired_entries(
790        cache: &Arc<RwLock<HashMap<PeerId, CachedCandidateSet>>>,
791        stats: &Arc<Mutex<CandidateCacheStats>>,
792        _config: &CandidateCacheConfig,
793    ) {
794        let now = Instant::now();
795        let mut to_remove = Vec::new();
796
797        // Find expired entries
798        {
799            let cache_read = cache.read().unwrap();
800            for (peer_id, cached_set) in cache_read.iter() {
801                let age = now.duration_since(cached_set.cached_at);
802                if age > cached_set.ttl {
803                    to_remove.push(*peer_id);
804                }
805            }
806        }
807
808        // Remove expired entries
809        if !to_remove.is_empty() {
810            let mut cache_write = cache.write().unwrap();
811            for peer_id in &to_remove {
812                cache_write.remove(peer_id);
813            }
814
815            // Update stats
816            let mut stats_guard = stats.lock().unwrap();
817            stats_guard.entries_expired += to_remove.len() as u64;
818            stats_guard.current_size = cache_write.len();
819
820            debug!("Cleaned up {} expired cache entries", to_remove.len());
821        }
822
823        // Also cleanup expired validation results
824        {
825            let mut cache_write = cache.write().unwrap();
826            for cached_set in cache_write.values_mut() {
827                cached_set.validation_results.retain(|_, validation_entry| {
828                    now.duration_since(validation_entry.cached_at) <= validation_entry.ttl
829                });
830            }
831        }
832    }
833
834    /// Get cache statistics
835    pub async fn get_stats(&self) -> CandidateCacheStats {
836        self.stats.lock().unwrap().clone()
837    }
838
839    /// Shutdown the candidate cache
840    pub async fn shutdown(&mut self) {
841        if let Some(handle) = self.cleanup_handle.take() {
842            handle.abort();
843        }
844
845        // Clear cache
846        {
847            let mut cache = self.cache.write().unwrap();
848            cache.clear();
849        }
850
851        info!("Candidate cache shutdown complete");
852    }
853}
854
855impl SessionCleanupCoordinator {
856    /// Create a new session cleanup coordinator
857    pub fn new(config: SessionCleanupConfig) -> Self {
858        Self {
859            active_sessions: Arc::new(RwLock::new(HashMap::new())),
860            config,
861            stats: Arc::new(Mutex::new(SessionCleanupStats::default())),
862            cleanup_handle: None,
863        }
864    }
865
866    /// Start the session cleanup coordinator
867    pub async fn start(&mut self) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
868        let sessions = Arc::clone(&self.active_sessions);
869        let stats = Arc::clone(&self.stats);
870        let config = self.config.clone();
871
872        let cleanup_handle = tokio::spawn(async move {
873            let mut interval = tokio::time::interval(config.cleanup_interval);
874            
875            loop {
876                interval.tick().await;
877                Self::cleanup_expired_sessions(&sessions, &stats, &config).await;
878            }
879        });
880
881        self.cleanup_handle = Some(cleanup_handle);
882        info!("Session cleanup coordinator started");
883        Ok(())
884    }
885
886    /// Register a new session
887    pub async fn register_session(
888        &self,
889        peer_id: PeerId,
890        memory_usage: usize,
891        priority: CleanupPriority,
892    ) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
893        let session_state = SessionState {
894            peer_id,
895            created_at: Instant::now(),
896            last_activity: Instant::now(),
897            memory_usage,
898            is_active: true,
899            cleanup_priority: priority,
900        };
901
902        {
903            let mut sessions = self.active_sessions.write().unwrap();
904            sessions.insert(peer_id, session_state);
905        }
906
907        // Update stats
908        {
909            let mut stats = self.stats.lock().unwrap();
910            stats.active_sessions += 1;
911        }
912
913        debug!("Registered session for peer {:?} with {} bytes", peer_id, memory_usage);
914        Ok(())
915    }
916
917    /// Update session activity
918    pub async fn update_session_activity(&self, peer_id: PeerId) {
919        let mut sessions = self.active_sessions.write().unwrap();
920        if let Some(session) = sessions.get_mut(&peer_id) {
921            session.last_activity = Instant::now();
922        }
923    }
924
925    /// Cleanup expired sessions
926    async fn cleanup_expired_sessions(
927        sessions: &Arc<RwLock<HashMap<PeerId, SessionState>>>,
928        stats: &Arc<Mutex<SessionCleanupStats>>,
929        config: &SessionCleanupConfig,
930    ) {
931        let now = Instant::now();
932        let mut to_remove = Vec::new();
933        let mut memory_freed = 0u64;
934
935        // Check for memory pressure
936        let memory_pressure = Self::check_memory_pressure(config);
937
938        // Find sessions to cleanup
939        {
940            let sessions_read = sessions.read().unwrap();
941            for (peer_id, session) in sessions_read.iter() {
942                let idle_time = now.duration_since(session.last_activity);
943                let age = now.duration_since(session.created_at);
944
945                let should_cleanup = if memory_pressure && config.enable_aggressive_cleanup {
946                    // Aggressive cleanup under memory pressure
947                    match session.cleanup_priority {
948                        CleanupPriority::High => idle_time > Duration::from_secs(30),
949                        CleanupPriority::Normal => idle_time > Duration::from_secs(60),
950                        CleanupPriority::Low => idle_time > config.max_idle_time / 2,
951                    }
952                } else {
953                    // Normal cleanup rules
954                    idle_time > config.max_idle_time || age > config.max_session_age
955                };
956
957                if should_cleanup {
958                    to_remove.push(*peer_id);
959                    memory_freed += session.memory_usage as u64;
960                }
961            }
962        }
963
964        // Remove expired sessions
965        if !to_remove.is_empty() {
966            let mut sessions_write = sessions.write().unwrap();
967            for peer_id in &to_remove {
968                sessions_write.remove(peer_id);
969            }
970
971            // Update stats
972            let mut stats_guard = stats.lock().unwrap();
973            stats_guard.sessions_cleaned += to_remove.len() as u64;
974            stats_guard.memory_freed += memory_freed;
975            stats_guard.active_sessions = sessions_write.len();
976
977            if memory_pressure {
978                info!("Aggressive cleanup: removed {} sessions, freed {} bytes", 
979                      to_remove.len(), memory_freed);
980            } else {
981                debug!("Regular cleanup: removed {} sessions, freed {} bytes", 
982                       to_remove.len(), memory_freed);
983            }
984        }
985    }
986
987    /// Check if system is under memory pressure
988    fn check_memory_pressure(config: &SessionCleanupConfig) -> bool {
989        // Simplified memory pressure detection
990        // In production, this would check actual system memory usage
991        // For now, return false (no memory pressure)
992        // This can be enhanced with system memory monitoring
993        let _ = config.memory_pressure_threshold;
994        false
995    }
996
997    /// Get session cleanup statistics
998    pub async fn get_stats(&self) -> SessionCleanupStats {
999        self.stats.lock().unwrap().clone()
1000    }
1001
1002    /// Shutdown the session cleanup coordinator
1003    pub async fn shutdown(&mut self) {
1004        if let Some(handle) = self.cleanup_handle.take() {
1005            handle.abort();
1006        }
1007
1008        // Clear sessions
1009        {
1010            let mut sessions = self.active_sessions.write().unwrap();
1011            sessions.clear();
1012        }
1013
1014        info!("Session cleanup coordinator shutdown complete");
1015    }
1016}
1017
1018impl FrameBatchingCoordinator {
1019    /// Create a new frame batching coordinator
1020    pub fn new(config: FrameBatchingConfig) -> Self {
1021        Self {
1022            pending_frames: Arc::new(Mutex::new(HashMap::new())),
1023            config,
1024            stats: Arc::new(Mutex::new(FrameBatchingStats::default())),
1025            flush_handle: None,
1026        }
1027    }
1028
1029    /// Start the frame batching coordinator
1030    pub async fn start(&mut self) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
1031        let pending_frames = Arc::clone(&self.pending_frames);
1032        let stats = Arc::clone(&self.stats);
1033        let config = self.config.clone();
1034
1035        let flush_handle = tokio::spawn(async move {
1036            let mut interval = tokio::time::interval(config.max_batch_delay / 4);
1037            
1038            loop {
1039                interval.tick().await;
1040                Self::flush_expired_batches(&pending_frames, &stats, &config).await;
1041            }
1042        });
1043
1044        self.flush_handle = Some(flush_handle);
1045        info!("Frame batching coordinator started");
1046        Ok(())
1047    }
1048
1049    /// Add frame to batch
1050    pub async fn add_frame(
1051        &self,
1052        destination: SocketAddr,
1053        frame_type: u8,
1054        payload: Vec<u8>,
1055        priority: FramePriority,
1056    ) -> Result<Option<Vec<u8>>, Box<dyn std::error::Error + Send + Sync>> {
1057        let frame = BatchedFrame {
1058            frame_type,
1059            size: payload.len(),
1060            payload,
1061            priority,
1062            created_at: Instant::now(),
1063        };
1064
1065        let mut pending = self.pending_frames.lock().unwrap();
1066        
1067        // Check if we need to flush
1068        let (should_flush, frames_count, total_size) = {
1069            let batch_set = pending.entry(destination).or_insert_with(|| {
1070                BatchedFrameSet {
1071                    frames: Vec::new(),
1072                    total_size: 0,
1073                    created_at: Instant::now(),
1074                    destination,
1075                    priority: BatchPriority::Normal,
1076                }
1077            });
1078
1079            batch_set.frames.push(frame);
1080            batch_set.total_size += batch_set.frames.last().unwrap().size;
1081            
1082            // Update batch priority based on frame priority
1083            if priority == FramePriority::Urgent {
1084                batch_set.priority = BatchPriority::High;
1085            }
1086
1087            // Check if batch should be flushed immediately
1088            let should_flush = self.should_flush_batch(batch_set);
1089            (should_flush, batch_set.frames.len(), batch_set.total_size)
1090        };
1091        
1092        if should_flush {
1093            // Remove and serialize the batch
1094            if let Some(batch_set) = pending.remove(&destination) {
1095                let batch_data = self.serialize_batch(&batch_set);
1096                
1097                // Update stats
1098                {
1099                    let mut stats = self.stats.lock().unwrap();
1100                    stats.batches_sent += 1;
1101                    stats.frames_batched += frames_count as u64;
1102                    stats.avg_batch_size = (stats.avg_batch_size * (stats.batches_sent - 1) as f64 + 
1103                                           total_size as f64) / stats.batches_sent as f64;
1104                }
1105
1106                debug!("Flushed batch to {} with {} frames ({} bytes)", 
1107                       destination, frames_count, total_size);
1108                return Ok(Some(batch_data));
1109            }
1110        }
1111
1112        Ok(None)
1113    }
1114
1115    /// Check if batch should be flushed
1116    fn should_flush_batch(&self, batch_set: &BatchedFrameSet) -> bool {
1117        let now = Instant::now();
1118        let age = now.duration_since(batch_set.created_at);
1119
1120        // Flush if batch is full, old, or high priority
1121        batch_set.total_size >= self.config.max_batch_size ||
1122        batch_set.frames.len() >= self.config.max_frames_per_batch ||
1123        age >= self.config.max_batch_delay ||
1124        batch_set.priority == BatchPriority::High
1125    }
1126
1127    /// Serialize batch into packet data
1128    fn serialize_batch(&self, batch_set: &BatchedFrameSet) -> Vec<u8> {
1129        let mut data = Vec::with_capacity(batch_set.total_size + batch_set.frames.len() * 4);
1130        
1131        for frame in &batch_set.frames {
1132            // Add frame type and length
1133            data.push(frame.frame_type);
1134            data.extend_from_slice(&(frame.payload.len() as u16).to_be_bytes());
1135            // Add frame payload
1136            data.extend_from_slice(&frame.payload);
1137        }
1138
1139        data
1140    }
1141
1142    /// Flush expired batches
1143    async fn flush_expired_batches(
1144        pending_frames: &Arc<Mutex<HashMap<SocketAddr, BatchedFrameSet>>>,
1145        stats: &Arc<Mutex<FrameBatchingStats>>,
1146        config: &FrameBatchingConfig,
1147    ) {
1148        let now = Instant::now();
1149        let mut to_flush = Vec::new();
1150
1151        // Find expired batches
1152        {
1153            let pending = pending_frames.lock().unwrap();
1154            for (destination, batch_set) in pending.iter() {
1155                let age = now.duration_since(batch_set.created_at);
1156                if age >= config.max_batch_delay {
1157                    to_flush.push((*destination, batch_set.frames.len(), batch_set.total_size));
1158                }
1159            }
1160        }
1161
1162        // Flush expired batches
1163        if !to_flush.is_empty() {
1164            let mut pending = pending_frames.lock().unwrap();
1165            let flush_count = to_flush.len();
1166            for (destination, frame_count, total_size) in to_flush {
1167                pending.remove(&destination);
1168                
1169                // Update stats
1170                let mut stats_guard = stats.lock().unwrap();
1171                stats_guard.batches_sent += 1;
1172                stats_guard.frames_batched += frame_count as u64;
1173                stats_guard.avg_batch_size = (stats_guard.avg_batch_size * (stats_guard.batches_sent - 1) as f64 + 
1174                                             total_size as f64) / stats_guard.batches_sent as f64;
1175            }
1176
1177            debug!("Flushed {} expired batches", flush_count);
1178        }
1179    }
1180
1181    /// Get batching statistics
1182    pub async fn get_stats(&self) -> FrameBatchingStats {
1183        self.stats.lock().unwrap().clone()
1184    }
1185
1186    /// Shutdown the frame batching coordinator
1187    pub async fn shutdown(&mut self) {
1188        if let Some(handle) = self.flush_handle.take() {
1189            handle.abort();
1190        }
1191
1192        // Flush all pending batches
1193        {
1194            let mut pending = self.pending_frames.lock().unwrap();
1195            pending.clear();
1196        }
1197
1198        info!("Frame batching coordinator shutdown complete");
1199    }
1200}
1201
1202/// Memory optimization manager that coordinates all memory optimization components
1203#[derive(Debug)]
1204pub struct MemoryOptimizationManager {
1205    connection_pool: ConnectionPool,
1206    candidate_cache: CandidateCache,
1207    session_cleanup: SessionCleanupCoordinator,
1208    frame_batching: FrameBatchingCoordinator,
1209    is_running: bool,
1210}
1211
1212impl MemoryOptimizationManager {
1213    /// Create a new memory optimization manager with default configurations
1214    pub fn new() -> Self {
1215        Self {
1216            connection_pool: ConnectionPool::new(ConnectionPoolConfig::default()),
1217            candidate_cache: CandidateCache::new(CandidateCacheConfig::default()),
1218            session_cleanup: SessionCleanupCoordinator::new(SessionCleanupConfig::default()),
1219            frame_batching: FrameBatchingCoordinator::new(FrameBatchingConfig::default()),
1220            is_running: false,
1221        }
1222    }
1223
1224    /// Create a new memory optimization manager with custom configurations
1225    pub fn with_configs(
1226        pool_config: ConnectionPoolConfig,
1227        cache_config: CandidateCacheConfig,
1228        cleanup_config: SessionCleanupConfig,
1229        batching_config: FrameBatchingConfig,
1230    ) -> Self {
1231        Self {
1232            connection_pool: ConnectionPool::new(pool_config),
1233            candidate_cache: CandidateCache::new(cache_config),
1234            session_cleanup: SessionCleanupCoordinator::new(cleanup_config),
1235            frame_batching: FrameBatchingCoordinator::new(batching_config),
1236            is_running: false,
1237        }
1238    }
1239
1240    /// Start all memory optimization components
1241    pub async fn start(&mut self) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
1242        if self.is_running {
1243            return Ok(());
1244        }
1245
1246        self.connection_pool.start().await?;
1247        self.candidate_cache.start().await?;
1248        self.session_cleanup.start().await?;
1249        self.frame_batching.start().await?;
1250
1251        self.is_running = true;
1252        info!("Memory optimization manager started");
1253        Ok(())
1254    }
1255
1256    /// Get connection pool reference
1257    pub fn connection_pool(&self) -> &ConnectionPool {
1258        &self.connection_pool
1259    }
1260
1261    /// Get candidate cache reference
1262    pub fn candidate_cache(&self) -> &CandidateCache {
1263        &self.candidate_cache
1264    }
1265
1266    /// Get session cleanup coordinator reference
1267    pub fn session_cleanup(&self) -> &SessionCleanupCoordinator {
1268        &self.session_cleanup
1269    }
1270
1271    /// Get frame batching coordinator reference
1272    pub fn frame_batching(&self) -> &FrameBatchingCoordinator {
1273        &self.frame_batching
1274    }
1275
1276    /// Get comprehensive memory optimization statistics
1277    pub async fn get_comprehensive_stats(&self) -> MemoryOptimizationStats {
1278        MemoryOptimizationStats {
1279            connection_pool: self.connection_pool.get_stats().await,
1280            candidate_cache: self.candidate_cache.get_stats().await,
1281            session_cleanup: self.session_cleanup.get_stats().await,
1282            frame_batching: self.frame_batching.get_stats().await,
1283        }
1284    }
1285
1286    /// Shutdown all memory optimization components
1287    pub async fn shutdown(&mut self) {
1288        if !self.is_running {
1289            return;
1290        }
1291
1292        self.connection_pool.shutdown().await;
1293        self.candidate_cache.shutdown().await;
1294        self.session_cleanup.shutdown().await;
1295        self.frame_batching.shutdown().await;
1296
1297        self.is_running = false;
1298        info!("Memory optimization manager shutdown complete");
1299    }
1300}
1301
1302/// Comprehensive memory optimization statistics
1303#[derive(Debug, Clone)]
1304pub struct MemoryOptimizationStats {
1305    pub connection_pool: ConnectionPoolStats,
1306    pub candidate_cache: CandidateCacheStats,
1307    pub session_cleanup: SessionCleanupStats,
1308    pub frame_batching: FrameBatchingStats,
1309}
1310
1311impl Default for MemoryOptimizationManager {
1312    fn default() -> Self {
1313        Self::new()
1314    }
1315}