1use std::{
10 collections::HashMap,
11 net::SocketAddr,
12 sync::{Arc, Mutex, RwLock},
13 time::{Duration, Instant},
14};
15
16use tracing::{debug, info};
17
18#[cfg(feature = "production-ready")]
19use crate::{HighLevelConnection as QuinnConnection, Endpoint as QuinnEndpoint};
20
21use crate::{
22 nat_traversal_api::{CandidateAddress, PeerId},
23 VarInt,
24};
25
26#[derive(Debug)]
28pub struct ConnectionPool {
29 active_connections: Arc<RwLock<HashMap<PeerId, PooledConnection>>>,
31 config: ConnectionPoolConfig,
33 stats: Arc<Mutex<ConnectionPoolStats>>,
35 cleanup_handle: Option<tokio::task::JoinHandle<()>>,
37}
38
39#[derive(Debug, Clone)]
41pub struct ConnectionPoolConfig {
42 pub max_connections: usize,
44 pub max_idle_time: Duration,
46 pub cleanup_interval: Duration,
48 pub enable_reuse: bool,
50 pub max_connection_age: Duration,
52}
53
54#[derive(Debug)]
56struct PooledConnection {
57 #[cfg(feature = "production-ready")]
58 connection: Arc<QuinnConnection>,
59 #[cfg(not(feature = "production-ready"))]
60 _placeholder: (),
61 peer_id: PeerId,
62 remote_address: SocketAddr,
63 created_at: Instant,
64 last_used: Instant,
65 use_count: u64,
66 is_active: bool,
67}
68
69#[derive(Debug, Default, Clone)]
71pub struct ConnectionPoolStats {
72 pub connections_created: u64,
74 pub connections_reused: u64,
76 pub connections_expired: u64,
78 pub active_connections: usize,
80 pub hit_rate: f64,
82 pub avg_connection_age: Duration,
84}
85
86#[derive(Debug)]
88pub struct CandidateCache {
89 cache: Arc<RwLock<HashMap<PeerId, CachedCandidateSet>>>,
91 config: CandidateCacheConfig,
93 stats: Arc<Mutex<CandidateCacheStats>>,
95 cleanup_handle: Option<tokio::task::JoinHandle<()>>,
97}
98
99#[derive(Debug, Clone)]
101pub struct CandidateCacheConfig {
102 pub default_ttl: Duration,
104 pub max_cache_size: usize,
106 pub cleanup_interval: Duration,
108 pub enable_validation_cache: bool,
110 pub validation_ttl: Duration,
112}
113
114#[derive(Debug, Clone)]
116struct CachedCandidateSet {
117 candidates: Vec<CandidateAddress>,
118 cached_at: Instant,
119 ttl: Duration,
120 access_count: u64,
121 last_accessed: Instant,
122 validation_results: HashMap<SocketAddr, ValidationCacheEntry>,
123}
124
125#[derive(Debug, Clone)]
127struct ValidationCacheEntry {
128 is_valid: bool,
129 rtt: Option<Duration>,
130 cached_at: Instant,
131 ttl: Duration,
132}
133
134#[derive(Debug, Default, Clone)]
136pub struct CandidateCacheStats {
137 pub cache_hits: u64,
139 pub cache_misses: u64,
141 pub entries_expired: u64,
143 pub current_size: usize,
145 pub hit_rate: f64,
147 pub avg_entry_age: Duration,
149}
150
151#[derive(Debug)]
153pub struct SessionCleanupCoordinator {
154 active_sessions: Arc<RwLock<HashMap<PeerId, SessionState>>>,
156 config: SessionCleanupConfig,
158 stats: Arc<Mutex<SessionCleanupStats>>,
160 cleanup_handle: Option<tokio::task::JoinHandle<()>>,
162}
163
164#[derive(Debug, Clone)]
166pub struct SessionCleanupConfig {
167 pub max_idle_time: Duration,
169 pub max_session_age: Duration,
171 pub cleanup_interval: Duration,
173 pub enable_aggressive_cleanup: bool,
175 pub memory_pressure_threshold: usize,
177}
178
179#[derive(Debug)]
181struct SessionState {
182 peer_id: PeerId,
183 created_at: Instant,
184 last_activity: Instant,
185 memory_usage: usize,
186 is_active: bool,
187 cleanup_priority: CleanupPriority,
188}
189
190#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord)]
192pub enum CleanupPriority {
193 Low, Normal, High, }
197
198#[derive(Debug, Default, Clone)]
200pub struct SessionCleanupStats {
201 pub sessions_cleaned: u64,
203 pub memory_freed: u64,
205 pub active_sessions: usize,
207 pub avg_session_lifetime: Duration,
209}
210
211#[derive(Debug)]
213pub struct FrameBatchingCoordinator {
214 pending_frames: Arc<Mutex<HashMap<SocketAddr, BatchedFrameSet>>>,
216 config: FrameBatchingConfig,
218 stats: Arc<Mutex<FrameBatchingStats>>,
220 flush_handle: Option<tokio::task::JoinHandle<()>>,
222}
223
224#[derive(Debug, Clone)]
226pub struct FrameBatchingConfig {
227 pub max_batch_size: usize,
229 pub max_batch_delay: Duration,
231 pub max_frames_per_batch: usize,
233 pub enable_adaptive_batching: bool,
235 pub min_batch_size: usize,
237}
238
239#[derive(Debug)]
241struct BatchedFrameSet {
242 frames: Vec<BatchedFrame>,
243 total_size: usize,
244 created_at: Instant,
245 destination: SocketAddr,
246 priority: BatchPriority,
247}
248
249#[derive(Debug)]
251struct BatchedFrame {
252 frame_type: u8,
253 payload: Vec<u8>,
254 size: usize,
255 priority: FramePriority,
256 created_at: Instant,
257}
258
259#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord)]
261enum BatchPriority {
262 Low, Normal, High, }
266
267#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord)]
269pub enum FramePriority {
270 Background, Normal, Urgent, }
274
275#[derive(Debug, Default, Clone)]
277pub struct FrameBatchingStats {
278 pub frames_batched: u64,
280 pub batches_sent: u64,
282 pub avg_batch_size: f64,
284 pub bytes_saved: u64,
286 pub batching_efficiency: f64,
288}
289
290impl Default for ConnectionPoolConfig {
291 fn default() -> Self {
292 Self {
293 max_connections: 1000,
294 max_idle_time: Duration::from_secs(300), cleanup_interval: Duration::from_secs(60), enable_reuse: true,
297 max_connection_age: Duration::from_secs(3600), }
299 }
300}
301
302impl Default for CandidateCacheConfig {
303 fn default() -> Self {
304 Self {
305 default_ttl: Duration::from_secs(300), max_cache_size: 10000,
307 cleanup_interval: Duration::from_secs(60), enable_validation_cache: true,
309 validation_ttl: Duration::from_secs(60), }
311 }
312}
313
314impl Default for SessionCleanupConfig {
315 fn default() -> Self {
316 Self {
317 max_idle_time: Duration::from_secs(600), max_session_age: Duration::from_secs(3600), cleanup_interval: Duration::from_secs(120), enable_aggressive_cleanup: true,
321 memory_pressure_threshold: 512, }
323 }
324}
325
326impl Default for FrameBatchingConfig {
327 fn default() -> Self {
328 Self {
329 max_batch_size: 1200, max_batch_delay: Duration::from_millis(10), max_frames_per_batch: 10,
332 enable_adaptive_batching: true,
333 min_batch_size: 200, }
335 }
336}
337
338impl ConnectionPool {
339 pub fn new(config: ConnectionPoolConfig) -> Self {
341 let pool = Self {
342 active_connections: Arc::new(RwLock::new(HashMap::new())),
343 config,
344 stats: Arc::new(Mutex::new(ConnectionPoolStats::default())),
345 cleanup_handle: None,
346 };
347
348 pool
349 }
350
351 pub async fn start(&mut self) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
353 let connections = Arc::clone(&self.active_connections);
354 let stats = Arc::clone(&self.stats);
355 let config = self.config.clone();
356
357 let cleanup_handle = tokio::spawn(async move {
358 let mut interval = tokio::time::interval(config.cleanup_interval);
359
360 loop {
361 interval.tick().await;
362 Self::cleanup_expired_connections(&connections, &stats, &config).await;
363 }
364 });
365
366 self.cleanup_handle = Some(cleanup_handle);
367 info!("Connection pool started with max_connections={}", self.config.max_connections);
368 Ok(())
369 }
370
371 #[cfg(feature = "production-ready")]
373 pub async fn get_connection(
374 &self,
375 peer_id: PeerId,
376 remote_address: SocketAddr,
377 endpoint: &QuinnEndpoint,
378 ) -> Result<Arc<QuinnConnection>, Box<dyn std::error::Error + Send + Sync>> {
379 if let Some(connection) = self.try_get_existing_connection(peer_id, remote_address).await {
381 self.update_stats_hit().await;
382 return Ok(connection);
383 }
384
385 let connection = self.create_new_connection(peer_id, remote_address, endpoint).await?;
387 self.update_stats_miss().await;
388 Ok(connection)
389 }
390
391 #[cfg(feature = "production-ready")]
393 async fn try_get_existing_connection(
394 &self,
395 peer_id: PeerId,
396 remote_address: SocketAddr,
397 ) -> Option<Arc<QuinnConnection>> {
398 let mut connections = self.active_connections.write().unwrap();
399
400 if let Some(pooled) = connections.get_mut(&peer_id) {
401 if pooled.is_active && pooled.remote_address == remote_address {
402 if pooled.connection.close_reason().is_none() {
404 pooled.last_used = Instant::now();
405 pooled.use_count += 1;
406 debug!("Reusing pooled connection for peer {:?}", peer_id);
407 return Some(Arc::clone(&pooled.connection));
408 } else {
409 connections.remove(&peer_id);
411 }
412 }
413 }
414
415 None
416 }
417
418 #[cfg(feature = "production-ready")]
420 async fn create_new_connection(
421 &self,
422 peer_id: PeerId,
423 remote_address: SocketAddr,
424 endpoint: &QuinnEndpoint,
425 ) -> Result<Arc<QuinnConnection>, Box<dyn std::error::Error + Send + Sync>> {
426 {
428 let connections = self.active_connections.read().unwrap();
429 if connections.len() >= self.config.max_connections {
430 drop(connections);
432 self.evict_lru_connection().await;
433 }
434 }
435
436 let rustls_config = rustls::ClientConfig::builder()
438 .with_root_certificates(rustls::RootCertStore::empty())
439 .with_no_client_auth();
440
441 let client_crypto = crate::crypto::rustls::QuicClientConfig::try_from(rustls_config)
442 .map_err(|e| format!("Failed to create QUIC client config: {}", e))?;
443
444 let client_config = crate::ClientConfig::new(Arc::new(client_crypto));
445
446 let connecting = endpoint.connect_with(client_config, remote_address, "ant-quic")
447 .map_err(|e| format!("Failed to initiate connection: {}", e))?;
448
449 let connection = connecting.await
451 .map_err(|e| format!("Connection failed: {}", e))?;
452
453 let connection_arc = Arc::new(connection);
454
455 let pooled = PooledConnection {
456 connection: Arc::clone(&connection_arc),
457 peer_id,
458 remote_address,
459 created_at: Instant::now(),
460 last_used: Instant::now(),
461 use_count: 1,
462 is_active: true,
463 };
464
465 {
467 let mut connections = self.active_connections.write().unwrap();
468 connections.insert(peer_id, pooled);
469 }
470
471 {
473 let mut stats = self.stats.lock().unwrap();
474 stats.connections_created += 1;
475 stats.active_connections += 1;
476 }
477
478 info!("Created new pooled connection for peer {:?}", peer_id);
479 Ok(connection_arc)
480 }
481
482 async fn evict_lru_connection(&self) {
484 let mut connections = self.active_connections.write().unwrap();
485
486 if let Some((lru_peer_id, _)) = connections
487 .iter()
488 .min_by_key(|(_, pooled)| pooled.last_used)
489 .map(|(peer_id, pooled)| (*peer_id, pooled.last_used))
490 {
491 connections.remove(&lru_peer_id);
492 debug!("Evicted LRU connection for peer {:?}", lru_peer_id);
493
494 let mut stats = self.stats.lock().unwrap();
496 stats.active_connections = stats.active_connections.saturating_sub(1);
497 }
498 }
499
500 async fn update_stats_hit(&self) {
502 let mut stats = self.stats.lock().unwrap();
503 stats.connections_reused += 1;
504 let total_requests = stats.connections_created + stats.connections_reused;
505 stats.hit_rate = stats.connections_reused as f64 / total_requests as f64;
506 }
507
508 async fn update_stats_miss(&self) {
510 let stats = self.stats.lock().unwrap();
511 let total_requests = stats.connections_created + stats.connections_reused;
512 drop(stats);
513
514 let mut stats = self.stats.lock().unwrap();
515 stats.hit_rate = stats.connections_reused as f64 / total_requests as f64;
516 }
517
518 async fn cleanup_expired_connections(
520 connections: &Arc<RwLock<HashMap<PeerId, PooledConnection>>>,
521 stats: &Arc<Mutex<ConnectionPoolStats>>,
522 config: &ConnectionPoolConfig,
523 ) {
524 let now = Instant::now();
525 let mut to_remove = Vec::new();
526
527 {
529 let connections_read = connections.read().unwrap();
530 for (peer_id, pooled) in connections_read.iter() {
531 let idle_time = now.duration_since(pooled.last_used);
532 let age = now.duration_since(pooled.created_at);
533
534 if idle_time > config.max_idle_time || age > config.max_connection_age {
535 to_remove.push(*peer_id);
536 }
537
538 #[cfg(feature = "production-ready")]
539 {
540 if pooled.connection.close_reason().is_some() {
542 to_remove.push(*peer_id);
543 }
544 }
545 }
546 }
547
548 if !to_remove.is_empty() {
550 let mut connections_write = connections.write().unwrap();
551 for peer_id in &to_remove {
552 connections_write.remove(peer_id);
553 }
554
555 let mut stats_guard = stats.lock().unwrap();
557 stats_guard.connections_expired += to_remove.len() as u64;
558 stats_guard.active_connections = connections_write.len();
559
560 debug!("Cleaned up {} expired connections", to_remove.len());
561 }
562 }
563
564 pub async fn get_stats(&self) -> ConnectionPoolStats {
566 self.stats.lock().unwrap().clone()
567 }
568
569 pub async fn shutdown(&mut self) {
571 if let Some(handle) = self.cleanup_handle.take() {
572 handle.abort();
573 }
574
575 #[cfg(feature = "production-ready")]
577 {
578 let connections = self.active_connections.read().unwrap();
579 for (_, pooled) in connections.iter() {
580 pooled.connection.close(VarInt::from_u32(0), b"shutdown");
581 }
582 }
583
584 info!("Connection pool shutdown complete");
585 }
586}
587
588impl CandidateCache {
589 pub fn new(config: CandidateCacheConfig) -> Self {
591 let cache = Self {
592 cache: Arc::new(RwLock::new(HashMap::new())),
593 config,
594 stats: Arc::new(Mutex::new(CandidateCacheStats::default())),
595 cleanup_handle: None,
596 };
597
598 cache
599 }
600
601 pub async fn start(&mut self) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
603 let cache = Arc::clone(&self.cache);
604 let stats = Arc::clone(&self.stats);
605 let config = self.config.clone();
606
607 let cleanup_handle = tokio::spawn(async move {
608 let mut interval = tokio::time::interval(config.cleanup_interval);
609
610 loop {
611 interval.tick().await;
612 Self::cleanup_expired_entries(&cache, &stats, &config).await;
613 }
614 });
615
616 self.cleanup_handle = Some(cleanup_handle);
617 info!("Candidate cache started with max_size={}", self.config.max_cache_size);
618 Ok(())
619 }
620
621 pub async fn get_candidates(&self, peer_id: PeerId) -> Option<Vec<CandidateAddress>> {
623 let (is_valid, candidates) = {
624 let cache = self.cache.read().unwrap();
625
626 if let Some(cached_set) = cache.get(&peer_id) {
627 let now = Instant::now();
628
629 if now.duration_since(cached_set.cached_at) <= cached_set.ttl {
631 (true, Some(cached_set.candidates.clone()))
632 } else {
633 (false, None)
634 }
635 } else {
636 (false, None)
637 }
638 };
639
640 if is_valid {
641 self.update_access_stats(peer_id, true).await;
643
644 if let Some(ref candidates) = candidates {
645 debug!("Cache hit for peer {:?}, {} candidates", peer_id, candidates.len());
646 }
647 return candidates;
648 }
649
650 self.update_access_stats(peer_id, false).await;
652 debug!("Cache miss for peer {:?}", peer_id);
653 None
654 }
655
656 pub async fn cache_candidates(
658 &self,
659 peer_id: PeerId,
660 candidates: Vec<CandidateAddress>,
661 ttl: Option<Duration>,
662 ) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
663 let ttl = ttl.unwrap_or(self.config.default_ttl);
664
665 {
667 let cache = self.cache.read().unwrap();
668 if cache.len() >= self.config.max_cache_size {
669 drop(cache);
670 self.evict_lru_entry().await;
671 }
672 }
673
674 let candidate_count = candidates.len();
675 let cached_set = CachedCandidateSet {
676 candidates,
677 cached_at: Instant::now(),
678 ttl,
679 access_count: 0,
680 last_accessed: Instant::now(),
681 validation_results: HashMap::new(),
682 };
683
684 {
686 let mut cache = self.cache.write().unwrap();
687 cache.insert(peer_id, cached_set);
688 }
689
690 {
692 let mut stats = self.stats.lock().unwrap();
693 stats.current_size += 1;
694 }
695
696 debug!("Cached {} candidates for peer {:?} with TTL {:?}",
697 candidate_count, peer_id, ttl);
698 Ok(())
699 }
700
701 pub async fn cache_validation_result(
703 &self,
704 peer_id: PeerId,
705 address: SocketAddr,
706 is_valid: bool,
707 rtt: Option<Duration>,
708 ) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
709 if !self.config.enable_validation_cache {
710 return Ok(());
711 }
712
713 let mut cache = self.cache.write().unwrap();
714
715 if let Some(cached_set) = cache.get_mut(&peer_id) {
716 let validation_entry = ValidationCacheEntry {
717 is_valid,
718 rtt,
719 cached_at: Instant::now(),
720 ttl: self.config.validation_ttl,
721 };
722
723 cached_set.validation_results.insert(address, validation_entry);
724 debug!("Cached validation result for {}:{} -> {}", peer_id.0[0], address, is_valid);
725 }
726
727 Ok(())
728 }
729
730 pub async fn get_validation_result(
732 &self,
733 peer_id: PeerId,
734 address: SocketAddr,
735 ) -> Option<(bool, Option<Duration>)> {
736 if !self.config.enable_validation_cache {
737 return None;
738 }
739
740 let cache = self.cache.read().unwrap();
741
742 if let Some(cached_set) = cache.get(&peer_id) {
743 if let Some(validation_entry) = cached_set.validation_results.get(&address) {
744 let now = Instant::now();
745
746 if now.duration_since(validation_entry.cached_at) <= validation_entry.ttl {
748 return Some((validation_entry.is_valid, validation_entry.rtt));
749 }
750 }
751 }
752
753 None
754 }
755
756 async fn update_access_stats(&self, peer_id: PeerId, hit: bool) {
758 {
760 let mut stats = self.stats.lock().unwrap();
761 if hit {
762 stats.cache_hits += 1;
763 } else {
764 stats.cache_misses += 1;
765 }
766
767 let total_accesses = stats.cache_hits + stats.cache_misses;
768 stats.hit_rate = stats.cache_hits as f64 / total_accesses as f64;
769 }
770
771 if hit {
773 let mut cache = self.cache.write().unwrap();
774 if let Some(cached_set) = cache.get_mut(&peer_id) {
775 cached_set.access_count += 1;
776 cached_set.last_accessed = Instant::now();
777 }
778 }
779 }
780
781 async fn evict_lru_entry(&self) {
783 let mut cache = self.cache.write().unwrap();
784
785 if let Some((lru_peer_id, _)) = cache
786 .iter()
787 .min_by_key(|(_, cached_set)| cached_set.last_accessed)
788 .map(|(peer_id, cached_set)| (*peer_id, cached_set.last_accessed))
789 {
790 cache.remove(&lru_peer_id);
791 debug!("Evicted LRU cache entry for peer {:?}", lru_peer_id);
792
793 let mut stats = self.stats.lock().unwrap();
795 stats.current_size = stats.current_size.saturating_sub(1);
796 }
797 }
798
799 async fn cleanup_expired_entries(
801 cache: &Arc<RwLock<HashMap<PeerId, CachedCandidateSet>>>,
802 stats: &Arc<Mutex<CandidateCacheStats>>,
803 _config: &CandidateCacheConfig,
804 ) {
805 let now = Instant::now();
806 let mut to_remove = Vec::new();
807
808 {
810 let cache_read = cache.read().unwrap();
811 for (peer_id, cached_set) in cache_read.iter() {
812 let age = now.duration_since(cached_set.cached_at);
813 if age > cached_set.ttl {
814 to_remove.push(*peer_id);
815 }
816 }
817 }
818
819 if !to_remove.is_empty() {
821 let mut cache_write = cache.write().unwrap();
822 for peer_id in &to_remove {
823 cache_write.remove(peer_id);
824 }
825
826 let mut stats_guard = stats.lock().unwrap();
828 stats_guard.entries_expired += to_remove.len() as u64;
829 stats_guard.current_size = cache_write.len();
830
831 debug!("Cleaned up {} expired cache entries", to_remove.len());
832 }
833
834 {
836 let mut cache_write = cache.write().unwrap();
837 for cached_set in cache_write.values_mut() {
838 cached_set.validation_results.retain(|_, validation_entry| {
839 now.duration_since(validation_entry.cached_at) <= validation_entry.ttl
840 });
841 }
842 }
843 }
844
845 pub async fn get_stats(&self) -> CandidateCacheStats {
847 self.stats.lock().unwrap().clone()
848 }
849
850 pub async fn shutdown(&mut self) {
852 if let Some(handle) = self.cleanup_handle.take() {
853 handle.abort();
854 }
855
856 {
858 let mut cache = self.cache.write().unwrap();
859 cache.clear();
860 }
861
862 info!("Candidate cache shutdown complete");
863 }
864}
865
866impl SessionCleanupCoordinator {
867 pub fn new(config: SessionCleanupConfig) -> Self {
869 Self {
870 active_sessions: Arc::new(RwLock::new(HashMap::new())),
871 config,
872 stats: Arc::new(Mutex::new(SessionCleanupStats::default())),
873 cleanup_handle: None,
874 }
875 }
876
877 pub async fn start(&mut self) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
879 let sessions = Arc::clone(&self.active_sessions);
880 let stats = Arc::clone(&self.stats);
881 let config = self.config.clone();
882
883 let cleanup_handle = tokio::spawn(async move {
884 let mut interval = tokio::time::interval(config.cleanup_interval);
885
886 loop {
887 interval.tick().await;
888 Self::cleanup_expired_sessions(&sessions, &stats, &config).await;
889 }
890 });
891
892 self.cleanup_handle = Some(cleanup_handle);
893 info!("Session cleanup coordinator started");
894 Ok(())
895 }
896
897 pub async fn register_session(
899 &self,
900 peer_id: PeerId,
901 memory_usage: usize,
902 priority: CleanupPriority,
903 ) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
904 let session_state = SessionState {
905 peer_id,
906 created_at: Instant::now(),
907 last_activity: Instant::now(),
908 memory_usage,
909 is_active: true,
910 cleanup_priority: priority,
911 };
912
913 {
914 let mut sessions = self.active_sessions.write().unwrap();
915 sessions.insert(peer_id, session_state);
916 }
917
918 {
920 let mut stats = self.stats.lock().unwrap();
921 stats.active_sessions += 1;
922 }
923
924 debug!("Registered session for peer {:?} with {} bytes", peer_id, memory_usage);
925 Ok(())
926 }
927
928 pub async fn update_session_activity(&self, peer_id: PeerId) {
930 let mut sessions = self.active_sessions.write().unwrap();
931 if let Some(session) = sessions.get_mut(&peer_id) {
932 session.last_activity = Instant::now();
933 }
934 }
935
936 async fn cleanup_expired_sessions(
938 sessions: &Arc<RwLock<HashMap<PeerId, SessionState>>>,
939 stats: &Arc<Mutex<SessionCleanupStats>>,
940 config: &SessionCleanupConfig,
941 ) {
942 let now = Instant::now();
943 let mut to_remove = Vec::new();
944 let mut memory_freed = 0u64;
945
946 let memory_pressure = Self::check_memory_pressure(config);
948
949 {
951 let sessions_read = sessions.read().unwrap();
952 for (peer_id, session) in sessions_read.iter() {
953 let idle_time = now.duration_since(session.last_activity);
954 let age = now.duration_since(session.created_at);
955
956 let should_cleanup = if memory_pressure && config.enable_aggressive_cleanup {
957 match session.cleanup_priority {
959 CleanupPriority::High => idle_time > Duration::from_secs(30),
960 CleanupPriority::Normal => idle_time > Duration::from_secs(60),
961 CleanupPriority::Low => idle_time > config.max_idle_time / 2,
962 }
963 } else {
964 idle_time > config.max_idle_time || age > config.max_session_age
966 };
967
968 if should_cleanup {
969 to_remove.push(*peer_id);
970 memory_freed += session.memory_usage as u64;
971 }
972 }
973 }
974
975 if !to_remove.is_empty() {
977 let mut sessions_write = sessions.write().unwrap();
978 for peer_id in &to_remove {
979 sessions_write.remove(peer_id);
980 }
981
982 let mut stats_guard = stats.lock().unwrap();
984 stats_guard.sessions_cleaned += to_remove.len() as u64;
985 stats_guard.memory_freed += memory_freed;
986 stats_guard.active_sessions = sessions_write.len();
987
988 if memory_pressure {
989 info!("Aggressive cleanup: removed {} sessions, freed {} bytes",
990 to_remove.len(), memory_freed);
991 } else {
992 debug!("Regular cleanup: removed {} sessions, freed {} bytes",
993 to_remove.len(), memory_freed);
994 }
995 }
996 }
997
998 fn check_memory_pressure(config: &SessionCleanupConfig) -> bool {
1000 let _ = config.memory_pressure_threshold;
1005 false
1006 }
1007
1008 pub async fn get_stats(&self) -> SessionCleanupStats {
1010 self.stats.lock().unwrap().clone()
1011 }
1012
1013 pub async fn shutdown(&mut self) {
1015 if let Some(handle) = self.cleanup_handle.take() {
1016 handle.abort();
1017 }
1018
1019 {
1021 let mut sessions = self.active_sessions.write().unwrap();
1022 sessions.clear();
1023 }
1024
1025 info!("Session cleanup coordinator shutdown complete");
1026 }
1027}
1028
1029impl FrameBatchingCoordinator {
1030 pub fn new(config: FrameBatchingConfig) -> Self {
1032 Self {
1033 pending_frames: Arc::new(Mutex::new(HashMap::new())),
1034 config,
1035 stats: Arc::new(Mutex::new(FrameBatchingStats::default())),
1036 flush_handle: None,
1037 }
1038 }
1039
1040 pub async fn start(&mut self) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
1042 let pending_frames = Arc::clone(&self.pending_frames);
1043 let stats = Arc::clone(&self.stats);
1044 let config = self.config.clone();
1045
1046 let flush_handle = tokio::spawn(async move {
1047 let mut interval = tokio::time::interval(config.max_batch_delay / 4);
1048
1049 loop {
1050 interval.tick().await;
1051 Self::flush_expired_batches(&pending_frames, &stats, &config).await;
1052 }
1053 });
1054
1055 self.flush_handle = Some(flush_handle);
1056 info!("Frame batching coordinator started");
1057 Ok(())
1058 }
1059
1060 pub async fn add_frame(
1062 &self,
1063 destination: SocketAddr,
1064 frame_type: u8,
1065 payload: Vec<u8>,
1066 priority: FramePriority,
1067 ) -> Result<Option<Vec<u8>>, Box<dyn std::error::Error + Send + Sync>> {
1068 let frame = BatchedFrame {
1069 frame_type,
1070 size: payload.len(),
1071 payload,
1072 priority,
1073 created_at: Instant::now(),
1074 };
1075
1076 let mut pending = self.pending_frames.lock().unwrap();
1077
1078 let (should_flush, frames_count, total_size) = {
1080 let batch_set = pending.entry(destination).or_insert_with(|| {
1081 BatchedFrameSet {
1082 frames: Vec::new(),
1083 total_size: 0,
1084 created_at: Instant::now(),
1085 destination,
1086 priority: BatchPriority::Normal,
1087 }
1088 });
1089
1090 batch_set.frames.push(frame);
1091 batch_set.total_size += batch_set.frames.last().unwrap().size;
1092
1093 if priority == FramePriority::Urgent {
1095 batch_set.priority = BatchPriority::High;
1096 }
1097
1098 let should_flush = self.should_flush_batch(batch_set);
1100 (should_flush, batch_set.frames.len(), batch_set.total_size)
1101 };
1102
1103 if should_flush {
1104 if let Some(batch_set) = pending.remove(&destination) {
1106 let batch_data = self.serialize_batch(&batch_set);
1107
1108 {
1110 let mut stats = self.stats.lock().unwrap();
1111 stats.batches_sent += 1;
1112 stats.frames_batched += frames_count as u64;
1113 stats.avg_batch_size = (stats.avg_batch_size * (stats.batches_sent - 1) as f64 +
1114 total_size as f64) / stats.batches_sent as f64;
1115 }
1116
1117 debug!("Flushed batch to {} with {} frames ({} bytes)",
1118 destination, frames_count, total_size);
1119 return Ok(Some(batch_data));
1120 }
1121 }
1122
1123 Ok(None)
1124 }
1125
1126 fn should_flush_batch(&self, batch_set: &BatchedFrameSet) -> bool {
1128 let now = Instant::now();
1129 let age = now.duration_since(batch_set.created_at);
1130
1131 batch_set.total_size >= self.config.max_batch_size ||
1133 batch_set.frames.len() >= self.config.max_frames_per_batch ||
1134 age >= self.config.max_batch_delay ||
1135 batch_set.priority == BatchPriority::High
1136 }
1137
1138 fn serialize_batch(&self, batch_set: &BatchedFrameSet) -> Vec<u8> {
1140 let mut data = Vec::with_capacity(batch_set.total_size + batch_set.frames.len() * 4);
1141
1142 for frame in &batch_set.frames {
1143 data.push(frame.frame_type);
1145 data.extend_from_slice(&(frame.payload.len() as u16).to_be_bytes());
1146 data.extend_from_slice(&frame.payload);
1148 }
1149
1150 data
1151 }
1152
1153 async fn flush_expired_batches(
1155 pending_frames: &Arc<Mutex<HashMap<SocketAddr, BatchedFrameSet>>>,
1156 stats: &Arc<Mutex<FrameBatchingStats>>,
1157 config: &FrameBatchingConfig,
1158 ) {
1159 let now = Instant::now();
1160 let mut to_flush = Vec::new();
1161
1162 {
1164 let pending = pending_frames.lock().unwrap();
1165 for (destination, batch_set) in pending.iter() {
1166 let age = now.duration_since(batch_set.created_at);
1167 if age >= config.max_batch_delay {
1168 to_flush.push((*destination, batch_set.frames.len(), batch_set.total_size));
1169 }
1170 }
1171 }
1172
1173 if !to_flush.is_empty() {
1175 let mut pending = pending_frames.lock().unwrap();
1176 let flush_count = to_flush.len();
1177 for (destination, frame_count, total_size) in to_flush {
1178 pending.remove(&destination);
1179
1180 let mut stats_guard = stats.lock().unwrap();
1182 stats_guard.batches_sent += 1;
1183 stats_guard.frames_batched += frame_count as u64;
1184 stats_guard.avg_batch_size = (stats_guard.avg_batch_size * (stats_guard.batches_sent - 1) as f64 +
1185 total_size as f64) / stats_guard.batches_sent as f64;
1186 }
1187
1188 debug!("Flushed {} expired batches", flush_count);
1189 }
1190 }
1191
1192 pub async fn get_stats(&self) -> FrameBatchingStats {
1194 self.stats.lock().unwrap().clone()
1195 }
1196
1197 pub async fn shutdown(&mut self) {
1199 if let Some(handle) = self.flush_handle.take() {
1200 handle.abort();
1201 }
1202
1203 {
1205 let mut pending = self.pending_frames.lock().unwrap();
1206 pending.clear();
1207 }
1208
1209 info!("Frame batching coordinator shutdown complete");
1210 }
1211}
1212
1213#[derive(Debug)]
1215pub struct MemoryOptimizationManager {
1216 connection_pool: ConnectionPool,
1217 candidate_cache: CandidateCache,
1218 session_cleanup: SessionCleanupCoordinator,
1219 frame_batching: FrameBatchingCoordinator,
1220 is_running: bool,
1221}
1222
1223impl MemoryOptimizationManager {
1224 pub fn new() -> Self {
1226 Self {
1227 connection_pool: ConnectionPool::new(ConnectionPoolConfig::default()),
1228 candidate_cache: CandidateCache::new(CandidateCacheConfig::default()),
1229 session_cleanup: SessionCleanupCoordinator::new(SessionCleanupConfig::default()),
1230 frame_batching: FrameBatchingCoordinator::new(FrameBatchingConfig::default()),
1231 is_running: false,
1232 }
1233 }
1234
1235 pub fn with_configs(
1237 pool_config: ConnectionPoolConfig,
1238 cache_config: CandidateCacheConfig,
1239 cleanup_config: SessionCleanupConfig,
1240 batching_config: FrameBatchingConfig,
1241 ) -> Self {
1242 Self {
1243 connection_pool: ConnectionPool::new(pool_config),
1244 candidate_cache: CandidateCache::new(cache_config),
1245 session_cleanup: SessionCleanupCoordinator::new(cleanup_config),
1246 frame_batching: FrameBatchingCoordinator::new(batching_config),
1247 is_running: false,
1248 }
1249 }
1250
1251 pub async fn start(&mut self) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
1253 if self.is_running {
1254 return Ok(());
1255 }
1256
1257 self.connection_pool.start().await?;
1258 self.candidate_cache.start().await?;
1259 self.session_cleanup.start().await?;
1260 self.frame_batching.start().await?;
1261
1262 self.is_running = true;
1263 info!("Memory optimization manager started");
1264 Ok(())
1265 }
1266
1267 pub fn connection_pool(&self) -> &ConnectionPool {
1269 &self.connection_pool
1270 }
1271
1272 pub fn candidate_cache(&self) -> &CandidateCache {
1274 &self.candidate_cache
1275 }
1276
1277 pub fn session_cleanup(&self) -> &SessionCleanupCoordinator {
1279 &self.session_cleanup
1280 }
1281
1282 pub fn frame_batching(&self) -> &FrameBatchingCoordinator {
1284 &self.frame_batching
1285 }
1286
1287 pub async fn get_comprehensive_stats(&self) -> MemoryOptimizationStats {
1289 MemoryOptimizationStats {
1290 connection_pool: self.connection_pool.get_stats().await,
1291 candidate_cache: self.candidate_cache.get_stats().await,
1292 session_cleanup: self.session_cleanup.get_stats().await,
1293 frame_batching: self.frame_batching.get_stats().await,
1294 }
1295 }
1296
1297 pub async fn shutdown(&mut self) {
1299 if !self.is_running {
1300 return;
1301 }
1302
1303 self.connection_pool.shutdown().await;
1304 self.candidate_cache.shutdown().await;
1305 self.session_cleanup.shutdown().await;
1306 self.frame_batching.shutdown().await;
1307
1308 self.is_running = false;
1309 info!("Memory optimization manager shutdown complete");
1310 }
1311}
1312
1313#[derive(Debug, Clone)]
1315pub struct MemoryOptimizationStats {
1316 pub connection_pool: ConnectionPoolStats,
1317 pub candidate_cache: CandidateCacheStats,
1318 pub session_cleanup: SessionCleanupStats,
1319 pub frame_batching: FrameBatchingStats,
1320}
1321
1322impl Default for MemoryOptimizationManager {
1323 fn default() -> Self {
1324 Self::new()
1325 }
1326}