1use std::{
10 collections::HashMap,
11 net::SocketAddr,
12 sync::{Arc, Mutex, RwLock},
13 time::{Duration, Instant},
14};
15
16use tracing::{debug, info};
17
18use crate::{Endpoint as QuinnEndpoint, HighLevelConnection as QuinnConnection};
19
20use crate::{
21 VarInt,
22 nat_traversal_api::{CandidateAddress, PeerId},
23};
24
25#[derive(Debug)]
27pub struct ConnectionPool {
28 active_connections: Arc<RwLock<HashMap<PeerId, PooledConnection>>>,
30 config: ConnectionPoolConfig,
32 stats: Arc<Mutex<ConnectionPoolStats>>,
34 cleanup_handle: Option<tokio::task::JoinHandle<()>>,
36}
37
38#[derive(Debug, Clone)]
40pub struct ConnectionPoolConfig {
41 pub max_connections: usize,
43 pub max_idle_time: Duration,
45 pub cleanup_interval: Duration,
47 pub enable_reuse: bool,
49 pub max_connection_age: Duration,
51}
52
53#[derive(Debug)]
55struct PooledConnection {
56 connection: Arc<QuinnConnection>,
57 peer_id: PeerId,
58 remote_address: SocketAddr,
59 created_at: Instant,
60 last_used: Instant,
61 use_count: u64,
62 is_active: bool,
63}
64
65#[derive(Debug, Default, Clone)]
67pub struct ConnectionPoolStats {
68 pub connections_created: u64,
70 pub connections_reused: u64,
72 pub connections_expired: u64,
74 pub active_connections: usize,
76 pub hit_rate: f64,
78 pub avg_connection_age: Duration,
80}
81
82#[derive(Debug)]
84pub struct CandidateCache {
85 cache: Arc<RwLock<HashMap<PeerId, CachedCandidateSet>>>,
87 config: CandidateCacheConfig,
89 stats: Arc<Mutex<CandidateCacheStats>>,
91 cleanup_handle: Option<tokio::task::JoinHandle<()>>,
93}
94
95#[derive(Debug, Clone)]
97pub struct CandidateCacheConfig {
98 pub default_ttl: Duration,
100 pub max_cache_size: usize,
102 pub cleanup_interval: Duration,
104 pub enable_validation_cache: bool,
106 pub validation_ttl: Duration,
108}
109
110#[derive(Debug, Clone)]
112struct CachedCandidateSet {
113 candidates: Vec<CandidateAddress>,
114 cached_at: Instant,
115 ttl: Duration,
116 access_count: u64,
117 last_accessed: Instant,
118 validation_results: HashMap<SocketAddr, ValidationCacheEntry>,
119}
120
121#[derive(Debug, Clone)]
123struct ValidationCacheEntry {
124 is_valid: bool,
125 rtt: Option<Duration>,
126 cached_at: Instant,
127 ttl: Duration,
128}
129
130#[derive(Debug, Default, Clone)]
132pub struct CandidateCacheStats {
133 pub cache_hits: u64,
135 pub cache_misses: u64,
137 pub entries_expired: u64,
139 pub current_size: usize,
141 pub hit_rate: f64,
143 pub avg_entry_age: Duration,
145}
146
147#[derive(Debug)]
149pub struct SessionCleanupCoordinator {
150 active_sessions: Arc<RwLock<HashMap<PeerId, SessionState>>>,
152 config: SessionCleanupConfig,
154 stats: Arc<Mutex<SessionCleanupStats>>,
156 cleanup_handle: Option<tokio::task::JoinHandle<()>>,
158}
159
160#[derive(Debug, Clone)]
162pub struct SessionCleanupConfig {
163 pub max_idle_time: Duration,
165 pub max_session_age: Duration,
167 pub cleanup_interval: Duration,
169 pub enable_aggressive_cleanup: bool,
171 pub memory_pressure_threshold: usize,
173}
174
175#[derive(Debug)]
177struct SessionState {
178 peer_id: PeerId,
179 created_at: Instant,
180 last_activity: Instant,
181 memory_usage: usize,
182 is_active: bool,
183 cleanup_priority: CleanupPriority,
184}
185
186#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord)]
188pub enum CleanupPriority {
189 Low, Normal, High, }
193
194#[derive(Debug, Default, Clone)]
196pub struct SessionCleanupStats {
197 pub sessions_cleaned: u64,
199 pub memory_freed: u64,
201 pub active_sessions: usize,
203 pub avg_session_lifetime: Duration,
205}
206
207#[derive(Debug)]
209pub struct FrameBatchingCoordinator {
210 pending_frames: Arc<Mutex<HashMap<SocketAddr, BatchedFrameSet>>>,
212 config: FrameBatchingConfig,
214 stats: Arc<Mutex<FrameBatchingStats>>,
216 flush_handle: Option<tokio::task::JoinHandle<()>>,
218}
219
220#[derive(Debug, Clone)]
222pub struct FrameBatchingConfig {
223 pub max_batch_size: usize,
225 pub max_batch_delay: Duration,
227 pub max_frames_per_batch: usize,
229 pub enable_adaptive_batching: bool,
231 pub min_batch_size: usize,
233}
234
235#[derive(Debug)]
237struct BatchedFrameSet {
238 frames: Vec<BatchedFrame>,
239 total_size: usize,
240 created_at: Instant,
241 destination: SocketAddr,
242 priority: BatchPriority,
243}
244
245#[derive(Debug)]
247struct BatchedFrame {
248 frame_type: u8,
249 payload: Vec<u8>,
250 size: usize,
251 priority: FramePriority,
252 created_at: Instant,
253}
254
255#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord)]
257enum BatchPriority {
258 Low, Normal, High, }
262
263#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord)]
265pub enum FramePriority {
266 Background, Normal, Urgent, }
270
271#[derive(Debug, Default, Clone)]
273pub struct FrameBatchingStats {
274 pub frames_batched: u64,
276 pub batches_sent: u64,
278 pub avg_batch_size: f64,
280 pub bytes_saved: u64,
282 pub batching_efficiency: f64,
284}
285
286impl Default for ConnectionPoolConfig {
287 fn default() -> Self {
288 Self {
289 max_connections: 1000,
290 max_idle_time: Duration::from_secs(300), cleanup_interval: Duration::from_secs(60), enable_reuse: true,
293 max_connection_age: Duration::from_secs(3600), }
295 }
296}
297
298impl Default for CandidateCacheConfig {
299 fn default() -> Self {
300 Self {
301 default_ttl: Duration::from_secs(300), max_cache_size: 10000,
303 cleanup_interval: Duration::from_secs(60), enable_validation_cache: true,
305 validation_ttl: Duration::from_secs(60), }
307 }
308}
309
310impl Default for SessionCleanupConfig {
311 fn default() -> Self {
312 Self {
313 max_idle_time: Duration::from_secs(600), max_session_age: Duration::from_secs(3600), cleanup_interval: Duration::from_secs(120), enable_aggressive_cleanup: true,
317 memory_pressure_threshold: 512, }
319 }
320}
321
322impl Default for FrameBatchingConfig {
323 fn default() -> Self {
324 Self {
325 max_batch_size: 1200, max_batch_delay: Duration::from_millis(10), max_frames_per_batch: 10,
328 enable_adaptive_batching: true,
329 min_batch_size: 200, }
331 }
332}
333
334impl ConnectionPool {
335 pub fn new(config: ConnectionPoolConfig) -> Self {
337 let pool = Self {
338 active_connections: Arc::new(RwLock::new(HashMap::new())),
339 config,
340 stats: Arc::new(Mutex::new(ConnectionPoolStats::default())),
341 cleanup_handle: None,
342 };
343
344 pool
345 }
346
347 pub async fn start(&mut self) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
349 let connections = Arc::clone(&self.active_connections);
350 let stats = Arc::clone(&self.stats);
351 let config = self.config.clone();
352
353 let cleanup_handle = tokio::spawn(async move {
354 let mut interval = tokio::time::interval(config.cleanup_interval);
355
356 loop {
357 interval.tick().await;
358 Self::cleanup_expired_connections(&connections, &stats, &config).await;
359 }
360 });
361
362 self.cleanup_handle = Some(cleanup_handle);
363 info!(
364 "Connection pool started with max_connections={}",
365 self.config.max_connections
366 );
367 Ok(())
368 }
369
370 pub async fn get_connection(
372 &self,
373 peer_id: PeerId,
374 remote_address: SocketAddr,
375 endpoint: &QuinnEndpoint,
376 ) -> Result<Arc<QuinnConnection>, Box<dyn std::error::Error + Send + Sync>> {
377 if let Some(connection) = self
379 .try_get_existing_connection(peer_id, remote_address)
380 .await
381 {
382 self.update_stats_hit().await;
383 return Ok(connection);
384 }
385
386 let connection = self
388 .create_new_connection(peer_id, remote_address, endpoint)
389 .await?;
390 self.update_stats_miss().await;
391 Ok(connection)
392 }
393
394 async fn try_get_existing_connection(
396 &self,
397 peer_id: PeerId,
398 remote_address: SocketAddr,
399 ) -> Option<Arc<QuinnConnection>> {
400 let mut connections = self.active_connections.write().unwrap();
401
402 if let Some(pooled) = connections.get_mut(&peer_id) {
403 if pooled.is_active && pooled.remote_address == remote_address {
404 if pooled.connection.close_reason().is_none() {
406 pooled.last_used = Instant::now();
407 pooled.use_count += 1;
408 debug!("Reusing pooled connection for peer {:?}", peer_id);
409 return Some(Arc::clone(&pooled.connection));
410 } else {
411 connections.remove(&peer_id);
413 }
414 }
415 }
416
417 None
418 }
419
420 async fn create_new_connection(
422 &self,
423 peer_id: PeerId,
424 remote_address: SocketAddr,
425 endpoint: &QuinnEndpoint,
426 ) -> Result<Arc<QuinnConnection>, Box<dyn std::error::Error + Send + Sync>> {
427 {
429 let connections = self.active_connections.read().unwrap();
430 if connections.len() >= self.config.max_connections {
431 drop(connections);
433 self.evict_lru_connection().await;
434 }
435 }
436
437 let rustls_config = rustls::ClientConfig::builder()
439 .with_root_certificates(rustls::RootCertStore::empty())
440 .with_no_client_auth();
441
442 let client_crypto = crate::crypto::rustls::QuicClientConfig::try_from(rustls_config)
443 .map_err(|e| format!("Failed to create QUIC client config: {}", e))?;
444
445 let client_config = crate::ClientConfig::new(Arc::new(client_crypto));
446
447 let connecting = endpoint
448 .connect_with(client_config, remote_address, "ant-quic")
449 .map_err(|e| format!("Failed to initiate connection: {}", e))?;
450
451 let connection = connecting
453 .await
454 .map_err(|e| format!("Connection failed: {}", e))?;
455
456 let connection_arc = Arc::new(connection);
457
458 let pooled = PooledConnection {
459 connection: Arc::clone(&connection_arc),
460 peer_id,
461 remote_address,
462 created_at: Instant::now(),
463 last_used: Instant::now(),
464 use_count: 1,
465 is_active: true,
466 };
467
468 {
470 let mut connections = self.active_connections.write().unwrap();
471 connections.insert(peer_id, pooled);
472 }
473
474 {
476 let mut stats = self.stats.lock().unwrap();
477 stats.connections_created += 1;
478 stats.active_connections += 1;
479 }
480
481 info!("Created new pooled connection for peer {:?}", peer_id);
482 Ok(connection_arc)
483 }
484
485 async fn evict_lru_connection(&self) {
487 let mut connections = self.active_connections.write().unwrap();
488
489 if let Some((lru_peer_id, _)) = connections
490 .iter()
491 .min_by_key(|(_, pooled)| pooled.last_used)
492 .map(|(peer_id, pooled)| (*peer_id, pooled.last_used))
493 {
494 connections.remove(&lru_peer_id);
495 debug!("Evicted LRU connection for peer {:?}", lru_peer_id);
496
497 let mut stats = self.stats.lock().unwrap();
499 stats.active_connections = stats.active_connections.saturating_sub(1);
500 }
501 }
502
503 async fn update_stats_hit(&self) {
505 let mut stats = self.stats.lock().unwrap();
506 stats.connections_reused += 1;
507 let total_requests = stats.connections_created + stats.connections_reused;
508 stats.hit_rate = stats.connections_reused as f64 / total_requests as f64;
509 }
510
511 async fn update_stats_miss(&self) {
513 let stats = self.stats.lock().unwrap();
514 let total_requests = stats.connections_created + stats.connections_reused;
515 drop(stats);
516
517 let mut stats = self.stats.lock().unwrap();
518 stats.hit_rate = stats.connections_reused as f64 / total_requests as f64;
519 }
520
521 async fn cleanup_expired_connections(
523 connections: &Arc<RwLock<HashMap<PeerId, PooledConnection>>>,
524 stats: &Arc<Mutex<ConnectionPoolStats>>,
525 config: &ConnectionPoolConfig,
526 ) {
527 let now = Instant::now();
528 let mut to_remove = Vec::new();
529
530 {
532 let connections_read = connections.read().unwrap();
533 for (peer_id, pooled) in connections_read.iter() {
534 let idle_time = now.duration_since(pooled.last_used);
535 let age = now.duration_since(pooled.created_at);
536
537 if idle_time > config.max_idle_time || age > config.max_connection_age {
538 to_remove.push(*peer_id);
539 }
540
541 if pooled.connection.close_reason().is_some() {
543 to_remove.push(*peer_id);
544 }
545 }
546 }
547
548 if !to_remove.is_empty() {
550 let mut connections_write = connections.write().unwrap();
551 for peer_id in &to_remove {
552 connections_write.remove(peer_id);
553 }
554
555 let mut stats_guard = stats.lock().unwrap();
557 stats_guard.connections_expired += to_remove.len() as u64;
558 stats_guard.active_connections = connections_write.len();
559
560 debug!("Cleaned up {} expired connections", to_remove.len());
561 }
562 }
563
564 pub async fn get_stats(&self) -> ConnectionPoolStats {
566 self.stats.lock().unwrap().clone()
567 }
568
569 pub async fn shutdown(&mut self) {
571 if let Some(handle) = self.cleanup_handle.take() {
572 handle.abort();
573 }
574
575 {
577 let connections = self.active_connections.read().unwrap();
578 for (_, pooled) in connections.iter() {
579 pooled.connection.close(VarInt::from_u32(0), b"shutdown");
580 }
581 }
582
583 info!("Connection pool shutdown complete");
584 }
585}
586
587impl CandidateCache {
588 pub fn new(config: CandidateCacheConfig) -> Self {
590 let cache = Self {
591 cache: Arc::new(RwLock::new(HashMap::new())),
592 config,
593 stats: Arc::new(Mutex::new(CandidateCacheStats::default())),
594 cleanup_handle: None,
595 };
596
597 cache
598 }
599
600 pub async fn start(&mut self) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
602 let cache = Arc::clone(&self.cache);
603 let stats = Arc::clone(&self.stats);
604 let config = self.config.clone();
605
606 let cleanup_handle = tokio::spawn(async move {
607 let mut interval = tokio::time::interval(config.cleanup_interval);
608
609 loop {
610 interval.tick().await;
611 Self::cleanup_expired_entries(&cache, &stats, &config).await;
612 }
613 });
614
615 self.cleanup_handle = Some(cleanup_handle);
616 info!(
617 "Candidate cache started with max_size={}",
618 self.config.max_cache_size
619 );
620 Ok(())
621 }
622
623 pub async fn get_candidates(&self, peer_id: PeerId) -> Option<Vec<CandidateAddress>> {
625 let (is_valid, candidates) = {
626 let cache = self.cache.read().unwrap();
627
628 if let Some(cached_set) = cache.get(&peer_id) {
629 let now = Instant::now();
630
631 if now.duration_since(cached_set.cached_at) <= cached_set.ttl {
633 (true, Some(cached_set.candidates.clone()))
634 } else {
635 (false, None)
636 }
637 } else {
638 (false, None)
639 }
640 };
641
642 if is_valid {
643 self.update_access_stats(peer_id, true).await;
645
646 if let Some(ref candidates) = candidates {
647 debug!(
648 "Cache hit for peer {:?}, {} candidates",
649 peer_id,
650 candidates.len()
651 );
652 }
653 return candidates;
654 }
655
656 self.update_access_stats(peer_id, false).await;
658 debug!("Cache miss for peer {:?}", peer_id);
659 None
660 }
661
662 pub async fn cache_candidates(
664 &self,
665 peer_id: PeerId,
666 candidates: Vec<CandidateAddress>,
667 ttl: Option<Duration>,
668 ) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
669 let ttl = ttl.unwrap_or(self.config.default_ttl);
670
671 {
673 let cache = self.cache.read().unwrap();
674 if cache.len() >= self.config.max_cache_size {
675 drop(cache);
676 self.evict_lru_entry().await;
677 }
678 }
679
680 let candidate_count = candidates.len();
681 let cached_set = CachedCandidateSet {
682 candidates,
683 cached_at: Instant::now(),
684 ttl,
685 access_count: 0,
686 last_accessed: Instant::now(),
687 validation_results: HashMap::new(),
688 };
689
690 {
692 let mut cache = self.cache.write().unwrap();
693 cache.insert(peer_id, cached_set);
694 }
695
696 {
698 let mut stats = self.stats.lock().unwrap();
699 stats.current_size += 1;
700 }
701
702 debug!(
703 "Cached {} candidates for peer {:?} with TTL {:?}",
704 candidate_count, peer_id, ttl
705 );
706 Ok(())
707 }
708
709 pub async fn cache_validation_result(
711 &self,
712 peer_id: PeerId,
713 address: SocketAddr,
714 is_valid: bool,
715 rtt: Option<Duration>,
716 ) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
717 if !self.config.enable_validation_cache {
718 return Ok(());
719 }
720
721 let mut cache = self.cache.write().unwrap();
722
723 if let Some(cached_set) = cache.get_mut(&peer_id) {
724 let validation_entry = ValidationCacheEntry {
725 is_valid,
726 rtt,
727 cached_at: Instant::now(),
728 ttl: self.config.validation_ttl,
729 };
730
731 cached_set
732 .validation_results
733 .insert(address, validation_entry);
734 debug!(
735 "Cached validation result for {}:{} -> {}",
736 peer_id.0[0], address, is_valid
737 );
738 }
739
740 Ok(())
741 }
742
743 pub async fn get_validation_result(
745 &self,
746 peer_id: PeerId,
747 address: SocketAddr,
748 ) -> Option<(bool, Option<Duration>)> {
749 if !self.config.enable_validation_cache {
750 return None;
751 }
752
753 let cache = self.cache.read().unwrap();
754
755 if let Some(cached_set) = cache.get(&peer_id) {
756 if let Some(validation_entry) = cached_set.validation_results.get(&address) {
757 let now = Instant::now();
758
759 if now.duration_since(validation_entry.cached_at) <= validation_entry.ttl {
761 return Some((validation_entry.is_valid, validation_entry.rtt));
762 }
763 }
764 }
765
766 None
767 }
768
769 async fn update_access_stats(&self, peer_id: PeerId, hit: bool) {
771 {
773 let mut stats = self.stats.lock().unwrap();
774 if hit {
775 stats.cache_hits += 1;
776 } else {
777 stats.cache_misses += 1;
778 }
779
780 let total_accesses = stats.cache_hits + stats.cache_misses;
781 stats.hit_rate = stats.cache_hits as f64 / total_accesses as f64;
782 }
783
784 if hit {
786 let mut cache = self.cache.write().unwrap();
787 if let Some(cached_set) = cache.get_mut(&peer_id) {
788 cached_set.access_count += 1;
789 cached_set.last_accessed = Instant::now();
790 }
791 }
792 }
793
794 async fn evict_lru_entry(&self) {
796 let mut cache = self.cache.write().unwrap();
797
798 if let Some((lru_peer_id, _)) = cache
799 .iter()
800 .min_by_key(|(_, cached_set)| cached_set.last_accessed)
801 .map(|(peer_id, cached_set)| (*peer_id, cached_set.last_accessed))
802 {
803 cache.remove(&lru_peer_id);
804 debug!("Evicted LRU cache entry for peer {:?}", lru_peer_id);
805
806 let mut stats = self.stats.lock().unwrap();
808 stats.current_size = stats.current_size.saturating_sub(1);
809 }
810 }
811
812 async fn cleanup_expired_entries(
814 cache: &Arc<RwLock<HashMap<PeerId, CachedCandidateSet>>>,
815 stats: &Arc<Mutex<CandidateCacheStats>>,
816 _config: &CandidateCacheConfig,
817 ) {
818 let now = Instant::now();
819 let mut to_remove = Vec::new();
820
821 {
823 let cache_read = cache.read().unwrap();
824 for (peer_id, cached_set) in cache_read.iter() {
825 let age = now.duration_since(cached_set.cached_at);
826 if age > cached_set.ttl {
827 to_remove.push(*peer_id);
828 }
829 }
830 }
831
832 if !to_remove.is_empty() {
834 let mut cache_write = cache.write().unwrap();
835 for peer_id in &to_remove {
836 cache_write.remove(peer_id);
837 }
838
839 let mut stats_guard = stats.lock().unwrap();
841 stats_guard.entries_expired += to_remove.len() as u64;
842 stats_guard.current_size = cache_write.len();
843
844 debug!("Cleaned up {} expired cache entries", to_remove.len());
845 }
846
847 {
849 let mut cache_write = cache.write().unwrap();
850 for cached_set in cache_write.values_mut() {
851 cached_set.validation_results.retain(|_, validation_entry| {
852 now.duration_since(validation_entry.cached_at) <= validation_entry.ttl
853 });
854 }
855 }
856 }
857
858 pub async fn get_stats(&self) -> CandidateCacheStats {
860 self.stats.lock().unwrap().clone()
861 }
862
863 pub async fn shutdown(&mut self) {
865 if let Some(handle) = self.cleanup_handle.take() {
866 handle.abort();
867 }
868
869 {
871 let mut cache = self.cache.write().unwrap();
872 cache.clear();
873 }
874
875 info!("Candidate cache shutdown complete");
876 }
877}
878
879impl SessionCleanupCoordinator {
880 pub fn new(config: SessionCleanupConfig) -> Self {
882 Self {
883 active_sessions: Arc::new(RwLock::new(HashMap::new())),
884 config,
885 stats: Arc::new(Mutex::new(SessionCleanupStats::default())),
886 cleanup_handle: None,
887 }
888 }
889
890 pub async fn start(&mut self) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
892 let sessions = Arc::clone(&self.active_sessions);
893 let stats = Arc::clone(&self.stats);
894 let config = self.config.clone();
895
896 let cleanup_handle = tokio::spawn(async move {
897 let mut interval = tokio::time::interval(config.cleanup_interval);
898
899 loop {
900 interval.tick().await;
901 Self::cleanup_expired_sessions(&sessions, &stats, &config).await;
902 }
903 });
904
905 self.cleanup_handle = Some(cleanup_handle);
906 info!("Session cleanup coordinator started");
907 Ok(())
908 }
909
910 pub async fn register_session(
912 &self,
913 peer_id: PeerId,
914 memory_usage: usize,
915 priority: CleanupPriority,
916 ) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
917 let session_state = SessionState {
918 peer_id,
919 created_at: Instant::now(),
920 last_activity: Instant::now(),
921 memory_usage,
922 is_active: true,
923 cleanup_priority: priority,
924 };
925
926 {
927 let mut sessions = self.active_sessions.write().unwrap();
928 sessions.insert(peer_id, session_state);
929 }
930
931 {
933 let mut stats = self.stats.lock().unwrap();
934 stats.active_sessions += 1;
935 }
936
937 debug!(
938 "Registered session for peer {:?} with {} bytes",
939 peer_id, memory_usage
940 );
941 Ok(())
942 }
943
944 pub async fn update_session_activity(&self, peer_id: PeerId) {
946 let mut sessions = self.active_sessions.write().unwrap();
947 if let Some(session) = sessions.get_mut(&peer_id) {
948 session.last_activity = Instant::now();
949 }
950 }
951
952 async fn cleanup_expired_sessions(
954 sessions: &Arc<RwLock<HashMap<PeerId, SessionState>>>,
955 stats: &Arc<Mutex<SessionCleanupStats>>,
956 config: &SessionCleanupConfig,
957 ) {
958 let now = Instant::now();
959 let mut to_remove = Vec::new();
960 let mut memory_freed = 0u64;
961
962 let memory_pressure = Self::check_memory_pressure(config);
964
965 {
967 let sessions_read = sessions.read().unwrap();
968 for (peer_id, session) in sessions_read.iter() {
969 let idle_time = now.duration_since(session.last_activity);
970 let age = now.duration_since(session.created_at);
971
972 let should_cleanup = if memory_pressure && config.enable_aggressive_cleanup {
973 match session.cleanup_priority {
975 CleanupPriority::High => idle_time > Duration::from_secs(30),
976 CleanupPriority::Normal => idle_time > Duration::from_secs(60),
977 CleanupPriority::Low => idle_time > config.max_idle_time / 2,
978 }
979 } else {
980 idle_time > config.max_idle_time || age > config.max_session_age
982 };
983
984 if should_cleanup {
985 to_remove.push(*peer_id);
986 memory_freed += session.memory_usage as u64;
987 }
988 }
989 }
990
991 if !to_remove.is_empty() {
993 let mut sessions_write = sessions.write().unwrap();
994 for peer_id in &to_remove {
995 sessions_write.remove(peer_id);
996 }
997
998 let mut stats_guard = stats.lock().unwrap();
1000 stats_guard.sessions_cleaned += to_remove.len() as u64;
1001 stats_guard.memory_freed += memory_freed;
1002 stats_guard.active_sessions = sessions_write.len();
1003
1004 if memory_pressure {
1005 info!(
1006 "Aggressive cleanup: removed {} sessions, freed {} bytes",
1007 to_remove.len(),
1008 memory_freed
1009 );
1010 } else {
1011 debug!(
1012 "Regular cleanup: removed {} sessions, freed {} bytes",
1013 to_remove.len(),
1014 memory_freed
1015 );
1016 }
1017 }
1018 }
1019
1020 fn check_memory_pressure(config: &SessionCleanupConfig) -> bool {
1022 let _ = config.memory_pressure_threshold;
1027 false
1028 }
1029
1030 pub async fn get_stats(&self) -> SessionCleanupStats {
1032 self.stats.lock().unwrap().clone()
1033 }
1034
1035 pub async fn shutdown(&mut self) {
1037 if let Some(handle) = self.cleanup_handle.take() {
1038 handle.abort();
1039 }
1040
1041 {
1043 let mut sessions = self.active_sessions.write().unwrap();
1044 sessions.clear();
1045 }
1046
1047 info!("Session cleanup coordinator shutdown complete");
1048 }
1049}
1050
1051impl FrameBatchingCoordinator {
1052 pub fn new(config: FrameBatchingConfig) -> Self {
1054 Self {
1055 pending_frames: Arc::new(Mutex::new(HashMap::new())),
1056 config,
1057 stats: Arc::new(Mutex::new(FrameBatchingStats::default())),
1058 flush_handle: None,
1059 }
1060 }
1061
1062 pub async fn start(&mut self) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
1064 let pending_frames = Arc::clone(&self.pending_frames);
1065 let stats = Arc::clone(&self.stats);
1066 let config = self.config.clone();
1067
1068 let flush_handle = tokio::spawn(async move {
1069 let mut interval = tokio::time::interval(config.max_batch_delay / 4);
1070
1071 loop {
1072 interval.tick().await;
1073 Self::flush_expired_batches(&pending_frames, &stats, &config).await;
1074 }
1075 });
1076
1077 self.flush_handle = Some(flush_handle);
1078 info!("Frame batching coordinator started");
1079 Ok(())
1080 }
1081
1082 pub async fn add_frame(
1084 &self,
1085 destination: SocketAddr,
1086 frame_type: u8,
1087 payload: Vec<u8>,
1088 priority: FramePriority,
1089 ) -> Result<Option<Vec<u8>>, Box<dyn std::error::Error + Send + Sync>> {
1090 let frame = BatchedFrame {
1091 frame_type,
1092 size: payload.len(),
1093 payload,
1094 priority,
1095 created_at: Instant::now(),
1096 };
1097
1098 let mut pending = self.pending_frames.lock().unwrap();
1099
1100 let (should_flush, frames_count, total_size) = {
1102 let batch_set = pending
1103 .entry(destination)
1104 .or_insert_with(|| BatchedFrameSet {
1105 frames: Vec::new(),
1106 total_size: 0,
1107 created_at: Instant::now(),
1108 destination,
1109 priority: BatchPriority::Normal,
1110 });
1111
1112 batch_set.frames.push(frame);
1113 batch_set.total_size += batch_set.frames.last().unwrap().size;
1114
1115 if priority == FramePriority::Urgent {
1117 batch_set.priority = BatchPriority::High;
1118 }
1119
1120 let should_flush = self.should_flush_batch(batch_set);
1122 (should_flush, batch_set.frames.len(), batch_set.total_size)
1123 };
1124
1125 if should_flush {
1126 if let Some(batch_set) = pending.remove(&destination) {
1128 let batch_data = self.serialize_batch(&batch_set);
1129
1130 {
1132 let mut stats = self.stats.lock().unwrap();
1133 stats.batches_sent += 1;
1134 stats.frames_batched += frames_count as u64;
1135 stats.avg_batch_size = (stats.avg_batch_size * (stats.batches_sent - 1) as f64
1136 + total_size as f64)
1137 / stats.batches_sent as f64;
1138 }
1139
1140 debug!(
1141 "Flushed batch to {} with {} frames ({} bytes)",
1142 destination, frames_count, total_size
1143 );
1144 return Ok(Some(batch_data));
1145 }
1146 }
1147
1148 Ok(None)
1149 }
1150
1151 fn should_flush_batch(&self, batch_set: &BatchedFrameSet) -> bool {
1153 let now = Instant::now();
1154 let age = now.duration_since(batch_set.created_at);
1155
1156 batch_set.total_size >= self.config.max_batch_size
1158 || batch_set.frames.len() >= self.config.max_frames_per_batch
1159 || age >= self.config.max_batch_delay
1160 || batch_set.priority == BatchPriority::High
1161 }
1162
1163 fn serialize_batch(&self, batch_set: &BatchedFrameSet) -> Vec<u8> {
1165 let mut data = Vec::with_capacity(batch_set.total_size + batch_set.frames.len() * 4);
1166
1167 for frame in &batch_set.frames {
1168 data.push(frame.frame_type);
1170 data.extend_from_slice(&(frame.payload.len() as u16).to_be_bytes());
1171 data.extend_from_slice(&frame.payload);
1173 }
1174
1175 data
1176 }
1177
1178 async fn flush_expired_batches(
1180 pending_frames: &Arc<Mutex<HashMap<SocketAddr, BatchedFrameSet>>>,
1181 stats: &Arc<Mutex<FrameBatchingStats>>,
1182 config: &FrameBatchingConfig,
1183 ) {
1184 let now = Instant::now();
1185 let mut to_flush = Vec::new();
1186
1187 {
1189 let pending = pending_frames.lock().unwrap();
1190 for (destination, batch_set) in pending.iter() {
1191 let age = now.duration_since(batch_set.created_at);
1192 if age >= config.max_batch_delay {
1193 to_flush.push((*destination, batch_set.frames.len(), batch_set.total_size));
1194 }
1195 }
1196 }
1197
1198 if !to_flush.is_empty() {
1200 let mut pending = pending_frames.lock().unwrap();
1201 let flush_count = to_flush.len();
1202 for (destination, frame_count, total_size) in to_flush {
1203 pending.remove(&destination);
1204
1205 let mut stats_guard = stats.lock().unwrap();
1207 stats_guard.batches_sent += 1;
1208 stats_guard.frames_batched += frame_count as u64;
1209 stats_guard.avg_batch_size = (stats_guard.avg_batch_size
1210 * (stats_guard.batches_sent - 1) as f64
1211 + total_size as f64)
1212 / stats_guard.batches_sent as f64;
1213 }
1214
1215 debug!("Flushed {} expired batches", flush_count);
1216 }
1217 }
1218
1219 pub async fn get_stats(&self) -> FrameBatchingStats {
1221 self.stats.lock().unwrap().clone()
1222 }
1223
1224 pub async fn shutdown(&mut self) {
1226 if let Some(handle) = self.flush_handle.take() {
1227 handle.abort();
1228 }
1229
1230 {
1232 let mut pending = self.pending_frames.lock().unwrap();
1233 pending.clear();
1234 }
1235
1236 info!("Frame batching coordinator shutdown complete");
1237 }
1238}
1239
1240#[derive(Debug)]
1242pub struct MemoryOptimizationManager {
1243 connection_pool: ConnectionPool,
1244 candidate_cache: CandidateCache,
1245 session_cleanup: SessionCleanupCoordinator,
1246 frame_batching: FrameBatchingCoordinator,
1247 is_running: bool,
1248}
1249
1250impl MemoryOptimizationManager {
1251 pub fn new() -> Self {
1253 Self {
1254 connection_pool: ConnectionPool::new(ConnectionPoolConfig::default()),
1255 candidate_cache: CandidateCache::new(CandidateCacheConfig::default()),
1256 session_cleanup: SessionCleanupCoordinator::new(SessionCleanupConfig::default()),
1257 frame_batching: FrameBatchingCoordinator::new(FrameBatchingConfig::default()),
1258 is_running: false,
1259 }
1260 }
1261
1262 pub fn with_configs(
1264 pool_config: ConnectionPoolConfig,
1265 cache_config: CandidateCacheConfig,
1266 cleanup_config: SessionCleanupConfig,
1267 batching_config: FrameBatchingConfig,
1268 ) -> Self {
1269 Self {
1270 connection_pool: ConnectionPool::new(pool_config),
1271 candidate_cache: CandidateCache::new(cache_config),
1272 session_cleanup: SessionCleanupCoordinator::new(cleanup_config),
1273 frame_batching: FrameBatchingCoordinator::new(batching_config),
1274 is_running: false,
1275 }
1276 }
1277
1278 pub async fn start(&mut self) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
1280 if self.is_running {
1281 return Ok(());
1282 }
1283
1284 self.connection_pool.start().await?;
1285 self.candidate_cache.start().await?;
1286 self.session_cleanup.start().await?;
1287 self.frame_batching.start().await?;
1288
1289 self.is_running = true;
1290 info!("Memory optimization manager started");
1291 Ok(())
1292 }
1293
1294 pub fn connection_pool(&self) -> &ConnectionPool {
1296 &self.connection_pool
1297 }
1298
1299 pub fn candidate_cache(&self) -> &CandidateCache {
1301 &self.candidate_cache
1302 }
1303
1304 pub fn session_cleanup(&self) -> &SessionCleanupCoordinator {
1306 &self.session_cleanup
1307 }
1308
1309 pub fn frame_batching(&self) -> &FrameBatchingCoordinator {
1311 &self.frame_batching
1312 }
1313
1314 pub async fn get_comprehensive_stats(&self) -> MemoryOptimizationStats {
1316 MemoryOptimizationStats {
1317 connection_pool: self.connection_pool.get_stats().await,
1318 candidate_cache: self.candidate_cache.get_stats().await,
1319 session_cleanup: self.session_cleanup.get_stats().await,
1320 frame_batching: self.frame_batching.get_stats().await,
1321 }
1322 }
1323
1324 pub async fn shutdown(&mut self) {
1326 if !self.is_running {
1327 return;
1328 }
1329
1330 self.connection_pool.shutdown().await;
1331 self.candidate_cache.shutdown().await;
1332 self.session_cleanup.shutdown().await;
1333 self.frame_batching.shutdown().await;
1334
1335 self.is_running = false;
1336 info!("Memory optimization manager shutdown complete");
1337 }
1338}
1339
1340#[derive(Debug, Clone)]
1342pub struct MemoryOptimizationStats {
1343 pub connection_pool: ConnectionPoolStats,
1344 pub candidate_cache: CandidateCacheStats,
1345 pub session_cleanup: SessionCleanupStats,
1346 pub frame_batching: FrameBatchingStats,
1347}
1348
1349impl Default for MemoryOptimizationManager {
1350 fn default() -> Self {
1351 Self::new()
1352 }
1353}