1use std::{
10 collections::HashMap,
11 net::SocketAddr,
12 sync::{Arc, Mutex, RwLock},
13 time::{Duration, Instant},
14};
15
16use tracing::{debug, info};
17
18use crate::{HighLevelConnection as QuinnConnection, Endpoint as QuinnEndpoint};
19
20use crate::{
21 nat_traversal_api::{CandidateAddress, PeerId},
22 VarInt,
23};
24
25#[derive(Debug)]
27pub struct ConnectionPool {
28 active_connections: Arc<RwLock<HashMap<PeerId, PooledConnection>>>,
30 config: ConnectionPoolConfig,
32 stats: Arc<Mutex<ConnectionPoolStats>>,
34 cleanup_handle: Option<tokio::task::JoinHandle<()>>,
36}
37
38#[derive(Debug, Clone)]
40pub struct ConnectionPoolConfig {
41 pub max_connections: usize,
43 pub max_idle_time: Duration,
45 pub cleanup_interval: Duration,
47 pub enable_reuse: bool,
49 pub max_connection_age: Duration,
51}
52
53#[derive(Debug)]
55struct PooledConnection {
56 connection: Arc<QuinnConnection>,
57 peer_id: PeerId,
58 remote_address: SocketAddr,
59 created_at: Instant,
60 last_used: Instant,
61 use_count: u64,
62 is_active: bool,
63}
64
65#[derive(Debug, Default, Clone)]
67pub struct ConnectionPoolStats {
68 pub connections_created: u64,
70 pub connections_reused: u64,
72 pub connections_expired: u64,
74 pub active_connections: usize,
76 pub hit_rate: f64,
78 pub avg_connection_age: Duration,
80}
81
82#[derive(Debug)]
84pub struct CandidateCache {
85 cache: Arc<RwLock<HashMap<PeerId, CachedCandidateSet>>>,
87 config: CandidateCacheConfig,
89 stats: Arc<Mutex<CandidateCacheStats>>,
91 cleanup_handle: Option<tokio::task::JoinHandle<()>>,
93}
94
95#[derive(Debug, Clone)]
97pub struct CandidateCacheConfig {
98 pub default_ttl: Duration,
100 pub max_cache_size: usize,
102 pub cleanup_interval: Duration,
104 pub enable_validation_cache: bool,
106 pub validation_ttl: Duration,
108}
109
110#[derive(Debug, Clone)]
112struct CachedCandidateSet {
113 candidates: Vec<CandidateAddress>,
114 cached_at: Instant,
115 ttl: Duration,
116 access_count: u64,
117 last_accessed: Instant,
118 validation_results: HashMap<SocketAddr, ValidationCacheEntry>,
119}
120
121#[derive(Debug, Clone)]
123struct ValidationCacheEntry {
124 is_valid: bool,
125 rtt: Option<Duration>,
126 cached_at: Instant,
127 ttl: Duration,
128}
129
130#[derive(Debug, Default, Clone)]
132pub struct CandidateCacheStats {
133 pub cache_hits: u64,
135 pub cache_misses: u64,
137 pub entries_expired: u64,
139 pub current_size: usize,
141 pub hit_rate: f64,
143 pub avg_entry_age: Duration,
145}
146
147#[derive(Debug)]
149pub struct SessionCleanupCoordinator {
150 active_sessions: Arc<RwLock<HashMap<PeerId, SessionState>>>,
152 config: SessionCleanupConfig,
154 stats: Arc<Mutex<SessionCleanupStats>>,
156 cleanup_handle: Option<tokio::task::JoinHandle<()>>,
158}
159
160#[derive(Debug, Clone)]
162pub struct SessionCleanupConfig {
163 pub max_idle_time: Duration,
165 pub max_session_age: Duration,
167 pub cleanup_interval: Duration,
169 pub enable_aggressive_cleanup: bool,
171 pub memory_pressure_threshold: usize,
173}
174
175#[derive(Debug)]
177struct SessionState {
178 peer_id: PeerId,
179 created_at: Instant,
180 last_activity: Instant,
181 memory_usage: usize,
182 is_active: bool,
183 cleanup_priority: CleanupPriority,
184}
185
186#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord)]
188pub enum CleanupPriority {
189 Low, Normal, High, }
193
194#[derive(Debug, Default, Clone)]
196pub struct SessionCleanupStats {
197 pub sessions_cleaned: u64,
199 pub memory_freed: u64,
201 pub active_sessions: usize,
203 pub avg_session_lifetime: Duration,
205}
206
207#[derive(Debug)]
209pub struct FrameBatchingCoordinator {
210 pending_frames: Arc<Mutex<HashMap<SocketAddr, BatchedFrameSet>>>,
212 config: FrameBatchingConfig,
214 stats: Arc<Mutex<FrameBatchingStats>>,
216 flush_handle: Option<tokio::task::JoinHandle<()>>,
218}
219
220#[derive(Debug, Clone)]
222pub struct FrameBatchingConfig {
223 pub max_batch_size: usize,
225 pub max_batch_delay: Duration,
227 pub max_frames_per_batch: usize,
229 pub enable_adaptive_batching: bool,
231 pub min_batch_size: usize,
233}
234
235#[derive(Debug)]
237struct BatchedFrameSet {
238 frames: Vec<BatchedFrame>,
239 total_size: usize,
240 created_at: Instant,
241 destination: SocketAddr,
242 priority: BatchPriority,
243}
244
245#[derive(Debug)]
247struct BatchedFrame {
248 frame_type: u8,
249 payload: Vec<u8>,
250 size: usize,
251 priority: FramePriority,
252 created_at: Instant,
253}
254
255#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord)]
257enum BatchPriority {
258 Low, Normal, High, }
262
263#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord)]
265pub enum FramePriority {
266 Background, Normal, Urgent, }
270
271#[derive(Debug, Default, Clone)]
273pub struct FrameBatchingStats {
274 pub frames_batched: u64,
276 pub batches_sent: u64,
278 pub avg_batch_size: f64,
280 pub bytes_saved: u64,
282 pub batching_efficiency: f64,
284}
285
286impl Default for ConnectionPoolConfig {
287 fn default() -> Self {
288 Self {
289 max_connections: 1000,
290 max_idle_time: Duration::from_secs(300), cleanup_interval: Duration::from_secs(60), enable_reuse: true,
293 max_connection_age: Duration::from_secs(3600), }
295 }
296}
297
298impl Default for CandidateCacheConfig {
299 fn default() -> Self {
300 Self {
301 default_ttl: Duration::from_secs(300), max_cache_size: 10000,
303 cleanup_interval: Duration::from_secs(60), enable_validation_cache: true,
305 validation_ttl: Duration::from_secs(60), }
307 }
308}
309
310impl Default for SessionCleanupConfig {
311 fn default() -> Self {
312 Self {
313 max_idle_time: Duration::from_secs(600), max_session_age: Duration::from_secs(3600), cleanup_interval: Duration::from_secs(120), enable_aggressive_cleanup: true,
317 memory_pressure_threshold: 512, }
319 }
320}
321
322impl Default for FrameBatchingConfig {
323 fn default() -> Self {
324 Self {
325 max_batch_size: 1200, max_batch_delay: Duration::from_millis(10), max_frames_per_batch: 10,
328 enable_adaptive_batching: true,
329 min_batch_size: 200, }
331 }
332}
333
334impl ConnectionPool {
335 pub fn new(config: ConnectionPoolConfig) -> Self {
337 let pool = Self {
338 active_connections: Arc::new(RwLock::new(HashMap::new())),
339 config,
340 stats: Arc::new(Mutex::new(ConnectionPoolStats::default())),
341 cleanup_handle: None,
342 };
343
344 pool
345 }
346
347 pub async fn start(&mut self) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
349 let connections = Arc::clone(&self.active_connections);
350 let stats = Arc::clone(&self.stats);
351 let config = self.config.clone();
352
353 let cleanup_handle = tokio::spawn(async move {
354 let mut interval = tokio::time::interval(config.cleanup_interval);
355
356 loop {
357 interval.tick().await;
358 Self::cleanup_expired_connections(&connections, &stats, &config).await;
359 }
360 });
361
362 self.cleanup_handle = Some(cleanup_handle);
363 info!("Connection pool started with max_connections={}", self.config.max_connections);
364 Ok(())
365 }
366
367 pub async fn get_connection(
369 &self,
370 peer_id: PeerId,
371 remote_address: SocketAddr,
372 endpoint: &QuinnEndpoint,
373 ) -> Result<Arc<QuinnConnection>, Box<dyn std::error::Error + Send + Sync>> {
374 if let Some(connection) = self.try_get_existing_connection(peer_id, remote_address).await {
376 self.update_stats_hit().await;
377 return Ok(connection);
378 }
379
380 let connection = self.create_new_connection(peer_id, remote_address, endpoint).await?;
382 self.update_stats_miss().await;
383 Ok(connection)
384 }
385
386 async fn try_get_existing_connection(
388 &self,
389 peer_id: PeerId,
390 remote_address: SocketAddr,
391 ) -> Option<Arc<QuinnConnection>> {
392 let mut connections = self.active_connections.write().unwrap();
393
394 if let Some(pooled) = connections.get_mut(&peer_id) {
395 if pooled.is_active && pooled.remote_address == remote_address {
396 if pooled.connection.close_reason().is_none() {
398 pooled.last_used = Instant::now();
399 pooled.use_count += 1;
400 debug!("Reusing pooled connection for peer {:?}", peer_id);
401 return Some(Arc::clone(&pooled.connection));
402 } else {
403 connections.remove(&peer_id);
405 }
406 }
407 }
408
409 None
410 }
411
412 async fn create_new_connection(
414 &self,
415 peer_id: PeerId,
416 remote_address: SocketAddr,
417 endpoint: &QuinnEndpoint,
418 ) -> Result<Arc<QuinnConnection>, Box<dyn std::error::Error + Send + Sync>> {
419 {
421 let connections = self.active_connections.read().unwrap();
422 if connections.len() >= self.config.max_connections {
423 drop(connections);
425 self.evict_lru_connection().await;
426 }
427 }
428
429 let rustls_config = rustls::ClientConfig::builder()
431 .with_root_certificates(rustls::RootCertStore::empty())
432 .with_no_client_auth();
433
434 let client_crypto = crate::crypto::rustls::QuicClientConfig::try_from(rustls_config)
435 .map_err(|e| format!("Failed to create QUIC client config: {}", e))?;
436
437 let client_config = crate::ClientConfig::new(Arc::new(client_crypto));
438
439 let connecting = endpoint.connect_with(client_config, remote_address, "ant-quic")
440 .map_err(|e| format!("Failed to initiate connection: {}", e))?;
441
442 let connection = connecting.await
444 .map_err(|e| format!("Connection failed: {}", e))?;
445
446 let connection_arc = Arc::new(connection);
447
448 let pooled = PooledConnection {
449 connection: Arc::clone(&connection_arc),
450 peer_id,
451 remote_address,
452 created_at: Instant::now(),
453 last_used: Instant::now(),
454 use_count: 1,
455 is_active: true,
456 };
457
458 {
460 let mut connections = self.active_connections.write().unwrap();
461 connections.insert(peer_id, pooled);
462 }
463
464 {
466 let mut stats = self.stats.lock().unwrap();
467 stats.connections_created += 1;
468 stats.active_connections += 1;
469 }
470
471 info!("Created new pooled connection for peer {:?}", peer_id);
472 Ok(connection_arc)
473 }
474
475 async fn evict_lru_connection(&self) {
477 let mut connections = self.active_connections.write().unwrap();
478
479 if let Some((lru_peer_id, _)) = connections
480 .iter()
481 .min_by_key(|(_, pooled)| pooled.last_used)
482 .map(|(peer_id, pooled)| (*peer_id, pooled.last_used))
483 {
484 connections.remove(&lru_peer_id);
485 debug!("Evicted LRU connection for peer {:?}", lru_peer_id);
486
487 let mut stats = self.stats.lock().unwrap();
489 stats.active_connections = stats.active_connections.saturating_sub(1);
490 }
491 }
492
493 async fn update_stats_hit(&self) {
495 let mut stats = self.stats.lock().unwrap();
496 stats.connections_reused += 1;
497 let total_requests = stats.connections_created + stats.connections_reused;
498 stats.hit_rate = stats.connections_reused as f64 / total_requests as f64;
499 }
500
501 async fn update_stats_miss(&self) {
503 let stats = self.stats.lock().unwrap();
504 let total_requests = stats.connections_created + stats.connections_reused;
505 drop(stats);
506
507 let mut stats = self.stats.lock().unwrap();
508 stats.hit_rate = stats.connections_reused as f64 / total_requests as f64;
509 }
510
511 async fn cleanup_expired_connections(
513 connections: &Arc<RwLock<HashMap<PeerId, PooledConnection>>>,
514 stats: &Arc<Mutex<ConnectionPoolStats>>,
515 config: &ConnectionPoolConfig,
516 ) {
517 let now = Instant::now();
518 let mut to_remove = Vec::new();
519
520 {
522 let connections_read = connections.read().unwrap();
523 for (peer_id, pooled) in connections_read.iter() {
524 let idle_time = now.duration_since(pooled.last_used);
525 let age = now.duration_since(pooled.created_at);
526
527 if idle_time > config.max_idle_time || age > config.max_connection_age {
528 to_remove.push(*peer_id);
529 }
530
531 if pooled.connection.close_reason().is_some() {
533 to_remove.push(*peer_id);
534 }
535 }
536 }
537
538 if !to_remove.is_empty() {
540 let mut connections_write = connections.write().unwrap();
541 for peer_id in &to_remove {
542 connections_write.remove(peer_id);
543 }
544
545 let mut stats_guard = stats.lock().unwrap();
547 stats_guard.connections_expired += to_remove.len() as u64;
548 stats_guard.active_connections = connections_write.len();
549
550 debug!("Cleaned up {} expired connections", to_remove.len());
551 }
552 }
553
554 pub async fn get_stats(&self) -> ConnectionPoolStats {
556 self.stats.lock().unwrap().clone()
557 }
558
559 pub async fn shutdown(&mut self) {
561 if let Some(handle) = self.cleanup_handle.take() {
562 handle.abort();
563 }
564
565 {
567 let connections = self.active_connections.read().unwrap();
568 for (_, pooled) in connections.iter() {
569 pooled.connection.close(VarInt::from_u32(0), b"shutdown");
570 }
571 }
572
573 info!("Connection pool shutdown complete");
574 }
575}
576
577impl CandidateCache {
578 pub fn new(config: CandidateCacheConfig) -> Self {
580 let cache = Self {
581 cache: Arc::new(RwLock::new(HashMap::new())),
582 config,
583 stats: Arc::new(Mutex::new(CandidateCacheStats::default())),
584 cleanup_handle: None,
585 };
586
587 cache
588 }
589
590 pub async fn start(&mut self) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
592 let cache = Arc::clone(&self.cache);
593 let stats = Arc::clone(&self.stats);
594 let config = self.config.clone();
595
596 let cleanup_handle = tokio::spawn(async move {
597 let mut interval = tokio::time::interval(config.cleanup_interval);
598
599 loop {
600 interval.tick().await;
601 Self::cleanup_expired_entries(&cache, &stats, &config).await;
602 }
603 });
604
605 self.cleanup_handle = Some(cleanup_handle);
606 info!("Candidate cache started with max_size={}", self.config.max_cache_size);
607 Ok(())
608 }
609
610 pub async fn get_candidates(&self, peer_id: PeerId) -> Option<Vec<CandidateAddress>> {
612 let (is_valid, candidates) = {
613 let cache = self.cache.read().unwrap();
614
615 if let Some(cached_set) = cache.get(&peer_id) {
616 let now = Instant::now();
617
618 if now.duration_since(cached_set.cached_at) <= cached_set.ttl {
620 (true, Some(cached_set.candidates.clone()))
621 } else {
622 (false, None)
623 }
624 } else {
625 (false, None)
626 }
627 };
628
629 if is_valid {
630 self.update_access_stats(peer_id, true).await;
632
633 if let Some(ref candidates) = candidates {
634 debug!("Cache hit for peer {:?}, {} candidates", peer_id, candidates.len());
635 }
636 return candidates;
637 }
638
639 self.update_access_stats(peer_id, false).await;
641 debug!("Cache miss for peer {:?}", peer_id);
642 None
643 }
644
645 pub async fn cache_candidates(
647 &self,
648 peer_id: PeerId,
649 candidates: Vec<CandidateAddress>,
650 ttl: Option<Duration>,
651 ) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
652 let ttl = ttl.unwrap_or(self.config.default_ttl);
653
654 {
656 let cache = self.cache.read().unwrap();
657 if cache.len() >= self.config.max_cache_size {
658 drop(cache);
659 self.evict_lru_entry().await;
660 }
661 }
662
663 let candidate_count = candidates.len();
664 let cached_set = CachedCandidateSet {
665 candidates,
666 cached_at: Instant::now(),
667 ttl,
668 access_count: 0,
669 last_accessed: Instant::now(),
670 validation_results: HashMap::new(),
671 };
672
673 {
675 let mut cache = self.cache.write().unwrap();
676 cache.insert(peer_id, cached_set);
677 }
678
679 {
681 let mut stats = self.stats.lock().unwrap();
682 stats.current_size += 1;
683 }
684
685 debug!("Cached {} candidates for peer {:?} with TTL {:?}",
686 candidate_count, peer_id, ttl);
687 Ok(())
688 }
689
690 pub async fn cache_validation_result(
692 &self,
693 peer_id: PeerId,
694 address: SocketAddr,
695 is_valid: bool,
696 rtt: Option<Duration>,
697 ) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
698 if !self.config.enable_validation_cache {
699 return Ok(());
700 }
701
702 let mut cache = self.cache.write().unwrap();
703
704 if let Some(cached_set) = cache.get_mut(&peer_id) {
705 let validation_entry = ValidationCacheEntry {
706 is_valid,
707 rtt,
708 cached_at: Instant::now(),
709 ttl: self.config.validation_ttl,
710 };
711
712 cached_set.validation_results.insert(address, validation_entry);
713 debug!("Cached validation result for {}:{} -> {}", peer_id.0[0], address, is_valid);
714 }
715
716 Ok(())
717 }
718
719 pub async fn get_validation_result(
721 &self,
722 peer_id: PeerId,
723 address: SocketAddr,
724 ) -> Option<(bool, Option<Duration>)> {
725 if !self.config.enable_validation_cache {
726 return None;
727 }
728
729 let cache = self.cache.read().unwrap();
730
731 if let Some(cached_set) = cache.get(&peer_id) {
732 if let Some(validation_entry) = cached_set.validation_results.get(&address) {
733 let now = Instant::now();
734
735 if now.duration_since(validation_entry.cached_at) <= validation_entry.ttl {
737 return Some((validation_entry.is_valid, validation_entry.rtt));
738 }
739 }
740 }
741
742 None
743 }
744
745 async fn update_access_stats(&self, peer_id: PeerId, hit: bool) {
747 {
749 let mut stats = self.stats.lock().unwrap();
750 if hit {
751 stats.cache_hits += 1;
752 } else {
753 stats.cache_misses += 1;
754 }
755
756 let total_accesses = stats.cache_hits + stats.cache_misses;
757 stats.hit_rate = stats.cache_hits as f64 / total_accesses as f64;
758 }
759
760 if hit {
762 let mut cache = self.cache.write().unwrap();
763 if let Some(cached_set) = cache.get_mut(&peer_id) {
764 cached_set.access_count += 1;
765 cached_set.last_accessed = Instant::now();
766 }
767 }
768 }
769
770 async fn evict_lru_entry(&self) {
772 let mut cache = self.cache.write().unwrap();
773
774 if let Some((lru_peer_id, _)) = cache
775 .iter()
776 .min_by_key(|(_, cached_set)| cached_set.last_accessed)
777 .map(|(peer_id, cached_set)| (*peer_id, cached_set.last_accessed))
778 {
779 cache.remove(&lru_peer_id);
780 debug!("Evicted LRU cache entry for peer {:?}", lru_peer_id);
781
782 let mut stats = self.stats.lock().unwrap();
784 stats.current_size = stats.current_size.saturating_sub(1);
785 }
786 }
787
788 async fn cleanup_expired_entries(
790 cache: &Arc<RwLock<HashMap<PeerId, CachedCandidateSet>>>,
791 stats: &Arc<Mutex<CandidateCacheStats>>,
792 _config: &CandidateCacheConfig,
793 ) {
794 let now = Instant::now();
795 let mut to_remove = Vec::new();
796
797 {
799 let cache_read = cache.read().unwrap();
800 for (peer_id, cached_set) in cache_read.iter() {
801 let age = now.duration_since(cached_set.cached_at);
802 if age > cached_set.ttl {
803 to_remove.push(*peer_id);
804 }
805 }
806 }
807
808 if !to_remove.is_empty() {
810 let mut cache_write = cache.write().unwrap();
811 for peer_id in &to_remove {
812 cache_write.remove(peer_id);
813 }
814
815 let mut stats_guard = stats.lock().unwrap();
817 stats_guard.entries_expired += to_remove.len() as u64;
818 stats_guard.current_size = cache_write.len();
819
820 debug!("Cleaned up {} expired cache entries", to_remove.len());
821 }
822
823 {
825 let mut cache_write = cache.write().unwrap();
826 for cached_set in cache_write.values_mut() {
827 cached_set.validation_results.retain(|_, validation_entry| {
828 now.duration_since(validation_entry.cached_at) <= validation_entry.ttl
829 });
830 }
831 }
832 }
833
834 pub async fn get_stats(&self) -> CandidateCacheStats {
836 self.stats.lock().unwrap().clone()
837 }
838
839 pub async fn shutdown(&mut self) {
841 if let Some(handle) = self.cleanup_handle.take() {
842 handle.abort();
843 }
844
845 {
847 let mut cache = self.cache.write().unwrap();
848 cache.clear();
849 }
850
851 info!("Candidate cache shutdown complete");
852 }
853}
854
855impl SessionCleanupCoordinator {
856 pub fn new(config: SessionCleanupConfig) -> Self {
858 Self {
859 active_sessions: Arc::new(RwLock::new(HashMap::new())),
860 config,
861 stats: Arc::new(Mutex::new(SessionCleanupStats::default())),
862 cleanup_handle: None,
863 }
864 }
865
866 pub async fn start(&mut self) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
868 let sessions = Arc::clone(&self.active_sessions);
869 let stats = Arc::clone(&self.stats);
870 let config = self.config.clone();
871
872 let cleanup_handle = tokio::spawn(async move {
873 let mut interval = tokio::time::interval(config.cleanup_interval);
874
875 loop {
876 interval.tick().await;
877 Self::cleanup_expired_sessions(&sessions, &stats, &config).await;
878 }
879 });
880
881 self.cleanup_handle = Some(cleanup_handle);
882 info!("Session cleanup coordinator started");
883 Ok(())
884 }
885
886 pub async fn register_session(
888 &self,
889 peer_id: PeerId,
890 memory_usage: usize,
891 priority: CleanupPriority,
892 ) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
893 let session_state = SessionState {
894 peer_id,
895 created_at: Instant::now(),
896 last_activity: Instant::now(),
897 memory_usage,
898 is_active: true,
899 cleanup_priority: priority,
900 };
901
902 {
903 let mut sessions = self.active_sessions.write().unwrap();
904 sessions.insert(peer_id, session_state);
905 }
906
907 {
909 let mut stats = self.stats.lock().unwrap();
910 stats.active_sessions += 1;
911 }
912
913 debug!("Registered session for peer {:?} with {} bytes", peer_id, memory_usage);
914 Ok(())
915 }
916
917 pub async fn update_session_activity(&self, peer_id: PeerId) {
919 let mut sessions = self.active_sessions.write().unwrap();
920 if let Some(session) = sessions.get_mut(&peer_id) {
921 session.last_activity = Instant::now();
922 }
923 }
924
925 async fn cleanup_expired_sessions(
927 sessions: &Arc<RwLock<HashMap<PeerId, SessionState>>>,
928 stats: &Arc<Mutex<SessionCleanupStats>>,
929 config: &SessionCleanupConfig,
930 ) {
931 let now = Instant::now();
932 let mut to_remove = Vec::new();
933 let mut memory_freed = 0u64;
934
935 let memory_pressure = Self::check_memory_pressure(config);
937
938 {
940 let sessions_read = sessions.read().unwrap();
941 for (peer_id, session) in sessions_read.iter() {
942 let idle_time = now.duration_since(session.last_activity);
943 let age = now.duration_since(session.created_at);
944
945 let should_cleanup = if memory_pressure && config.enable_aggressive_cleanup {
946 match session.cleanup_priority {
948 CleanupPriority::High => idle_time > Duration::from_secs(30),
949 CleanupPriority::Normal => idle_time > Duration::from_secs(60),
950 CleanupPriority::Low => idle_time > config.max_idle_time / 2,
951 }
952 } else {
953 idle_time > config.max_idle_time || age > config.max_session_age
955 };
956
957 if should_cleanup {
958 to_remove.push(*peer_id);
959 memory_freed += session.memory_usage as u64;
960 }
961 }
962 }
963
964 if !to_remove.is_empty() {
966 let mut sessions_write = sessions.write().unwrap();
967 for peer_id in &to_remove {
968 sessions_write.remove(peer_id);
969 }
970
971 let mut stats_guard = stats.lock().unwrap();
973 stats_guard.sessions_cleaned += to_remove.len() as u64;
974 stats_guard.memory_freed += memory_freed;
975 stats_guard.active_sessions = sessions_write.len();
976
977 if memory_pressure {
978 info!("Aggressive cleanup: removed {} sessions, freed {} bytes",
979 to_remove.len(), memory_freed);
980 } else {
981 debug!("Regular cleanup: removed {} sessions, freed {} bytes",
982 to_remove.len(), memory_freed);
983 }
984 }
985 }
986
987 fn check_memory_pressure(config: &SessionCleanupConfig) -> bool {
989 let _ = config.memory_pressure_threshold;
994 false
995 }
996
997 pub async fn get_stats(&self) -> SessionCleanupStats {
999 self.stats.lock().unwrap().clone()
1000 }
1001
1002 pub async fn shutdown(&mut self) {
1004 if let Some(handle) = self.cleanup_handle.take() {
1005 handle.abort();
1006 }
1007
1008 {
1010 let mut sessions = self.active_sessions.write().unwrap();
1011 sessions.clear();
1012 }
1013
1014 info!("Session cleanup coordinator shutdown complete");
1015 }
1016}
1017
1018impl FrameBatchingCoordinator {
1019 pub fn new(config: FrameBatchingConfig) -> Self {
1021 Self {
1022 pending_frames: Arc::new(Mutex::new(HashMap::new())),
1023 config,
1024 stats: Arc::new(Mutex::new(FrameBatchingStats::default())),
1025 flush_handle: None,
1026 }
1027 }
1028
1029 pub async fn start(&mut self) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
1031 let pending_frames = Arc::clone(&self.pending_frames);
1032 let stats = Arc::clone(&self.stats);
1033 let config = self.config.clone();
1034
1035 let flush_handle = tokio::spawn(async move {
1036 let mut interval = tokio::time::interval(config.max_batch_delay / 4);
1037
1038 loop {
1039 interval.tick().await;
1040 Self::flush_expired_batches(&pending_frames, &stats, &config).await;
1041 }
1042 });
1043
1044 self.flush_handle = Some(flush_handle);
1045 info!("Frame batching coordinator started");
1046 Ok(())
1047 }
1048
1049 pub async fn add_frame(
1051 &self,
1052 destination: SocketAddr,
1053 frame_type: u8,
1054 payload: Vec<u8>,
1055 priority: FramePriority,
1056 ) -> Result<Option<Vec<u8>>, Box<dyn std::error::Error + Send + Sync>> {
1057 let frame = BatchedFrame {
1058 frame_type,
1059 size: payload.len(),
1060 payload,
1061 priority,
1062 created_at: Instant::now(),
1063 };
1064
1065 let mut pending = self.pending_frames.lock().unwrap();
1066
1067 let (should_flush, frames_count, total_size) = {
1069 let batch_set = pending.entry(destination).or_insert_with(|| {
1070 BatchedFrameSet {
1071 frames: Vec::new(),
1072 total_size: 0,
1073 created_at: Instant::now(),
1074 destination,
1075 priority: BatchPriority::Normal,
1076 }
1077 });
1078
1079 batch_set.frames.push(frame);
1080 batch_set.total_size += batch_set.frames.last().unwrap().size;
1081
1082 if priority == FramePriority::Urgent {
1084 batch_set.priority = BatchPriority::High;
1085 }
1086
1087 let should_flush = self.should_flush_batch(batch_set);
1089 (should_flush, batch_set.frames.len(), batch_set.total_size)
1090 };
1091
1092 if should_flush {
1093 if let Some(batch_set) = pending.remove(&destination) {
1095 let batch_data = self.serialize_batch(&batch_set);
1096
1097 {
1099 let mut stats = self.stats.lock().unwrap();
1100 stats.batches_sent += 1;
1101 stats.frames_batched += frames_count as u64;
1102 stats.avg_batch_size = (stats.avg_batch_size * (stats.batches_sent - 1) as f64 +
1103 total_size as f64) / stats.batches_sent as f64;
1104 }
1105
1106 debug!("Flushed batch to {} with {} frames ({} bytes)",
1107 destination, frames_count, total_size);
1108 return Ok(Some(batch_data));
1109 }
1110 }
1111
1112 Ok(None)
1113 }
1114
1115 fn should_flush_batch(&self, batch_set: &BatchedFrameSet) -> bool {
1117 let now = Instant::now();
1118 let age = now.duration_since(batch_set.created_at);
1119
1120 batch_set.total_size >= self.config.max_batch_size ||
1122 batch_set.frames.len() >= self.config.max_frames_per_batch ||
1123 age >= self.config.max_batch_delay ||
1124 batch_set.priority == BatchPriority::High
1125 }
1126
1127 fn serialize_batch(&self, batch_set: &BatchedFrameSet) -> Vec<u8> {
1129 let mut data = Vec::with_capacity(batch_set.total_size + batch_set.frames.len() * 4);
1130
1131 for frame in &batch_set.frames {
1132 data.push(frame.frame_type);
1134 data.extend_from_slice(&(frame.payload.len() as u16).to_be_bytes());
1135 data.extend_from_slice(&frame.payload);
1137 }
1138
1139 data
1140 }
1141
1142 async fn flush_expired_batches(
1144 pending_frames: &Arc<Mutex<HashMap<SocketAddr, BatchedFrameSet>>>,
1145 stats: &Arc<Mutex<FrameBatchingStats>>,
1146 config: &FrameBatchingConfig,
1147 ) {
1148 let now = Instant::now();
1149 let mut to_flush = Vec::new();
1150
1151 {
1153 let pending = pending_frames.lock().unwrap();
1154 for (destination, batch_set) in pending.iter() {
1155 let age = now.duration_since(batch_set.created_at);
1156 if age >= config.max_batch_delay {
1157 to_flush.push((*destination, batch_set.frames.len(), batch_set.total_size));
1158 }
1159 }
1160 }
1161
1162 if !to_flush.is_empty() {
1164 let mut pending = pending_frames.lock().unwrap();
1165 let flush_count = to_flush.len();
1166 for (destination, frame_count, total_size) in to_flush {
1167 pending.remove(&destination);
1168
1169 let mut stats_guard = stats.lock().unwrap();
1171 stats_guard.batches_sent += 1;
1172 stats_guard.frames_batched += frame_count as u64;
1173 stats_guard.avg_batch_size = (stats_guard.avg_batch_size * (stats_guard.batches_sent - 1) as f64 +
1174 total_size as f64) / stats_guard.batches_sent as f64;
1175 }
1176
1177 debug!("Flushed {} expired batches", flush_count);
1178 }
1179 }
1180
1181 pub async fn get_stats(&self) -> FrameBatchingStats {
1183 self.stats.lock().unwrap().clone()
1184 }
1185
1186 pub async fn shutdown(&mut self) {
1188 if let Some(handle) = self.flush_handle.take() {
1189 handle.abort();
1190 }
1191
1192 {
1194 let mut pending = self.pending_frames.lock().unwrap();
1195 pending.clear();
1196 }
1197
1198 info!("Frame batching coordinator shutdown complete");
1199 }
1200}
1201
1202#[derive(Debug)]
1204pub struct MemoryOptimizationManager {
1205 connection_pool: ConnectionPool,
1206 candidate_cache: CandidateCache,
1207 session_cleanup: SessionCleanupCoordinator,
1208 frame_batching: FrameBatchingCoordinator,
1209 is_running: bool,
1210}
1211
1212impl MemoryOptimizationManager {
1213 pub fn new() -> Self {
1215 Self {
1216 connection_pool: ConnectionPool::new(ConnectionPoolConfig::default()),
1217 candidate_cache: CandidateCache::new(CandidateCacheConfig::default()),
1218 session_cleanup: SessionCleanupCoordinator::new(SessionCleanupConfig::default()),
1219 frame_batching: FrameBatchingCoordinator::new(FrameBatchingConfig::default()),
1220 is_running: false,
1221 }
1222 }
1223
1224 pub fn with_configs(
1226 pool_config: ConnectionPoolConfig,
1227 cache_config: CandidateCacheConfig,
1228 cleanup_config: SessionCleanupConfig,
1229 batching_config: FrameBatchingConfig,
1230 ) -> Self {
1231 Self {
1232 connection_pool: ConnectionPool::new(pool_config),
1233 candidate_cache: CandidateCache::new(cache_config),
1234 session_cleanup: SessionCleanupCoordinator::new(cleanup_config),
1235 frame_batching: FrameBatchingCoordinator::new(batching_config),
1236 is_running: false,
1237 }
1238 }
1239
1240 pub async fn start(&mut self) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
1242 if self.is_running {
1243 return Ok(());
1244 }
1245
1246 self.connection_pool.start().await?;
1247 self.candidate_cache.start().await?;
1248 self.session_cleanup.start().await?;
1249 self.frame_batching.start().await?;
1250
1251 self.is_running = true;
1252 info!("Memory optimization manager started");
1253 Ok(())
1254 }
1255
1256 pub fn connection_pool(&self) -> &ConnectionPool {
1258 &self.connection_pool
1259 }
1260
1261 pub fn candidate_cache(&self) -> &CandidateCache {
1263 &self.candidate_cache
1264 }
1265
1266 pub fn session_cleanup(&self) -> &SessionCleanupCoordinator {
1268 &self.session_cleanup
1269 }
1270
1271 pub fn frame_batching(&self) -> &FrameBatchingCoordinator {
1273 &self.frame_batching
1274 }
1275
1276 pub async fn get_comprehensive_stats(&self) -> MemoryOptimizationStats {
1278 MemoryOptimizationStats {
1279 connection_pool: self.connection_pool.get_stats().await,
1280 candidate_cache: self.candidate_cache.get_stats().await,
1281 session_cleanup: self.session_cleanup.get_stats().await,
1282 frame_batching: self.frame_batching.get_stats().await,
1283 }
1284 }
1285
1286 pub async fn shutdown(&mut self) {
1288 if !self.is_running {
1289 return;
1290 }
1291
1292 self.connection_pool.shutdown().await;
1293 self.candidate_cache.shutdown().await;
1294 self.session_cleanup.shutdown().await;
1295 self.frame_batching.shutdown().await;
1296
1297 self.is_running = false;
1298 info!("Memory optimization manager shutdown complete");
1299 }
1300}
1301
1302#[derive(Debug, Clone)]
1304pub struct MemoryOptimizationStats {
1305 pub connection_pool: ConnectionPoolStats,
1306 pub candidate_cache: CandidateCacheStats,
1307 pub session_cleanup: SessionCleanupStats,
1308 pub frame_batching: FrameBatchingStats,
1309}
1310
1311impl Default for MemoryOptimizationManager {
1312 fn default() -> Self {
1313 Self::new()
1314 }
1315}