ant_quic/optimization/
memory.rs

1//! Memory optimization components for ant-quic
2//!
3//! This module provides memory-efficient resource management including:
4//! - Connection pooling for Quinn connections
5//! - Candidate caching with TTL
6//! - Automatic cleanup of expired sessions and state
7//! - Frame batching for reduced packet overhead
8
9use std::{
10    collections::HashMap,
11    net::SocketAddr,
12    sync::{Arc, Mutex, RwLock},
13    time::{Duration, Instant},
14};
15
16use tracing::{debug, info};
17
18use crate::{Endpoint as QuinnEndpoint, HighLevelConnection as QuinnConnection};
19
20use crate::{
21    VarInt,
22    nat_traversal_api::{CandidateAddress, PeerId},
23};
24
25/// Connection pool for reusing Quinn connections
26#[derive(Debug)]
27pub struct ConnectionPool {
28    /// Active connections by peer ID
29    active_connections: Arc<RwLock<HashMap<PeerId, PooledConnection>>>,
30    /// Connection pool configuration
31    config: ConnectionPoolConfig,
32    /// Pool statistics
33    stats: Arc<Mutex<ConnectionPoolStats>>,
34    /// Cleanup task handle
35    cleanup_handle: Option<tokio::task::JoinHandle<()>>,
36}
37
38/// Configuration for connection pooling
39#[derive(Debug, Clone)]
40pub struct ConnectionPoolConfig {
41    /// Maximum number of connections to pool
42    pub max_connections: usize,
43    /// Maximum idle time before connection cleanup
44    pub max_idle_time: Duration,
45    /// Cleanup interval for expired connections
46    pub cleanup_interval: Duration,
47    /// Enable connection reuse
48    pub enable_reuse: bool,
49    /// Maximum connection age before forced refresh
50    pub max_connection_age: Duration,
51}
52
53/// A pooled connection with metadata
54#[derive(Debug)]
55struct PooledConnection {
56    connection: Arc<QuinnConnection>,
57    peer_id: PeerId,
58    remote_address: SocketAddr,
59    created_at: Instant,
60    last_used: Instant,
61    use_count: u64,
62    is_active: bool,
63}
64
65/// Statistics for connection pool
66#[derive(Debug, Default, Clone)]
67pub struct ConnectionPoolStats {
68    /// Total connections created
69    pub connections_created: u64,
70    /// Total connections reused
71    pub connections_reused: u64,
72    /// Total connections expired
73    pub connections_expired: u64,
74    /// Current active connections
75    pub active_connections: usize,
76    /// Pool hit rate (reuse / total requests)
77    pub hit_rate: f64,
78    /// Average connection age
79    pub avg_connection_age: Duration,
80}
81
82/// Candidate cache with TTL for efficient candidate management
83#[derive(Debug)]
84pub struct CandidateCache {
85    /// Cached candidates by peer ID
86    cache: Arc<RwLock<HashMap<PeerId, CachedCandidateSet>>>,
87    /// Cache configuration
88    config: CandidateCacheConfig,
89    /// Cache statistics
90    stats: Arc<Mutex<CandidateCacheStats>>,
91    /// Cleanup task handle
92    cleanup_handle: Option<tokio::task::JoinHandle<()>>,
93}
94
95/// Configuration for candidate caching
96#[derive(Debug, Clone)]
97pub struct CandidateCacheConfig {
98    /// Default TTL for cached candidates
99    pub default_ttl: Duration,
100    /// Maximum number of candidate sets to cache
101    pub max_cache_size: usize,
102    /// Cleanup interval for expired entries
103    pub cleanup_interval: Duration,
104    /// Enable candidate validation caching
105    pub enable_validation_cache: bool,
106    /// TTL for validation results
107    pub validation_ttl: Duration,
108}
109
110/// Cached candidate set with metadata
111#[derive(Debug, Clone)]
112struct CachedCandidateSet {
113    candidates: Vec<CandidateAddress>,
114    cached_at: Instant,
115    ttl: Duration,
116    access_count: u64,
117    last_accessed: Instant,
118    validation_results: HashMap<SocketAddr, ValidationCacheEntry>,
119}
120
121/// Cached validation result
122#[derive(Debug, Clone)]
123struct ValidationCacheEntry {
124    is_valid: bool,
125    rtt: Option<Duration>,
126    cached_at: Instant,
127    ttl: Duration,
128}
129
130/// Statistics for candidate cache
131#[derive(Debug, Default, Clone)]
132pub struct CandidateCacheStats {
133    /// Total cache hits
134    pub cache_hits: u64,
135    /// Total cache misses
136    pub cache_misses: u64,
137    /// Total entries expired
138    pub entries_expired: u64,
139    /// Current cache size
140    pub current_size: usize,
141    /// Cache hit rate
142    pub hit_rate: f64,
143    /// Average entry age
144    pub avg_entry_age: Duration,
145}
146
147/// Session state cleanup coordinator
148#[derive(Debug)]
149pub struct SessionCleanupCoordinator {
150    /// Active sessions by peer ID
151    active_sessions: Arc<RwLock<HashMap<PeerId, SessionState>>>,
152    /// Cleanup configuration
153    config: SessionCleanupConfig,
154    /// Cleanup statistics
155    stats: Arc<Mutex<SessionCleanupStats>>,
156    /// Cleanup task handle
157    cleanup_handle: Option<tokio::task::JoinHandle<()>>,
158}
159
160/// Configuration for session cleanup
161#[derive(Debug, Clone)]
162pub struct SessionCleanupConfig {
163    /// Maximum session idle time
164    pub max_idle_time: Duration,
165    /// Maximum session age
166    pub max_session_age: Duration,
167    /// Cleanup interval
168    pub cleanup_interval: Duration,
169    /// Enable aggressive cleanup under memory pressure
170    pub enable_aggressive_cleanup: bool,
171    /// Memory pressure threshold (MB)
172    pub memory_pressure_threshold: usize,
173}
174
175/// Session state for cleanup tracking
176#[derive(Debug)]
177struct SessionState {
178    peer_id: PeerId,
179    created_at: Instant,
180    last_activity: Instant,
181    memory_usage: usize,
182    is_active: bool,
183    cleanup_priority: CleanupPriority,
184}
185
186/// Priority for session cleanup
187#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord)]
188pub enum CleanupPriority {
189    Low,    // Keep as long as possible
190    Normal, // Standard cleanup rules
191    High,   // Clean up aggressively
192}
193
194/// Statistics for session cleanup
195#[derive(Debug, Default, Clone)]
196pub struct SessionCleanupStats {
197    /// Total sessions cleaned up
198    pub sessions_cleaned: u64,
199    /// Memory freed (bytes)
200    pub memory_freed: u64,
201    /// Current active sessions
202    pub active_sessions: usize,
203    /// Average session lifetime
204    pub avg_session_lifetime: Duration,
205}
206
207/// Frame batching coordinator for reduced packet overhead
208#[derive(Debug)]
209pub struct FrameBatchingCoordinator {
210    /// Pending frames by destination
211    pending_frames: Arc<Mutex<HashMap<SocketAddr, BatchedFrameSet>>>,
212    /// Batching configuration
213    config: FrameBatchingConfig,
214    /// Batching statistics
215    stats: Arc<Mutex<FrameBatchingStats>>,
216    /// Flush task handle
217    flush_handle: Option<tokio::task::JoinHandle<()>>,
218}
219
220/// Configuration for frame batching
221#[derive(Debug, Clone)]
222pub struct FrameBatchingConfig {
223    /// Maximum batch size (bytes)
224    pub max_batch_size: usize,
225    /// Maximum batch delay
226    pub max_batch_delay: Duration,
227    /// Maximum frames per batch
228    pub max_frames_per_batch: usize,
229    /// Enable adaptive batching based on network conditions
230    pub enable_adaptive_batching: bool,
231    /// Minimum batch size for efficiency
232    pub min_batch_size: usize,
233}
234
235/// Batched frame set
236#[derive(Debug)]
237struct BatchedFrameSet {
238    frames: Vec<BatchedFrame>,
239    total_size: usize,
240    created_at: Instant,
241    destination: SocketAddr,
242    priority: BatchPriority,
243}
244
245/// Individual batched frame
246#[derive(Debug)]
247struct BatchedFrame {
248    frame_type: u8,
249    payload: Vec<u8>,
250    size: usize,
251    priority: FramePriority,
252    created_at: Instant,
253}
254
255/// Priority for frame batching
256#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord)]
257enum BatchPriority {
258    Low,    // Can wait for full batch
259    Normal, // Standard batching rules
260    High,   // Send quickly, minimal batching
261}
262
263/// Priority for individual frames
264#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord)]
265pub enum FramePriority {
266    Background, // Low priority, can be delayed
267    Normal,     // Standard priority
268    Urgent,     // High priority, minimal delay
269}
270
271/// Statistics for frame batching
272#[derive(Debug, Default, Clone)]
273pub struct FrameBatchingStats {
274    /// Total frames batched
275    pub frames_batched: u64,
276    /// Total batches sent
277    pub batches_sent: u64,
278    /// Average batch size
279    pub avg_batch_size: f64,
280    /// Bytes saved through batching
281    pub bytes_saved: u64,
282    /// Batching efficiency (0.0 - 1.0)
283    pub batching_efficiency: f64,
284}
285
286impl Default for ConnectionPoolConfig {
287    fn default() -> Self {
288        Self {
289            max_connections: 1000,
290            max_idle_time: Duration::from_secs(300), // 5 minutes
291            cleanup_interval: Duration::from_secs(60), // 1 minute
292            enable_reuse: true,
293            max_connection_age: Duration::from_secs(3600), // 1 hour
294        }
295    }
296}
297
298impl Default for CandidateCacheConfig {
299    fn default() -> Self {
300        Self {
301            default_ttl: Duration::from_secs(300), // 5 minutes
302            max_cache_size: 10000,
303            cleanup_interval: Duration::from_secs(60), // 1 minute
304            enable_validation_cache: true,
305            validation_ttl: Duration::from_secs(60), // 1 minute
306        }
307    }
308}
309
310impl Default for SessionCleanupConfig {
311    fn default() -> Self {
312        Self {
313            max_idle_time: Duration::from_secs(600),    // 10 minutes
314            max_session_age: Duration::from_secs(3600), // 1 hour
315            cleanup_interval: Duration::from_secs(120), // 2 minutes
316            enable_aggressive_cleanup: true,
317            memory_pressure_threshold: 512, // 512 MB
318        }
319    }
320}
321
322impl Default for FrameBatchingConfig {
323    fn default() -> Self {
324        Self {
325            max_batch_size: 1200,                       // Just under typical MTU
326            max_batch_delay: Duration::from_millis(10), // 10ms max delay
327            max_frames_per_batch: 10,
328            enable_adaptive_batching: true,
329            min_batch_size: 200, // Minimum size for efficiency
330        }
331    }
332}
333
334impl ConnectionPool {
335    /// Create a new connection pool
336    pub fn new(config: ConnectionPoolConfig) -> Self {
337        Self {
338            active_connections: Arc::new(RwLock::new(HashMap::new())),
339            config,
340            stats: Arc::new(Mutex::new(ConnectionPoolStats::default())),
341            cleanup_handle: None,
342        }
343    }
344
345    /// Start the connection pool with cleanup task
346    pub async fn start(&mut self) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
347        let connections = Arc::clone(&self.active_connections);
348        let stats = Arc::clone(&self.stats);
349        let config = self.config.clone();
350
351        let cleanup_handle = tokio::spawn(async move {
352            let mut interval = tokio::time::interval(config.cleanup_interval);
353
354            loop {
355                interval.tick().await;
356                Self::cleanup_expired_connections(&connections, &stats, &config).await;
357            }
358        });
359
360        self.cleanup_handle = Some(cleanup_handle);
361        info!(
362            "Connection pool started with max_connections={}",
363            self.config.max_connections
364        );
365        Ok(())
366    }
367
368    /// Get or create a connection for a peer
369    pub async fn get_connection(
370        &self,
371        peer_id: PeerId,
372        remote_address: SocketAddr,
373        endpoint: &QuinnEndpoint,
374    ) -> Result<Arc<QuinnConnection>, Box<dyn std::error::Error + Send + Sync>> {
375        // Try to get existing connection
376        if let Some(connection) = self
377            .try_get_existing_connection(peer_id, remote_address)
378            .await
379        {
380            self.update_stats_hit().await;
381            return Ok(connection);
382        }
383
384        // Create new connection
385        let connection = self
386            .create_new_connection(peer_id, remote_address, endpoint)
387            .await?;
388        self.update_stats_miss().await;
389        Ok(connection)
390    }
391
392    /// Try to get existing connection from pool
393    async fn try_get_existing_connection(
394        &self,
395        peer_id: PeerId,
396        remote_address: SocketAddr,
397    ) -> Option<Arc<QuinnConnection>> {
398        let mut connections = self.active_connections.write().unwrap();
399
400        if let Some(pooled) = connections.get_mut(&peer_id) {
401            if pooled.is_active && pooled.remote_address == remote_address {
402                // Check if connection is still valid
403                if pooled.connection.close_reason().is_none() {
404                    pooled.last_used = Instant::now();
405                    pooled.use_count += 1;
406                    debug!("Reusing pooled connection for peer {:?}", peer_id);
407                    return Some(Arc::clone(&pooled.connection));
408                } else {
409                    // Connection is closed, remove it
410                    connections.remove(&peer_id);
411                }
412            }
413        }
414
415        None
416    }
417
418    /// Create a new connection and add to pool
419    async fn create_new_connection(
420        &self,
421        peer_id: PeerId,
422        remote_address: SocketAddr,
423        endpoint: &QuinnEndpoint,
424    ) -> Result<Arc<QuinnConnection>, Box<dyn std::error::Error + Send + Sync>> {
425        // Check pool size limit
426        {
427            let connections = self.active_connections.read().unwrap();
428            if connections.len() >= self.config.max_connections {
429                // Pool is full, need to evict least recently used
430                drop(connections);
431                self.evict_lru_connection().await;
432            }
433        }
434
435        // Create new connection
436        let rustls_config = rustls::ClientConfig::builder()
437            .with_root_certificates(rustls::RootCertStore::empty())
438            .with_no_client_auth();
439
440        let client_crypto = crate::crypto::rustls::QuicClientConfig::try_from(rustls_config)
441            .map_err(|e| format!("Failed to create QUIC client config: {e}"))?;
442
443        let client_config = crate::ClientConfig::new(Arc::new(client_crypto));
444
445        let connecting = endpoint
446            .connect_with(client_config, remote_address, "ant-quic")
447            .map_err(|e| format!("Failed to initiate connection: {e}"))?;
448
449        // Wait for the connection to be established
450        let connection = connecting
451            .await
452            .map_err(|e| format!("Connection failed: {e}"))?;
453
454        let connection_arc = Arc::new(connection);
455
456        let pooled = PooledConnection {
457            connection: Arc::clone(&connection_arc),
458            peer_id,
459            remote_address,
460            created_at: Instant::now(),
461            last_used: Instant::now(),
462            use_count: 1,
463            is_active: true,
464        };
465
466        // Add to pool
467        {
468            let mut connections = self.active_connections.write().unwrap();
469            connections.insert(peer_id, pooled);
470        }
471
472        // Update stats
473        {
474            let mut stats = self.stats.lock().unwrap();
475            stats.connections_created += 1;
476            stats.active_connections += 1;
477        }
478
479        info!("Created new pooled connection for peer {:?}", peer_id);
480        Ok(connection_arc)
481    }
482
483    /// Evict least recently used connection
484    async fn evict_lru_connection(&self) {
485        let mut connections = self.active_connections.write().unwrap();
486
487        if let Some((lru_peer_id, _)) = connections
488            .iter()
489            .min_by_key(|(_, pooled)| pooled.last_used)
490            .map(|(peer_id, pooled)| (*peer_id, pooled.last_used))
491        {
492            connections.remove(&lru_peer_id);
493            debug!("Evicted LRU connection for peer {:?}", lru_peer_id);
494
495            // Update stats
496            let mut stats = self.stats.lock().unwrap();
497            stats.active_connections = stats.active_connections.saturating_sub(1);
498        }
499    }
500
501    /// Update stats for cache hit
502    async fn update_stats_hit(&self) {
503        let mut stats = self.stats.lock().unwrap();
504        stats.connections_reused += 1;
505        let total_requests = stats.connections_created + stats.connections_reused;
506        stats.hit_rate = stats.connections_reused as f64 / total_requests as f64;
507    }
508
509    /// Update stats for cache miss
510    async fn update_stats_miss(&self) {
511        let stats = self.stats.lock().unwrap();
512        let total_requests = stats.connections_created + stats.connections_reused;
513        drop(stats);
514
515        let mut stats = self.stats.lock().unwrap();
516        stats.hit_rate = stats.connections_reused as f64 / total_requests as f64;
517    }
518
519    /// Cleanup expired connections
520    async fn cleanup_expired_connections(
521        connections: &Arc<RwLock<HashMap<PeerId, PooledConnection>>>,
522        stats: &Arc<Mutex<ConnectionPoolStats>>,
523        config: &ConnectionPoolConfig,
524    ) {
525        let now = Instant::now();
526        let mut to_remove = Vec::new();
527
528        // Find expired connections
529        {
530            let connections_read = connections.read().unwrap();
531            for (peer_id, pooled) in connections_read.iter() {
532                let idle_time = now.duration_since(pooled.last_used);
533                let age = now.duration_since(pooled.created_at);
534
535                if idle_time > config.max_idle_time || age > config.max_connection_age {
536                    to_remove.push(*peer_id);
537                }
538
539                // Also remove closed connections
540                if pooled.connection.close_reason().is_some() {
541                    to_remove.push(*peer_id);
542                }
543            }
544        }
545
546        // Remove expired connections
547        if !to_remove.is_empty() {
548            let mut connections_write = connections.write().unwrap();
549            for peer_id in &to_remove {
550                connections_write.remove(peer_id);
551            }
552
553            // Update stats
554            let mut stats_guard = stats.lock().unwrap();
555            stats_guard.connections_expired += to_remove.len() as u64;
556            stats_guard.active_connections = connections_write.len();
557
558            debug!("Cleaned up {} expired connections", to_remove.len());
559        }
560    }
561
562    /// Get connection pool statistics
563    pub async fn get_stats(&self) -> ConnectionPoolStats {
564        self.stats.lock().unwrap().clone()
565    }
566
567    /// Shutdown the connection pool
568    pub async fn shutdown(&mut self) {
569        if let Some(handle) = self.cleanup_handle.take() {
570            handle.abort();
571        }
572
573        // Close all connections
574        {
575            let connections = self.active_connections.read().unwrap();
576            for (_, pooled) in connections.iter() {
577                pooled.connection.close(VarInt::from_u32(0), b"shutdown");
578            }
579        }
580
581        info!("Connection pool shutdown complete");
582    }
583}
584
585impl CandidateCache {
586    /// Create a new candidate cache
587    pub fn new(config: CandidateCacheConfig) -> Self {
588        Self {
589            cache: Arc::new(RwLock::new(HashMap::new())),
590            config,
591            stats: Arc::new(Mutex::new(CandidateCacheStats::default())),
592            cleanup_handle: None,
593        }
594    }
595
596    /// Start the candidate cache with cleanup task
597    pub async fn start(&mut self) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
598        let cache = Arc::clone(&self.cache);
599        let stats = Arc::clone(&self.stats);
600        let config = self.config.clone();
601
602        let cleanup_handle = tokio::spawn(async move {
603            let mut interval = tokio::time::interval(config.cleanup_interval);
604
605            loop {
606                interval.tick().await;
607                Self::cleanup_expired_entries(&cache, &stats, &config).await;
608            }
609        });
610
611        self.cleanup_handle = Some(cleanup_handle);
612        info!(
613            "Candidate cache started with max_size={}",
614            self.config.max_cache_size
615        );
616        Ok(())
617    }
618
619    /// Get cached candidates for a peer
620    pub async fn get_candidates(&self, peer_id: PeerId) -> Option<Vec<CandidateAddress>> {
621        let (is_valid, candidates) = {
622            let cache = self.cache.read().unwrap();
623
624            if let Some(cached_set) = cache.get(&peer_id) {
625                let now = Instant::now();
626
627                // Check if entry is still valid
628                if now.duration_since(cached_set.cached_at) <= cached_set.ttl {
629                    (true, Some(cached_set.candidates.clone()))
630                } else {
631                    (false, None)
632                }
633            } else {
634                (false, None)
635            }
636        };
637
638        if is_valid {
639            // Update access statistics
640            self.update_access_stats(peer_id, true).await;
641
642            if let Some(ref candidates) = candidates {
643                debug!(
644                    "Cache hit for peer {:?}, {} candidates",
645                    peer_id,
646                    candidates.len()
647                );
648            }
649            return candidates;
650        }
651
652        // Cache miss
653        self.update_access_stats(peer_id, false).await;
654        debug!("Cache miss for peer {:?}", peer_id);
655        None
656    }
657
658    /// Cache candidates for a peer
659    pub async fn cache_candidates(
660        &self,
661        peer_id: PeerId,
662        candidates: Vec<CandidateAddress>,
663        ttl: Option<Duration>,
664    ) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
665        let ttl = ttl.unwrap_or(self.config.default_ttl);
666
667        // Check cache size limit
668        {
669            let cache = self.cache.read().unwrap();
670            if cache.len() >= self.config.max_cache_size {
671                drop(cache);
672                self.evict_lru_entry().await;
673            }
674        }
675
676        let candidate_count = candidates.len();
677        let cached_set = CachedCandidateSet {
678            candidates,
679            cached_at: Instant::now(),
680            ttl,
681            access_count: 0,
682            last_accessed: Instant::now(),
683            validation_results: HashMap::new(),
684        };
685
686        // Add to cache
687        {
688            let mut cache = self.cache.write().unwrap();
689            cache.insert(peer_id, cached_set);
690        }
691
692        // Update stats
693        {
694            let mut stats = self.stats.lock().unwrap();
695            stats.current_size += 1;
696        }
697
698        debug!(
699            "Cached {} candidates for peer {:?} with TTL {:?}",
700            candidate_count, peer_id, ttl
701        );
702        Ok(())
703    }
704
705    /// Cache validation result for a candidate
706    pub async fn cache_validation_result(
707        &self,
708        peer_id: PeerId,
709        address: SocketAddr,
710        is_valid: bool,
711        rtt: Option<Duration>,
712    ) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
713        if !self.config.enable_validation_cache {
714            return Ok(());
715        }
716
717        let mut cache = self.cache.write().unwrap();
718
719        if let Some(cached_set) = cache.get_mut(&peer_id) {
720            let validation_entry = ValidationCacheEntry {
721                is_valid,
722                rtt,
723                cached_at: Instant::now(),
724                ttl: self.config.validation_ttl,
725            };
726
727            cached_set
728                .validation_results
729                .insert(address, validation_entry);
730            debug!(
731                "Cached validation result for {}:{} -> {}",
732                peer_id.0[0], address, is_valid
733            );
734        }
735
736        Ok(())
737    }
738
739    /// Get cached validation result
740    pub async fn get_validation_result(
741        &self,
742        peer_id: PeerId,
743        address: SocketAddr,
744    ) -> Option<(bool, Option<Duration>)> {
745        if !self.config.enable_validation_cache {
746            return None;
747        }
748
749        let cache = self.cache.read().unwrap();
750
751        if let Some(cached_set) = cache.get(&peer_id) {
752            if let Some(validation_entry) = cached_set.validation_results.get(&address) {
753                let now = Instant::now();
754
755                // Check if validation result is still valid
756                if now.duration_since(validation_entry.cached_at) <= validation_entry.ttl {
757                    return Some((validation_entry.is_valid, validation_entry.rtt));
758                }
759            }
760        }
761
762        None
763    }
764
765    /// Update access statistics
766    async fn update_access_stats(&self, peer_id: PeerId, hit: bool) {
767        // Update cache-level stats
768        {
769            let mut stats = self.stats.lock().unwrap();
770            if hit {
771                stats.cache_hits += 1;
772            } else {
773                stats.cache_misses += 1;
774            }
775
776            let total_accesses = stats.cache_hits + stats.cache_misses;
777            stats.hit_rate = stats.cache_hits as f64 / total_accesses as f64;
778        }
779
780        // Update entry-level stats
781        if hit {
782            let mut cache = self.cache.write().unwrap();
783            if let Some(cached_set) = cache.get_mut(&peer_id) {
784                cached_set.access_count += 1;
785                cached_set.last_accessed = Instant::now();
786            }
787        }
788    }
789
790    /// Evict least recently used entry
791    async fn evict_lru_entry(&self) {
792        let mut cache = self.cache.write().unwrap();
793
794        if let Some((lru_peer_id, _)) = cache
795            .iter()
796            .min_by_key(|(_, cached_set)| cached_set.last_accessed)
797            .map(|(peer_id, cached_set)| (*peer_id, cached_set.last_accessed))
798        {
799            cache.remove(&lru_peer_id);
800            debug!("Evicted LRU cache entry for peer {:?}", lru_peer_id);
801
802            // Update stats
803            let mut stats = self.stats.lock().unwrap();
804            stats.current_size = stats.current_size.saturating_sub(1);
805        }
806    }
807
808    /// Cleanup expired cache entries
809    async fn cleanup_expired_entries(
810        cache: &Arc<RwLock<HashMap<PeerId, CachedCandidateSet>>>,
811        stats: &Arc<Mutex<CandidateCacheStats>>,
812        _config: &CandidateCacheConfig,
813    ) {
814        let now = Instant::now();
815        let mut to_remove = Vec::new();
816
817        // Find expired entries
818        {
819            let cache_read = cache.read().unwrap();
820            for (peer_id, cached_set) in cache_read.iter() {
821                let age = now.duration_since(cached_set.cached_at);
822                if age > cached_set.ttl {
823                    to_remove.push(*peer_id);
824                }
825            }
826        }
827
828        // Remove expired entries
829        if !to_remove.is_empty() {
830            let mut cache_write = cache.write().unwrap();
831            for peer_id in &to_remove {
832                cache_write.remove(peer_id);
833            }
834
835            // Update stats
836            let mut stats_guard = stats.lock().unwrap();
837            stats_guard.entries_expired += to_remove.len() as u64;
838            stats_guard.current_size = cache_write.len();
839
840            debug!("Cleaned up {} expired cache entries", to_remove.len());
841        }
842
843        // Also cleanup expired validation results
844        {
845            let mut cache_write = cache.write().unwrap();
846            for cached_set in cache_write.values_mut() {
847                cached_set.validation_results.retain(|_, validation_entry| {
848                    now.duration_since(validation_entry.cached_at) <= validation_entry.ttl
849                });
850            }
851        }
852    }
853
854    /// Get cache statistics
855    pub async fn get_stats(&self) -> CandidateCacheStats {
856        self.stats.lock().unwrap().clone()
857    }
858
859    /// Shutdown the candidate cache
860    pub async fn shutdown(&mut self) {
861        if let Some(handle) = self.cleanup_handle.take() {
862            handle.abort();
863        }
864
865        // Clear cache
866        {
867            let mut cache = self.cache.write().unwrap();
868            cache.clear();
869        }
870
871        info!("Candidate cache shutdown complete");
872    }
873}
874
875impl SessionCleanupCoordinator {
876    /// Create a new session cleanup coordinator
877    pub fn new(config: SessionCleanupConfig) -> Self {
878        Self {
879            active_sessions: Arc::new(RwLock::new(HashMap::new())),
880            config,
881            stats: Arc::new(Mutex::new(SessionCleanupStats::default())),
882            cleanup_handle: None,
883        }
884    }
885
886    /// Start the session cleanup coordinator
887    pub async fn start(&mut self) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
888        let sessions = Arc::clone(&self.active_sessions);
889        let stats = Arc::clone(&self.stats);
890        let config = self.config.clone();
891
892        let cleanup_handle = tokio::spawn(async move {
893            let mut interval = tokio::time::interval(config.cleanup_interval);
894
895            loop {
896                interval.tick().await;
897                Self::cleanup_expired_sessions(&sessions, &stats, &config).await;
898            }
899        });
900
901        self.cleanup_handle = Some(cleanup_handle);
902        info!("Session cleanup coordinator started");
903        Ok(())
904    }
905
906    /// Register a new session
907    pub async fn register_session(
908        &self,
909        peer_id: PeerId,
910        memory_usage: usize,
911        priority: CleanupPriority,
912    ) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
913        let session_state = SessionState {
914            peer_id,
915            created_at: Instant::now(),
916            last_activity: Instant::now(),
917            memory_usage,
918            is_active: true,
919            cleanup_priority: priority,
920        };
921
922        {
923            let mut sessions = self.active_sessions.write().unwrap();
924            sessions.insert(peer_id, session_state);
925        }
926
927        // Update stats
928        {
929            let mut stats = self.stats.lock().unwrap();
930            stats.active_sessions += 1;
931        }
932
933        debug!(
934            "Registered session for peer {:?} with {} bytes",
935            peer_id, memory_usage
936        );
937        Ok(())
938    }
939
940    /// Update session activity
941    pub async fn update_session_activity(&self, peer_id: PeerId) {
942        let mut sessions = self.active_sessions.write().unwrap();
943        if let Some(session) = sessions.get_mut(&peer_id) {
944            session.last_activity = Instant::now();
945        }
946    }
947
948    /// Cleanup expired sessions
949    async fn cleanup_expired_sessions(
950        sessions: &Arc<RwLock<HashMap<PeerId, SessionState>>>,
951        stats: &Arc<Mutex<SessionCleanupStats>>,
952        config: &SessionCleanupConfig,
953    ) {
954        let now = Instant::now();
955        let mut to_remove = Vec::new();
956        let mut memory_freed = 0u64;
957
958        // Check for memory pressure
959        let memory_pressure = Self::check_memory_pressure(config);
960
961        // Find sessions to cleanup
962        {
963            let sessions_read = sessions.read().unwrap();
964            for (peer_id, session) in sessions_read.iter() {
965                let idle_time = now.duration_since(session.last_activity);
966                let age = now.duration_since(session.created_at);
967
968                let should_cleanup = if memory_pressure && config.enable_aggressive_cleanup {
969                    // Aggressive cleanup under memory pressure
970                    match session.cleanup_priority {
971                        CleanupPriority::High => idle_time > Duration::from_secs(30),
972                        CleanupPriority::Normal => idle_time > Duration::from_secs(60),
973                        CleanupPriority::Low => idle_time > config.max_idle_time / 2,
974                    }
975                } else {
976                    // Normal cleanup rules
977                    idle_time > config.max_idle_time || age > config.max_session_age
978                };
979
980                if should_cleanup {
981                    to_remove.push(*peer_id);
982                    memory_freed += session.memory_usage as u64;
983                }
984            }
985        }
986
987        // Remove expired sessions
988        if !to_remove.is_empty() {
989            let mut sessions_write = sessions.write().unwrap();
990            for peer_id in &to_remove {
991                sessions_write.remove(peer_id);
992            }
993
994            // Update stats
995            let mut stats_guard = stats.lock().unwrap();
996            stats_guard.sessions_cleaned += to_remove.len() as u64;
997            stats_guard.memory_freed += memory_freed;
998            stats_guard.active_sessions = sessions_write.len();
999
1000            if memory_pressure {
1001                info!(
1002                    "Aggressive cleanup: removed {} sessions, freed {} bytes",
1003                    to_remove.len(),
1004                    memory_freed
1005                );
1006            } else {
1007                debug!(
1008                    "Regular cleanup: removed {} sessions, freed {} bytes",
1009                    to_remove.len(),
1010                    memory_freed
1011                );
1012            }
1013        }
1014    }
1015
1016    /// Check if system is under memory pressure
1017    fn check_memory_pressure(config: &SessionCleanupConfig) -> bool {
1018        // Simplified memory pressure detection
1019        // In production, this would check actual system memory usage
1020        // For now, return false (no memory pressure)
1021        // This can be enhanced with system memory monitoring
1022        let _ = config.memory_pressure_threshold;
1023        false
1024    }
1025
1026    /// Get session cleanup statistics
1027    pub async fn get_stats(&self) -> SessionCleanupStats {
1028        self.stats.lock().unwrap().clone()
1029    }
1030
1031    /// Shutdown the session cleanup coordinator
1032    pub async fn shutdown(&mut self) {
1033        if let Some(handle) = self.cleanup_handle.take() {
1034            handle.abort();
1035        }
1036
1037        // Clear sessions
1038        {
1039            let mut sessions = self.active_sessions.write().unwrap();
1040            sessions.clear();
1041        }
1042
1043        info!("Session cleanup coordinator shutdown complete");
1044    }
1045}
1046
1047impl FrameBatchingCoordinator {
1048    /// Create a new frame batching coordinator
1049    pub fn new(config: FrameBatchingConfig) -> Self {
1050        Self {
1051            pending_frames: Arc::new(Mutex::new(HashMap::new())),
1052            config,
1053            stats: Arc::new(Mutex::new(FrameBatchingStats::default())),
1054            flush_handle: None,
1055        }
1056    }
1057
1058    /// Start the frame batching coordinator
1059    pub async fn start(&mut self) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
1060        let pending_frames = Arc::clone(&self.pending_frames);
1061        let stats = Arc::clone(&self.stats);
1062        let config = self.config.clone();
1063
1064        let flush_handle = tokio::spawn(async move {
1065            let mut interval = tokio::time::interval(config.max_batch_delay / 4);
1066
1067            loop {
1068                interval.tick().await;
1069                Self::flush_expired_batches(&pending_frames, &stats, &config).await;
1070            }
1071        });
1072
1073        self.flush_handle = Some(flush_handle);
1074        info!("Frame batching coordinator started");
1075        Ok(())
1076    }
1077
1078    /// Add frame to batch
1079    pub async fn add_frame(
1080        &self,
1081        destination: SocketAddr,
1082        frame_type: u8,
1083        payload: Vec<u8>,
1084        priority: FramePriority,
1085    ) -> Result<Option<Vec<u8>>, Box<dyn std::error::Error + Send + Sync>> {
1086        let frame = BatchedFrame {
1087            frame_type,
1088            size: payload.len(),
1089            payload,
1090            priority,
1091            created_at: Instant::now(),
1092        };
1093
1094        let mut pending = self.pending_frames.lock().unwrap();
1095
1096        // Check if we need to flush
1097        let (should_flush, frames_count, total_size) = {
1098            let batch_set = pending
1099                .entry(destination)
1100                .or_insert_with(|| BatchedFrameSet {
1101                    frames: Vec::new(),
1102                    total_size: 0,
1103                    created_at: Instant::now(),
1104                    destination,
1105                    priority: BatchPriority::Normal,
1106                });
1107
1108            batch_set.frames.push(frame);
1109            batch_set.total_size += batch_set.frames.last().unwrap().size;
1110
1111            // Update batch priority based on frame priority
1112            if priority == FramePriority::Urgent {
1113                batch_set.priority = BatchPriority::High;
1114            }
1115
1116            // Check if batch should be flushed immediately
1117            let should_flush = self.should_flush_batch(batch_set);
1118            (should_flush, batch_set.frames.len(), batch_set.total_size)
1119        };
1120
1121        if should_flush {
1122            // Remove and serialize the batch
1123            if let Some(batch_set) = pending.remove(&destination) {
1124                let batch_data = self.serialize_batch(&batch_set);
1125
1126                // Update stats
1127                {
1128                    let mut stats = self.stats.lock().unwrap();
1129                    stats.batches_sent += 1;
1130                    stats.frames_batched += frames_count as u64;
1131                    stats.avg_batch_size = (stats.avg_batch_size * (stats.batches_sent - 1) as f64
1132                        + total_size as f64)
1133                        / stats.batches_sent as f64;
1134                }
1135
1136                debug!(
1137                    "Flushed batch to {} with {} frames ({} bytes)",
1138                    destination, frames_count, total_size
1139                );
1140                return Ok(Some(batch_data));
1141            }
1142        }
1143
1144        Ok(None)
1145    }
1146
1147    /// Check if batch should be flushed
1148    fn should_flush_batch(&self, batch_set: &BatchedFrameSet) -> bool {
1149        let now = Instant::now();
1150        let age = now.duration_since(batch_set.created_at);
1151
1152        // Flush if batch is full, old, or high priority
1153        batch_set.total_size >= self.config.max_batch_size
1154            || batch_set.frames.len() >= self.config.max_frames_per_batch
1155            || age >= self.config.max_batch_delay
1156            || batch_set.priority == BatchPriority::High
1157    }
1158
1159    /// Serialize batch into packet data
1160    fn serialize_batch(&self, batch_set: &BatchedFrameSet) -> Vec<u8> {
1161        let mut data = Vec::with_capacity(batch_set.total_size + batch_set.frames.len() * 4);
1162
1163        for frame in &batch_set.frames {
1164            // Add frame type and length
1165            data.push(frame.frame_type);
1166            data.extend_from_slice(&(frame.payload.len() as u16).to_be_bytes());
1167            // Add frame payload
1168            data.extend_from_slice(&frame.payload);
1169        }
1170
1171        data
1172    }
1173
1174    /// Flush expired batches
1175    async fn flush_expired_batches(
1176        pending_frames: &Arc<Mutex<HashMap<SocketAddr, BatchedFrameSet>>>,
1177        stats: &Arc<Mutex<FrameBatchingStats>>,
1178        config: &FrameBatchingConfig,
1179    ) {
1180        let now = Instant::now();
1181        let mut to_flush = Vec::new();
1182
1183        // Find expired batches
1184        {
1185            let pending = pending_frames.lock().unwrap();
1186            for (destination, batch_set) in pending.iter() {
1187                let age = now.duration_since(batch_set.created_at);
1188                if age >= config.max_batch_delay {
1189                    to_flush.push((*destination, batch_set.frames.len(), batch_set.total_size));
1190                }
1191            }
1192        }
1193
1194        // Flush expired batches
1195        if !to_flush.is_empty() {
1196            let mut pending = pending_frames.lock().unwrap();
1197            let flush_count = to_flush.len();
1198            for (destination, frame_count, total_size) in to_flush {
1199                pending.remove(&destination);
1200
1201                // Update stats
1202                let mut stats_guard = stats.lock().unwrap();
1203                stats_guard.batches_sent += 1;
1204                stats_guard.frames_batched += frame_count as u64;
1205                stats_guard.avg_batch_size = (stats_guard.avg_batch_size
1206                    * (stats_guard.batches_sent - 1) as f64
1207                    + total_size as f64)
1208                    / stats_guard.batches_sent as f64;
1209            }
1210
1211            debug!("Flushed {} expired batches", flush_count);
1212        }
1213    }
1214
1215    /// Get batching statistics
1216    pub async fn get_stats(&self) -> FrameBatchingStats {
1217        self.stats.lock().unwrap().clone()
1218    }
1219
1220    /// Shutdown the frame batching coordinator
1221    pub async fn shutdown(&mut self) {
1222        if let Some(handle) = self.flush_handle.take() {
1223            handle.abort();
1224        }
1225
1226        // Flush all pending batches
1227        {
1228            let mut pending = self.pending_frames.lock().unwrap();
1229            pending.clear();
1230        }
1231
1232        info!("Frame batching coordinator shutdown complete");
1233    }
1234}
1235
1236/// Memory optimization manager that coordinates all memory optimization components
1237#[derive(Debug)]
1238pub struct MemoryOptimizationManager {
1239    connection_pool: ConnectionPool,
1240    candidate_cache: CandidateCache,
1241    session_cleanup: SessionCleanupCoordinator,
1242    frame_batching: FrameBatchingCoordinator,
1243    is_running: bool,
1244}
1245
1246impl MemoryOptimizationManager {
1247    /// Create a new memory optimization manager with default configurations
1248    pub fn new() -> Self {
1249        Self {
1250            connection_pool: ConnectionPool::new(ConnectionPoolConfig::default()),
1251            candidate_cache: CandidateCache::new(CandidateCacheConfig::default()),
1252            session_cleanup: SessionCleanupCoordinator::new(SessionCleanupConfig::default()),
1253            frame_batching: FrameBatchingCoordinator::new(FrameBatchingConfig::default()),
1254            is_running: false,
1255        }
1256    }
1257
1258    /// Create a new memory optimization manager with custom configurations
1259    pub fn with_configs(
1260        pool_config: ConnectionPoolConfig,
1261        cache_config: CandidateCacheConfig,
1262        cleanup_config: SessionCleanupConfig,
1263        batching_config: FrameBatchingConfig,
1264    ) -> Self {
1265        Self {
1266            connection_pool: ConnectionPool::new(pool_config),
1267            candidate_cache: CandidateCache::new(cache_config),
1268            session_cleanup: SessionCleanupCoordinator::new(cleanup_config),
1269            frame_batching: FrameBatchingCoordinator::new(batching_config),
1270            is_running: false,
1271        }
1272    }
1273
1274    /// Start all memory optimization components
1275    pub async fn start(&mut self) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
1276        if self.is_running {
1277            return Ok(());
1278        }
1279
1280        self.connection_pool.start().await?;
1281        self.candidate_cache.start().await?;
1282        self.session_cleanup.start().await?;
1283        self.frame_batching.start().await?;
1284
1285        self.is_running = true;
1286        info!("Memory optimization manager started");
1287        Ok(())
1288    }
1289
1290    /// Get connection pool reference
1291    pub fn connection_pool(&self) -> &ConnectionPool {
1292        &self.connection_pool
1293    }
1294
1295    /// Get candidate cache reference
1296    pub fn candidate_cache(&self) -> &CandidateCache {
1297        &self.candidate_cache
1298    }
1299
1300    /// Get session cleanup coordinator reference
1301    pub fn session_cleanup(&self) -> &SessionCleanupCoordinator {
1302        &self.session_cleanup
1303    }
1304
1305    /// Get frame batching coordinator reference
1306    pub fn frame_batching(&self) -> &FrameBatchingCoordinator {
1307        &self.frame_batching
1308    }
1309
1310    /// Get comprehensive memory optimization statistics
1311    pub async fn get_comprehensive_stats(&self) -> MemoryOptimizationStats {
1312        MemoryOptimizationStats {
1313            connection_pool: self.connection_pool.get_stats().await,
1314            candidate_cache: self.candidate_cache.get_stats().await,
1315            session_cleanup: self.session_cleanup.get_stats().await,
1316            frame_batching: self.frame_batching.get_stats().await,
1317        }
1318    }
1319
1320    /// Shutdown all memory optimization components
1321    pub async fn shutdown(&mut self) {
1322        if !self.is_running {
1323            return;
1324        }
1325
1326        self.connection_pool.shutdown().await;
1327        self.candidate_cache.shutdown().await;
1328        self.session_cleanup.shutdown().await;
1329        self.frame_batching.shutdown().await;
1330
1331        self.is_running = false;
1332        info!("Memory optimization manager shutdown complete");
1333    }
1334}
1335
1336/// Comprehensive memory optimization statistics
1337#[derive(Debug, Clone)]
1338pub struct MemoryOptimizationStats {
1339    pub connection_pool: ConnectionPoolStats,
1340    pub candidate_cache: CandidateCacheStats,
1341    pub session_cleanup: SessionCleanupStats,
1342    pub frame_batching: FrameBatchingStats,
1343}
1344
1345impl Default for MemoryOptimizationManager {
1346    fn default() -> Self {
1347        Self::new()
1348    }
1349}