1use std::{
10 collections::{HashMap, VecDeque},
11 net::{IpAddr, SocketAddr},
12 sync::{Arc, Mutex, RwLock},
13 time::{Duration, Instant},
14};
15
16use tokio::time::timeout;
17
18use tracing::{debug, info, warn};
19
20use tokio::time::sleep;
21
22use crate::{
23 nat_traversal_api::{CandidateAddress, PeerId},
24 candidate_discovery::NetworkInterface,
25 connection::nat_traversal::{CandidateSource, CandidateState},
26};
27
28#[derive(Debug)]
30pub struct ParallelDiscoveryCoordinator {
31 active_discoveries: Arc<RwLock<HashMap<String, DiscoveryTask>>>,
33 config: ParallelDiscoveryConfig,
35 stats: Arc<Mutex<ParallelDiscoveryStats>>,
37 coordination_handle: Option<tokio::task::JoinHandle<()>>,
39}
40
41#[derive(Debug, Clone)]
43pub struct ParallelDiscoveryConfig {
44 pub max_concurrent_tasks: usize,
46 pub interface_timeout: Duration,
48 pub enable_prioritization: bool,
50 pub preferred_interface_types: Vec<InterfaceType>,
52 pub enable_adaptive_parallelism: bool,
54}
55
56#[derive(Debug, Clone, Copy, PartialEq, Eq)]
58pub enum InterfaceType {
59 Ethernet,
60 WiFi,
61 Cellular,
62 Loopback,
63 VPN,
64 Unknown,
65}
66
67#[derive(Debug)]
69struct DiscoveryTask {
70 interface_name: String,
71 interface_type: InterfaceType,
72 started_at: Instant,
73 status: TaskStatus,
74 discovered_candidates: Vec<CandidateAddress>,
75 priority: u32,
76}
77
78#[derive(Debug, Clone, Copy, PartialEq, Eq)]
80enum TaskStatus {
81 Pending,
82 Running,
83 Completed,
84 Failed,
85 Timeout,
86}
87
88#[derive(Debug, Default, Clone)]
90pub struct ParallelDiscoveryStats {
91 pub tasks_started: u64,
93 pub tasks_completed: u64,
95 pub tasks_failed: u64,
97 pub avg_discovery_time: Duration,
99 pub total_candidates: u64,
101 pub parallelism_efficiency: f64,
103}
104
105#[derive(Debug)]
107pub struct AdaptiveTimeoutManager {
108 network_conditions: Arc<RwLock<NetworkConditions>>,
110 timeout_configs: HashMap<OperationType, AdaptiveTimeoutConfig>,
112 stats: Arc<Mutex<AdaptiveTimeoutStats>>,
114 monitoring_handle: Option<tokio::task::JoinHandle<()>>,
116}
117
118#[derive(Debug, Clone)]
120pub struct NetworkConditions {
121 rtt_samples: VecDeque<Duration>,
123 packet_loss_rate: f64,
125 bandwidth_estimate: u64,
127 quality_score: f64,
129 congestion_level: f64,
131 last_measurement: Instant,
133}
134
135#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
137pub enum OperationType {
138 CandidateDiscovery,
139 PathValidation,
140 CoordinationRequest,
141 HolePunching,
142 ConnectionEstablishment,
143}
144
145#[derive(Debug, Clone)]
147struct AdaptiveTimeoutConfig {
148 base_timeout: Duration,
150 min_timeout: Duration,
152 max_timeout: Duration,
154 rtt_multiplier: f64,
156 quality_factor: f64,
158 congestion_factor: f64,
160}
161
162#[derive(Debug, Default, Clone)]
164pub struct AdaptiveTimeoutStats {
165 pub adjustments_made: u64,
167 pub avg_timeouts: HashMap<OperationType, Duration>,
169 pub timeout_effectiveness: f64,
171 pub condition_accuracy: f64,
173}
174
175#[derive(Debug)]
177pub struct BandwidthAwareValidator {
178 active_validations: Arc<RwLock<HashMap<SocketAddr, ValidationSession>>>,
180 bandwidth_monitor: Arc<Mutex<BandwidthMonitor>>,
182 config: BandwidthValidationConfig,
184 stats: Arc<Mutex<BandwidthValidationStats>>,
186}
187
188#[derive(Debug, Clone)]
190pub struct BandwidthValidationConfig {
191 pub max_concurrent_validations: usize,
193 pub bandwidth_threshold: u64,
195 pub enable_adaptive_validation: bool,
197 pub validation_packet_size: usize,
199 pub max_validation_rate: f64,
201}
202
203#[derive(Debug)]
205struct BandwidthMonitor {
206 bandwidth_samples: VecDeque<BandwidthSample>,
208 current_bandwidth: u64,
210 utilization: f64,
212 last_measurement: Instant,
214}
215
216#[derive(Debug, Clone)]
218struct BandwidthSample {
219 timestamp: Instant,
220 bytes_transferred: u64,
221 duration: Duration,
222 bandwidth: u64,
223}
224
225#[derive(Debug)]
227struct ValidationSession {
228 target_address: SocketAddr,
229 started_at: Instant,
230 packets_sent: u32,
231 packets_received: u32,
232 total_bytes: u64,
233 rtt_samples: Vec<Duration>,
234 bandwidth_usage: u64,
235 priority: ValidationPriority,
236}
237
238#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord)]
240pub enum ValidationPriority {
241 Low,
242 Normal,
243 High,
244 Critical,
245}
246
247#[derive(Debug, Default, Clone)]
249pub struct BandwidthValidationStats {
250 pub validations_started: u64,
252 pub validations_completed: u64,
254 pub total_bandwidth_used: u64,
256 pub avg_validation_time: Duration,
258 pub bandwidth_efficiency: f64,
260}
261
262#[derive(Debug)]
264pub struct CongestionControlIntegrator {
265 active_migrations: Arc<RwLock<HashMap<PeerId, MigrationSession>>>,
267 congestion_state: Arc<Mutex<CongestionState>>,
269 config: CongestionIntegrationConfig,
271 stats: Arc<Mutex<CongestionIntegrationStats>>,
273}
274
275#[derive(Debug, Clone)]
277pub struct CongestionIntegrationConfig {
278 pub enable_congestion_awareness: bool,
280 pub congestion_threshold: f64,
282 pub max_migrations_per_second: f64,
284 pub enable_bandwidth_estimation: bool,
286 pub cwnd_scaling_factor: f64,
288}
289
290#[derive(Debug)]
292struct MigrationSession {
293 peer_id: PeerId,
294 old_path: SocketAddr,
295 new_path: SocketAddr,
296 started_at: Instant,
297 migration_state: MigrationState,
298 congestion_window: u32,
299 rtt_estimate: Duration,
300 bandwidth_estimate: u64,
301}
302
303#[derive(Debug, Clone, Copy, PartialEq, Eq)]
305pub enum MigrationState {
306 Initiated,
307 PathValidating,
308 CongestionProbing,
309 Migrating,
310 Completed,
311 Failed,
312}
313
314#[derive(Debug)]
316struct CongestionState {
317 congestion_window: u32,
319 ssthresh: u32,
321 rtt_measurements: VecDeque<Duration>,
323 congestion_events: VecDeque<CongestionEvent>,
325 congestion_level: f64,
327}
328
329#[derive(Debug, Clone)]
331struct CongestionEvent {
332 timestamp: Instant,
333 event_type: CongestionEventType,
334 severity: f64,
335}
336
337#[derive(Debug, Clone, Copy, PartialEq, Eq)]
339pub enum CongestionEventType {
340 PacketLoss,
341 Timeout,
342 ECNMark,
343 RTTIncrease,
344}
345
346#[derive(Debug, Default, Clone)]
348pub struct CongestionIntegrationStats {
349 pub migrations_attempted: u64,
351 pub migrations_successful: u64,
353 pub avg_migration_time: Duration,
355 pub congestion_avoided_migrations: u64,
357 pub bandwidth_utilization_efficiency: f64,
359}
360
361impl Default for ParallelDiscoveryConfig {
362 fn default() -> Self {
363 Self {
364 max_concurrent_tasks: 8,
365 interface_timeout: Duration::from_secs(5),
366 enable_prioritization: true,
367 preferred_interface_types: vec![
368 InterfaceType::Ethernet,
369 InterfaceType::WiFi,
370 InterfaceType::Cellular,
371 ],
372 enable_adaptive_parallelism: true,
373 }
374 }
375}
376
377impl Default for BandwidthValidationConfig {
378 fn default() -> Self {
379 Self {
380 max_concurrent_validations: 16,
381 bandwidth_threshold: 1_000_000, enable_adaptive_validation: true,
383 validation_packet_size: 64,
384 max_validation_rate: 100.0, }
386 }
387}
388
389impl Default for CongestionIntegrationConfig {
390 fn default() -> Self {
391 Self {
392 enable_congestion_awareness: true,
393 congestion_threshold: 0.7, max_migrations_per_second: 10.0,
395 enable_bandwidth_estimation: true,
396 cwnd_scaling_factor: 0.8,
397 }
398 }
399}
400
401impl ParallelDiscoveryCoordinator {
402 pub fn new(config: ParallelDiscoveryConfig) -> Self {
404 Self {
405 active_discoveries: Arc::new(RwLock::new(HashMap::new())),
406 config,
407 stats: Arc::new(Mutex::new(ParallelDiscoveryStats::default())),
408 coordination_handle: None,
409 }
410 }
411
412 pub async fn start_parallel_discovery(
414 &mut self,
415 interfaces: Vec<NetworkInterface>,
416 peer_id: PeerId,
417 ) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
418 info!("Starting parallel discovery across {} interfaces for peer {:?}",
419 interfaces.len(), peer_id);
420
421 let prioritized_interfaces = if self.config.enable_prioritization {
423 self.prioritize_interfaces(interfaces)
424 } else {
425 interfaces
426 };
427
428 let max_tasks = if self.config.enable_adaptive_parallelism {
430 self.calculate_adaptive_parallelism().await
431 } else {
432 self.config.max_concurrent_tasks
433 };
434
435 let tasks_to_start = prioritized_interfaces.into_iter()
436 .take(max_tasks)
437 .collect::<Vec<_>>();
438
439 for interface in tasks_to_start {
441 self.start_interface_discovery(interface, peer_id).await?;
442 }
443
444 self.start_coordination_task().await?;
446
447 Ok(())
448 }
449
450 fn prioritize_interfaces(&self, mut interfaces: Vec<NetworkInterface>) -> Vec<NetworkInterface> {
452 interfaces.sort_by_key(|interface| {
453 let interface_type = self.classify_interface_type(&interface.name);
454 let type_priority = self.config.preferred_interface_types
455 .iter()
456 .position(|&t| t == interface_type)
457 .unwrap_or(999);
458
459 (type_priority, interface.addresses.len())
461 });
462
463 interfaces
464 }
465
466 fn classify_interface_type(&self, name: &str) -> InterfaceType {
468 let name_lower = name.to_lowercase();
469
470 if name_lower.contains("eth") || name_lower.contains("en") {
471 InterfaceType::Ethernet
472 } else if name_lower.contains("wlan") || name_lower.contains("wifi") || name_lower.contains("wl") {
473 InterfaceType::WiFi
474 } else if name_lower.contains("cell") || name_lower.contains("wwan") || name_lower.contains("ppp") {
475 InterfaceType::Cellular
476 } else if name_lower.contains("lo") || name_lower.contains("loopback") {
477 InterfaceType::Loopback
478 } else if name_lower.contains("vpn") || name_lower.contains("tun") || name_lower.contains("tap") {
479 InterfaceType::VPN
480 } else {
481 InterfaceType::Unknown
482 }
483 }
484
485 async fn calculate_adaptive_parallelism(&self) -> usize {
487 let base_parallelism = self.config.max_concurrent_tasks;
495 let system_load_factor = 0.8; ((base_parallelism as f64) * system_load_factor) as usize
498 }
499
500 async fn start_interface_discovery(
502 &self,
503 interface: NetworkInterface,
504 _peer_id: PeerId,
505 ) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
506 let interface_type = self.classify_interface_type(&interface.name);
507 let priority = self.calculate_interface_priority(interface_type);
508
509 let task = DiscoveryTask {
510 interface_name: interface.name.clone(),
511 interface_type,
512 started_at: Instant::now(),
513 status: TaskStatus::Pending,
514 discovered_candidates: Vec::new(),
515 priority,
516 };
517
518 {
520 let mut discoveries = self.active_discoveries.write().unwrap();
521 discoveries.insert(interface.name.clone(), task);
522 }
523
524 {
526 let mut stats = self.stats.lock().unwrap();
527 stats.tasks_started += 1;
528 }
529
530 self.perform_interface_discovery(interface).await?;
532
533 Ok(())
534 }
535
536 fn calculate_interface_priority(&self, interface_type: InterfaceType) -> u32 {
538 match interface_type {
539 InterfaceType::Ethernet => 100,
540 InterfaceType::WiFi => 80,
541 InterfaceType::Cellular => 60,
542 InterfaceType::VPN => 40,
543 InterfaceType::Loopback => 20,
544 InterfaceType::Unknown => 10,
545 }
546 }
547
548 async fn perform_interface_discovery(
550 &self,
551 interface: NetworkInterface,
552 ) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
553 let interface_name = interface.name.clone();
554
555 {
557 let mut discoveries = self.active_discoveries.write().unwrap();
558 if let Some(task) = discoveries.get_mut(&interface_name) {
559 task.status = TaskStatus::Running;
560 }
561 }
562
563 let discovery_result = timeout(
565 self.config.interface_timeout,
566 self.discover_candidates_for_interface(interface),
567 ).await;
568
569 match discovery_result {
570 Ok(Ok(candidates)) => {
571 {
573 let mut discoveries = self.active_discoveries.write().unwrap();
574 if let Some(task) = discoveries.get_mut(&interface_name) {
575 task.status = TaskStatus::Completed;
576 task.discovered_candidates = candidates;
577 }
578 }
579
580 {
582 let mut stats = self.stats.lock().unwrap();
583 stats.tasks_completed += 1;
584 }
585
586 debug!("Interface discovery completed for {}", interface_name);
587 }
588 Ok(Err(_)) => {
589 {
591 let mut discoveries = self.active_discoveries.write().unwrap();
592 if let Some(task) = discoveries.get_mut(&interface_name) {
593 task.status = TaskStatus::Failed;
594 }
595 }
596
597 {
599 let mut stats = self.stats.lock().unwrap();
600 stats.tasks_failed += 1;
601 }
602
603 warn!("Interface discovery failed for {}", interface_name);
604 }
605 Err(_) => {
606 {
608 let mut discoveries = self.active_discoveries.write().unwrap();
609 if let Some(task) = discoveries.get_mut(&interface_name) {
610 task.status = TaskStatus::Timeout;
611 }
612 }
613
614 {
616 let mut stats = self.stats.lock().unwrap();
617 stats.tasks_failed += 1;
618 }
619
620 warn!("Interface discovery timeout for {}", interface_name);
621 }
622 }
623
624 Ok(())
625 }
626
627 async fn discover_candidates_for_interface(
629 &self,
630 interface: NetworkInterface,
631 ) -> Result<Vec<CandidateAddress>, Box<dyn std::error::Error + Send + Sync>> {
632 let mut candidates = Vec::new();
633
634 for address in &interface.addresses {
635 if self.is_valid_candidate_address(&address) {
637 let candidate = CandidateAddress {
638 address: *address,
639 priority: self.calculate_candidate_priority(&address, &interface),
640 source: CandidateSource::Local,
641 state: CandidateState::New,
642 };
643
644 candidates.push(candidate);
645 }
646 }
647
648 sleep(Duration::from_millis(100)).await;
650
651 Ok(candidates)
652 }
653
654 fn is_valid_candidate_address(&self, address: &SocketAddr) -> bool {
656 match address.ip() {
657 IpAddr::V4(ipv4) => {
658 !ipv4.is_loopback() && !ipv4.is_link_local() && !ipv4.is_broadcast()
659 }
660 IpAddr::V6(ipv6) => {
661 !ipv6.is_loopback() && !ipv6.is_unspecified()
662 }
663 }
664 }
665
666 fn calculate_candidate_priority(&self, address: &SocketAddr, interface: &NetworkInterface) -> u32 {
668 let mut priority = 1000u32;
669
670 if address.is_ipv4() {
672 priority += 100;
673 }
674
675 if !self.is_private_address(address) {
677 priority += 200;
678 }
679
680 let interface_type = self.classify_interface_type(&interface.name);
682 priority += self.calculate_interface_priority(interface_type);
683
684 priority
685 }
686
687 fn is_private_address(&self, address: &SocketAddr) -> bool {
689 match address.ip() {
690 IpAddr::V4(ipv4) => ipv4.is_private(),
691 IpAddr::V6(ipv6) => {
692 let segments = ipv6.segments();
694 (segments[0] & 0xfe00) == 0xfc00
695 }
696 }
697 }
698
699 async fn start_coordination_task(&mut self) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
701 let discoveries = Arc::clone(&self.active_discoveries);
702 let stats = Arc::clone(&self.stats);
703 let config = self.config.clone();
704
705 let coordination_handle = tokio::spawn(async move {
706 let mut interval = tokio::time::interval(Duration::from_millis(500));
707
708 loop {
709 interval.tick().await;
710 Self::coordinate_discoveries(&discoveries, &stats, &config).await;
711
712 let all_complete = {
714 let discoveries_read = discoveries.read().unwrap();
715 discoveries_read.values().all(|task| {
716 matches!(task.status, TaskStatus::Completed | TaskStatus::Failed | TaskStatus::Timeout)
717 })
718 };
719
720 if all_complete {
721 break;
722 }
723 }
724 });
725
726 self.coordination_handle = Some(coordination_handle);
727 Ok(())
728 }
729
730 async fn coordinate_discoveries(
732 discoveries: &Arc<RwLock<HashMap<String, DiscoveryTask>>>,
733 stats: &Arc<Mutex<ParallelDiscoveryStats>>,
734 _config: &ParallelDiscoveryConfig,
735 ) {
736 let mut total_candidates = 0u64;
737 let mut completed_tasks = 0u64;
738 let mut total_discovery_time = Duration::ZERO;
739
740 {
741 let discoveries_read = discoveries.read().unwrap();
742 for task in discoveries_read.values() {
743 if task.status == TaskStatus::Completed {
744 total_candidates += task.discovered_candidates.len() as u64;
745 completed_tasks += 1;
746 total_discovery_time += task.started_at.elapsed();
747 }
748 }
749 }
750
751 {
753 let mut stats_guard = stats.lock().unwrap();
754 stats_guard.total_candidates = total_candidates;
755 stats_guard.tasks_completed = completed_tasks;
756
757 if completed_tasks > 0 {
758 stats_guard.avg_discovery_time = total_discovery_time / completed_tasks as u32;
759 stats_guard.parallelism_efficiency = completed_tasks as f64 / stats_guard.tasks_started as f64;
760 }
761 }
762 }
763
764 pub async fn get_all_candidates(&self) -> Vec<CandidateAddress> {
766 let mut all_candidates = Vec::new();
767
768 let discoveries = self.active_discoveries.read().unwrap();
769 for task in discoveries.values() {
770 if task.status == TaskStatus::Completed {
771 all_candidates.extend(task.discovered_candidates.clone());
772 }
773 }
774
775 all_candidates.sort_by(|a, b| b.priority.cmp(&a.priority));
777
778 all_candidates
779 }
780
781 pub async fn get_stats(&self) -> ParallelDiscoveryStats {
783 self.stats.lock().unwrap().clone()
784 }
785
786 pub async fn shutdown(&mut self) {
788 if let Some(handle) = self.coordination_handle.take() {
789 handle.abort();
790 }
791
792 {
794 let mut discoveries = self.active_discoveries.write().unwrap();
795 discoveries.clear();
796 }
797
798 info!("Parallel discovery coordinator shutdown complete");
799 }
800}
801
802impl AdaptiveTimeoutManager {
803 pub fn new() -> Self {
805 let mut timeout_configs = HashMap::new();
806
807 timeout_configs.insert(OperationType::CandidateDiscovery, AdaptiveTimeoutConfig {
809 base_timeout: Duration::from_secs(5),
810 min_timeout: Duration::from_millis(500),
811 max_timeout: Duration::from_secs(30),
812 rtt_multiplier: 4.0,
813 quality_factor: 0.5,
814 congestion_factor: 0.3,
815 });
816
817 timeout_configs.insert(OperationType::PathValidation, AdaptiveTimeoutConfig {
818 base_timeout: Duration::from_secs(3),
819 min_timeout: Duration::from_millis(200),
820 max_timeout: Duration::from_secs(15),
821 rtt_multiplier: 3.0,
822 quality_factor: 0.4,
823 congestion_factor: 0.4,
824 });
825
826 timeout_configs.insert(OperationType::CoordinationRequest, AdaptiveTimeoutConfig {
827 base_timeout: Duration::from_secs(10),
828 min_timeout: Duration::from_secs(1),
829 max_timeout: Duration::from_secs(60),
830 rtt_multiplier: 5.0,
831 quality_factor: 0.6,
832 congestion_factor: 0.2,
833 });
834
835 timeout_configs.insert(OperationType::HolePunching, AdaptiveTimeoutConfig {
836 base_timeout: Duration::from_secs(2),
837 min_timeout: Duration::from_millis(100),
838 max_timeout: Duration::from_secs(10),
839 rtt_multiplier: 2.0,
840 quality_factor: 0.3,
841 congestion_factor: 0.5,
842 });
843
844 timeout_configs.insert(OperationType::ConnectionEstablishment, AdaptiveTimeoutConfig {
845 base_timeout: Duration::from_secs(15),
846 min_timeout: Duration::from_secs(2),
847 max_timeout: Duration::from_secs(120),
848 rtt_multiplier: 6.0,
849 quality_factor: 0.7,
850 congestion_factor: 0.1,
851 });
852
853 Self {
854 network_conditions: Arc::new(RwLock::new(NetworkConditions {
855 rtt_samples: VecDeque::new(),
856 packet_loss_rate: 0.0,
857 bandwidth_estimate: 1_000_000, quality_score: 0.8, congestion_level: 0.2, last_measurement: Instant::now(),
861 })),
862 timeout_configs,
863 stats: Arc::new(Mutex::new(AdaptiveTimeoutStats::default())),
864 monitoring_handle: None,
865 }
866 }
867
868 pub async fn start(&mut self) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
870 let network_conditions = Arc::clone(&self.network_conditions);
871 let stats = Arc::clone(&self.stats);
872
873 let monitoring_handle = tokio::spawn(async move {
874 let mut interval = tokio::time::interval(Duration::from_secs(1));
875
876 loop {
877 interval.tick().await;
878 Self::update_network_conditions(&network_conditions, &stats).await;
879 }
880 });
881
882 self.monitoring_handle = Some(monitoring_handle);
883 info!("Adaptive timeout manager started");
884 Ok(())
885 }
886
887 pub async fn calculate_timeout(&self, operation: OperationType) -> Duration {
889 let config = self.timeout_configs.get(&operation)
890 .cloned()
891 .unwrap_or_else(|| AdaptiveTimeoutConfig {
892 base_timeout: Duration::from_secs(5),
893 min_timeout: Duration::from_millis(500),
894 max_timeout: Duration::from_secs(30),
895 rtt_multiplier: 4.0,
896 quality_factor: 0.5,
897 congestion_factor: 0.3,
898 });
899
900 let conditions = self.network_conditions.read().unwrap();
901
902 let rtt_based_timeout = if let Some(avg_rtt) = self.calculate_average_rtt(&conditions.rtt_samples) {
904 Duration::from_millis((avg_rtt.as_millis() as f64 * config.rtt_multiplier) as u64)
905 } else {
906 config.base_timeout
907 };
908
909 let quality_adjustment = 1.0 + (1.0 - conditions.quality_score) * config.quality_factor;
911
912 let congestion_adjustment = 1.0 + conditions.congestion_level * config.congestion_factor;
914
915 let adjusted_timeout = Duration::from_millis(
917 (rtt_based_timeout.as_millis() as f64 * quality_adjustment * congestion_adjustment) as u64
918 );
919
920 let final_timeout = adjusted_timeout
922 .max(config.min_timeout)
923 .min(config.max_timeout);
924
925 {
927 let mut stats = self.stats.lock().unwrap();
928 stats.adjustments_made += 1;
929 stats.avg_timeouts.insert(operation, final_timeout);
930 }
931
932 debug!("Calculated adaptive timeout for {:?}: {:?} (quality: {:.2}, congestion: {:.2})",
933 operation, final_timeout, conditions.quality_score, conditions.congestion_level);
934
935 final_timeout
936 }
937
938 pub async fn record_measurement(
940 &self,
941 rtt: Duration,
942 packet_loss: bool,
943 bandwidth: Option<u64>,
944 ) {
945 let mut conditions = self.network_conditions.write().unwrap();
946
947 conditions.rtt_samples.push_back(rtt);
949 if conditions.rtt_samples.len() > 50 {
950 conditions.rtt_samples.pop_front();
951 }
952
953 let loss_sample = if packet_loss { 1.0 } else { 0.0 };
955 conditions.packet_loss_rate = conditions.packet_loss_rate * 0.9 + loss_sample * 0.1;
956
957 if let Some(bw) = bandwidth {
959 conditions.bandwidth_estimate = (conditions.bandwidth_estimate as f64 * 0.8 + bw as f64 * 0.2) as u64;
960 }
961
962 let rtt_quality = 1.0 - (rtt.as_millis() as f64 / 1000.0).min(1.0);
964 let loss_quality = 1.0 - conditions.packet_loss_rate;
965 conditions.quality_score = (rtt_quality + loss_quality) / 2.0;
966
967 let rtt_variance = self.calculate_rtt_variance(&conditions.rtt_samples);
969 conditions.congestion_level = (conditions.packet_loss_rate + rtt_variance).min(1.0);
970
971 conditions.last_measurement = Instant::now();
972 }
973
974 fn calculate_average_rtt(&self, samples: &VecDeque<Duration>) -> Option<Duration> {
976 if samples.is_empty() {
977 return None;
978 }
979
980 let total_ms: u64 = samples.iter().map(|d| d.as_millis() as u64).sum();
981 Some(Duration::from_millis(total_ms / samples.len() as u64))
982 }
983
984 fn calculate_rtt_variance(&self, samples: &VecDeque<Duration>) -> f64 {
986 if samples.len() < 2 {
987 return 0.0;
988 }
989
990 let avg = self.calculate_average_rtt(samples).unwrap().as_millis() as f64;
991 let variance: f64 = samples.iter()
992 .map(|d| {
993 let diff = d.as_millis() as f64 - avg;
994 diff * diff
995 })
996 .sum::<f64>() / samples.len() as f64;
997
998 (variance.sqrt() / avg).min(1.0)
999 }
1000
1001 async fn update_network_conditions(
1003 network_conditions: &Arc<RwLock<NetworkConditions>>,
1004 _stats: &Arc<Mutex<AdaptiveTimeoutStats>>,
1005 ) {
1006 let mut conditions = network_conditions.write().unwrap();
1014
1015 while conditions.rtt_samples.len() > 100 {
1017 conditions.rtt_samples.pop_front();
1018 }
1019
1020 conditions.packet_loss_rate *= 0.99;
1022
1023 if conditions.last_measurement.elapsed() > Duration::from_secs(10) {
1025 conditions.quality_score *= 0.95;
1027 }
1028 }
1029
1030 pub async fn get_network_conditions(&self) -> NetworkConditions {
1032 self.network_conditions.read().unwrap().clone()
1033 }
1034
1035 pub async fn get_stats(&self) -> AdaptiveTimeoutStats {
1037 self.stats.lock().unwrap().clone()
1038 }
1039
1040 pub async fn shutdown(&mut self) {
1042 if let Some(handle) = self.monitoring_handle.take() {
1043 handle.abort();
1044 }
1045
1046 info!("Adaptive timeout manager shutdown complete");
1047 }
1048}
1049
1050impl BandwidthAwareValidator {
1051 pub fn new(config: BandwidthValidationConfig) -> Self {
1053 Self {
1054 active_validations: Arc::new(RwLock::new(HashMap::new())),
1055 bandwidth_monitor: Arc::new(Mutex::new(BandwidthMonitor {
1056 bandwidth_samples: VecDeque::new(),
1057 current_bandwidth: 1_000_000, utilization: 0.0,
1059 last_measurement: Instant::now(),
1060 })),
1061 config,
1062 stats: Arc::new(Mutex::new(BandwidthValidationStats::default())),
1063 }
1064 }
1065
1066 pub async fn start_validation(
1068 &self,
1069 target_address: SocketAddr,
1070 priority: ValidationPriority,
1071 ) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
1072 if !self.can_start_validation().await {
1074 return Err("Bandwidth limit reached, cannot start validation".into());
1075 }
1076
1077 let session = ValidationSession {
1078 target_address,
1079 started_at: Instant::now(),
1080 packets_sent: 0,
1081 packets_received: 0,
1082 total_bytes: 0,
1083 rtt_samples: Vec::new(),
1084 bandwidth_usage: 0,
1085 priority,
1086 };
1087
1088 {
1090 let mut validations = self.active_validations.write().unwrap();
1091 validations.insert(target_address, session);
1092 }
1093
1094 {
1096 let mut stats = self.stats.lock().unwrap();
1097 stats.validations_started += 1;
1098 }
1099
1100 debug!("Started bandwidth-aware validation for {}", target_address);
1101 Ok(())
1102 }
1103
1104 async fn can_start_validation(&self) -> bool {
1106 let validations = self.active_validations.read().unwrap();
1107 let bandwidth_monitor = self.bandwidth_monitor.lock().unwrap();
1108
1109 if validations.len() >= self.config.max_concurrent_validations {
1111 return false;
1112 }
1113
1114 if self.config.enable_adaptive_validation {
1116 let current_usage: u64 = validations.values()
1117 .map(|session| session.bandwidth_usage)
1118 .sum();
1119
1120 let available_bandwidth = bandwidth_monitor.current_bandwidth;
1121 let utilization = current_usage as f64 / available_bandwidth as f64;
1122
1123 if utilization > 0.8 { return false;
1125 }
1126 }
1127
1128 true
1129 }
1130
1131 pub async fn record_packet_sent(
1133 &self,
1134 target_address: SocketAddr,
1135 packet_size: usize,
1136 ) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
1137 let mut validations = self.active_validations.write().unwrap();
1138
1139 if let Some(session) = validations.get_mut(&target_address) {
1140 session.packets_sent += 1;
1141 session.total_bytes += packet_size as u64;
1142 session.bandwidth_usage += packet_size as u64;
1143 }
1144
1145 self.update_bandwidth_usage(packet_size as u64).await;
1147
1148 Ok(())
1149 }
1150
1151 pub async fn record_packet_received(
1153 &self,
1154 target_address: SocketAddr,
1155 rtt: Duration,
1156 ) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
1157 let mut validations = self.active_validations.write().unwrap();
1158
1159 if let Some(session) = validations.get_mut(&target_address) {
1160 session.packets_received += 1;
1161 session.rtt_samples.push(rtt);
1162 }
1163
1164 Ok(())
1165 }
1166
1167 async fn update_bandwidth_usage(&self, bytes_used: u64) {
1169 let mut monitor = self.bandwidth_monitor.lock().unwrap();
1170
1171 let now = Instant::now();
1172 let sample = BandwidthSample {
1173 timestamp: now,
1174 bytes_transferred: bytes_used,
1175 duration: now.duration_since(monitor.last_measurement),
1176 bandwidth: if monitor.last_measurement.elapsed().as_secs() > 0 {
1177 bytes_used / monitor.last_measurement.elapsed().as_secs()
1178 } else {
1179 0
1180 },
1181 };
1182
1183 monitor.bandwidth_samples.push_back(sample);
1184 if monitor.bandwidth_samples.len() > 100 {
1185 monitor.bandwidth_samples.pop_front();
1186 }
1187
1188 if !monitor.bandwidth_samples.is_empty() {
1190 let total_bytes: u64 = monitor.bandwidth_samples.iter()
1191 .map(|s| s.bytes_transferred)
1192 .sum();
1193 let total_time: Duration = monitor.bandwidth_samples.iter()
1194 .map(|s| s.duration)
1195 .sum();
1196
1197 if total_time.as_secs() > 0 {
1198 monitor.current_bandwidth = total_bytes / total_time.as_secs();
1199 }
1200 }
1201
1202 monitor.last_measurement = now;
1203 }
1204
1205 pub async fn complete_validation(
1207 &self,
1208 target_address: SocketAddr,
1209 success: bool,
1210 ) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
1211 let session = {
1212 let mut validations = self.active_validations.write().unwrap();
1213 validations.remove(&target_address)
1214 };
1215
1216 if let Some(session) = session {
1217 let duration = session.started_at.elapsed();
1218
1219 {
1221 let mut stats = self.stats.lock().unwrap();
1222 if success {
1223 stats.validations_completed += 1;
1224 }
1225 stats.total_bandwidth_used += session.bandwidth_usage;
1226 stats.avg_validation_time = if stats.validations_completed > 0 {
1227 Duration::from_millis(
1228 (stats.avg_validation_time.as_millis() as u64 * (stats.validations_completed - 1) +
1229 duration.as_millis() as u64) / stats.validations_completed
1230 )
1231 } else {
1232 duration
1233 };
1234
1235 if stats.total_bandwidth_used > 0 {
1236 stats.bandwidth_efficiency = stats.validations_completed as f64 /
1237 stats.total_bandwidth_used as f64 * 1000.0; }
1239 }
1240
1241 debug!("Completed validation for {} in {:?} (success: {})",
1242 target_address, duration, success);
1243 }
1244
1245 Ok(())
1246 }
1247
1248 pub async fn get_stats(&self) -> BandwidthValidationStats {
1250 self.stats.lock().unwrap().clone()
1251 }
1252}
1253
1254impl CongestionControlIntegrator {
1255 pub fn new(config: CongestionIntegrationConfig) -> Self {
1257 Self {
1258 active_migrations: Arc::new(RwLock::new(HashMap::new())),
1259 congestion_state: Arc::new(Mutex::new(CongestionState {
1260 congestion_window: 10, ssthresh: 65535,
1262 rtt_measurements: VecDeque::new(),
1263 congestion_events: VecDeque::new(),
1264 congestion_level: 0.0,
1265 })),
1266 config,
1267 stats: Arc::new(Mutex::new(CongestionIntegrationStats::default())),
1268 }
1269 }
1270
1271 pub async fn start_migration(
1273 &self,
1274 peer_id: PeerId,
1275 old_path: SocketAddr,
1276 new_path: SocketAddr,
1277 ) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
1278 if self.config.enable_congestion_awareness {
1280 let congestion_state = self.congestion_state.lock().unwrap();
1281 if congestion_state.congestion_level > self.config.congestion_threshold {
1282 return Err("Migration delayed due to high congestion".into());
1283 }
1284 }
1285
1286 let session = MigrationSession {
1287 peer_id,
1288 old_path,
1289 new_path,
1290 started_at: Instant::now(),
1291 migration_state: MigrationState::Initiated,
1292 congestion_window: {
1293 let state = self.congestion_state.lock().unwrap();
1294 (state.congestion_window as f64 * self.config.cwnd_scaling_factor) as u32
1295 },
1296 rtt_estimate: Duration::from_millis(100), bandwidth_estimate: 1_000_000, };
1299
1300 {
1302 let mut migrations = self.active_migrations.write().unwrap();
1303 migrations.insert(peer_id, session);
1304 }
1305
1306 {
1308 let mut stats = self.stats.lock().unwrap();
1309 stats.migrations_attempted += 1;
1310 }
1311
1312 info!("Started congestion-aware migration for peer {:?}: {} -> {}",
1313 peer_id, old_path, new_path);
1314 Ok(())
1315 }
1316
1317 pub async fn update_migration_state(
1319 &self,
1320 peer_id: PeerId,
1321 new_state: MigrationState,
1322 rtt: Option<Duration>,
1323 bandwidth: Option<u64>,
1324 ) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
1325 let mut migrations = self.active_migrations.write().unwrap();
1326
1327 if let Some(session) = migrations.get_mut(&peer_id) {
1328 session.migration_state = new_state;
1329
1330 if let Some(rtt) = rtt {
1331 session.rtt_estimate = rtt;
1332
1333 let mut congestion_state = self.congestion_state.lock().unwrap();
1335 congestion_state.rtt_measurements.push_back(rtt);
1336 if congestion_state.rtt_measurements.len() > 50 {
1337 congestion_state.rtt_measurements.pop_front();
1338 }
1339 }
1340
1341 if let Some(bw) = bandwidth {
1342 session.bandwidth_estimate = bw;
1343 }
1344
1345 if matches!(new_state, MigrationState::Completed) {
1347 let duration = session.started_at.elapsed();
1348
1349 let mut stats = self.stats.lock().unwrap();
1351 stats.migrations_successful += 1;
1352 stats.avg_migration_time = if stats.migrations_successful > 0 {
1353 Duration::from_millis(
1354 (stats.avg_migration_time.as_millis() as u64 * (stats.migrations_successful - 1) +
1355 duration.as_millis() as u64) / stats.migrations_successful
1356 )
1357 } else {
1358 duration
1359 };
1360
1361 debug!("Migration completed for peer {:?} in {:?}", peer_id, duration);
1362 }
1363 }
1364
1365 Ok(())
1366 }
1367
1368 pub async fn record_congestion_event(
1370 &self,
1371 event_type: CongestionEventType,
1372 severity: f64,
1373 ) {
1374 let event = CongestionEvent {
1375 timestamp: Instant::now(),
1376 event_type,
1377 severity,
1378 };
1379
1380 let mut congestion_state = self.congestion_state.lock().unwrap();
1381 congestion_state.congestion_events.push_back(event);
1382
1383 if congestion_state.congestion_events.len() > 100 {
1385 congestion_state.congestion_events.pop_front();
1386 }
1387
1388 let recent_events: Vec<_> = congestion_state.congestion_events.iter()
1390 .filter(|e| e.timestamp.elapsed() < Duration::from_secs(10))
1391 .collect();
1392
1393 if !recent_events.is_empty() {
1394 let avg_severity: f64 = recent_events.iter()
1395 .map(|e| e.severity)
1396 .sum::<f64>() / recent_events.len() as f64;
1397
1398 congestion_state.congestion_level = avg_severity;
1399 }
1400
1401 match event_type {
1403 CongestionEventType::PacketLoss | CongestionEventType::Timeout => {
1404 congestion_state.ssthresh = congestion_state.congestion_window / 2;
1405 congestion_state.congestion_window = congestion_state.ssthresh;
1406 }
1407 CongestionEventType::ECNMark => {
1408 congestion_state.congestion_window =
1409 (congestion_state.congestion_window as f64 * 0.8) as u32;
1410 }
1411 CongestionEventType::RTTIncrease => {
1412 congestion_state.congestion_window =
1414 (congestion_state.congestion_window as f64 * 0.95) as u32;
1415 }
1416 }
1417
1418 debug!("Recorded congestion event: {:?} (severity: {:.2}, new cwnd: {})",
1419 event_type, severity, congestion_state.congestion_window);
1420 }
1421
1422 pub async fn get_stats(&self) -> CongestionIntegrationStats {
1424 self.stats.lock().unwrap().clone()
1425 }
1426}
1427
1428#[derive(Debug)]
1430pub struct NetworkEfficiencyManager {
1431 parallel_discovery: ParallelDiscoveryCoordinator,
1432 adaptive_timeout: AdaptiveTimeoutManager,
1433 bandwidth_validator: BandwidthAwareValidator,
1434 congestion_integrator: CongestionControlIntegrator,
1435 is_running: bool,
1436}
1437
1438impl NetworkEfficiencyManager {
1439 pub fn new() -> Self {
1441 Self {
1442 parallel_discovery: ParallelDiscoveryCoordinator::new(ParallelDiscoveryConfig::default()),
1443 adaptive_timeout: AdaptiveTimeoutManager::new(),
1444 bandwidth_validator: BandwidthAwareValidator::new(BandwidthValidationConfig::default()),
1445 congestion_integrator: CongestionControlIntegrator::new(CongestionIntegrationConfig::default()),
1446 is_running: false,
1447 }
1448 }
1449
1450 pub fn with_configs(
1452 discovery_config: ParallelDiscoveryConfig,
1453 validation_config: BandwidthValidationConfig,
1454 congestion_config: CongestionIntegrationConfig,
1455 ) -> Self {
1456 Self {
1457 parallel_discovery: ParallelDiscoveryCoordinator::new(discovery_config),
1458 adaptive_timeout: AdaptiveTimeoutManager::new(),
1459 bandwidth_validator: BandwidthAwareValidator::new(validation_config),
1460 congestion_integrator: CongestionControlIntegrator::new(congestion_config),
1461 is_running: false,
1462 }
1463 }
1464
1465 pub async fn start(&mut self) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
1467 if self.is_running {
1468 return Ok(());
1469 }
1470
1471 self.adaptive_timeout.start().await?;
1472
1473 self.is_running = true;
1474 info!("Network efficiency manager started");
1475 Ok(())
1476 }
1477
1478 pub fn parallel_discovery(&mut self) -> &mut ParallelDiscoveryCoordinator {
1480 &mut self.parallel_discovery
1481 }
1482
1483 pub fn adaptive_timeout(&self) -> &AdaptiveTimeoutManager {
1485 &self.adaptive_timeout
1486 }
1487
1488 pub fn bandwidth_validator(&self) -> &BandwidthAwareValidator {
1490 &self.bandwidth_validator
1491 }
1492
1493 pub fn congestion_integrator(&self) -> &CongestionControlIntegrator {
1495 &self.congestion_integrator
1496 }
1497
1498 pub async fn get_comprehensive_stats(&self) -> NetworkEfficiencyStats {
1500 NetworkEfficiencyStats {
1501 parallel_discovery: self.parallel_discovery.get_stats().await,
1502 adaptive_timeout: self.adaptive_timeout.get_stats().await,
1503 bandwidth_validation: self.bandwidth_validator.get_stats().await,
1504 congestion_integration: self.congestion_integrator.get_stats().await,
1505 }
1506 }
1507
1508 pub async fn shutdown(&mut self) {
1510 if !self.is_running {
1511 return;
1512 }
1513
1514 self.parallel_discovery.shutdown().await;
1515 self.adaptive_timeout.shutdown().await;
1516
1517 self.is_running = false;
1518 info!("Network efficiency manager shutdown complete");
1519 }
1520}
1521
1522#[derive(Debug, Clone)]
1524pub struct NetworkEfficiencyStats {
1525 pub parallel_discovery: ParallelDiscoveryStats,
1526 pub adaptive_timeout: AdaptiveTimeoutStats,
1527 pub bandwidth_validation: BandwidthValidationStats,
1528 pub congestion_integration: CongestionIntegrationStats,
1529}
1530
1531impl Default for NetworkEfficiencyManager {
1532 fn default() -> Self {
1533 Self::new()
1534 }
1535}