1use std::{
10 collections::{HashMap, VecDeque},
11 net::{IpAddr, SocketAddr},
12 sync::{Arc, Mutex, RwLock},
13 time::{Duration, Instant},
14};
15
16use tokio::time::timeout;
17
18use tracing::{debug, info, warn};
19
20use tokio::time::sleep;
21
22use crate::{
23 candidate_discovery::NetworkInterface,
24 connection::nat_traversal::{CandidateSource, CandidateState},
25 nat_traversal_api::{CandidateAddress, PeerId},
26};
27
28#[derive(Debug)]
30pub struct ParallelDiscoveryCoordinator {
31 active_discoveries: Arc<RwLock<HashMap<String, DiscoveryTask>>>,
33 config: ParallelDiscoveryConfig,
35 stats: Arc<Mutex<ParallelDiscoveryStats>>,
37 coordination_handle: Option<tokio::task::JoinHandle<()>>,
39}
40
41#[derive(Debug, Clone)]
43pub struct ParallelDiscoveryConfig {
44 pub max_concurrent_tasks: usize,
46 pub interface_timeout: Duration,
48 pub enable_prioritization: bool,
50 pub preferred_interface_types: Vec<InterfaceType>,
52 pub enable_adaptive_parallelism: bool,
54}
55
56#[derive(Debug, Clone, Copy, PartialEq, Eq)]
58pub enum InterfaceType {
59 Ethernet,
60 WiFi,
61 Cellular,
62 Loopback,
63 VPN,
64 Unknown,
65}
66
67#[derive(Debug)]
69struct DiscoveryTask {
70 interface_name: String,
71 interface_type: InterfaceType,
72 started_at: Instant,
73 status: TaskStatus,
74 discovered_candidates: Vec<CandidateAddress>,
75 priority: u32,
76}
77
78#[derive(Debug, Clone, Copy, PartialEq, Eq)]
80enum TaskStatus {
81 Pending,
82 Running,
83 Completed,
84 Failed,
85 Timeout,
86}
87
88#[derive(Debug, Default, Clone)]
90pub struct ParallelDiscoveryStats {
91 pub tasks_started: u64,
93 pub tasks_completed: u64,
95 pub tasks_failed: u64,
97 pub avg_discovery_time: Duration,
99 pub total_candidates: u64,
101 pub parallelism_efficiency: f64,
103}
104
105#[derive(Debug)]
107pub struct AdaptiveTimeoutManager {
108 network_conditions: Arc<RwLock<NetworkConditions>>,
110 timeout_configs: HashMap<OperationType, AdaptiveTimeoutConfig>,
112 stats: Arc<Mutex<AdaptiveTimeoutStats>>,
114 monitoring_handle: Option<tokio::task::JoinHandle<()>>,
116}
117
118#[derive(Debug, Clone)]
120pub struct NetworkConditions {
121 rtt_samples: VecDeque<Duration>,
123 packet_loss_rate: f64,
125 bandwidth_estimate: u64,
127 quality_score: f64,
129 congestion_level: f64,
131 last_measurement: Instant,
133}
134
135#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
137pub enum OperationType {
138 CandidateDiscovery,
139 PathValidation,
140 CoordinationRequest,
141 HolePunching,
142 ConnectionEstablishment,
143}
144
145#[derive(Debug, Clone)]
147struct AdaptiveTimeoutConfig {
148 base_timeout: Duration,
150 min_timeout: Duration,
152 max_timeout: Duration,
154 rtt_multiplier: f64,
156 quality_factor: f64,
158 congestion_factor: f64,
160}
161
162#[derive(Debug, Default, Clone)]
164pub struct AdaptiveTimeoutStats {
165 pub adjustments_made: u64,
167 pub avg_timeouts: HashMap<OperationType, Duration>,
169 pub timeout_effectiveness: f64,
171 pub condition_accuracy: f64,
173}
174
175#[derive(Debug)]
177pub struct BandwidthAwareValidator {
178 active_validations: Arc<RwLock<HashMap<SocketAddr, ValidationSession>>>,
180 bandwidth_monitor: Arc<Mutex<BandwidthMonitor>>,
182 config: BandwidthValidationConfig,
184 stats: Arc<Mutex<BandwidthValidationStats>>,
186}
187
188#[derive(Debug, Clone)]
190pub struct BandwidthValidationConfig {
191 pub max_concurrent_validations: usize,
193 pub bandwidth_threshold: u64,
195 pub enable_adaptive_validation: bool,
197 pub validation_packet_size: usize,
199 pub max_validation_rate: f64,
201}
202
203#[derive(Debug)]
205struct BandwidthMonitor {
206 bandwidth_samples: VecDeque<BandwidthSample>,
208 current_bandwidth: u64,
210 utilization: f64,
212 last_measurement: Instant,
214}
215
216#[derive(Debug, Clone)]
218struct BandwidthSample {
219 timestamp: Instant,
220 bytes_transferred: u64,
221 duration: Duration,
222 bandwidth: u64,
223}
224
225#[derive(Debug)]
227struct ValidationSession {
228 target_address: SocketAddr,
229 started_at: Instant,
230 packets_sent: u32,
231 packets_received: u32,
232 total_bytes: u64,
233 rtt_samples: Vec<Duration>,
234 bandwidth_usage: u64,
235 priority: ValidationPriority,
236}
237
238#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord)]
240pub enum ValidationPriority {
241 Low,
242 Normal,
243 High,
244 Critical,
245}
246
247#[derive(Debug, Default, Clone)]
249pub struct BandwidthValidationStats {
250 pub validations_started: u64,
252 pub validations_completed: u64,
254 pub total_bandwidth_used: u64,
256 pub avg_validation_time: Duration,
258 pub bandwidth_efficiency: f64,
260}
261
262#[derive(Debug)]
264pub struct CongestionControlIntegrator {
265 active_migrations: Arc<RwLock<HashMap<PeerId, MigrationSession>>>,
267 congestion_state: Arc<Mutex<CongestionState>>,
269 config: CongestionIntegrationConfig,
271 stats: Arc<Mutex<CongestionIntegrationStats>>,
273}
274
275#[derive(Debug, Clone)]
277pub struct CongestionIntegrationConfig {
278 pub enable_congestion_awareness: bool,
280 pub congestion_threshold: f64,
282 pub max_migrations_per_second: f64,
284 pub enable_bandwidth_estimation: bool,
286 pub cwnd_scaling_factor: f64,
288}
289
290#[derive(Debug)]
292struct MigrationSession {
293 peer_id: PeerId,
294 old_path: SocketAddr,
295 new_path: SocketAddr,
296 started_at: Instant,
297 migration_state: MigrationState,
298 congestion_window: u32,
299 rtt_estimate: Duration,
300 bandwidth_estimate: u64,
301}
302
303#[derive(Debug, Clone, Copy, PartialEq, Eq)]
305pub enum MigrationState {
306 Initiated,
307 PathValidating,
308 CongestionProbing,
309 Migrating,
310 Completed,
311 Failed,
312}
313
314#[derive(Debug)]
316struct CongestionState {
317 congestion_window: u32,
319 ssthresh: u32,
321 rtt_measurements: VecDeque<Duration>,
323 congestion_events: VecDeque<CongestionEvent>,
325 congestion_level: f64,
327}
328
329#[derive(Debug, Clone)]
331struct CongestionEvent {
332 timestamp: Instant,
333 event_type: CongestionEventType,
334 severity: f64,
335}
336
337#[derive(Debug, Clone, Copy, PartialEq, Eq)]
339pub enum CongestionEventType {
340 PacketLoss,
341 Timeout,
342 ECNMark,
343 RTTIncrease,
344}
345
346#[derive(Debug, Default, Clone)]
348pub struct CongestionIntegrationStats {
349 pub migrations_attempted: u64,
351 pub migrations_successful: u64,
353 pub avg_migration_time: Duration,
355 pub congestion_avoided_migrations: u64,
357 pub bandwidth_utilization_efficiency: f64,
359}
360
361impl Default for ParallelDiscoveryConfig {
362 fn default() -> Self {
363 Self {
364 max_concurrent_tasks: 8,
365 interface_timeout: Duration::from_secs(5),
366 enable_prioritization: true,
367 preferred_interface_types: vec![
368 InterfaceType::Ethernet,
369 InterfaceType::WiFi,
370 InterfaceType::Cellular,
371 ],
372 enable_adaptive_parallelism: true,
373 }
374 }
375}
376
377impl Default for BandwidthValidationConfig {
378 fn default() -> Self {
379 Self {
380 max_concurrent_validations: 16,
381 bandwidth_threshold: 1_000_000, enable_adaptive_validation: true,
383 validation_packet_size: 64,
384 max_validation_rate: 100.0, }
386 }
387}
388
389impl Default for CongestionIntegrationConfig {
390 fn default() -> Self {
391 Self {
392 enable_congestion_awareness: true,
393 congestion_threshold: 0.7, max_migrations_per_second: 10.0,
395 enable_bandwidth_estimation: true,
396 cwnd_scaling_factor: 0.8,
397 }
398 }
399}
400
401impl ParallelDiscoveryCoordinator {
402 pub fn new(config: ParallelDiscoveryConfig) -> Self {
404 Self {
405 active_discoveries: Arc::new(RwLock::new(HashMap::new())),
406 config,
407 stats: Arc::new(Mutex::new(ParallelDiscoveryStats::default())),
408 coordination_handle: None,
409 }
410 }
411
412 pub async fn start_parallel_discovery(
414 &mut self,
415 interfaces: Vec<NetworkInterface>,
416 peer_id: PeerId,
417 ) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
418 info!(
419 "Starting parallel discovery across {} interfaces for peer {:?}",
420 interfaces.len(),
421 peer_id
422 );
423
424 let prioritized_interfaces = if self.config.enable_prioritization {
426 self.prioritize_interfaces(interfaces)
427 } else {
428 interfaces
429 };
430
431 let max_tasks = if self.config.enable_adaptive_parallelism {
433 self.calculate_adaptive_parallelism().await
434 } else {
435 self.config.max_concurrent_tasks
436 };
437
438 let tasks_to_start = prioritized_interfaces
439 .into_iter()
440 .take(max_tasks)
441 .collect::<Vec<_>>();
442
443 for interface in tasks_to_start {
445 self.start_interface_discovery(interface, peer_id).await?;
446 }
447
448 self.start_coordination_task().await?;
450
451 Ok(())
452 }
453
454 fn prioritize_interfaces(
456 &self,
457 mut interfaces: Vec<NetworkInterface>,
458 ) -> Vec<NetworkInterface> {
459 interfaces.sort_by_key(|interface| {
460 let interface_type = self.classify_interface_type(&interface.name);
461 let type_priority = self
462 .config
463 .preferred_interface_types
464 .iter()
465 .position(|&t| t == interface_type)
466 .unwrap_or(999);
467
468 (type_priority, interface.addresses.len())
470 });
471
472 interfaces
473 }
474
475 fn classify_interface_type(&self, name: &str) -> InterfaceType {
477 let name_lower = name.to_lowercase();
478
479 if name_lower.contains("eth") || name_lower.contains("en") {
480 InterfaceType::Ethernet
481 } else if name_lower.contains("wlan")
482 || name_lower.contains("wifi")
483 || name_lower.contains("wl")
484 {
485 InterfaceType::WiFi
486 } else if name_lower.contains("cell")
487 || name_lower.contains("wwan")
488 || name_lower.contains("ppp")
489 {
490 InterfaceType::Cellular
491 } else if name_lower.contains("lo") || name_lower.contains("loopback") {
492 InterfaceType::Loopback
493 } else if name_lower.contains("vpn")
494 || name_lower.contains("tun")
495 || name_lower.contains("tap")
496 {
497 InterfaceType::VPN
498 } else {
499 InterfaceType::Unknown
500 }
501 }
502
503 async fn calculate_adaptive_parallelism(&self) -> usize {
505 let base_parallelism = self.config.max_concurrent_tasks;
513 let system_load_factor = 0.8; ((base_parallelism as f64) * system_load_factor) as usize
516 }
517
518 async fn start_interface_discovery(
520 &self,
521 interface: NetworkInterface,
522 _peer_id: PeerId,
523 ) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
524 let interface_type = self.classify_interface_type(&interface.name);
525 let priority = self.calculate_interface_priority(interface_type);
526
527 let task = DiscoveryTask {
528 interface_name: interface.name.clone(),
529 interface_type,
530 started_at: Instant::now(),
531 status: TaskStatus::Pending,
532 discovered_candidates: Vec::new(),
533 priority,
534 };
535
536 {
538 let mut discoveries = self.active_discoveries.write().unwrap();
539 discoveries.insert(interface.name.clone(), task);
540 }
541
542 {
544 let mut stats = self.stats.lock().unwrap();
545 stats.tasks_started += 1;
546 }
547
548 self.perform_interface_discovery(interface).await?;
550
551 Ok(())
552 }
553
554 fn calculate_interface_priority(&self, interface_type: InterfaceType) -> u32 {
556 match interface_type {
557 InterfaceType::Ethernet => 100,
558 InterfaceType::WiFi => 80,
559 InterfaceType::Cellular => 60,
560 InterfaceType::VPN => 40,
561 InterfaceType::Loopback => 20,
562 InterfaceType::Unknown => 10,
563 }
564 }
565
566 async fn perform_interface_discovery(
568 &self,
569 interface: NetworkInterface,
570 ) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
571 let interface_name = interface.name.clone();
572
573 {
575 let mut discoveries = self.active_discoveries.write().unwrap();
576 if let Some(task) = discoveries.get_mut(&interface_name) {
577 task.status = TaskStatus::Running;
578 }
579 }
580
581 let discovery_result = timeout(
583 self.config.interface_timeout,
584 self.discover_candidates_for_interface(interface),
585 )
586 .await;
587
588 match discovery_result {
589 Ok(Ok(candidates)) => {
590 {
592 let mut discoveries = self.active_discoveries.write().unwrap();
593 if let Some(task) = discoveries.get_mut(&interface_name) {
594 task.status = TaskStatus::Completed;
595 task.discovered_candidates = candidates;
596 }
597 }
598
599 {
601 let mut stats = self.stats.lock().unwrap();
602 stats.tasks_completed += 1;
603 }
604
605 debug!("Interface discovery completed for {}", interface_name);
606 }
607 Ok(Err(_)) => {
608 {
610 let mut discoveries = self.active_discoveries.write().unwrap();
611 if let Some(task) = discoveries.get_mut(&interface_name) {
612 task.status = TaskStatus::Failed;
613 }
614 }
615
616 {
618 let mut stats = self.stats.lock().unwrap();
619 stats.tasks_failed += 1;
620 }
621
622 warn!("Interface discovery failed for {}", interface_name);
623 }
624 Err(_) => {
625 {
627 let mut discoveries = self.active_discoveries.write().unwrap();
628 if let Some(task) = discoveries.get_mut(&interface_name) {
629 task.status = TaskStatus::Timeout;
630 }
631 }
632
633 {
635 let mut stats = self.stats.lock().unwrap();
636 stats.tasks_failed += 1;
637 }
638
639 warn!("Interface discovery timeout for {}", interface_name);
640 }
641 }
642
643 Ok(())
644 }
645
646 async fn discover_candidates_for_interface(
648 &self,
649 interface: NetworkInterface,
650 ) -> Result<Vec<CandidateAddress>, Box<dyn std::error::Error + Send + Sync>> {
651 let mut candidates = Vec::new();
652
653 for address in &interface.addresses {
654 if self.is_valid_candidate_address(address) {
656 let candidate = CandidateAddress {
657 address: *address,
658 priority: self.calculate_candidate_priority(address, &interface),
659 source: CandidateSource::Local,
660 state: CandidateState::New,
661 };
662
663 candidates.push(candidate);
664 }
665 }
666
667 sleep(Duration::from_millis(100)).await;
669
670 Ok(candidates)
671 }
672
673 fn is_valid_candidate_address(&self, address: &SocketAddr) -> bool {
675 match address.ip() {
676 IpAddr::V4(ipv4) => {
677 !ipv4.is_loopback() && !ipv4.is_link_local() && !ipv4.is_broadcast()
678 }
679 IpAddr::V6(ipv6) => !ipv6.is_loopback() && !ipv6.is_unspecified(),
680 }
681 }
682
683 fn calculate_candidate_priority(
685 &self,
686 address: &SocketAddr,
687 interface: &NetworkInterface,
688 ) -> u32 {
689 let mut priority = 1000u32;
690
691 if address.is_ipv4() {
693 priority += 100;
694 }
695
696 if !self.is_private_address(address) {
698 priority += 200;
699 }
700
701 let interface_type = self.classify_interface_type(&interface.name);
703 priority += self.calculate_interface_priority(interface_type);
704
705 priority
706 }
707
708 fn is_private_address(&self, address: &SocketAddr) -> bool {
710 match address.ip() {
711 IpAddr::V4(ipv4) => ipv4.is_private(),
712 IpAddr::V6(ipv6) => {
713 let segments = ipv6.segments();
715 (segments[0] & 0xfe00) == 0xfc00
716 }
717 }
718 }
719
720 async fn start_coordination_task(
722 &mut self,
723 ) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
724 let discoveries = Arc::clone(&self.active_discoveries);
725 let stats = Arc::clone(&self.stats);
726 let config = self.config.clone();
727
728 let coordination_handle = tokio::spawn(async move {
729 let mut interval = tokio::time::interval(Duration::from_millis(500));
730
731 loop {
732 interval.tick().await;
733 Self::coordinate_discoveries(&discoveries, &stats, &config).await;
734
735 let all_complete = {
737 let discoveries_read = discoveries.read().unwrap();
738 discoveries_read.values().all(|task| {
739 matches!(
740 task.status,
741 TaskStatus::Completed | TaskStatus::Failed | TaskStatus::Timeout
742 )
743 })
744 };
745
746 if all_complete {
747 break;
748 }
749 }
750 });
751
752 self.coordination_handle = Some(coordination_handle);
753 Ok(())
754 }
755
756 async fn coordinate_discoveries(
758 discoveries: &Arc<RwLock<HashMap<String, DiscoveryTask>>>,
759 stats: &Arc<Mutex<ParallelDiscoveryStats>>,
760 _config: &ParallelDiscoveryConfig,
761 ) {
762 let mut total_candidates = 0u64;
763 let mut completed_tasks = 0u64;
764 let mut total_discovery_time = Duration::ZERO;
765
766 {
767 let discoveries_read = discoveries.read().unwrap();
768 for task in discoveries_read.values() {
769 if task.status == TaskStatus::Completed {
770 total_candidates += task.discovered_candidates.len() as u64;
771 completed_tasks += 1;
772 total_discovery_time += task.started_at.elapsed();
773 }
774 }
775 }
776
777 {
779 let mut stats_guard = stats.lock().unwrap();
780 stats_guard.total_candidates = total_candidates;
781 stats_guard.tasks_completed = completed_tasks;
782
783 if completed_tasks > 0 {
784 stats_guard.avg_discovery_time = total_discovery_time / completed_tasks as u32;
785 stats_guard.parallelism_efficiency =
786 completed_tasks as f64 / stats_guard.tasks_started as f64;
787 }
788 }
789 }
790
791 pub async fn get_all_candidates(&self) -> Vec<CandidateAddress> {
793 let mut all_candidates = Vec::new();
794
795 let discoveries = self.active_discoveries.read().unwrap();
796 for task in discoveries.values() {
797 if task.status == TaskStatus::Completed {
798 all_candidates.extend(task.discovered_candidates.clone());
799 }
800 }
801
802 all_candidates.sort_by(|a, b| b.priority.cmp(&a.priority));
804
805 all_candidates
806 }
807
808 pub async fn get_stats(&self) -> ParallelDiscoveryStats {
810 self.stats.lock().unwrap().clone()
811 }
812
813 pub async fn shutdown(&mut self) {
815 if let Some(handle) = self.coordination_handle.take() {
816 handle.abort();
817 }
818
819 {
821 let mut discoveries = self.active_discoveries.write().unwrap();
822 discoveries.clear();
823 }
824
825 info!("Parallel discovery coordinator shutdown complete");
826 }
827}
828
829impl Default for AdaptiveTimeoutManager {
830 fn default() -> Self {
831 Self::new()
832 }
833}
834
835impl AdaptiveTimeoutManager {
836 pub fn new() -> Self {
838 let mut timeout_configs = HashMap::new();
839
840 timeout_configs.insert(
842 OperationType::CandidateDiscovery,
843 AdaptiveTimeoutConfig {
844 base_timeout: Duration::from_secs(5),
845 min_timeout: Duration::from_millis(500),
846 max_timeout: Duration::from_secs(30),
847 rtt_multiplier: 4.0,
848 quality_factor: 0.5,
849 congestion_factor: 0.3,
850 },
851 );
852
853 timeout_configs.insert(
854 OperationType::PathValidation,
855 AdaptiveTimeoutConfig {
856 base_timeout: Duration::from_secs(3),
857 min_timeout: Duration::from_millis(200),
858 max_timeout: Duration::from_secs(15),
859 rtt_multiplier: 3.0,
860 quality_factor: 0.4,
861 congestion_factor: 0.4,
862 },
863 );
864
865 timeout_configs.insert(
866 OperationType::CoordinationRequest,
867 AdaptiveTimeoutConfig {
868 base_timeout: Duration::from_secs(10),
869 min_timeout: Duration::from_secs(1),
870 max_timeout: Duration::from_secs(60),
871 rtt_multiplier: 5.0,
872 quality_factor: 0.6,
873 congestion_factor: 0.2,
874 },
875 );
876
877 timeout_configs.insert(
878 OperationType::HolePunching,
879 AdaptiveTimeoutConfig {
880 base_timeout: Duration::from_secs(2),
881 min_timeout: Duration::from_millis(100),
882 max_timeout: Duration::from_secs(10),
883 rtt_multiplier: 2.0,
884 quality_factor: 0.3,
885 congestion_factor: 0.5,
886 },
887 );
888
889 timeout_configs.insert(
890 OperationType::ConnectionEstablishment,
891 AdaptiveTimeoutConfig {
892 base_timeout: Duration::from_secs(15),
893 min_timeout: Duration::from_secs(2),
894 max_timeout: Duration::from_secs(120),
895 rtt_multiplier: 6.0,
896 quality_factor: 0.7,
897 congestion_factor: 0.1,
898 },
899 );
900
901 Self {
902 network_conditions: Arc::new(RwLock::new(NetworkConditions {
903 rtt_samples: VecDeque::new(),
904 packet_loss_rate: 0.0,
905 bandwidth_estimate: 1_000_000, quality_score: 0.8, congestion_level: 0.2, last_measurement: Instant::now(),
909 })),
910 timeout_configs,
911 stats: Arc::new(Mutex::new(AdaptiveTimeoutStats::default())),
912 monitoring_handle: None,
913 }
914 }
915
916 pub async fn start(&mut self) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
918 let network_conditions = Arc::clone(&self.network_conditions);
919 let stats = Arc::clone(&self.stats);
920
921 let monitoring_handle = tokio::spawn(async move {
922 let mut interval = tokio::time::interval(Duration::from_secs(1));
923
924 loop {
925 interval.tick().await;
926 Self::update_network_conditions(&network_conditions, &stats).await;
927 }
928 });
929
930 self.monitoring_handle = Some(monitoring_handle);
931 info!("Adaptive timeout manager started");
932 Ok(())
933 }
934
935 pub async fn calculate_timeout(&self, operation: OperationType) -> Duration {
937 let config = self
938 .timeout_configs
939 .get(&operation)
940 .cloned()
941 .unwrap_or_else(|| AdaptiveTimeoutConfig {
942 base_timeout: Duration::from_secs(5),
943 min_timeout: Duration::from_millis(500),
944 max_timeout: Duration::from_secs(30),
945 rtt_multiplier: 4.0,
946 quality_factor: 0.5,
947 congestion_factor: 0.3,
948 });
949
950 let conditions = self.network_conditions.read().unwrap();
951
952 let rtt_based_timeout =
954 if let Some(avg_rtt) = self.calculate_average_rtt(&conditions.rtt_samples) {
955 Duration::from_millis((avg_rtt.as_millis() as f64 * config.rtt_multiplier) as u64)
956 } else {
957 config.base_timeout
958 };
959
960 let quality_adjustment = 1.0 + (1.0 - conditions.quality_score) * config.quality_factor;
962
963 let congestion_adjustment = 1.0 + conditions.congestion_level * config.congestion_factor;
965
966 let adjusted_timeout = Duration::from_millis(
968 (rtt_based_timeout.as_millis() as f64 * quality_adjustment * congestion_adjustment)
969 as u64,
970 );
971
972 let final_timeout = adjusted_timeout
974 .max(config.min_timeout)
975 .min(config.max_timeout);
976
977 {
979 let mut stats = self.stats.lock().unwrap();
980 stats.adjustments_made += 1;
981 stats.avg_timeouts.insert(operation, final_timeout);
982 }
983
984 debug!(
985 "Calculated adaptive timeout for {:?}: {:?} (quality: {:.2}, congestion: {:.2})",
986 operation, final_timeout, conditions.quality_score, conditions.congestion_level
987 );
988
989 final_timeout
990 }
991
992 pub async fn record_measurement(
994 &self,
995 rtt: Duration,
996 packet_loss: bool,
997 bandwidth: Option<u64>,
998 ) {
999 let mut conditions = self.network_conditions.write().unwrap();
1000
1001 conditions.rtt_samples.push_back(rtt);
1003 if conditions.rtt_samples.len() > 50 {
1004 conditions.rtt_samples.pop_front();
1005 }
1006
1007 let loss_sample = if packet_loss { 1.0 } else { 0.0 };
1009 conditions.packet_loss_rate = conditions.packet_loss_rate * 0.9 + loss_sample * 0.1;
1010
1011 if let Some(bw) = bandwidth {
1013 conditions.bandwidth_estimate =
1014 (conditions.bandwidth_estimate as f64 * 0.8 + bw as f64 * 0.2) as u64;
1015 }
1016
1017 let rtt_quality = 1.0 - (rtt.as_millis() as f64 / 1000.0).min(1.0);
1019 let loss_quality = 1.0 - conditions.packet_loss_rate;
1020 conditions.quality_score = (rtt_quality + loss_quality) / 2.0;
1021
1022 let rtt_variance = self.calculate_rtt_variance(&conditions.rtt_samples);
1024 conditions.congestion_level = (conditions.packet_loss_rate + rtt_variance).min(1.0);
1025
1026 conditions.last_measurement = Instant::now();
1027 }
1028
1029 fn calculate_average_rtt(&self, samples: &VecDeque<Duration>) -> Option<Duration> {
1031 if samples.is_empty() {
1032 return None;
1033 }
1034
1035 let total_ms: u64 = samples.iter().map(|d| d.as_millis() as u64).sum();
1036 Some(Duration::from_millis(total_ms / samples.len() as u64))
1037 }
1038
1039 fn calculate_rtt_variance(&self, samples: &VecDeque<Duration>) -> f64 {
1041 if samples.len() < 2 {
1042 return 0.0;
1043 }
1044
1045 let avg = self.calculate_average_rtt(samples).unwrap().as_millis() as f64;
1046 let variance: f64 = samples
1047 .iter()
1048 .map(|d| {
1049 let diff = d.as_millis() as f64 - avg;
1050 diff * diff
1051 })
1052 .sum::<f64>()
1053 / samples.len() as f64;
1054
1055 (variance.sqrt() / avg).min(1.0)
1056 }
1057
1058 async fn update_network_conditions(
1060 network_conditions: &Arc<RwLock<NetworkConditions>>,
1061 _stats: &Arc<Mutex<AdaptiveTimeoutStats>>,
1062 ) {
1063 let mut conditions = network_conditions.write().unwrap();
1071
1072 while conditions.rtt_samples.len() > 100 {
1074 conditions.rtt_samples.pop_front();
1075 }
1076
1077 conditions.packet_loss_rate *= 0.99;
1079
1080 if conditions.last_measurement.elapsed() > Duration::from_secs(10) {
1082 conditions.quality_score *= 0.95;
1084 }
1085 }
1086
1087 pub async fn get_network_conditions(&self) -> NetworkConditions {
1089 self.network_conditions.read().unwrap().clone()
1090 }
1091
1092 pub async fn get_stats(&self) -> AdaptiveTimeoutStats {
1094 self.stats.lock().unwrap().clone()
1095 }
1096
1097 pub async fn shutdown(&mut self) {
1099 if let Some(handle) = self.monitoring_handle.take() {
1100 handle.abort();
1101 }
1102
1103 info!("Adaptive timeout manager shutdown complete");
1104 }
1105}
1106
1107impl BandwidthAwareValidator {
1108 pub fn new(config: BandwidthValidationConfig) -> Self {
1110 Self {
1111 active_validations: Arc::new(RwLock::new(HashMap::new())),
1112 bandwidth_monitor: Arc::new(Mutex::new(BandwidthMonitor {
1113 bandwidth_samples: VecDeque::new(),
1114 current_bandwidth: 1_000_000, utilization: 0.0,
1116 last_measurement: Instant::now(),
1117 })),
1118 config,
1119 stats: Arc::new(Mutex::new(BandwidthValidationStats::default())),
1120 }
1121 }
1122
1123 pub async fn start_validation(
1125 &self,
1126 target_address: SocketAddr,
1127 priority: ValidationPriority,
1128 ) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
1129 if !self.can_start_validation().await {
1131 return Err("Bandwidth limit reached, cannot start validation".into());
1132 }
1133
1134 let session = ValidationSession {
1135 target_address,
1136 started_at: Instant::now(),
1137 packets_sent: 0,
1138 packets_received: 0,
1139 total_bytes: 0,
1140 rtt_samples: Vec::new(),
1141 bandwidth_usage: 0,
1142 priority,
1143 };
1144
1145 {
1147 let mut validations = self.active_validations.write().unwrap();
1148 validations.insert(target_address, session);
1149 }
1150
1151 {
1153 let mut stats = self.stats.lock().unwrap();
1154 stats.validations_started += 1;
1155 }
1156
1157 debug!("Started bandwidth-aware validation for {}", target_address);
1158 Ok(())
1159 }
1160
1161 async fn can_start_validation(&self) -> bool {
1163 let validations = self.active_validations.read().unwrap();
1164 let bandwidth_monitor = self.bandwidth_monitor.lock().unwrap();
1165
1166 if validations.len() >= self.config.max_concurrent_validations {
1168 return false;
1169 }
1170
1171 if self.config.enable_adaptive_validation {
1173 let current_usage: u64 = validations
1174 .values()
1175 .map(|session| session.bandwidth_usage)
1176 .sum();
1177
1178 let available_bandwidth = bandwidth_monitor.current_bandwidth;
1179 let utilization = current_usage as f64 / available_bandwidth as f64;
1180
1181 if utilization > 0.8 {
1182 return false;
1184 }
1185 }
1186
1187 true
1188 }
1189
1190 pub async fn record_packet_sent(
1192 &self,
1193 target_address: SocketAddr,
1194 packet_size: usize,
1195 ) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
1196 let mut validations = self.active_validations.write().unwrap();
1197
1198 if let Some(session) = validations.get_mut(&target_address) {
1199 session.packets_sent += 1;
1200 session.total_bytes += packet_size as u64;
1201 session.bandwidth_usage += packet_size as u64;
1202 }
1203
1204 self.update_bandwidth_usage(packet_size as u64).await;
1206
1207 Ok(())
1208 }
1209
1210 pub async fn record_packet_received(
1212 &self,
1213 target_address: SocketAddr,
1214 rtt: Duration,
1215 ) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
1216 let mut validations = self.active_validations.write().unwrap();
1217
1218 if let Some(session) = validations.get_mut(&target_address) {
1219 session.packets_received += 1;
1220 session.rtt_samples.push(rtt);
1221 }
1222
1223 Ok(())
1224 }
1225
1226 async fn update_bandwidth_usage(&self, bytes_used: u64) {
1228 let mut monitor = self.bandwidth_monitor.lock().unwrap();
1229
1230 let now = Instant::now();
1231 let sample = BandwidthSample {
1232 timestamp: now,
1233 bytes_transferred: bytes_used,
1234 duration: now.duration_since(monitor.last_measurement),
1235 bandwidth: if monitor.last_measurement.elapsed().as_secs() > 0 {
1236 bytes_used / monitor.last_measurement.elapsed().as_secs()
1237 } else {
1238 0
1239 },
1240 };
1241
1242 monitor.bandwidth_samples.push_back(sample);
1243 if monitor.bandwidth_samples.len() > 100 {
1244 monitor.bandwidth_samples.pop_front();
1245 }
1246
1247 if !monitor.bandwidth_samples.is_empty() {
1249 let total_bytes: u64 = monitor
1250 .bandwidth_samples
1251 .iter()
1252 .map(|s| s.bytes_transferred)
1253 .sum();
1254 let total_time: Duration = monitor.bandwidth_samples.iter().map(|s| s.duration).sum();
1255
1256 if total_time.as_secs() > 0 {
1257 monitor.current_bandwidth = total_bytes / total_time.as_secs();
1258 }
1259 }
1260
1261 monitor.last_measurement = now;
1262 }
1263
1264 pub async fn complete_validation(
1266 &self,
1267 target_address: SocketAddr,
1268 success: bool,
1269 ) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
1270 let session = {
1271 let mut validations = self.active_validations.write().unwrap();
1272 validations.remove(&target_address)
1273 };
1274
1275 if let Some(session) = session {
1276 let duration = session.started_at.elapsed();
1277
1278 {
1280 let mut stats = self.stats.lock().unwrap();
1281 if success {
1282 stats.validations_completed += 1;
1283 }
1284 stats.total_bandwidth_used += session.bandwidth_usage;
1285 stats.avg_validation_time = if stats.validations_completed > 0 {
1286 Duration::from_millis(
1287 (stats.avg_validation_time.as_millis() as u64
1288 * (stats.validations_completed - 1)
1289 + duration.as_millis() as u64)
1290 / stats.validations_completed,
1291 )
1292 } else {
1293 duration
1294 };
1295
1296 if stats.total_bandwidth_used > 0 {
1297 stats.bandwidth_efficiency = stats.validations_completed as f64
1298 / stats.total_bandwidth_used as f64
1299 * 1000.0; }
1301 }
1302
1303 debug!(
1304 "Completed validation for {} in {:?} (success: {})",
1305 target_address, duration, success
1306 );
1307 }
1308
1309 Ok(())
1310 }
1311
1312 pub async fn get_stats(&self) -> BandwidthValidationStats {
1314 self.stats.lock().unwrap().clone()
1315 }
1316}
1317
1318impl CongestionControlIntegrator {
1319 pub fn new(config: CongestionIntegrationConfig) -> Self {
1321 Self {
1322 active_migrations: Arc::new(RwLock::new(HashMap::new())),
1323 congestion_state: Arc::new(Mutex::new(CongestionState {
1324 congestion_window: 10, ssthresh: 65535,
1326 rtt_measurements: VecDeque::new(),
1327 congestion_events: VecDeque::new(),
1328 congestion_level: 0.0,
1329 })),
1330 config,
1331 stats: Arc::new(Mutex::new(CongestionIntegrationStats::default())),
1332 }
1333 }
1334
1335 pub async fn start_migration(
1337 &self,
1338 peer_id: PeerId,
1339 old_path: SocketAddr,
1340 new_path: SocketAddr,
1341 ) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
1342 if self.config.enable_congestion_awareness {
1344 let congestion_state = self.congestion_state.lock().unwrap();
1345 if congestion_state.congestion_level > self.config.congestion_threshold {
1346 return Err("Migration delayed due to high congestion".into());
1347 }
1348 }
1349
1350 let session = MigrationSession {
1351 peer_id,
1352 old_path,
1353 new_path,
1354 started_at: Instant::now(),
1355 migration_state: MigrationState::Initiated,
1356 congestion_window: {
1357 let state = self.congestion_state.lock().unwrap();
1358 (state.congestion_window as f64 * self.config.cwnd_scaling_factor) as u32
1359 },
1360 rtt_estimate: Duration::from_millis(100), bandwidth_estimate: 1_000_000, };
1363
1364 {
1366 let mut migrations = self.active_migrations.write().unwrap();
1367 migrations.insert(peer_id, session);
1368 }
1369
1370 {
1372 let mut stats = self.stats.lock().unwrap();
1373 stats.migrations_attempted += 1;
1374 }
1375
1376 info!(
1377 "Started congestion-aware migration for peer {:?}: {} -> {}",
1378 peer_id, old_path, new_path
1379 );
1380 Ok(())
1381 }
1382
1383 pub async fn update_migration_state(
1385 &self,
1386 peer_id: PeerId,
1387 new_state: MigrationState,
1388 rtt: Option<Duration>,
1389 bandwidth: Option<u64>,
1390 ) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
1391 let mut migrations = self.active_migrations.write().unwrap();
1392
1393 if let Some(session) = migrations.get_mut(&peer_id) {
1394 session.migration_state = new_state;
1395
1396 if let Some(rtt) = rtt {
1397 session.rtt_estimate = rtt;
1398
1399 let mut congestion_state = self.congestion_state.lock().unwrap();
1401 congestion_state.rtt_measurements.push_back(rtt);
1402 if congestion_state.rtt_measurements.len() > 50 {
1403 congestion_state.rtt_measurements.pop_front();
1404 }
1405 }
1406
1407 if let Some(bw) = bandwidth {
1408 session.bandwidth_estimate = bw;
1409 }
1410
1411 if matches!(new_state, MigrationState::Completed) {
1413 let duration = session.started_at.elapsed();
1414
1415 let mut stats = self.stats.lock().unwrap();
1417 stats.migrations_successful += 1;
1418 stats.avg_migration_time = if stats.migrations_successful > 0 {
1419 Duration::from_millis(
1420 (stats.avg_migration_time.as_millis() as u64
1421 * (stats.migrations_successful - 1)
1422 + duration.as_millis() as u64)
1423 / stats.migrations_successful,
1424 )
1425 } else {
1426 duration
1427 };
1428
1429 debug!(
1430 "Migration completed for peer {:?} in {:?}",
1431 peer_id, duration
1432 );
1433 }
1434 }
1435
1436 Ok(())
1437 }
1438
1439 pub async fn record_congestion_event(&self, event_type: CongestionEventType, severity: f64) {
1441 let event = CongestionEvent {
1442 timestamp: Instant::now(),
1443 event_type,
1444 severity,
1445 };
1446
1447 let mut congestion_state = self.congestion_state.lock().unwrap();
1448 congestion_state.congestion_events.push_back(event);
1449
1450 if congestion_state.congestion_events.len() > 100 {
1452 congestion_state.congestion_events.pop_front();
1453 }
1454
1455 let recent_events: Vec<_> = congestion_state
1457 .congestion_events
1458 .iter()
1459 .filter(|e| e.timestamp.elapsed() < Duration::from_secs(10))
1460 .collect();
1461
1462 if !recent_events.is_empty() {
1463 let avg_severity: f64 =
1464 recent_events.iter().map(|e| e.severity).sum::<f64>() / recent_events.len() as f64;
1465
1466 congestion_state.congestion_level = avg_severity;
1467 }
1468
1469 match event_type {
1471 CongestionEventType::PacketLoss | CongestionEventType::Timeout => {
1472 congestion_state.ssthresh = congestion_state.congestion_window / 2;
1473 congestion_state.congestion_window = congestion_state.ssthresh;
1474 }
1475 CongestionEventType::ECNMark => {
1476 congestion_state.congestion_window =
1477 (congestion_state.congestion_window as f64 * 0.8) as u32;
1478 }
1479 CongestionEventType::RTTIncrease => {
1480 congestion_state.congestion_window =
1482 (congestion_state.congestion_window as f64 * 0.95) as u32;
1483 }
1484 }
1485
1486 debug!(
1487 "Recorded congestion event: {:?} (severity: {:.2}, new cwnd: {})",
1488 event_type, severity, congestion_state.congestion_window
1489 );
1490 }
1491
1492 pub async fn get_stats(&self) -> CongestionIntegrationStats {
1494 self.stats.lock().unwrap().clone()
1495 }
1496}
1497
1498#[derive(Debug)]
1500pub struct NetworkEfficiencyManager {
1501 parallel_discovery: ParallelDiscoveryCoordinator,
1502 adaptive_timeout: AdaptiveTimeoutManager,
1503 bandwidth_validator: BandwidthAwareValidator,
1504 congestion_integrator: CongestionControlIntegrator,
1505 is_running: bool,
1506}
1507
1508impl NetworkEfficiencyManager {
1509 pub fn new() -> Self {
1511 Self {
1512 parallel_discovery: ParallelDiscoveryCoordinator::new(
1513 ParallelDiscoveryConfig::default(),
1514 ),
1515 adaptive_timeout: AdaptiveTimeoutManager::new(),
1516 bandwidth_validator: BandwidthAwareValidator::new(BandwidthValidationConfig::default()),
1517 congestion_integrator: CongestionControlIntegrator::new(
1518 CongestionIntegrationConfig::default(),
1519 ),
1520 is_running: false,
1521 }
1522 }
1523
1524 pub fn with_configs(
1526 discovery_config: ParallelDiscoveryConfig,
1527 validation_config: BandwidthValidationConfig,
1528 congestion_config: CongestionIntegrationConfig,
1529 ) -> Self {
1530 Self {
1531 parallel_discovery: ParallelDiscoveryCoordinator::new(discovery_config),
1532 adaptive_timeout: AdaptiveTimeoutManager::new(),
1533 bandwidth_validator: BandwidthAwareValidator::new(validation_config),
1534 congestion_integrator: CongestionControlIntegrator::new(congestion_config),
1535 is_running: false,
1536 }
1537 }
1538
1539 pub async fn start(&mut self) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
1541 if self.is_running {
1542 return Ok(());
1543 }
1544
1545 self.adaptive_timeout.start().await?;
1546
1547 self.is_running = true;
1548 info!("Network efficiency manager started");
1549 Ok(())
1550 }
1551
1552 pub fn parallel_discovery(&mut self) -> &mut ParallelDiscoveryCoordinator {
1554 &mut self.parallel_discovery
1555 }
1556
1557 pub fn adaptive_timeout(&self) -> &AdaptiveTimeoutManager {
1559 &self.adaptive_timeout
1560 }
1561
1562 pub fn bandwidth_validator(&self) -> &BandwidthAwareValidator {
1564 &self.bandwidth_validator
1565 }
1566
1567 pub fn congestion_integrator(&self) -> &CongestionControlIntegrator {
1569 &self.congestion_integrator
1570 }
1571
1572 pub async fn get_comprehensive_stats(&self) -> NetworkEfficiencyStats {
1574 NetworkEfficiencyStats {
1575 parallel_discovery: self.parallel_discovery.get_stats().await,
1576 adaptive_timeout: self.adaptive_timeout.get_stats().await,
1577 bandwidth_validation: self.bandwidth_validator.get_stats().await,
1578 congestion_integration: self.congestion_integrator.get_stats().await,
1579 }
1580 }
1581
1582 pub async fn shutdown(&mut self) {
1584 if !self.is_running {
1585 return;
1586 }
1587
1588 self.parallel_discovery.shutdown().await;
1589 self.adaptive_timeout.shutdown().await;
1590
1591 self.is_running = false;
1592 info!("Network efficiency manager shutdown complete");
1593 }
1594}
1595
1596#[derive(Debug, Clone)]
1598pub struct NetworkEfficiencyStats {
1599 pub parallel_discovery: ParallelDiscoveryStats,
1600 pub adaptive_timeout: AdaptiveTimeoutStats,
1601 pub bandwidth_validation: BandwidthValidationStats,
1602 pub congestion_integration: CongestionIntegrationStats,
1603}
1604
1605impl Default for NetworkEfficiencyManager {
1606 fn default() -> Self {
1607 Self::new()
1608 }
1609}