1use std::{
10 collections::HashMap,
11 net::SocketAddr,
12 sync::{Arc, Mutex, RwLock},
13 time::{Duration, Instant},
14};
15
16use tracing::{debug, info};
17
18use crate::{Endpoint as QuinnEndpoint, HighLevelConnection as QuinnConnection};
19
20use crate::{
21 VarInt,
22 nat_traversal_api::{CandidateAddress, PeerId},
23};
24
25#[derive(Debug)]
27pub struct ConnectionPool {
28 active_connections: Arc<RwLock<HashMap<PeerId, PooledConnection>>>,
30 config: ConnectionPoolConfig,
32 stats: Arc<Mutex<ConnectionPoolStats>>,
34 cleanup_handle: Option<tokio::task::JoinHandle<()>>,
36}
37
38#[derive(Debug, Clone)]
40pub struct ConnectionPoolConfig {
41 pub max_connections: usize,
43 pub max_idle_time: Duration,
45 pub cleanup_interval: Duration,
47 pub enable_reuse: bool,
49 pub max_connection_age: Duration,
51}
52
53#[derive(Debug)]
55struct PooledConnection {
56 connection: Arc<QuinnConnection>,
57 peer_id: PeerId,
58 remote_address: SocketAddr,
59 created_at: Instant,
60 last_used: Instant,
61 use_count: u64,
62 is_active: bool,
63}
64
65#[derive(Debug, Default, Clone)]
67pub struct ConnectionPoolStats {
68 pub connections_created: u64,
70 pub connections_reused: u64,
72 pub connections_expired: u64,
74 pub active_connections: usize,
76 pub hit_rate: f64,
78 pub avg_connection_age: Duration,
80}
81
82#[derive(Debug)]
84pub struct CandidateCache {
85 cache: Arc<RwLock<HashMap<PeerId, CachedCandidateSet>>>,
87 config: CandidateCacheConfig,
89 stats: Arc<Mutex<CandidateCacheStats>>,
91 cleanup_handle: Option<tokio::task::JoinHandle<()>>,
93}
94
95#[derive(Debug, Clone)]
97pub struct CandidateCacheConfig {
98 pub default_ttl: Duration,
100 pub max_cache_size: usize,
102 pub cleanup_interval: Duration,
104 pub enable_validation_cache: bool,
106 pub validation_ttl: Duration,
108}
109
110#[derive(Debug, Clone)]
112struct CachedCandidateSet {
113 candidates: Vec<CandidateAddress>,
114 cached_at: Instant,
115 ttl: Duration,
116 access_count: u64,
117 last_accessed: Instant,
118 validation_results: HashMap<SocketAddr, ValidationCacheEntry>,
119}
120
121#[derive(Debug, Clone)]
123struct ValidationCacheEntry {
124 is_valid: bool,
125 rtt: Option<Duration>,
126 cached_at: Instant,
127 ttl: Duration,
128}
129
130#[derive(Debug, Default, Clone)]
132pub struct CandidateCacheStats {
133 pub cache_hits: u64,
135 pub cache_misses: u64,
137 pub entries_expired: u64,
139 pub current_size: usize,
141 pub hit_rate: f64,
143 pub avg_entry_age: Duration,
145}
146
147#[derive(Debug)]
149pub struct SessionCleanupCoordinator {
150 active_sessions: Arc<RwLock<HashMap<PeerId, SessionState>>>,
152 config: SessionCleanupConfig,
154 stats: Arc<Mutex<SessionCleanupStats>>,
156 cleanup_handle: Option<tokio::task::JoinHandle<()>>,
158}
159
160#[derive(Debug, Clone)]
162pub struct SessionCleanupConfig {
163 pub max_idle_time: Duration,
165 pub max_session_age: Duration,
167 pub cleanup_interval: Duration,
169 pub enable_aggressive_cleanup: bool,
171 pub memory_pressure_threshold: usize,
173}
174
175#[derive(Debug)]
177struct SessionState {
178 peer_id: PeerId,
179 created_at: Instant,
180 last_activity: Instant,
181 memory_usage: usize,
182 is_active: bool,
183 cleanup_priority: CleanupPriority,
184}
185
186#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord)]
188pub enum CleanupPriority {
189 Low, Normal, High, }
193
194#[derive(Debug, Default, Clone)]
196pub struct SessionCleanupStats {
197 pub sessions_cleaned: u64,
199 pub memory_freed: u64,
201 pub active_sessions: usize,
203 pub avg_session_lifetime: Duration,
205}
206
207#[derive(Debug)]
209pub struct FrameBatchingCoordinator {
210 pending_frames: Arc<Mutex<HashMap<SocketAddr, BatchedFrameSet>>>,
212 config: FrameBatchingConfig,
214 stats: Arc<Mutex<FrameBatchingStats>>,
216 flush_handle: Option<tokio::task::JoinHandle<()>>,
218}
219
220#[derive(Debug, Clone)]
222pub struct FrameBatchingConfig {
223 pub max_batch_size: usize,
225 pub max_batch_delay: Duration,
227 pub max_frames_per_batch: usize,
229 pub enable_adaptive_batching: bool,
231 pub min_batch_size: usize,
233}
234
235#[derive(Debug)]
237struct BatchedFrameSet {
238 frames: Vec<BatchedFrame>,
239 total_size: usize,
240 created_at: Instant,
241 destination: SocketAddr,
242 priority: BatchPriority,
243}
244
245#[derive(Debug)]
247struct BatchedFrame {
248 frame_type: u8,
249 payload: Vec<u8>,
250 size: usize,
251 priority: FramePriority,
252 created_at: Instant,
253}
254
255#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord)]
257enum BatchPriority {
258 Low, Normal, High, }
262
263#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord)]
265pub enum FramePriority {
266 Background, Normal, Urgent, }
270
271#[derive(Debug, Default, Clone)]
273pub struct FrameBatchingStats {
274 pub frames_batched: u64,
276 pub batches_sent: u64,
278 pub avg_batch_size: f64,
280 pub bytes_saved: u64,
282 pub batching_efficiency: f64,
284}
285
286impl Default for ConnectionPoolConfig {
287 fn default() -> Self {
288 Self {
289 max_connections: 1000,
290 max_idle_time: Duration::from_secs(300), cleanup_interval: Duration::from_secs(60), enable_reuse: true,
293 max_connection_age: Duration::from_secs(3600), }
295 }
296}
297
298impl Default for CandidateCacheConfig {
299 fn default() -> Self {
300 Self {
301 default_ttl: Duration::from_secs(300), max_cache_size: 10000,
303 cleanup_interval: Duration::from_secs(60), enable_validation_cache: true,
305 validation_ttl: Duration::from_secs(60), }
307 }
308}
309
310impl Default for SessionCleanupConfig {
311 fn default() -> Self {
312 Self {
313 max_idle_time: Duration::from_secs(600), max_session_age: Duration::from_secs(3600), cleanup_interval: Duration::from_secs(120), enable_aggressive_cleanup: true,
317 memory_pressure_threshold: 512, }
319 }
320}
321
322impl Default for FrameBatchingConfig {
323 fn default() -> Self {
324 Self {
325 max_batch_size: 1200, max_batch_delay: Duration::from_millis(10), max_frames_per_batch: 10,
328 enable_adaptive_batching: true,
329 min_batch_size: 200, }
331 }
332}
333
334impl ConnectionPool {
335 pub fn new(config: ConnectionPoolConfig) -> Self {
337 Self {
338 active_connections: Arc::new(RwLock::new(HashMap::new())),
339 config,
340 stats: Arc::new(Mutex::new(ConnectionPoolStats::default())),
341 cleanup_handle: None,
342 }
343 }
344
345 pub async fn start(&mut self) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
347 let connections = Arc::clone(&self.active_connections);
348 let stats = Arc::clone(&self.stats);
349 let config = self.config.clone();
350
351 let cleanup_handle = tokio::spawn(async move {
352 let mut interval = tokio::time::interval(config.cleanup_interval);
353
354 loop {
355 interval.tick().await;
356 Self::cleanup_expired_connections(&connections, &stats, &config).await;
357 }
358 });
359
360 self.cleanup_handle = Some(cleanup_handle);
361 info!(
362 "Connection pool started with max_connections={}",
363 self.config.max_connections
364 );
365 Ok(())
366 }
367
368 pub async fn get_connection(
370 &self,
371 peer_id: PeerId,
372 remote_address: SocketAddr,
373 endpoint: &QuinnEndpoint,
374 ) -> Result<Arc<QuinnConnection>, Box<dyn std::error::Error + Send + Sync>> {
375 if let Some(connection) = self
377 .try_get_existing_connection(peer_id, remote_address)
378 .await
379 {
380 self.update_stats_hit().await;
381 return Ok(connection);
382 }
383
384 let connection = self
386 .create_new_connection(peer_id, remote_address, endpoint)
387 .await?;
388 self.update_stats_miss().await;
389 Ok(connection)
390 }
391
392 async fn try_get_existing_connection(
394 &self,
395 peer_id: PeerId,
396 remote_address: SocketAddr,
397 ) -> Option<Arc<QuinnConnection>> {
398 let mut connections = self.active_connections.write().unwrap();
399
400 if let Some(pooled) = connections.get_mut(&peer_id) {
401 if pooled.is_active && pooled.remote_address == remote_address {
402 if pooled.connection.close_reason().is_none() {
404 pooled.last_used = Instant::now();
405 pooled.use_count += 1;
406 debug!("Reusing pooled connection for peer {:?}", peer_id);
407 return Some(Arc::clone(&pooled.connection));
408 } else {
409 connections.remove(&peer_id);
411 }
412 }
413 }
414
415 None
416 }
417
418 async fn create_new_connection(
420 &self,
421 peer_id: PeerId,
422 remote_address: SocketAddr,
423 endpoint: &QuinnEndpoint,
424 ) -> Result<Arc<QuinnConnection>, Box<dyn std::error::Error + Send + Sync>> {
425 {
427 let connections = self.active_connections.read().unwrap();
428 if connections.len() >= self.config.max_connections {
429 drop(connections);
431 self.evict_lru_connection().await;
432 }
433 }
434
435 let rustls_config = rustls::ClientConfig::builder()
437 .with_root_certificates(rustls::RootCertStore::empty())
438 .with_no_client_auth();
439
440 let client_crypto = crate::crypto::rustls::QuicClientConfig::try_from(rustls_config)
441 .map_err(|e| format!("Failed to create QUIC client config: {e}"))?;
442
443 let client_config = crate::ClientConfig::new(Arc::new(client_crypto));
444
445 let connecting = endpoint
446 .connect_with(client_config, remote_address, "ant-quic")
447 .map_err(|e| format!("Failed to initiate connection: {e}"))?;
448
449 let connection = connecting
451 .await
452 .map_err(|e| format!("Connection failed: {e}"))?;
453
454 let connection_arc = Arc::new(connection);
455
456 let pooled = PooledConnection {
457 connection: Arc::clone(&connection_arc),
458 peer_id,
459 remote_address,
460 created_at: Instant::now(),
461 last_used: Instant::now(),
462 use_count: 1,
463 is_active: true,
464 };
465
466 {
468 let mut connections = self.active_connections.write().unwrap();
469 connections.insert(peer_id, pooled);
470 }
471
472 {
474 let mut stats = self.stats.lock().unwrap();
475 stats.connections_created += 1;
476 stats.active_connections += 1;
477 }
478
479 info!("Created new pooled connection for peer {:?}", peer_id);
480 Ok(connection_arc)
481 }
482
483 async fn evict_lru_connection(&self) {
485 let mut connections = self.active_connections.write().unwrap();
486
487 if let Some((lru_peer_id, _)) = connections
488 .iter()
489 .min_by_key(|(_, pooled)| pooled.last_used)
490 .map(|(peer_id, pooled)| (*peer_id, pooled.last_used))
491 {
492 connections.remove(&lru_peer_id);
493 debug!("Evicted LRU connection for peer {:?}", lru_peer_id);
494
495 let mut stats = self.stats.lock().unwrap();
497 stats.active_connections = stats.active_connections.saturating_sub(1);
498 }
499 }
500
501 async fn update_stats_hit(&self) {
503 let mut stats = self.stats.lock().unwrap();
504 stats.connections_reused += 1;
505 let total_requests = stats.connections_created + stats.connections_reused;
506 stats.hit_rate = stats.connections_reused as f64 / total_requests as f64;
507 }
508
509 async fn update_stats_miss(&self) {
511 let stats = self.stats.lock().unwrap();
512 let total_requests = stats.connections_created + stats.connections_reused;
513 drop(stats);
514
515 let mut stats = self.stats.lock().unwrap();
516 stats.hit_rate = stats.connections_reused as f64 / total_requests as f64;
517 }
518
519 async fn cleanup_expired_connections(
521 connections: &Arc<RwLock<HashMap<PeerId, PooledConnection>>>,
522 stats: &Arc<Mutex<ConnectionPoolStats>>,
523 config: &ConnectionPoolConfig,
524 ) {
525 let now = Instant::now();
526 let mut to_remove = Vec::new();
527
528 {
530 let connections_read = connections.read().unwrap();
531 for (peer_id, pooled) in connections_read.iter() {
532 let idle_time = now.duration_since(pooled.last_used);
533 let age = now.duration_since(pooled.created_at);
534
535 if idle_time > config.max_idle_time || age > config.max_connection_age {
536 to_remove.push(*peer_id);
537 }
538
539 if pooled.connection.close_reason().is_some() {
541 to_remove.push(*peer_id);
542 }
543 }
544 }
545
546 if !to_remove.is_empty() {
548 let mut connections_write = connections.write().unwrap();
549 for peer_id in &to_remove {
550 connections_write.remove(peer_id);
551 }
552
553 let mut stats_guard = stats.lock().unwrap();
555 stats_guard.connections_expired += to_remove.len() as u64;
556 stats_guard.active_connections = connections_write.len();
557
558 debug!("Cleaned up {} expired connections", to_remove.len());
559 }
560 }
561
562 pub async fn get_stats(&self) -> ConnectionPoolStats {
564 self.stats.lock().unwrap().clone()
565 }
566
567 pub async fn shutdown(&mut self) {
569 if let Some(handle) = self.cleanup_handle.take() {
570 handle.abort();
571 }
572
573 {
575 let connections = self.active_connections.read().unwrap();
576 for (_, pooled) in connections.iter() {
577 pooled.connection.close(VarInt::from_u32(0), b"shutdown");
578 }
579 }
580
581 info!("Connection pool shutdown complete");
582 }
583}
584
585impl CandidateCache {
586 pub fn new(config: CandidateCacheConfig) -> Self {
588 Self {
589 cache: Arc::new(RwLock::new(HashMap::new())),
590 config,
591 stats: Arc::new(Mutex::new(CandidateCacheStats::default())),
592 cleanup_handle: None,
593 }
594 }
595
596 pub async fn start(&mut self) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
598 let cache = Arc::clone(&self.cache);
599 let stats = Arc::clone(&self.stats);
600 let config = self.config.clone();
601
602 let cleanup_handle = tokio::spawn(async move {
603 let mut interval = tokio::time::interval(config.cleanup_interval);
604
605 loop {
606 interval.tick().await;
607 Self::cleanup_expired_entries(&cache, &stats, &config).await;
608 }
609 });
610
611 self.cleanup_handle = Some(cleanup_handle);
612 info!(
613 "Candidate cache started with max_size={}",
614 self.config.max_cache_size
615 );
616 Ok(())
617 }
618
619 pub async fn get_candidates(&self, peer_id: PeerId) -> Option<Vec<CandidateAddress>> {
621 let (is_valid, candidates) = {
622 let cache = self.cache.read().unwrap();
623
624 if let Some(cached_set) = cache.get(&peer_id) {
625 let now = Instant::now();
626
627 if now.duration_since(cached_set.cached_at) <= cached_set.ttl {
629 (true, Some(cached_set.candidates.clone()))
630 } else {
631 (false, None)
632 }
633 } else {
634 (false, None)
635 }
636 };
637
638 if is_valid {
639 self.update_access_stats(peer_id, true).await;
641
642 if let Some(ref candidates) = candidates {
643 debug!(
644 "Cache hit for peer {:?}, {} candidates",
645 peer_id,
646 candidates.len()
647 );
648 }
649 return candidates;
650 }
651
652 self.update_access_stats(peer_id, false).await;
654 debug!("Cache miss for peer {:?}", peer_id);
655 None
656 }
657
658 pub async fn cache_candidates(
660 &self,
661 peer_id: PeerId,
662 candidates: Vec<CandidateAddress>,
663 ttl: Option<Duration>,
664 ) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
665 let ttl = ttl.unwrap_or(self.config.default_ttl);
666
667 {
669 let cache = self.cache.read().unwrap();
670 if cache.len() >= self.config.max_cache_size {
671 drop(cache);
672 self.evict_lru_entry().await;
673 }
674 }
675
676 let candidate_count = candidates.len();
677 let cached_set = CachedCandidateSet {
678 candidates,
679 cached_at: Instant::now(),
680 ttl,
681 access_count: 0,
682 last_accessed: Instant::now(),
683 validation_results: HashMap::new(),
684 };
685
686 {
688 let mut cache = self.cache.write().unwrap();
689 cache.insert(peer_id, cached_set);
690 }
691
692 {
694 let mut stats = self.stats.lock().unwrap();
695 stats.current_size += 1;
696 }
697
698 debug!(
699 "Cached {} candidates for peer {:?} with TTL {:?}",
700 candidate_count, peer_id, ttl
701 );
702 Ok(())
703 }
704
705 pub async fn cache_validation_result(
707 &self,
708 peer_id: PeerId,
709 address: SocketAddr,
710 is_valid: bool,
711 rtt: Option<Duration>,
712 ) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
713 if !self.config.enable_validation_cache {
714 return Ok(());
715 }
716
717 let mut cache = self.cache.write().unwrap();
718
719 if let Some(cached_set) = cache.get_mut(&peer_id) {
720 let validation_entry = ValidationCacheEntry {
721 is_valid,
722 rtt,
723 cached_at: Instant::now(),
724 ttl: self.config.validation_ttl,
725 };
726
727 cached_set
728 .validation_results
729 .insert(address, validation_entry);
730 debug!(
731 "Cached validation result for {}:{} -> {}",
732 peer_id.0[0], address, is_valid
733 );
734 }
735
736 Ok(())
737 }
738
739 pub async fn get_validation_result(
741 &self,
742 peer_id: PeerId,
743 address: SocketAddr,
744 ) -> Option<(bool, Option<Duration>)> {
745 if !self.config.enable_validation_cache {
746 return None;
747 }
748
749 let cache = self.cache.read().unwrap();
750
751 if let Some(cached_set) = cache.get(&peer_id) {
752 if let Some(validation_entry) = cached_set.validation_results.get(&address) {
753 let now = Instant::now();
754
755 if now.duration_since(validation_entry.cached_at) <= validation_entry.ttl {
757 return Some((validation_entry.is_valid, validation_entry.rtt));
758 }
759 }
760 }
761
762 None
763 }
764
765 async fn update_access_stats(&self, peer_id: PeerId, hit: bool) {
767 {
769 let mut stats = self.stats.lock().unwrap();
770 if hit {
771 stats.cache_hits += 1;
772 } else {
773 stats.cache_misses += 1;
774 }
775
776 let total_accesses = stats.cache_hits + stats.cache_misses;
777 stats.hit_rate = stats.cache_hits as f64 / total_accesses as f64;
778 }
779
780 if hit {
782 let mut cache = self.cache.write().unwrap();
783 if let Some(cached_set) = cache.get_mut(&peer_id) {
784 cached_set.access_count += 1;
785 cached_set.last_accessed = Instant::now();
786 }
787 }
788 }
789
790 async fn evict_lru_entry(&self) {
792 let mut cache = self.cache.write().unwrap();
793
794 if let Some((lru_peer_id, _)) = cache
795 .iter()
796 .min_by_key(|(_, cached_set)| cached_set.last_accessed)
797 .map(|(peer_id, cached_set)| (*peer_id, cached_set.last_accessed))
798 {
799 cache.remove(&lru_peer_id);
800 debug!("Evicted LRU cache entry for peer {:?}", lru_peer_id);
801
802 let mut stats = self.stats.lock().unwrap();
804 stats.current_size = stats.current_size.saturating_sub(1);
805 }
806 }
807
808 async fn cleanup_expired_entries(
810 cache: &Arc<RwLock<HashMap<PeerId, CachedCandidateSet>>>,
811 stats: &Arc<Mutex<CandidateCacheStats>>,
812 _config: &CandidateCacheConfig,
813 ) {
814 let now = Instant::now();
815 let mut to_remove = Vec::new();
816
817 {
819 let cache_read = cache.read().unwrap();
820 for (peer_id, cached_set) in cache_read.iter() {
821 let age = now.duration_since(cached_set.cached_at);
822 if age > cached_set.ttl {
823 to_remove.push(*peer_id);
824 }
825 }
826 }
827
828 if !to_remove.is_empty() {
830 let mut cache_write = cache.write().unwrap();
831 for peer_id in &to_remove {
832 cache_write.remove(peer_id);
833 }
834
835 let mut stats_guard = stats.lock().unwrap();
837 stats_guard.entries_expired += to_remove.len() as u64;
838 stats_guard.current_size = cache_write.len();
839
840 debug!("Cleaned up {} expired cache entries", to_remove.len());
841 }
842
843 {
845 let mut cache_write = cache.write().unwrap();
846 for cached_set in cache_write.values_mut() {
847 cached_set.validation_results.retain(|_, validation_entry| {
848 now.duration_since(validation_entry.cached_at) <= validation_entry.ttl
849 });
850 }
851 }
852 }
853
854 pub async fn get_stats(&self) -> CandidateCacheStats {
856 self.stats.lock().unwrap().clone()
857 }
858
859 pub async fn shutdown(&mut self) {
861 if let Some(handle) = self.cleanup_handle.take() {
862 handle.abort();
863 }
864
865 {
867 let mut cache = self.cache.write().unwrap();
868 cache.clear();
869 }
870
871 info!("Candidate cache shutdown complete");
872 }
873}
874
875impl SessionCleanupCoordinator {
876 pub fn new(config: SessionCleanupConfig) -> Self {
878 Self {
879 active_sessions: Arc::new(RwLock::new(HashMap::new())),
880 config,
881 stats: Arc::new(Mutex::new(SessionCleanupStats::default())),
882 cleanup_handle: None,
883 }
884 }
885
886 pub async fn start(&mut self) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
888 let sessions = Arc::clone(&self.active_sessions);
889 let stats = Arc::clone(&self.stats);
890 let config = self.config.clone();
891
892 let cleanup_handle = tokio::spawn(async move {
893 let mut interval = tokio::time::interval(config.cleanup_interval);
894
895 loop {
896 interval.tick().await;
897 Self::cleanup_expired_sessions(&sessions, &stats, &config).await;
898 }
899 });
900
901 self.cleanup_handle = Some(cleanup_handle);
902 info!("Session cleanup coordinator started");
903 Ok(())
904 }
905
906 pub async fn register_session(
908 &self,
909 peer_id: PeerId,
910 memory_usage: usize,
911 priority: CleanupPriority,
912 ) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
913 let session_state = SessionState {
914 peer_id,
915 created_at: Instant::now(),
916 last_activity: Instant::now(),
917 memory_usage,
918 is_active: true,
919 cleanup_priority: priority,
920 };
921
922 {
923 let mut sessions = self.active_sessions.write().unwrap();
924 sessions.insert(peer_id, session_state);
925 }
926
927 {
929 let mut stats = self.stats.lock().unwrap();
930 stats.active_sessions += 1;
931 }
932
933 debug!(
934 "Registered session for peer {:?} with {} bytes",
935 peer_id, memory_usage
936 );
937 Ok(())
938 }
939
940 pub async fn update_session_activity(&self, peer_id: PeerId) {
942 let mut sessions = self.active_sessions.write().unwrap();
943 if let Some(session) = sessions.get_mut(&peer_id) {
944 session.last_activity = Instant::now();
945 }
946 }
947
948 async fn cleanup_expired_sessions(
950 sessions: &Arc<RwLock<HashMap<PeerId, SessionState>>>,
951 stats: &Arc<Mutex<SessionCleanupStats>>,
952 config: &SessionCleanupConfig,
953 ) {
954 let now = Instant::now();
955 let mut to_remove = Vec::new();
956 let mut memory_freed = 0u64;
957
958 let memory_pressure = Self::check_memory_pressure(config);
960
961 {
963 let sessions_read = sessions.read().unwrap();
964 for (peer_id, session) in sessions_read.iter() {
965 let idle_time = now.duration_since(session.last_activity);
966 let age = now.duration_since(session.created_at);
967
968 let should_cleanup = if memory_pressure && config.enable_aggressive_cleanup {
969 match session.cleanup_priority {
971 CleanupPriority::High => idle_time > Duration::from_secs(30),
972 CleanupPriority::Normal => idle_time > Duration::from_secs(60),
973 CleanupPriority::Low => idle_time > config.max_idle_time / 2,
974 }
975 } else {
976 idle_time > config.max_idle_time || age > config.max_session_age
978 };
979
980 if should_cleanup {
981 to_remove.push(*peer_id);
982 memory_freed += session.memory_usage as u64;
983 }
984 }
985 }
986
987 if !to_remove.is_empty() {
989 let mut sessions_write = sessions.write().unwrap();
990 for peer_id in &to_remove {
991 sessions_write.remove(peer_id);
992 }
993
994 let mut stats_guard = stats.lock().unwrap();
996 stats_guard.sessions_cleaned += to_remove.len() as u64;
997 stats_guard.memory_freed += memory_freed;
998 stats_guard.active_sessions = sessions_write.len();
999
1000 if memory_pressure {
1001 info!(
1002 "Aggressive cleanup: removed {} sessions, freed {} bytes",
1003 to_remove.len(),
1004 memory_freed
1005 );
1006 } else {
1007 debug!(
1008 "Regular cleanup: removed {} sessions, freed {} bytes",
1009 to_remove.len(),
1010 memory_freed
1011 );
1012 }
1013 }
1014 }
1015
1016 fn check_memory_pressure(config: &SessionCleanupConfig) -> bool {
1018 let _ = config.memory_pressure_threshold;
1023 false
1024 }
1025
1026 pub async fn get_stats(&self) -> SessionCleanupStats {
1028 self.stats.lock().unwrap().clone()
1029 }
1030
1031 pub async fn shutdown(&mut self) {
1033 if let Some(handle) = self.cleanup_handle.take() {
1034 handle.abort();
1035 }
1036
1037 {
1039 let mut sessions = self.active_sessions.write().unwrap();
1040 sessions.clear();
1041 }
1042
1043 info!("Session cleanup coordinator shutdown complete");
1044 }
1045}
1046
1047impl FrameBatchingCoordinator {
1048 pub fn new(config: FrameBatchingConfig) -> Self {
1050 Self {
1051 pending_frames: Arc::new(Mutex::new(HashMap::new())),
1052 config,
1053 stats: Arc::new(Mutex::new(FrameBatchingStats::default())),
1054 flush_handle: None,
1055 }
1056 }
1057
1058 pub async fn start(&mut self) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
1060 let pending_frames = Arc::clone(&self.pending_frames);
1061 let stats = Arc::clone(&self.stats);
1062 let config = self.config.clone();
1063
1064 let flush_handle = tokio::spawn(async move {
1065 let mut interval = tokio::time::interval(config.max_batch_delay / 4);
1066
1067 loop {
1068 interval.tick().await;
1069 Self::flush_expired_batches(&pending_frames, &stats, &config).await;
1070 }
1071 });
1072
1073 self.flush_handle = Some(flush_handle);
1074 info!("Frame batching coordinator started");
1075 Ok(())
1076 }
1077
1078 pub async fn add_frame(
1080 &self,
1081 destination: SocketAddr,
1082 frame_type: u8,
1083 payload: Vec<u8>,
1084 priority: FramePriority,
1085 ) -> Result<Option<Vec<u8>>, Box<dyn std::error::Error + Send + Sync>> {
1086 let frame = BatchedFrame {
1087 frame_type,
1088 size: payload.len(),
1089 payload,
1090 priority,
1091 created_at: Instant::now(),
1092 };
1093
1094 let mut pending = self.pending_frames.lock().unwrap();
1095
1096 let (should_flush, frames_count, total_size) = {
1098 let batch_set = pending
1099 .entry(destination)
1100 .or_insert_with(|| BatchedFrameSet {
1101 frames: Vec::new(),
1102 total_size: 0,
1103 created_at: Instant::now(),
1104 destination,
1105 priority: BatchPriority::Normal,
1106 });
1107
1108 batch_set.frames.push(frame);
1109 batch_set.total_size += batch_set.frames.last().unwrap().size;
1110
1111 if priority == FramePriority::Urgent {
1113 batch_set.priority = BatchPriority::High;
1114 }
1115
1116 let should_flush = self.should_flush_batch(batch_set);
1118 (should_flush, batch_set.frames.len(), batch_set.total_size)
1119 };
1120
1121 if should_flush {
1122 if let Some(batch_set) = pending.remove(&destination) {
1124 let batch_data = self.serialize_batch(&batch_set);
1125
1126 {
1128 let mut stats = self.stats.lock().unwrap();
1129 stats.batches_sent += 1;
1130 stats.frames_batched += frames_count as u64;
1131 stats.avg_batch_size = (stats.avg_batch_size * (stats.batches_sent - 1) as f64
1132 + total_size as f64)
1133 / stats.batches_sent as f64;
1134 }
1135
1136 debug!(
1137 "Flushed batch to {} with {} frames ({} bytes)",
1138 destination, frames_count, total_size
1139 );
1140 return Ok(Some(batch_data));
1141 }
1142 }
1143
1144 Ok(None)
1145 }
1146
1147 fn should_flush_batch(&self, batch_set: &BatchedFrameSet) -> bool {
1149 let now = Instant::now();
1150 let age = now.duration_since(batch_set.created_at);
1151
1152 batch_set.total_size >= self.config.max_batch_size
1154 || batch_set.frames.len() >= self.config.max_frames_per_batch
1155 || age >= self.config.max_batch_delay
1156 || batch_set.priority == BatchPriority::High
1157 }
1158
1159 fn serialize_batch(&self, batch_set: &BatchedFrameSet) -> Vec<u8> {
1161 let mut data = Vec::with_capacity(batch_set.total_size + batch_set.frames.len() * 4);
1162
1163 for frame in &batch_set.frames {
1164 data.push(frame.frame_type);
1166 data.extend_from_slice(&(frame.payload.len() as u16).to_be_bytes());
1167 data.extend_from_slice(&frame.payload);
1169 }
1170
1171 data
1172 }
1173
1174 async fn flush_expired_batches(
1176 pending_frames: &Arc<Mutex<HashMap<SocketAddr, BatchedFrameSet>>>,
1177 stats: &Arc<Mutex<FrameBatchingStats>>,
1178 config: &FrameBatchingConfig,
1179 ) {
1180 let now = Instant::now();
1181 let mut to_flush = Vec::new();
1182
1183 {
1185 let pending = pending_frames.lock().unwrap();
1186 for (destination, batch_set) in pending.iter() {
1187 let age = now.duration_since(batch_set.created_at);
1188 if age >= config.max_batch_delay {
1189 to_flush.push((*destination, batch_set.frames.len(), batch_set.total_size));
1190 }
1191 }
1192 }
1193
1194 if !to_flush.is_empty() {
1196 let mut pending = pending_frames.lock().unwrap();
1197 let flush_count = to_flush.len();
1198 for (destination, frame_count, total_size) in to_flush {
1199 pending.remove(&destination);
1200
1201 let mut stats_guard = stats.lock().unwrap();
1203 stats_guard.batches_sent += 1;
1204 stats_guard.frames_batched += frame_count as u64;
1205 stats_guard.avg_batch_size = (stats_guard.avg_batch_size
1206 * (stats_guard.batches_sent - 1) as f64
1207 + total_size as f64)
1208 / stats_guard.batches_sent as f64;
1209 }
1210
1211 debug!("Flushed {} expired batches", flush_count);
1212 }
1213 }
1214
1215 pub async fn get_stats(&self) -> FrameBatchingStats {
1217 self.stats.lock().unwrap().clone()
1218 }
1219
1220 pub async fn shutdown(&mut self) {
1222 if let Some(handle) = self.flush_handle.take() {
1223 handle.abort();
1224 }
1225
1226 {
1228 let mut pending = self.pending_frames.lock().unwrap();
1229 pending.clear();
1230 }
1231
1232 info!("Frame batching coordinator shutdown complete");
1233 }
1234}
1235
1236#[derive(Debug)]
1238pub struct MemoryOptimizationManager {
1239 connection_pool: ConnectionPool,
1240 candidate_cache: CandidateCache,
1241 session_cleanup: SessionCleanupCoordinator,
1242 frame_batching: FrameBatchingCoordinator,
1243 is_running: bool,
1244}
1245
1246impl MemoryOptimizationManager {
1247 pub fn new() -> Self {
1249 Self {
1250 connection_pool: ConnectionPool::new(ConnectionPoolConfig::default()),
1251 candidate_cache: CandidateCache::new(CandidateCacheConfig::default()),
1252 session_cleanup: SessionCleanupCoordinator::new(SessionCleanupConfig::default()),
1253 frame_batching: FrameBatchingCoordinator::new(FrameBatchingConfig::default()),
1254 is_running: false,
1255 }
1256 }
1257
1258 pub fn with_configs(
1260 pool_config: ConnectionPoolConfig,
1261 cache_config: CandidateCacheConfig,
1262 cleanup_config: SessionCleanupConfig,
1263 batching_config: FrameBatchingConfig,
1264 ) -> Self {
1265 Self {
1266 connection_pool: ConnectionPool::new(pool_config),
1267 candidate_cache: CandidateCache::new(cache_config),
1268 session_cleanup: SessionCleanupCoordinator::new(cleanup_config),
1269 frame_batching: FrameBatchingCoordinator::new(batching_config),
1270 is_running: false,
1271 }
1272 }
1273
1274 pub async fn start(&mut self) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
1276 if self.is_running {
1277 return Ok(());
1278 }
1279
1280 self.connection_pool.start().await?;
1281 self.candidate_cache.start().await?;
1282 self.session_cleanup.start().await?;
1283 self.frame_batching.start().await?;
1284
1285 self.is_running = true;
1286 info!("Memory optimization manager started");
1287 Ok(())
1288 }
1289
1290 pub fn connection_pool(&self) -> &ConnectionPool {
1292 &self.connection_pool
1293 }
1294
1295 pub fn candidate_cache(&self) -> &CandidateCache {
1297 &self.candidate_cache
1298 }
1299
1300 pub fn session_cleanup(&self) -> &SessionCleanupCoordinator {
1302 &self.session_cleanup
1303 }
1304
1305 pub fn frame_batching(&self) -> &FrameBatchingCoordinator {
1307 &self.frame_batching
1308 }
1309
1310 pub async fn get_comprehensive_stats(&self) -> MemoryOptimizationStats {
1312 MemoryOptimizationStats {
1313 connection_pool: self.connection_pool.get_stats().await,
1314 candidate_cache: self.candidate_cache.get_stats().await,
1315 session_cleanup: self.session_cleanup.get_stats().await,
1316 frame_batching: self.frame_batching.get_stats().await,
1317 }
1318 }
1319
1320 pub async fn shutdown(&mut self) {
1322 if !self.is_running {
1323 return;
1324 }
1325
1326 self.connection_pool.shutdown().await;
1327 self.candidate_cache.shutdown().await;
1328 self.session_cleanup.shutdown().await;
1329 self.frame_batching.shutdown().await;
1330
1331 self.is_running = false;
1332 info!("Memory optimization manager shutdown complete");
1333 }
1334}
1335
1336#[derive(Debug, Clone)]
1338pub struct MemoryOptimizationStats {
1339 pub connection_pool: ConnectionPoolStats,
1340 pub candidate_cache: CandidateCacheStats,
1341 pub session_cleanup: SessionCleanupStats,
1342 pub frame_batching: FrameBatchingStats,
1343}
1344
1345impl Default for MemoryOptimizationManager {
1346 fn default() -> Self {
1347 Self::new()
1348 }
1349}