ant_quic/optimization/
memory.rs

1//! Memory optimization components for ant-quic
2//!
3//! This module provides memory-efficient resource management including:
4//! - Connection pooling for Quinn connections
5//! - Candidate caching with TTL
6//! - Automatic cleanup of expired sessions and state
7//! - Frame batching for reduced packet overhead
8
9use std::{
10    collections::HashMap,
11    net::SocketAddr,
12    sync::{Arc, Mutex, RwLock},
13    time::{Duration, Instant},
14};
15
16use tracing::{debug, info};
17
18use crate::{Endpoint as QuinnEndpoint, HighLevelConnection as QuinnConnection};
19
20use crate::{
21    VarInt,
22    nat_traversal_api::{CandidateAddress, PeerId},
23};
24
25/// Connection pool for reusing Quinn connections
26#[derive(Debug)]
27pub struct ConnectionPool {
28    /// Active connections by peer ID
29    active_connections: Arc<RwLock<HashMap<PeerId, PooledConnection>>>,
30    /// Connection pool configuration
31    config: ConnectionPoolConfig,
32    /// Pool statistics
33    stats: Arc<Mutex<ConnectionPoolStats>>,
34    /// Cleanup task handle
35    cleanup_handle: Option<tokio::task::JoinHandle<()>>,
36}
37
38/// Configuration for connection pooling
39#[derive(Debug, Clone)]
40pub struct ConnectionPoolConfig {
41    /// Maximum number of connections to pool
42    pub max_connections: usize,
43    /// Maximum idle time before connection cleanup
44    pub max_idle_time: Duration,
45    /// Cleanup interval for expired connections
46    pub cleanup_interval: Duration,
47    /// Enable connection reuse
48    pub enable_reuse: bool,
49    /// Maximum connection age before forced refresh
50    pub max_connection_age: Duration,
51}
52
53/// A pooled connection with metadata
54#[derive(Debug)]
55struct PooledConnection {
56    connection: Arc<QuinnConnection>,
57    peer_id: PeerId,
58    remote_address: SocketAddr,
59    created_at: Instant,
60    last_used: Instant,
61    use_count: u64,
62    is_active: bool,
63}
64
65/// Statistics for connection pool
66#[derive(Debug, Default, Clone)]
67pub struct ConnectionPoolStats {
68    /// Total connections created
69    pub connections_created: u64,
70    /// Total connections reused
71    pub connections_reused: u64,
72    /// Total connections expired
73    pub connections_expired: u64,
74    /// Current active connections
75    pub active_connections: usize,
76    /// Pool hit rate (reuse / total requests)
77    pub hit_rate: f64,
78    /// Average connection age
79    pub avg_connection_age: Duration,
80}
81
82/// Candidate cache with TTL for efficient candidate management
83#[derive(Debug)]
84pub struct CandidateCache {
85    /// Cached candidates by peer ID
86    cache: Arc<RwLock<HashMap<PeerId, CachedCandidateSet>>>,
87    /// Cache configuration
88    config: CandidateCacheConfig,
89    /// Cache statistics
90    stats: Arc<Mutex<CandidateCacheStats>>,
91    /// Cleanup task handle
92    cleanup_handle: Option<tokio::task::JoinHandle<()>>,
93}
94
95/// Configuration for candidate caching
96#[derive(Debug, Clone)]
97pub struct CandidateCacheConfig {
98    /// Default TTL for cached candidates
99    pub default_ttl: Duration,
100    /// Maximum number of candidate sets to cache
101    pub max_cache_size: usize,
102    /// Cleanup interval for expired entries
103    pub cleanup_interval: Duration,
104    /// Enable candidate validation caching
105    pub enable_validation_cache: bool,
106    /// TTL for validation results
107    pub validation_ttl: Duration,
108}
109
110/// Cached candidate set with metadata
111#[derive(Debug, Clone)]
112struct CachedCandidateSet {
113    candidates: Vec<CandidateAddress>,
114    cached_at: Instant,
115    ttl: Duration,
116    access_count: u64,
117    last_accessed: Instant,
118    validation_results: HashMap<SocketAddr, ValidationCacheEntry>,
119}
120
121/// Cached validation result
122#[derive(Debug, Clone)]
123struct ValidationCacheEntry {
124    is_valid: bool,
125    rtt: Option<Duration>,
126    cached_at: Instant,
127    ttl: Duration,
128}
129
130/// Statistics for candidate cache
131#[derive(Debug, Default, Clone)]
132pub struct CandidateCacheStats {
133    /// Total cache hits
134    pub cache_hits: u64,
135    /// Total cache misses
136    pub cache_misses: u64,
137    /// Total entries expired
138    pub entries_expired: u64,
139    /// Current cache size
140    pub current_size: usize,
141    /// Cache hit rate
142    pub hit_rate: f64,
143    /// Average entry age
144    pub avg_entry_age: Duration,
145}
146
147/// Session state cleanup coordinator
148#[derive(Debug)]
149pub struct SessionCleanupCoordinator {
150    /// Active sessions by peer ID
151    active_sessions: Arc<RwLock<HashMap<PeerId, SessionState>>>,
152    /// Cleanup configuration
153    config: SessionCleanupConfig,
154    /// Cleanup statistics
155    stats: Arc<Mutex<SessionCleanupStats>>,
156    /// Cleanup task handle
157    cleanup_handle: Option<tokio::task::JoinHandle<()>>,
158}
159
160/// Configuration for session cleanup
161#[derive(Debug, Clone)]
162pub struct SessionCleanupConfig {
163    /// Maximum session idle time
164    pub max_idle_time: Duration,
165    /// Maximum session age
166    pub max_session_age: Duration,
167    /// Cleanup interval
168    pub cleanup_interval: Duration,
169    /// Enable aggressive cleanup under memory pressure
170    pub enable_aggressive_cleanup: bool,
171    /// Memory pressure threshold (MB)
172    pub memory_pressure_threshold: usize,
173}
174
175/// Session state for cleanup tracking
176#[derive(Debug)]
177struct SessionState {
178    peer_id: PeerId,
179    created_at: Instant,
180    last_activity: Instant,
181    memory_usage: usize,
182    is_active: bool,
183    cleanup_priority: CleanupPriority,
184}
185
186/// Priority for session cleanup
187#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord)]
188pub enum CleanupPriority {
189    Low,    // Keep as long as possible
190    Normal, // Standard cleanup rules
191    High,   // Clean up aggressively
192}
193
194/// Statistics for session cleanup
195#[derive(Debug, Default, Clone)]
196pub struct SessionCleanupStats {
197    /// Total sessions cleaned up
198    pub sessions_cleaned: u64,
199    /// Memory freed (bytes)
200    pub memory_freed: u64,
201    /// Current active sessions
202    pub active_sessions: usize,
203    /// Average session lifetime
204    pub avg_session_lifetime: Duration,
205}
206
207/// Frame batching coordinator for reduced packet overhead
208#[derive(Debug)]
209pub struct FrameBatchingCoordinator {
210    /// Pending frames by destination
211    pending_frames: Arc<Mutex<HashMap<SocketAddr, BatchedFrameSet>>>,
212    /// Batching configuration
213    config: FrameBatchingConfig,
214    /// Batching statistics
215    stats: Arc<Mutex<FrameBatchingStats>>,
216    /// Flush task handle
217    flush_handle: Option<tokio::task::JoinHandle<()>>,
218}
219
220/// Configuration for frame batching
221#[derive(Debug, Clone)]
222pub struct FrameBatchingConfig {
223    /// Maximum batch size (bytes)
224    pub max_batch_size: usize,
225    /// Maximum batch delay
226    pub max_batch_delay: Duration,
227    /// Maximum frames per batch
228    pub max_frames_per_batch: usize,
229    /// Enable adaptive batching based on network conditions
230    pub enable_adaptive_batching: bool,
231    /// Minimum batch size for efficiency
232    pub min_batch_size: usize,
233}
234
235/// Batched frame set
236#[derive(Debug)]
237struct BatchedFrameSet {
238    frames: Vec<BatchedFrame>,
239    total_size: usize,
240    created_at: Instant,
241    destination: SocketAddr,
242    priority: BatchPriority,
243}
244
245/// Individual batched frame
246#[derive(Debug)]
247struct BatchedFrame {
248    frame_type: u8,
249    payload: Vec<u8>,
250    size: usize,
251    priority: FramePriority,
252    created_at: Instant,
253}
254
255/// Priority for frame batching
256#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord)]
257enum BatchPriority {
258    Low,    // Can wait for full batch
259    Normal, // Standard batching rules
260    High,   // Send quickly, minimal batching
261}
262
263/// Priority for individual frames
264#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord)]
265pub enum FramePriority {
266    Background, // Low priority, can be delayed
267    Normal,     // Standard priority
268    Urgent,     // High priority, minimal delay
269}
270
271/// Statistics for frame batching
272#[derive(Debug, Default, Clone)]
273pub struct FrameBatchingStats {
274    /// Total frames batched
275    pub frames_batched: u64,
276    /// Total batches sent
277    pub batches_sent: u64,
278    /// Average batch size
279    pub avg_batch_size: f64,
280    /// Bytes saved through batching
281    pub bytes_saved: u64,
282    /// Batching efficiency (0.0 - 1.0)
283    pub batching_efficiency: f64,
284}
285
286impl Default for ConnectionPoolConfig {
287    fn default() -> Self {
288        Self {
289            max_connections: 1000,
290            max_idle_time: Duration::from_secs(300), // 5 minutes
291            cleanup_interval: Duration::from_secs(60), // 1 minute
292            enable_reuse: true,
293            max_connection_age: Duration::from_secs(3600), // 1 hour
294        }
295    }
296}
297
298impl Default for CandidateCacheConfig {
299    fn default() -> Self {
300        Self {
301            default_ttl: Duration::from_secs(300), // 5 minutes
302            max_cache_size: 10000,
303            cleanup_interval: Duration::from_secs(60), // 1 minute
304            enable_validation_cache: true,
305            validation_ttl: Duration::from_secs(60), // 1 minute
306        }
307    }
308}
309
310impl Default for SessionCleanupConfig {
311    fn default() -> Self {
312        Self {
313            max_idle_time: Duration::from_secs(600),    // 10 minutes
314            max_session_age: Duration::from_secs(3600), // 1 hour
315            cleanup_interval: Duration::from_secs(120), // 2 minutes
316            enable_aggressive_cleanup: true,
317            memory_pressure_threshold: 512, // 512 MB
318        }
319    }
320}
321
322impl Default for FrameBatchingConfig {
323    fn default() -> Self {
324        Self {
325            max_batch_size: 1200,                       // Just under typical MTU
326            max_batch_delay: Duration::from_millis(10), // 10ms max delay
327            max_frames_per_batch: 10,
328            enable_adaptive_batching: true,
329            min_batch_size: 200, // Minimum size for efficiency
330        }
331    }
332}
333
334impl ConnectionPool {
335    /// Create a new connection pool
336    pub fn new(config: ConnectionPoolConfig) -> Self {
337        let pool = Self {
338            active_connections: Arc::new(RwLock::new(HashMap::new())),
339            config,
340            stats: Arc::new(Mutex::new(ConnectionPoolStats::default())),
341            cleanup_handle: None,
342        };
343
344        pool
345    }
346
347    /// Start the connection pool with cleanup task
348    pub async fn start(&mut self) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
349        let connections = Arc::clone(&self.active_connections);
350        let stats = Arc::clone(&self.stats);
351        let config = self.config.clone();
352
353        let cleanup_handle = tokio::spawn(async move {
354            let mut interval = tokio::time::interval(config.cleanup_interval);
355
356            loop {
357                interval.tick().await;
358                Self::cleanup_expired_connections(&connections, &stats, &config).await;
359            }
360        });
361
362        self.cleanup_handle = Some(cleanup_handle);
363        info!(
364            "Connection pool started with max_connections={}",
365            self.config.max_connections
366        );
367        Ok(())
368    }
369
370    /// Get or create a connection for a peer
371    pub async fn get_connection(
372        &self,
373        peer_id: PeerId,
374        remote_address: SocketAddr,
375        endpoint: &QuinnEndpoint,
376    ) -> Result<Arc<QuinnConnection>, Box<dyn std::error::Error + Send + Sync>> {
377        // Try to get existing connection
378        if let Some(connection) = self
379            .try_get_existing_connection(peer_id, remote_address)
380            .await
381        {
382            self.update_stats_hit().await;
383            return Ok(connection);
384        }
385
386        // Create new connection
387        let connection = self
388            .create_new_connection(peer_id, remote_address, endpoint)
389            .await?;
390        self.update_stats_miss().await;
391        Ok(connection)
392    }
393
394    /// Try to get existing connection from pool
395    async fn try_get_existing_connection(
396        &self,
397        peer_id: PeerId,
398        remote_address: SocketAddr,
399    ) -> Option<Arc<QuinnConnection>> {
400        let mut connections = self.active_connections.write().unwrap();
401
402        if let Some(pooled) = connections.get_mut(&peer_id) {
403            if pooled.is_active && pooled.remote_address == remote_address {
404                // Check if connection is still valid
405                if pooled.connection.close_reason().is_none() {
406                    pooled.last_used = Instant::now();
407                    pooled.use_count += 1;
408                    debug!("Reusing pooled connection for peer {:?}", peer_id);
409                    return Some(Arc::clone(&pooled.connection));
410                } else {
411                    // Connection is closed, remove it
412                    connections.remove(&peer_id);
413                }
414            }
415        }
416
417        None
418    }
419
420    /// Create a new connection and add to pool
421    async fn create_new_connection(
422        &self,
423        peer_id: PeerId,
424        remote_address: SocketAddr,
425        endpoint: &QuinnEndpoint,
426    ) -> Result<Arc<QuinnConnection>, Box<dyn std::error::Error + Send + Sync>> {
427        // Check pool size limit
428        {
429            let connections = self.active_connections.read().unwrap();
430            if connections.len() >= self.config.max_connections {
431                // Pool is full, need to evict least recently used
432                drop(connections);
433                self.evict_lru_connection().await;
434            }
435        }
436
437        // Create new connection
438        let rustls_config = rustls::ClientConfig::builder()
439            .with_root_certificates(rustls::RootCertStore::empty())
440            .with_no_client_auth();
441
442        let client_crypto = crate::crypto::rustls::QuicClientConfig::try_from(rustls_config)
443            .map_err(|e| format!("Failed to create QUIC client config: {}", e))?;
444
445        let client_config = crate::ClientConfig::new(Arc::new(client_crypto));
446
447        let connecting = endpoint
448            .connect_with(client_config, remote_address, "ant-quic")
449            .map_err(|e| format!("Failed to initiate connection: {}", e))?;
450
451        // Wait for the connection to be established
452        let connection = connecting
453            .await
454            .map_err(|e| format!("Connection failed: {}", e))?;
455
456        let connection_arc = Arc::new(connection);
457
458        let pooled = PooledConnection {
459            connection: Arc::clone(&connection_arc),
460            peer_id,
461            remote_address,
462            created_at: Instant::now(),
463            last_used: Instant::now(),
464            use_count: 1,
465            is_active: true,
466        };
467
468        // Add to pool
469        {
470            let mut connections = self.active_connections.write().unwrap();
471            connections.insert(peer_id, pooled);
472        }
473
474        // Update stats
475        {
476            let mut stats = self.stats.lock().unwrap();
477            stats.connections_created += 1;
478            stats.active_connections += 1;
479        }
480
481        info!("Created new pooled connection for peer {:?}", peer_id);
482        Ok(connection_arc)
483    }
484
485    /// Evict least recently used connection
486    async fn evict_lru_connection(&self) {
487        let mut connections = self.active_connections.write().unwrap();
488
489        if let Some((lru_peer_id, _)) = connections
490            .iter()
491            .min_by_key(|(_, pooled)| pooled.last_used)
492            .map(|(peer_id, pooled)| (*peer_id, pooled.last_used))
493        {
494            connections.remove(&lru_peer_id);
495            debug!("Evicted LRU connection for peer {:?}", lru_peer_id);
496
497            // Update stats
498            let mut stats = self.stats.lock().unwrap();
499            stats.active_connections = stats.active_connections.saturating_sub(1);
500        }
501    }
502
503    /// Update stats for cache hit
504    async fn update_stats_hit(&self) {
505        let mut stats = self.stats.lock().unwrap();
506        stats.connections_reused += 1;
507        let total_requests = stats.connections_created + stats.connections_reused;
508        stats.hit_rate = stats.connections_reused as f64 / total_requests as f64;
509    }
510
511    /// Update stats for cache miss
512    async fn update_stats_miss(&self) {
513        let stats = self.stats.lock().unwrap();
514        let total_requests = stats.connections_created + stats.connections_reused;
515        drop(stats);
516
517        let mut stats = self.stats.lock().unwrap();
518        stats.hit_rate = stats.connections_reused as f64 / total_requests as f64;
519    }
520
521    /// Cleanup expired connections
522    async fn cleanup_expired_connections(
523        connections: &Arc<RwLock<HashMap<PeerId, PooledConnection>>>,
524        stats: &Arc<Mutex<ConnectionPoolStats>>,
525        config: &ConnectionPoolConfig,
526    ) {
527        let now = Instant::now();
528        let mut to_remove = Vec::new();
529
530        // Find expired connections
531        {
532            let connections_read = connections.read().unwrap();
533            for (peer_id, pooled) in connections_read.iter() {
534                let idle_time = now.duration_since(pooled.last_used);
535                let age = now.duration_since(pooled.created_at);
536
537                if idle_time > config.max_idle_time || age > config.max_connection_age {
538                    to_remove.push(*peer_id);
539                }
540
541                // Also remove closed connections
542                if pooled.connection.close_reason().is_some() {
543                    to_remove.push(*peer_id);
544                }
545            }
546        }
547
548        // Remove expired connections
549        if !to_remove.is_empty() {
550            let mut connections_write = connections.write().unwrap();
551            for peer_id in &to_remove {
552                connections_write.remove(peer_id);
553            }
554
555            // Update stats
556            let mut stats_guard = stats.lock().unwrap();
557            stats_guard.connections_expired += to_remove.len() as u64;
558            stats_guard.active_connections = connections_write.len();
559
560            debug!("Cleaned up {} expired connections", to_remove.len());
561        }
562    }
563
564    /// Get connection pool statistics
565    pub async fn get_stats(&self) -> ConnectionPoolStats {
566        self.stats.lock().unwrap().clone()
567    }
568
569    /// Shutdown the connection pool
570    pub async fn shutdown(&mut self) {
571        if let Some(handle) = self.cleanup_handle.take() {
572            handle.abort();
573        }
574
575        // Close all connections
576        {
577            let connections = self.active_connections.read().unwrap();
578            for (_, pooled) in connections.iter() {
579                pooled.connection.close(VarInt::from_u32(0), b"shutdown");
580            }
581        }
582
583        info!("Connection pool shutdown complete");
584    }
585}
586
587impl CandidateCache {
588    /// Create a new candidate cache
589    pub fn new(config: CandidateCacheConfig) -> Self {
590        let cache = Self {
591            cache: Arc::new(RwLock::new(HashMap::new())),
592            config,
593            stats: Arc::new(Mutex::new(CandidateCacheStats::default())),
594            cleanup_handle: None,
595        };
596
597        cache
598    }
599
600    /// Start the candidate cache with cleanup task
601    pub async fn start(&mut self) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
602        let cache = Arc::clone(&self.cache);
603        let stats = Arc::clone(&self.stats);
604        let config = self.config.clone();
605
606        let cleanup_handle = tokio::spawn(async move {
607            let mut interval = tokio::time::interval(config.cleanup_interval);
608
609            loop {
610                interval.tick().await;
611                Self::cleanup_expired_entries(&cache, &stats, &config).await;
612            }
613        });
614
615        self.cleanup_handle = Some(cleanup_handle);
616        info!(
617            "Candidate cache started with max_size={}",
618            self.config.max_cache_size
619        );
620        Ok(())
621    }
622
623    /// Get cached candidates for a peer
624    pub async fn get_candidates(&self, peer_id: PeerId) -> Option<Vec<CandidateAddress>> {
625        let (is_valid, candidates) = {
626            let cache = self.cache.read().unwrap();
627
628            if let Some(cached_set) = cache.get(&peer_id) {
629                let now = Instant::now();
630
631                // Check if entry is still valid
632                if now.duration_since(cached_set.cached_at) <= cached_set.ttl {
633                    (true, Some(cached_set.candidates.clone()))
634                } else {
635                    (false, None)
636                }
637            } else {
638                (false, None)
639            }
640        };
641
642        if is_valid {
643            // Update access statistics
644            self.update_access_stats(peer_id, true).await;
645
646            if let Some(ref candidates) = candidates {
647                debug!(
648                    "Cache hit for peer {:?}, {} candidates",
649                    peer_id,
650                    candidates.len()
651                );
652            }
653            return candidates;
654        }
655
656        // Cache miss
657        self.update_access_stats(peer_id, false).await;
658        debug!("Cache miss for peer {:?}", peer_id);
659        None
660    }
661
662    /// Cache candidates for a peer
663    pub async fn cache_candidates(
664        &self,
665        peer_id: PeerId,
666        candidates: Vec<CandidateAddress>,
667        ttl: Option<Duration>,
668    ) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
669        let ttl = ttl.unwrap_or(self.config.default_ttl);
670
671        // Check cache size limit
672        {
673            let cache = self.cache.read().unwrap();
674            if cache.len() >= self.config.max_cache_size {
675                drop(cache);
676                self.evict_lru_entry().await;
677            }
678        }
679
680        let candidate_count = candidates.len();
681        let cached_set = CachedCandidateSet {
682            candidates,
683            cached_at: Instant::now(),
684            ttl,
685            access_count: 0,
686            last_accessed: Instant::now(),
687            validation_results: HashMap::new(),
688        };
689
690        // Add to cache
691        {
692            let mut cache = self.cache.write().unwrap();
693            cache.insert(peer_id, cached_set);
694        }
695
696        // Update stats
697        {
698            let mut stats = self.stats.lock().unwrap();
699            stats.current_size += 1;
700        }
701
702        debug!(
703            "Cached {} candidates for peer {:?} with TTL {:?}",
704            candidate_count, peer_id, ttl
705        );
706        Ok(())
707    }
708
709    /// Cache validation result for a candidate
710    pub async fn cache_validation_result(
711        &self,
712        peer_id: PeerId,
713        address: SocketAddr,
714        is_valid: bool,
715        rtt: Option<Duration>,
716    ) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
717        if !self.config.enable_validation_cache {
718            return Ok(());
719        }
720
721        let mut cache = self.cache.write().unwrap();
722
723        if let Some(cached_set) = cache.get_mut(&peer_id) {
724            let validation_entry = ValidationCacheEntry {
725                is_valid,
726                rtt,
727                cached_at: Instant::now(),
728                ttl: self.config.validation_ttl,
729            };
730
731            cached_set
732                .validation_results
733                .insert(address, validation_entry);
734            debug!(
735                "Cached validation result for {}:{} -> {}",
736                peer_id.0[0], address, is_valid
737            );
738        }
739
740        Ok(())
741    }
742
743    /// Get cached validation result
744    pub async fn get_validation_result(
745        &self,
746        peer_id: PeerId,
747        address: SocketAddr,
748    ) -> Option<(bool, Option<Duration>)> {
749        if !self.config.enable_validation_cache {
750            return None;
751        }
752
753        let cache = self.cache.read().unwrap();
754
755        if let Some(cached_set) = cache.get(&peer_id) {
756            if let Some(validation_entry) = cached_set.validation_results.get(&address) {
757                let now = Instant::now();
758
759                // Check if validation result is still valid
760                if now.duration_since(validation_entry.cached_at) <= validation_entry.ttl {
761                    return Some((validation_entry.is_valid, validation_entry.rtt));
762                }
763            }
764        }
765
766        None
767    }
768
769    /// Update access statistics
770    async fn update_access_stats(&self, peer_id: PeerId, hit: bool) {
771        // Update cache-level stats
772        {
773            let mut stats = self.stats.lock().unwrap();
774            if hit {
775                stats.cache_hits += 1;
776            } else {
777                stats.cache_misses += 1;
778            }
779
780            let total_accesses = stats.cache_hits + stats.cache_misses;
781            stats.hit_rate = stats.cache_hits as f64 / total_accesses as f64;
782        }
783
784        // Update entry-level stats
785        if hit {
786            let mut cache = self.cache.write().unwrap();
787            if let Some(cached_set) = cache.get_mut(&peer_id) {
788                cached_set.access_count += 1;
789                cached_set.last_accessed = Instant::now();
790            }
791        }
792    }
793
794    /// Evict least recently used entry
795    async fn evict_lru_entry(&self) {
796        let mut cache = self.cache.write().unwrap();
797
798        if let Some((lru_peer_id, _)) = cache
799            .iter()
800            .min_by_key(|(_, cached_set)| cached_set.last_accessed)
801            .map(|(peer_id, cached_set)| (*peer_id, cached_set.last_accessed))
802        {
803            cache.remove(&lru_peer_id);
804            debug!("Evicted LRU cache entry for peer {:?}", lru_peer_id);
805
806            // Update stats
807            let mut stats = self.stats.lock().unwrap();
808            stats.current_size = stats.current_size.saturating_sub(1);
809        }
810    }
811
812    /// Cleanup expired cache entries
813    async fn cleanup_expired_entries(
814        cache: &Arc<RwLock<HashMap<PeerId, CachedCandidateSet>>>,
815        stats: &Arc<Mutex<CandidateCacheStats>>,
816        _config: &CandidateCacheConfig,
817    ) {
818        let now = Instant::now();
819        let mut to_remove = Vec::new();
820
821        // Find expired entries
822        {
823            let cache_read = cache.read().unwrap();
824            for (peer_id, cached_set) in cache_read.iter() {
825                let age = now.duration_since(cached_set.cached_at);
826                if age > cached_set.ttl {
827                    to_remove.push(*peer_id);
828                }
829            }
830        }
831
832        // Remove expired entries
833        if !to_remove.is_empty() {
834            let mut cache_write = cache.write().unwrap();
835            for peer_id in &to_remove {
836                cache_write.remove(peer_id);
837            }
838
839            // Update stats
840            let mut stats_guard = stats.lock().unwrap();
841            stats_guard.entries_expired += to_remove.len() as u64;
842            stats_guard.current_size = cache_write.len();
843
844            debug!("Cleaned up {} expired cache entries", to_remove.len());
845        }
846
847        // Also cleanup expired validation results
848        {
849            let mut cache_write = cache.write().unwrap();
850            for cached_set in cache_write.values_mut() {
851                cached_set.validation_results.retain(|_, validation_entry| {
852                    now.duration_since(validation_entry.cached_at) <= validation_entry.ttl
853                });
854            }
855        }
856    }
857
858    /// Get cache statistics
859    pub async fn get_stats(&self) -> CandidateCacheStats {
860        self.stats.lock().unwrap().clone()
861    }
862
863    /// Shutdown the candidate cache
864    pub async fn shutdown(&mut self) {
865        if let Some(handle) = self.cleanup_handle.take() {
866            handle.abort();
867        }
868
869        // Clear cache
870        {
871            let mut cache = self.cache.write().unwrap();
872            cache.clear();
873        }
874
875        info!("Candidate cache shutdown complete");
876    }
877}
878
879impl SessionCleanupCoordinator {
880    /// Create a new session cleanup coordinator
881    pub fn new(config: SessionCleanupConfig) -> Self {
882        Self {
883            active_sessions: Arc::new(RwLock::new(HashMap::new())),
884            config,
885            stats: Arc::new(Mutex::new(SessionCleanupStats::default())),
886            cleanup_handle: None,
887        }
888    }
889
890    /// Start the session cleanup coordinator
891    pub async fn start(&mut self) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
892        let sessions = Arc::clone(&self.active_sessions);
893        let stats = Arc::clone(&self.stats);
894        let config = self.config.clone();
895
896        let cleanup_handle = tokio::spawn(async move {
897            let mut interval = tokio::time::interval(config.cleanup_interval);
898
899            loop {
900                interval.tick().await;
901                Self::cleanup_expired_sessions(&sessions, &stats, &config).await;
902            }
903        });
904
905        self.cleanup_handle = Some(cleanup_handle);
906        info!("Session cleanup coordinator started");
907        Ok(())
908    }
909
910    /// Register a new session
911    pub async fn register_session(
912        &self,
913        peer_id: PeerId,
914        memory_usage: usize,
915        priority: CleanupPriority,
916    ) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
917        let session_state = SessionState {
918            peer_id,
919            created_at: Instant::now(),
920            last_activity: Instant::now(),
921            memory_usage,
922            is_active: true,
923            cleanup_priority: priority,
924        };
925
926        {
927            let mut sessions = self.active_sessions.write().unwrap();
928            sessions.insert(peer_id, session_state);
929        }
930
931        // Update stats
932        {
933            let mut stats = self.stats.lock().unwrap();
934            stats.active_sessions += 1;
935        }
936
937        debug!(
938            "Registered session for peer {:?} with {} bytes",
939            peer_id, memory_usage
940        );
941        Ok(())
942    }
943
944    /// Update session activity
945    pub async fn update_session_activity(&self, peer_id: PeerId) {
946        let mut sessions = self.active_sessions.write().unwrap();
947        if let Some(session) = sessions.get_mut(&peer_id) {
948            session.last_activity = Instant::now();
949        }
950    }
951
952    /// Cleanup expired sessions
953    async fn cleanup_expired_sessions(
954        sessions: &Arc<RwLock<HashMap<PeerId, SessionState>>>,
955        stats: &Arc<Mutex<SessionCleanupStats>>,
956        config: &SessionCleanupConfig,
957    ) {
958        let now = Instant::now();
959        let mut to_remove = Vec::new();
960        let mut memory_freed = 0u64;
961
962        // Check for memory pressure
963        let memory_pressure = Self::check_memory_pressure(config);
964
965        // Find sessions to cleanup
966        {
967            let sessions_read = sessions.read().unwrap();
968            for (peer_id, session) in sessions_read.iter() {
969                let idle_time = now.duration_since(session.last_activity);
970                let age = now.duration_since(session.created_at);
971
972                let should_cleanup = if memory_pressure && config.enable_aggressive_cleanup {
973                    // Aggressive cleanup under memory pressure
974                    match session.cleanup_priority {
975                        CleanupPriority::High => idle_time > Duration::from_secs(30),
976                        CleanupPriority::Normal => idle_time > Duration::from_secs(60),
977                        CleanupPriority::Low => idle_time > config.max_idle_time / 2,
978                    }
979                } else {
980                    // Normal cleanup rules
981                    idle_time > config.max_idle_time || age > config.max_session_age
982                };
983
984                if should_cleanup {
985                    to_remove.push(*peer_id);
986                    memory_freed += session.memory_usage as u64;
987                }
988            }
989        }
990
991        // Remove expired sessions
992        if !to_remove.is_empty() {
993            let mut sessions_write = sessions.write().unwrap();
994            for peer_id in &to_remove {
995                sessions_write.remove(peer_id);
996            }
997
998            // Update stats
999            let mut stats_guard = stats.lock().unwrap();
1000            stats_guard.sessions_cleaned += to_remove.len() as u64;
1001            stats_guard.memory_freed += memory_freed;
1002            stats_guard.active_sessions = sessions_write.len();
1003
1004            if memory_pressure {
1005                info!(
1006                    "Aggressive cleanup: removed {} sessions, freed {} bytes",
1007                    to_remove.len(),
1008                    memory_freed
1009                );
1010            } else {
1011                debug!(
1012                    "Regular cleanup: removed {} sessions, freed {} bytes",
1013                    to_remove.len(),
1014                    memory_freed
1015                );
1016            }
1017        }
1018    }
1019
1020    /// Check if system is under memory pressure
1021    fn check_memory_pressure(config: &SessionCleanupConfig) -> bool {
1022        // Simplified memory pressure detection
1023        // In production, this would check actual system memory usage
1024        // For now, return false (no memory pressure)
1025        // This can be enhanced with system memory monitoring
1026        let _ = config.memory_pressure_threshold;
1027        false
1028    }
1029
1030    /// Get session cleanup statistics
1031    pub async fn get_stats(&self) -> SessionCleanupStats {
1032        self.stats.lock().unwrap().clone()
1033    }
1034
1035    /// Shutdown the session cleanup coordinator
1036    pub async fn shutdown(&mut self) {
1037        if let Some(handle) = self.cleanup_handle.take() {
1038            handle.abort();
1039        }
1040
1041        // Clear sessions
1042        {
1043            let mut sessions = self.active_sessions.write().unwrap();
1044            sessions.clear();
1045        }
1046
1047        info!("Session cleanup coordinator shutdown complete");
1048    }
1049}
1050
1051impl FrameBatchingCoordinator {
1052    /// Create a new frame batching coordinator
1053    pub fn new(config: FrameBatchingConfig) -> Self {
1054        Self {
1055            pending_frames: Arc::new(Mutex::new(HashMap::new())),
1056            config,
1057            stats: Arc::new(Mutex::new(FrameBatchingStats::default())),
1058            flush_handle: None,
1059        }
1060    }
1061
1062    /// Start the frame batching coordinator
1063    pub async fn start(&mut self) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
1064        let pending_frames = Arc::clone(&self.pending_frames);
1065        let stats = Arc::clone(&self.stats);
1066        let config = self.config.clone();
1067
1068        let flush_handle = tokio::spawn(async move {
1069            let mut interval = tokio::time::interval(config.max_batch_delay / 4);
1070
1071            loop {
1072                interval.tick().await;
1073                Self::flush_expired_batches(&pending_frames, &stats, &config).await;
1074            }
1075        });
1076
1077        self.flush_handle = Some(flush_handle);
1078        info!("Frame batching coordinator started");
1079        Ok(())
1080    }
1081
1082    /// Add frame to batch
1083    pub async fn add_frame(
1084        &self,
1085        destination: SocketAddr,
1086        frame_type: u8,
1087        payload: Vec<u8>,
1088        priority: FramePriority,
1089    ) -> Result<Option<Vec<u8>>, Box<dyn std::error::Error + Send + Sync>> {
1090        let frame = BatchedFrame {
1091            frame_type,
1092            size: payload.len(),
1093            payload,
1094            priority,
1095            created_at: Instant::now(),
1096        };
1097
1098        let mut pending = self.pending_frames.lock().unwrap();
1099
1100        // Check if we need to flush
1101        let (should_flush, frames_count, total_size) = {
1102            let batch_set = pending
1103                .entry(destination)
1104                .or_insert_with(|| BatchedFrameSet {
1105                    frames: Vec::new(),
1106                    total_size: 0,
1107                    created_at: Instant::now(),
1108                    destination,
1109                    priority: BatchPriority::Normal,
1110                });
1111
1112            batch_set.frames.push(frame);
1113            batch_set.total_size += batch_set.frames.last().unwrap().size;
1114
1115            // Update batch priority based on frame priority
1116            if priority == FramePriority::Urgent {
1117                batch_set.priority = BatchPriority::High;
1118            }
1119
1120            // Check if batch should be flushed immediately
1121            let should_flush = self.should_flush_batch(batch_set);
1122            (should_flush, batch_set.frames.len(), batch_set.total_size)
1123        };
1124
1125        if should_flush {
1126            // Remove and serialize the batch
1127            if let Some(batch_set) = pending.remove(&destination) {
1128                let batch_data = self.serialize_batch(&batch_set);
1129
1130                // Update stats
1131                {
1132                    let mut stats = self.stats.lock().unwrap();
1133                    stats.batches_sent += 1;
1134                    stats.frames_batched += frames_count as u64;
1135                    stats.avg_batch_size = (stats.avg_batch_size * (stats.batches_sent - 1) as f64
1136                        + total_size as f64)
1137                        / stats.batches_sent as f64;
1138                }
1139
1140                debug!(
1141                    "Flushed batch to {} with {} frames ({} bytes)",
1142                    destination, frames_count, total_size
1143                );
1144                return Ok(Some(batch_data));
1145            }
1146        }
1147
1148        Ok(None)
1149    }
1150
1151    /// Check if batch should be flushed
1152    fn should_flush_batch(&self, batch_set: &BatchedFrameSet) -> bool {
1153        let now = Instant::now();
1154        let age = now.duration_since(batch_set.created_at);
1155
1156        // Flush if batch is full, old, or high priority
1157        batch_set.total_size >= self.config.max_batch_size
1158            || batch_set.frames.len() >= self.config.max_frames_per_batch
1159            || age >= self.config.max_batch_delay
1160            || batch_set.priority == BatchPriority::High
1161    }
1162
1163    /// Serialize batch into packet data
1164    fn serialize_batch(&self, batch_set: &BatchedFrameSet) -> Vec<u8> {
1165        let mut data = Vec::with_capacity(batch_set.total_size + batch_set.frames.len() * 4);
1166
1167        for frame in &batch_set.frames {
1168            // Add frame type and length
1169            data.push(frame.frame_type);
1170            data.extend_from_slice(&(frame.payload.len() as u16).to_be_bytes());
1171            // Add frame payload
1172            data.extend_from_slice(&frame.payload);
1173        }
1174
1175        data
1176    }
1177
1178    /// Flush expired batches
1179    async fn flush_expired_batches(
1180        pending_frames: &Arc<Mutex<HashMap<SocketAddr, BatchedFrameSet>>>,
1181        stats: &Arc<Mutex<FrameBatchingStats>>,
1182        config: &FrameBatchingConfig,
1183    ) {
1184        let now = Instant::now();
1185        let mut to_flush = Vec::new();
1186
1187        // Find expired batches
1188        {
1189            let pending = pending_frames.lock().unwrap();
1190            for (destination, batch_set) in pending.iter() {
1191                let age = now.duration_since(batch_set.created_at);
1192                if age >= config.max_batch_delay {
1193                    to_flush.push((*destination, batch_set.frames.len(), batch_set.total_size));
1194                }
1195            }
1196        }
1197
1198        // Flush expired batches
1199        if !to_flush.is_empty() {
1200            let mut pending = pending_frames.lock().unwrap();
1201            let flush_count = to_flush.len();
1202            for (destination, frame_count, total_size) in to_flush {
1203                pending.remove(&destination);
1204
1205                // Update stats
1206                let mut stats_guard = stats.lock().unwrap();
1207                stats_guard.batches_sent += 1;
1208                stats_guard.frames_batched += frame_count as u64;
1209                stats_guard.avg_batch_size = (stats_guard.avg_batch_size
1210                    * (stats_guard.batches_sent - 1) as f64
1211                    + total_size as f64)
1212                    / stats_guard.batches_sent as f64;
1213            }
1214
1215            debug!("Flushed {} expired batches", flush_count);
1216        }
1217    }
1218
1219    /// Get batching statistics
1220    pub async fn get_stats(&self) -> FrameBatchingStats {
1221        self.stats.lock().unwrap().clone()
1222    }
1223
1224    /// Shutdown the frame batching coordinator
1225    pub async fn shutdown(&mut self) {
1226        if let Some(handle) = self.flush_handle.take() {
1227            handle.abort();
1228        }
1229
1230        // Flush all pending batches
1231        {
1232            let mut pending = self.pending_frames.lock().unwrap();
1233            pending.clear();
1234        }
1235
1236        info!("Frame batching coordinator shutdown complete");
1237    }
1238}
1239
1240/// Memory optimization manager that coordinates all memory optimization components
1241#[derive(Debug)]
1242pub struct MemoryOptimizationManager {
1243    connection_pool: ConnectionPool,
1244    candidate_cache: CandidateCache,
1245    session_cleanup: SessionCleanupCoordinator,
1246    frame_batching: FrameBatchingCoordinator,
1247    is_running: bool,
1248}
1249
1250impl MemoryOptimizationManager {
1251    /// Create a new memory optimization manager with default configurations
1252    pub fn new() -> Self {
1253        Self {
1254            connection_pool: ConnectionPool::new(ConnectionPoolConfig::default()),
1255            candidate_cache: CandidateCache::new(CandidateCacheConfig::default()),
1256            session_cleanup: SessionCleanupCoordinator::new(SessionCleanupConfig::default()),
1257            frame_batching: FrameBatchingCoordinator::new(FrameBatchingConfig::default()),
1258            is_running: false,
1259        }
1260    }
1261
1262    /// Create a new memory optimization manager with custom configurations
1263    pub fn with_configs(
1264        pool_config: ConnectionPoolConfig,
1265        cache_config: CandidateCacheConfig,
1266        cleanup_config: SessionCleanupConfig,
1267        batching_config: FrameBatchingConfig,
1268    ) -> Self {
1269        Self {
1270            connection_pool: ConnectionPool::new(pool_config),
1271            candidate_cache: CandidateCache::new(cache_config),
1272            session_cleanup: SessionCleanupCoordinator::new(cleanup_config),
1273            frame_batching: FrameBatchingCoordinator::new(batching_config),
1274            is_running: false,
1275        }
1276    }
1277
1278    /// Start all memory optimization components
1279    pub async fn start(&mut self) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
1280        if self.is_running {
1281            return Ok(());
1282        }
1283
1284        self.connection_pool.start().await?;
1285        self.candidate_cache.start().await?;
1286        self.session_cleanup.start().await?;
1287        self.frame_batching.start().await?;
1288
1289        self.is_running = true;
1290        info!("Memory optimization manager started");
1291        Ok(())
1292    }
1293
1294    /// Get connection pool reference
1295    pub fn connection_pool(&self) -> &ConnectionPool {
1296        &self.connection_pool
1297    }
1298
1299    /// Get candidate cache reference
1300    pub fn candidate_cache(&self) -> &CandidateCache {
1301        &self.candidate_cache
1302    }
1303
1304    /// Get session cleanup coordinator reference
1305    pub fn session_cleanup(&self) -> &SessionCleanupCoordinator {
1306        &self.session_cleanup
1307    }
1308
1309    /// Get frame batching coordinator reference
1310    pub fn frame_batching(&self) -> &FrameBatchingCoordinator {
1311        &self.frame_batching
1312    }
1313
1314    /// Get comprehensive memory optimization statistics
1315    pub async fn get_comprehensive_stats(&self) -> MemoryOptimizationStats {
1316        MemoryOptimizationStats {
1317            connection_pool: self.connection_pool.get_stats().await,
1318            candidate_cache: self.candidate_cache.get_stats().await,
1319            session_cleanup: self.session_cleanup.get_stats().await,
1320            frame_batching: self.frame_batching.get_stats().await,
1321        }
1322    }
1323
1324    /// Shutdown all memory optimization components
1325    pub async fn shutdown(&mut self) {
1326        if !self.is_running {
1327            return;
1328        }
1329
1330        self.connection_pool.shutdown().await;
1331        self.candidate_cache.shutdown().await;
1332        self.session_cleanup.shutdown().await;
1333        self.frame_batching.shutdown().await;
1334
1335        self.is_running = false;
1336        info!("Memory optimization manager shutdown complete");
1337    }
1338}
1339
1340/// Comprehensive memory optimization statistics
1341#[derive(Debug, Clone)]
1342pub struct MemoryOptimizationStats {
1343    pub connection_pool: ConnectionPoolStats,
1344    pub candidate_cache: CandidateCacheStats,
1345    pub session_cleanup: SessionCleanupStats,
1346    pub frame_batching: FrameBatchingStats,
1347}
1348
1349impl Default for MemoryOptimizationManager {
1350    fn default() -> Self {
1351        Self::new()
1352    }
1353}