1use std::{
10 collections::{HashMap, VecDeque},
11 net::{IpAddr, SocketAddr},
12 sync::{Arc, Mutex, RwLock},
13 time::{Duration, Instant},
14};
15
16use tokio::time::timeout;
17
18use tracing::{debug, info, warn};
19
20#[cfg(feature = "production-ready")]
21use tokio::time::sleep;
22
23use crate::{
24 nat_traversal_api::{CandidateAddress, PeerId},
25 candidate_discovery::NetworkInterface,
26 connection::nat_traversal::{CandidateSource, CandidateState},
27};
28
29#[derive(Debug)]
31pub struct ParallelDiscoveryCoordinator {
32 active_discoveries: Arc<RwLock<HashMap<String, DiscoveryTask>>>,
34 config: ParallelDiscoveryConfig,
36 stats: Arc<Mutex<ParallelDiscoveryStats>>,
38 coordination_handle: Option<tokio::task::JoinHandle<()>>,
40}
41
42#[derive(Debug, Clone)]
44pub struct ParallelDiscoveryConfig {
45 pub max_concurrent_tasks: usize,
47 pub interface_timeout: Duration,
49 pub enable_prioritization: bool,
51 pub preferred_interface_types: Vec<InterfaceType>,
53 pub enable_adaptive_parallelism: bool,
55}
56
57#[derive(Debug, Clone, Copy, PartialEq, Eq)]
59pub enum InterfaceType {
60 Ethernet,
61 WiFi,
62 Cellular,
63 Loopback,
64 VPN,
65 Unknown,
66}
67
68#[derive(Debug)]
70struct DiscoveryTask {
71 interface_name: String,
72 interface_type: InterfaceType,
73 started_at: Instant,
74 status: TaskStatus,
75 discovered_candidates: Vec<CandidateAddress>,
76 priority: u32,
77}
78
79#[derive(Debug, Clone, Copy, PartialEq, Eq)]
81enum TaskStatus {
82 Pending,
83 Running,
84 Completed,
85 Failed,
86 Timeout,
87}
88
89#[derive(Debug, Default, Clone)]
91pub struct ParallelDiscoveryStats {
92 pub tasks_started: u64,
94 pub tasks_completed: u64,
96 pub tasks_failed: u64,
98 pub avg_discovery_time: Duration,
100 pub total_candidates: u64,
102 pub parallelism_efficiency: f64,
104}
105
106#[derive(Debug)]
108pub struct AdaptiveTimeoutManager {
109 network_conditions: Arc<RwLock<NetworkConditions>>,
111 timeout_configs: HashMap<OperationType, AdaptiveTimeoutConfig>,
113 stats: Arc<Mutex<AdaptiveTimeoutStats>>,
115 monitoring_handle: Option<tokio::task::JoinHandle<()>>,
117}
118
119#[derive(Debug, Clone)]
121pub struct NetworkConditions {
122 rtt_samples: VecDeque<Duration>,
124 packet_loss_rate: f64,
126 bandwidth_estimate: u64,
128 quality_score: f64,
130 congestion_level: f64,
132 last_measurement: Instant,
134}
135
136#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
138pub enum OperationType {
139 CandidateDiscovery,
140 PathValidation,
141 CoordinationRequest,
142 HolePunching,
143 ConnectionEstablishment,
144}
145
146#[derive(Debug, Clone)]
148struct AdaptiveTimeoutConfig {
149 base_timeout: Duration,
151 min_timeout: Duration,
153 max_timeout: Duration,
155 rtt_multiplier: f64,
157 quality_factor: f64,
159 congestion_factor: f64,
161}
162
163#[derive(Debug, Default, Clone)]
165pub struct AdaptiveTimeoutStats {
166 pub adjustments_made: u64,
168 pub avg_timeouts: HashMap<OperationType, Duration>,
170 pub timeout_effectiveness: f64,
172 pub condition_accuracy: f64,
174}
175
176#[derive(Debug)]
178pub struct BandwidthAwareValidator {
179 active_validations: Arc<RwLock<HashMap<SocketAddr, ValidationSession>>>,
181 bandwidth_monitor: Arc<Mutex<BandwidthMonitor>>,
183 config: BandwidthValidationConfig,
185 stats: Arc<Mutex<BandwidthValidationStats>>,
187}
188
189#[derive(Debug, Clone)]
191pub struct BandwidthValidationConfig {
192 pub max_concurrent_validations: usize,
194 pub bandwidth_threshold: u64,
196 pub enable_adaptive_validation: bool,
198 pub validation_packet_size: usize,
200 pub max_validation_rate: f64,
202}
203
204#[derive(Debug)]
206struct BandwidthMonitor {
207 bandwidth_samples: VecDeque<BandwidthSample>,
209 current_bandwidth: u64,
211 utilization: f64,
213 last_measurement: Instant,
215}
216
217#[derive(Debug, Clone)]
219struct BandwidthSample {
220 timestamp: Instant,
221 bytes_transferred: u64,
222 duration: Duration,
223 bandwidth: u64,
224}
225
226#[derive(Debug)]
228struct ValidationSession {
229 target_address: SocketAddr,
230 started_at: Instant,
231 packets_sent: u32,
232 packets_received: u32,
233 total_bytes: u64,
234 rtt_samples: Vec<Duration>,
235 bandwidth_usage: u64,
236 priority: ValidationPriority,
237}
238
239#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord)]
241pub enum ValidationPriority {
242 Low,
243 Normal,
244 High,
245 Critical,
246}
247
248#[derive(Debug, Default, Clone)]
250pub struct BandwidthValidationStats {
251 pub validations_started: u64,
253 pub validations_completed: u64,
255 pub total_bandwidth_used: u64,
257 pub avg_validation_time: Duration,
259 pub bandwidth_efficiency: f64,
261}
262
263#[derive(Debug)]
265pub struct CongestionControlIntegrator {
266 active_migrations: Arc<RwLock<HashMap<PeerId, MigrationSession>>>,
268 congestion_state: Arc<Mutex<CongestionState>>,
270 config: CongestionIntegrationConfig,
272 stats: Arc<Mutex<CongestionIntegrationStats>>,
274}
275
276#[derive(Debug, Clone)]
278pub struct CongestionIntegrationConfig {
279 pub enable_congestion_awareness: bool,
281 pub congestion_threshold: f64,
283 pub max_migrations_per_second: f64,
285 pub enable_bandwidth_estimation: bool,
287 pub cwnd_scaling_factor: f64,
289}
290
291#[derive(Debug)]
293struct MigrationSession {
294 peer_id: PeerId,
295 old_path: SocketAddr,
296 new_path: SocketAddr,
297 started_at: Instant,
298 migration_state: MigrationState,
299 congestion_window: u32,
300 rtt_estimate: Duration,
301 bandwidth_estimate: u64,
302}
303
304#[derive(Debug, Clone, Copy, PartialEq, Eq)]
306pub enum MigrationState {
307 Initiated,
308 PathValidating,
309 CongestionProbing,
310 Migrating,
311 Completed,
312 Failed,
313}
314
315#[derive(Debug)]
317struct CongestionState {
318 congestion_window: u32,
320 ssthresh: u32,
322 rtt_measurements: VecDeque<Duration>,
324 congestion_events: VecDeque<CongestionEvent>,
326 congestion_level: f64,
328}
329
330#[derive(Debug, Clone)]
332struct CongestionEvent {
333 timestamp: Instant,
334 event_type: CongestionEventType,
335 severity: f64,
336}
337
338#[derive(Debug, Clone, Copy, PartialEq, Eq)]
340pub enum CongestionEventType {
341 PacketLoss,
342 Timeout,
343 ECNMark,
344 RTTIncrease,
345}
346
347#[derive(Debug, Default, Clone)]
349pub struct CongestionIntegrationStats {
350 pub migrations_attempted: u64,
352 pub migrations_successful: u64,
354 pub avg_migration_time: Duration,
356 pub congestion_avoided_migrations: u64,
358 pub bandwidth_utilization_efficiency: f64,
360}
361
362impl Default for ParallelDiscoveryConfig {
363 fn default() -> Self {
364 Self {
365 max_concurrent_tasks: 8,
366 interface_timeout: Duration::from_secs(5),
367 enable_prioritization: true,
368 preferred_interface_types: vec![
369 InterfaceType::Ethernet,
370 InterfaceType::WiFi,
371 InterfaceType::Cellular,
372 ],
373 enable_adaptive_parallelism: true,
374 }
375 }
376}
377
378impl Default for BandwidthValidationConfig {
379 fn default() -> Self {
380 Self {
381 max_concurrent_validations: 16,
382 bandwidth_threshold: 1_000_000, enable_adaptive_validation: true,
384 validation_packet_size: 64,
385 max_validation_rate: 100.0, }
387 }
388}
389
390impl Default for CongestionIntegrationConfig {
391 fn default() -> Self {
392 Self {
393 enable_congestion_awareness: true,
394 congestion_threshold: 0.7, max_migrations_per_second: 10.0,
396 enable_bandwidth_estimation: true,
397 cwnd_scaling_factor: 0.8,
398 }
399 }
400}
401
402impl ParallelDiscoveryCoordinator {
403 pub fn new(config: ParallelDiscoveryConfig) -> Self {
405 Self {
406 active_discoveries: Arc::new(RwLock::new(HashMap::new())),
407 config,
408 stats: Arc::new(Mutex::new(ParallelDiscoveryStats::default())),
409 coordination_handle: None,
410 }
411 }
412
413 pub async fn start_parallel_discovery(
415 &mut self,
416 interfaces: Vec<NetworkInterface>,
417 peer_id: PeerId,
418 ) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
419 info!("Starting parallel discovery across {} interfaces for peer {:?}",
420 interfaces.len(), peer_id);
421
422 let prioritized_interfaces = if self.config.enable_prioritization {
424 self.prioritize_interfaces(interfaces)
425 } else {
426 interfaces
427 };
428
429 let max_tasks = if self.config.enable_adaptive_parallelism {
431 self.calculate_adaptive_parallelism().await
432 } else {
433 self.config.max_concurrent_tasks
434 };
435
436 let tasks_to_start = prioritized_interfaces.into_iter()
437 .take(max_tasks)
438 .collect::<Vec<_>>();
439
440 for interface in tasks_to_start {
442 self.start_interface_discovery(interface, peer_id).await?;
443 }
444
445 self.start_coordination_task().await?;
447
448 Ok(())
449 }
450
451 fn prioritize_interfaces(&self, mut interfaces: Vec<NetworkInterface>) -> Vec<NetworkInterface> {
453 interfaces.sort_by_key(|interface| {
454 let interface_type = self.classify_interface_type(&interface.name);
455 let type_priority = self.config.preferred_interface_types
456 .iter()
457 .position(|&t| t == interface_type)
458 .unwrap_or(999);
459
460 (type_priority, interface.addresses.len())
462 });
463
464 interfaces
465 }
466
467 fn classify_interface_type(&self, name: &str) -> InterfaceType {
469 let name_lower = name.to_lowercase();
470
471 if name_lower.contains("eth") || name_lower.contains("en") {
472 InterfaceType::Ethernet
473 } else if name_lower.contains("wlan") || name_lower.contains("wifi") || name_lower.contains("wl") {
474 InterfaceType::WiFi
475 } else if name_lower.contains("cell") || name_lower.contains("wwan") || name_lower.contains("ppp") {
476 InterfaceType::Cellular
477 } else if name_lower.contains("lo") || name_lower.contains("loopback") {
478 InterfaceType::Loopback
479 } else if name_lower.contains("vpn") || name_lower.contains("tun") || name_lower.contains("tap") {
480 InterfaceType::VPN
481 } else {
482 InterfaceType::Unknown
483 }
484 }
485
486 async fn calculate_adaptive_parallelism(&self) -> usize {
488 let base_parallelism = self.config.max_concurrent_tasks;
496 let system_load_factor = 0.8; ((base_parallelism as f64) * system_load_factor) as usize
499 }
500
501 async fn start_interface_discovery(
503 &self,
504 interface: NetworkInterface,
505 _peer_id: PeerId,
506 ) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
507 let interface_type = self.classify_interface_type(&interface.name);
508 let priority = self.calculate_interface_priority(interface_type);
509
510 let task = DiscoveryTask {
511 interface_name: interface.name.clone(),
512 interface_type,
513 started_at: Instant::now(),
514 status: TaskStatus::Pending,
515 discovered_candidates: Vec::new(),
516 priority,
517 };
518
519 {
521 let mut discoveries = self.active_discoveries.write().unwrap();
522 discoveries.insert(interface.name.clone(), task);
523 }
524
525 {
527 let mut stats = self.stats.lock().unwrap();
528 stats.tasks_started += 1;
529 }
530
531 self.perform_interface_discovery(interface).await?;
533
534 Ok(())
535 }
536
537 fn calculate_interface_priority(&self, interface_type: InterfaceType) -> u32 {
539 match interface_type {
540 InterfaceType::Ethernet => 100,
541 InterfaceType::WiFi => 80,
542 InterfaceType::Cellular => 60,
543 InterfaceType::VPN => 40,
544 InterfaceType::Loopback => 20,
545 InterfaceType::Unknown => 10,
546 }
547 }
548
549 async fn perform_interface_discovery(
551 &self,
552 interface: NetworkInterface,
553 ) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
554 let interface_name = interface.name.clone();
555
556 {
558 let mut discoveries = self.active_discoveries.write().unwrap();
559 if let Some(task) = discoveries.get_mut(&interface_name) {
560 task.status = TaskStatus::Running;
561 }
562 }
563
564 let discovery_result = timeout(
566 self.config.interface_timeout,
567 self.discover_candidates_for_interface(interface),
568 ).await;
569
570 match discovery_result {
571 Ok(Ok(candidates)) => {
572 {
574 let mut discoveries = self.active_discoveries.write().unwrap();
575 if let Some(task) = discoveries.get_mut(&interface_name) {
576 task.status = TaskStatus::Completed;
577 task.discovered_candidates = candidates;
578 }
579 }
580
581 {
583 let mut stats = self.stats.lock().unwrap();
584 stats.tasks_completed += 1;
585 }
586
587 debug!("Interface discovery completed for {}", interface_name);
588 }
589 Ok(Err(_)) => {
590 {
592 let mut discoveries = self.active_discoveries.write().unwrap();
593 if let Some(task) = discoveries.get_mut(&interface_name) {
594 task.status = TaskStatus::Failed;
595 }
596 }
597
598 {
600 let mut stats = self.stats.lock().unwrap();
601 stats.tasks_failed += 1;
602 }
603
604 warn!("Interface discovery failed for {}", interface_name);
605 }
606 Err(_) => {
607 {
609 let mut discoveries = self.active_discoveries.write().unwrap();
610 if let Some(task) = discoveries.get_mut(&interface_name) {
611 task.status = TaskStatus::Timeout;
612 }
613 }
614
615 {
617 let mut stats = self.stats.lock().unwrap();
618 stats.tasks_failed += 1;
619 }
620
621 warn!("Interface discovery timeout for {}", interface_name);
622 }
623 }
624
625 Ok(())
626 }
627
628 async fn discover_candidates_for_interface(
630 &self,
631 interface: NetworkInterface,
632 ) -> Result<Vec<CandidateAddress>, Box<dyn std::error::Error + Send + Sync>> {
633 let mut candidates = Vec::new();
634
635 for address in &interface.addresses {
636 if self.is_valid_candidate_address(&address) {
638 let candidate = CandidateAddress {
639 address: *address,
640 priority: self.calculate_candidate_priority(&address, &interface),
641 source: CandidateSource::Local,
642 state: CandidateState::New,
643 };
644
645 candidates.push(candidate);
646 }
647 }
648
649 #[cfg(feature = "production-ready")]
651 sleep(Duration::from_millis(100)).await;
652
653 Ok(candidates)
654 }
655
656 fn is_valid_candidate_address(&self, address: &SocketAddr) -> bool {
658 match address.ip() {
659 IpAddr::V4(ipv4) => {
660 !ipv4.is_loopback() && !ipv4.is_link_local() && !ipv4.is_broadcast()
661 }
662 IpAddr::V6(ipv6) => {
663 !ipv6.is_loopback() && !ipv6.is_unspecified()
664 }
665 }
666 }
667
668 fn calculate_candidate_priority(&self, address: &SocketAddr, interface: &NetworkInterface) -> u32 {
670 let mut priority = 1000u32;
671
672 if address.is_ipv4() {
674 priority += 100;
675 }
676
677 if !self.is_private_address(address) {
679 priority += 200;
680 }
681
682 let interface_type = self.classify_interface_type(&interface.name);
684 priority += self.calculate_interface_priority(interface_type);
685
686 priority
687 }
688
689 fn is_private_address(&self, address: &SocketAddr) -> bool {
691 match address.ip() {
692 IpAddr::V4(ipv4) => ipv4.is_private(),
693 IpAddr::V6(ipv6) => {
694 let segments = ipv6.segments();
696 (segments[0] & 0xfe00) == 0xfc00
697 }
698 }
699 }
700
701 async fn start_coordination_task(&mut self) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
703 let discoveries = Arc::clone(&self.active_discoveries);
704 let stats = Arc::clone(&self.stats);
705 let config = self.config.clone();
706
707 let coordination_handle = tokio::spawn(async move {
708 let mut interval = tokio::time::interval(Duration::from_millis(500));
709
710 loop {
711 interval.tick().await;
712 Self::coordinate_discoveries(&discoveries, &stats, &config).await;
713
714 let all_complete = {
716 let discoveries_read = discoveries.read().unwrap();
717 discoveries_read.values().all(|task| {
718 matches!(task.status, TaskStatus::Completed | TaskStatus::Failed | TaskStatus::Timeout)
719 })
720 };
721
722 if all_complete {
723 break;
724 }
725 }
726 });
727
728 self.coordination_handle = Some(coordination_handle);
729 Ok(())
730 }
731
732 async fn coordinate_discoveries(
734 discoveries: &Arc<RwLock<HashMap<String, DiscoveryTask>>>,
735 stats: &Arc<Mutex<ParallelDiscoveryStats>>,
736 _config: &ParallelDiscoveryConfig,
737 ) {
738 let mut total_candidates = 0u64;
739 let mut completed_tasks = 0u64;
740 let mut total_discovery_time = Duration::ZERO;
741
742 {
743 let discoveries_read = discoveries.read().unwrap();
744 for task in discoveries_read.values() {
745 if task.status == TaskStatus::Completed {
746 total_candidates += task.discovered_candidates.len() as u64;
747 completed_tasks += 1;
748 total_discovery_time += task.started_at.elapsed();
749 }
750 }
751 }
752
753 {
755 let mut stats_guard = stats.lock().unwrap();
756 stats_guard.total_candidates = total_candidates;
757 stats_guard.tasks_completed = completed_tasks;
758
759 if completed_tasks > 0 {
760 stats_guard.avg_discovery_time = total_discovery_time / completed_tasks as u32;
761 stats_guard.parallelism_efficiency = completed_tasks as f64 / stats_guard.tasks_started as f64;
762 }
763 }
764 }
765
766 pub async fn get_all_candidates(&self) -> Vec<CandidateAddress> {
768 let mut all_candidates = Vec::new();
769
770 let discoveries = self.active_discoveries.read().unwrap();
771 for task in discoveries.values() {
772 if task.status == TaskStatus::Completed {
773 all_candidates.extend(task.discovered_candidates.clone());
774 }
775 }
776
777 all_candidates.sort_by(|a, b| b.priority.cmp(&a.priority));
779
780 all_candidates
781 }
782
783 pub async fn get_stats(&self) -> ParallelDiscoveryStats {
785 self.stats.lock().unwrap().clone()
786 }
787
788 pub async fn shutdown(&mut self) {
790 if let Some(handle) = self.coordination_handle.take() {
791 handle.abort();
792 }
793
794 {
796 let mut discoveries = self.active_discoveries.write().unwrap();
797 discoveries.clear();
798 }
799
800 info!("Parallel discovery coordinator shutdown complete");
801 }
802}
803
804impl AdaptiveTimeoutManager {
805 pub fn new() -> Self {
807 let mut timeout_configs = HashMap::new();
808
809 timeout_configs.insert(OperationType::CandidateDiscovery, AdaptiveTimeoutConfig {
811 base_timeout: Duration::from_secs(5),
812 min_timeout: Duration::from_millis(500),
813 max_timeout: Duration::from_secs(30),
814 rtt_multiplier: 4.0,
815 quality_factor: 0.5,
816 congestion_factor: 0.3,
817 });
818
819 timeout_configs.insert(OperationType::PathValidation, AdaptiveTimeoutConfig {
820 base_timeout: Duration::from_secs(3),
821 min_timeout: Duration::from_millis(200),
822 max_timeout: Duration::from_secs(15),
823 rtt_multiplier: 3.0,
824 quality_factor: 0.4,
825 congestion_factor: 0.4,
826 });
827
828 timeout_configs.insert(OperationType::CoordinationRequest, AdaptiveTimeoutConfig {
829 base_timeout: Duration::from_secs(10),
830 min_timeout: Duration::from_secs(1),
831 max_timeout: Duration::from_secs(60),
832 rtt_multiplier: 5.0,
833 quality_factor: 0.6,
834 congestion_factor: 0.2,
835 });
836
837 timeout_configs.insert(OperationType::HolePunching, AdaptiveTimeoutConfig {
838 base_timeout: Duration::from_secs(2),
839 min_timeout: Duration::from_millis(100),
840 max_timeout: Duration::from_secs(10),
841 rtt_multiplier: 2.0,
842 quality_factor: 0.3,
843 congestion_factor: 0.5,
844 });
845
846 timeout_configs.insert(OperationType::ConnectionEstablishment, AdaptiveTimeoutConfig {
847 base_timeout: Duration::from_secs(15),
848 min_timeout: Duration::from_secs(2),
849 max_timeout: Duration::from_secs(120),
850 rtt_multiplier: 6.0,
851 quality_factor: 0.7,
852 congestion_factor: 0.1,
853 });
854
855 Self {
856 network_conditions: Arc::new(RwLock::new(NetworkConditions {
857 rtt_samples: VecDeque::new(),
858 packet_loss_rate: 0.0,
859 bandwidth_estimate: 1_000_000, quality_score: 0.8, congestion_level: 0.2, last_measurement: Instant::now(),
863 })),
864 timeout_configs,
865 stats: Arc::new(Mutex::new(AdaptiveTimeoutStats::default())),
866 monitoring_handle: None,
867 }
868 }
869
870 pub async fn start(&mut self) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
872 let network_conditions = Arc::clone(&self.network_conditions);
873 let stats = Arc::clone(&self.stats);
874
875 let monitoring_handle = tokio::spawn(async move {
876 let mut interval = tokio::time::interval(Duration::from_secs(1));
877
878 loop {
879 interval.tick().await;
880 Self::update_network_conditions(&network_conditions, &stats).await;
881 }
882 });
883
884 self.monitoring_handle = Some(monitoring_handle);
885 info!("Adaptive timeout manager started");
886 Ok(())
887 }
888
889 pub async fn calculate_timeout(&self, operation: OperationType) -> Duration {
891 let config = self.timeout_configs.get(&operation)
892 .cloned()
893 .unwrap_or_else(|| AdaptiveTimeoutConfig {
894 base_timeout: Duration::from_secs(5),
895 min_timeout: Duration::from_millis(500),
896 max_timeout: Duration::from_secs(30),
897 rtt_multiplier: 4.0,
898 quality_factor: 0.5,
899 congestion_factor: 0.3,
900 });
901
902 let conditions = self.network_conditions.read().unwrap();
903
904 let rtt_based_timeout = if let Some(avg_rtt) = self.calculate_average_rtt(&conditions.rtt_samples) {
906 Duration::from_millis((avg_rtt.as_millis() as f64 * config.rtt_multiplier) as u64)
907 } else {
908 config.base_timeout
909 };
910
911 let quality_adjustment = 1.0 + (1.0 - conditions.quality_score) * config.quality_factor;
913
914 let congestion_adjustment = 1.0 + conditions.congestion_level * config.congestion_factor;
916
917 let adjusted_timeout = Duration::from_millis(
919 (rtt_based_timeout.as_millis() as f64 * quality_adjustment * congestion_adjustment) as u64
920 );
921
922 let final_timeout = adjusted_timeout
924 .max(config.min_timeout)
925 .min(config.max_timeout);
926
927 {
929 let mut stats = self.stats.lock().unwrap();
930 stats.adjustments_made += 1;
931 stats.avg_timeouts.insert(operation, final_timeout);
932 }
933
934 debug!("Calculated adaptive timeout for {:?}: {:?} (quality: {:.2}, congestion: {:.2})",
935 operation, final_timeout, conditions.quality_score, conditions.congestion_level);
936
937 final_timeout
938 }
939
940 pub async fn record_measurement(
942 &self,
943 rtt: Duration,
944 packet_loss: bool,
945 bandwidth: Option<u64>,
946 ) {
947 let mut conditions = self.network_conditions.write().unwrap();
948
949 conditions.rtt_samples.push_back(rtt);
951 if conditions.rtt_samples.len() > 50 {
952 conditions.rtt_samples.pop_front();
953 }
954
955 let loss_sample = if packet_loss { 1.0 } else { 0.0 };
957 conditions.packet_loss_rate = conditions.packet_loss_rate * 0.9 + loss_sample * 0.1;
958
959 if let Some(bw) = bandwidth {
961 conditions.bandwidth_estimate = (conditions.bandwidth_estimate as f64 * 0.8 + bw as f64 * 0.2) as u64;
962 }
963
964 let rtt_quality = 1.0 - (rtt.as_millis() as f64 / 1000.0).min(1.0);
966 let loss_quality = 1.0 - conditions.packet_loss_rate;
967 conditions.quality_score = (rtt_quality + loss_quality) / 2.0;
968
969 let rtt_variance = self.calculate_rtt_variance(&conditions.rtt_samples);
971 conditions.congestion_level = (conditions.packet_loss_rate + rtt_variance).min(1.0);
972
973 conditions.last_measurement = Instant::now();
974 }
975
976 fn calculate_average_rtt(&self, samples: &VecDeque<Duration>) -> Option<Duration> {
978 if samples.is_empty() {
979 return None;
980 }
981
982 let total_ms: u64 = samples.iter().map(|d| d.as_millis() as u64).sum();
983 Some(Duration::from_millis(total_ms / samples.len() as u64))
984 }
985
986 fn calculate_rtt_variance(&self, samples: &VecDeque<Duration>) -> f64 {
988 if samples.len() < 2 {
989 return 0.0;
990 }
991
992 let avg = self.calculate_average_rtt(samples).unwrap().as_millis() as f64;
993 let variance: f64 = samples.iter()
994 .map(|d| {
995 let diff = d.as_millis() as f64 - avg;
996 diff * diff
997 })
998 .sum::<f64>() / samples.len() as f64;
999
1000 (variance.sqrt() / avg).min(1.0)
1001 }
1002
1003 async fn update_network_conditions(
1005 network_conditions: &Arc<RwLock<NetworkConditions>>,
1006 _stats: &Arc<Mutex<AdaptiveTimeoutStats>>,
1007 ) {
1008 let mut conditions = network_conditions.write().unwrap();
1016
1017 while conditions.rtt_samples.len() > 100 {
1019 conditions.rtt_samples.pop_front();
1020 }
1021
1022 conditions.packet_loss_rate *= 0.99;
1024
1025 if conditions.last_measurement.elapsed() > Duration::from_secs(10) {
1027 conditions.quality_score *= 0.95;
1029 }
1030 }
1031
1032 pub async fn get_network_conditions(&self) -> NetworkConditions {
1034 self.network_conditions.read().unwrap().clone()
1035 }
1036
1037 pub async fn get_stats(&self) -> AdaptiveTimeoutStats {
1039 self.stats.lock().unwrap().clone()
1040 }
1041
1042 pub async fn shutdown(&mut self) {
1044 if let Some(handle) = self.monitoring_handle.take() {
1045 handle.abort();
1046 }
1047
1048 info!("Adaptive timeout manager shutdown complete");
1049 }
1050}
1051
1052impl BandwidthAwareValidator {
1053 pub fn new(config: BandwidthValidationConfig) -> Self {
1055 Self {
1056 active_validations: Arc::new(RwLock::new(HashMap::new())),
1057 bandwidth_monitor: Arc::new(Mutex::new(BandwidthMonitor {
1058 bandwidth_samples: VecDeque::new(),
1059 current_bandwidth: 1_000_000, utilization: 0.0,
1061 last_measurement: Instant::now(),
1062 })),
1063 config,
1064 stats: Arc::new(Mutex::new(BandwidthValidationStats::default())),
1065 }
1066 }
1067
1068 pub async fn start_validation(
1070 &self,
1071 target_address: SocketAddr,
1072 priority: ValidationPriority,
1073 ) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
1074 if !self.can_start_validation().await {
1076 return Err("Bandwidth limit reached, cannot start validation".into());
1077 }
1078
1079 let session = ValidationSession {
1080 target_address,
1081 started_at: Instant::now(),
1082 packets_sent: 0,
1083 packets_received: 0,
1084 total_bytes: 0,
1085 rtt_samples: Vec::new(),
1086 bandwidth_usage: 0,
1087 priority,
1088 };
1089
1090 {
1092 let mut validations = self.active_validations.write().unwrap();
1093 validations.insert(target_address, session);
1094 }
1095
1096 {
1098 let mut stats = self.stats.lock().unwrap();
1099 stats.validations_started += 1;
1100 }
1101
1102 debug!("Started bandwidth-aware validation for {}", target_address);
1103 Ok(())
1104 }
1105
1106 async fn can_start_validation(&self) -> bool {
1108 let validations = self.active_validations.read().unwrap();
1109 let bandwidth_monitor = self.bandwidth_monitor.lock().unwrap();
1110
1111 if validations.len() >= self.config.max_concurrent_validations {
1113 return false;
1114 }
1115
1116 if self.config.enable_adaptive_validation {
1118 let current_usage: u64 = validations.values()
1119 .map(|session| session.bandwidth_usage)
1120 .sum();
1121
1122 let available_bandwidth = bandwidth_monitor.current_bandwidth;
1123 let utilization = current_usage as f64 / available_bandwidth as f64;
1124
1125 if utilization > 0.8 { return false;
1127 }
1128 }
1129
1130 true
1131 }
1132
1133 pub async fn record_packet_sent(
1135 &self,
1136 target_address: SocketAddr,
1137 packet_size: usize,
1138 ) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
1139 let mut validations = self.active_validations.write().unwrap();
1140
1141 if let Some(session) = validations.get_mut(&target_address) {
1142 session.packets_sent += 1;
1143 session.total_bytes += packet_size as u64;
1144 session.bandwidth_usage += packet_size as u64;
1145 }
1146
1147 self.update_bandwidth_usage(packet_size as u64).await;
1149
1150 Ok(())
1151 }
1152
1153 pub async fn record_packet_received(
1155 &self,
1156 target_address: SocketAddr,
1157 rtt: Duration,
1158 ) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
1159 let mut validations = self.active_validations.write().unwrap();
1160
1161 if let Some(session) = validations.get_mut(&target_address) {
1162 session.packets_received += 1;
1163 session.rtt_samples.push(rtt);
1164 }
1165
1166 Ok(())
1167 }
1168
1169 async fn update_bandwidth_usage(&self, bytes_used: u64) {
1171 let mut monitor = self.bandwidth_monitor.lock().unwrap();
1172
1173 let now = Instant::now();
1174 let sample = BandwidthSample {
1175 timestamp: now,
1176 bytes_transferred: bytes_used,
1177 duration: now.duration_since(monitor.last_measurement),
1178 bandwidth: if monitor.last_measurement.elapsed().as_secs() > 0 {
1179 bytes_used / monitor.last_measurement.elapsed().as_secs()
1180 } else {
1181 0
1182 },
1183 };
1184
1185 monitor.bandwidth_samples.push_back(sample);
1186 if monitor.bandwidth_samples.len() > 100 {
1187 monitor.bandwidth_samples.pop_front();
1188 }
1189
1190 if !monitor.bandwidth_samples.is_empty() {
1192 let total_bytes: u64 = monitor.bandwidth_samples.iter()
1193 .map(|s| s.bytes_transferred)
1194 .sum();
1195 let total_time: Duration = monitor.bandwidth_samples.iter()
1196 .map(|s| s.duration)
1197 .sum();
1198
1199 if total_time.as_secs() > 0 {
1200 monitor.current_bandwidth = total_bytes / total_time.as_secs();
1201 }
1202 }
1203
1204 monitor.last_measurement = now;
1205 }
1206
1207 pub async fn complete_validation(
1209 &self,
1210 target_address: SocketAddr,
1211 success: bool,
1212 ) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
1213 let session = {
1214 let mut validations = self.active_validations.write().unwrap();
1215 validations.remove(&target_address)
1216 };
1217
1218 if let Some(session) = session {
1219 let duration = session.started_at.elapsed();
1220
1221 {
1223 let mut stats = self.stats.lock().unwrap();
1224 if success {
1225 stats.validations_completed += 1;
1226 }
1227 stats.total_bandwidth_used += session.bandwidth_usage;
1228 stats.avg_validation_time = if stats.validations_completed > 0 {
1229 Duration::from_millis(
1230 (stats.avg_validation_time.as_millis() as u64 * (stats.validations_completed - 1) +
1231 duration.as_millis() as u64) / stats.validations_completed
1232 )
1233 } else {
1234 duration
1235 };
1236
1237 if stats.total_bandwidth_used > 0 {
1238 stats.bandwidth_efficiency = stats.validations_completed as f64 /
1239 stats.total_bandwidth_used as f64 * 1000.0; }
1241 }
1242
1243 debug!("Completed validation for {} in {:?} (success: {})",
1244 target_address, duration, success);
1245 }
1246
1247 Ok(())
1248 }
1249
1250 pub async fn get_stats(&self) -> BandwidthValidationStats {
1252 self.stats.lock().unwrap().clone()
1253 }
1254}
1255
1256impl CongestionControlIntegrator {
1257 pub fn new(config: CongestionIntegrationConfig) -> Self {
1259 Self {
1260 active_migrations: Arc::new(RwLock::new(HashMap::new())),
1261 congestion_state: Arc::new(Mutex::new(CongestionState {
1262 congestion_window: 10, ssthresh: 65535,
1264 rtt_measurements: VecDeque::new(),
1265 congestion_events: VecDeque::new(),
1266 congestion_level: 0.0,
1267 })),
1268 config,
1269 stats: Arc::new(Mutex::new(CongestionIntegrationStats::default())),
1270 }
1271 }
1272
1273 pub async fn start_migration(
1275 &self,
1276 peer_id: PeerId,
1277 old_path: SocketAddr,
1278 new_path: SocketAddr,
1279 ) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
1280 if self.config.enable_congestion_awareness {
1282 let congestion_state = self.congestion_state.lock().unwrap();
1283 if congestion_state.congestion_level > self.config.congestion_threshold {
1284 return Err("Migration delayed due to high congestion".into());
1285 }
1286 }
1287
1288 let session = MigrationSession {
1289 peer_id,
1290 old_path,
1291 new_path,
1292 started_at: Instant::now(),
1293 migration_state: MigrationState::Initiated,
1294 congestion_window: {
1295 let state = self.congestion_state.lock().unwrap();
1296 (state.congestion_window as f64 * self.config.cwnd_scaling_factor) as u32
1297 },
1298 rtt_estimate: Duration::from_millis(100), bandwidth_estimate: 1_000_000, };
1301
1302 {
1304 let mut migrations = self.active_migrations.write().unwrap();
1305 migrations.insert(peer_id, session);
1306 }
1307
1308 {
1310 let mut stats = self.stats.lock().unwrap();
1311 stats.migrations_attempted += 1;
1312 }
1313
1314 info!("Started congestion-aware migration for peer {:?}: {} -> {}",
1315 peer_id, old_path, new_path);
1316 Ok(())
1317 }
1318
1319 pub async fn update_migration_state(
1321 &self,
1322 peer_id: PeerId,
1323 new_state: MigrationState,
1324 rtt: Option<Duration>,
1325 bandwidth: Option<u64>,
1326 ) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
1327 let mut migrations = self.active_migrations.write().unwrap();
1328
1329 if let Some(session) = migrations.get_mut(&peer_id) {
1330 session.migration_state = new_state;
1331
1332 if let Some(rtt) = rtt {
1333 session.rtt_estimate = rtt;
1334
1335 let mut congestion_state = self.congestion_state.lock().unwrap();
1337 congestion_state.rtt_measurements.push_back(rtt);
1338 if congestion_state.rtt_measurements.len() > 50 {
1339 congestion_state.rtt_measurements.pop_front();
1340 }
1341 }
1342
1343 if let Some(bw) = bandwidth {
1344 session.bandwidth_estimate = bw;
1345 }
1346
1347 if matches!(new_state, MigrationState::Completed) {
1349 let duration = session.started_at.elapsed();
1350
1351 let mut stats = self.stats.lock().unwrap();
1353 stats.migrations_successful += 1;
1354 stats.avg_migration_time = if stats.migrations_successful > 0 {
1355 Duration::from_millis(
1356 (stats.avg_migration_time.as_millis() as u64 * (stats.migrations_successful - 1) +
1357 duration.as_millis() as u64) / stats.migrations_successful
1358 )
1359 } else {
1360 duration
1361 };
1362
1363 debug!("Migration completed for peer {:?} in {:?}", peer_id, duration);
1364 }
1365 }
1366
1367 Ok(())
1368 }
1369
1370 pub async fn record_congestion_event(
1372 &self,
1373 event_type: CongestionEventType,
1374 severity: f64,
1375 ) {
1376 let event = CongestionEvent {
1377 timestamp: Instant::now(),
1378 event_type,
1379 severity,
1380 };
1381
1382 let mut congestion_state = self.congestion_state.lock().unwrap();
1383 congestion_state.congestion_events.push_back(event);
1384
1385 if congestion_state.congestion_events.len() > 100 {
1387 congestion_state.congestion_events.pop_front();
1388 }
1389
1390 let recent_events: Vec<_> = congestion_state.congestion_events.iter()
1392 .filter(|e| e.timestamp.elapsed() < Duration::from_secs(10))
1393 .collect();
1394
1395 if !recent_events.is_empty() {
1396 let avg_severity: f64 = recent_events.iter()
1397 .map(|e| e.severity)
1398 .sum::<f64>() / recent_events.len() as f64;
1399
1400 congestion_state.congestion_level = avg_severity;
1401 }
1402
1403 match event_type {
1405 CongestionEventType::PacketLoss | CongestionEventType::Timeout => {
1406 congestion_state.ssthresh = congestion_state.congestion_window / 2;
1407 congestion_state.congestion_window = congestion_state.ssthresh;
1408 }
1409 CongestionEventType::ECNMark => {
1410 congestion_state.congestion_window =
1411 (congestion_state.congestion_window as f64 * 0.8) as u32;
1412 }
1413 CongestionEventType::RTTIncrease => {
1414 congestion_state.congestion_window =
1416 (congestion_state.congestion_window as f64 * 0.95) as u32;
1417 }
1418 }
1419
1420 debug!("Recorded congestion event: {:?} (severity: {:.2}, new cwnd: {})",
1421 event_type, severity, congestion_state.congestion_window);
1422 }
1423
1424 pub async fn get_stats(&self) -> CongestionIntegrationStats {
1426 self.stats.lock().unwrap().clone()
1427 }
1428}
1429
1430#[derive(Debug)]
1432pub struct NetworkEfficiencyManager {
1433 parallel_discovery: ParallelDiscoveryCoordinator,
1434 adaptive_timeout: AdaptiveTimeoutManager,
1435 bandwidth_validator: BandwidthAwareValidator,
1436 congestion_integrator: CongestionControlIntegrator,
1437 is_running: bool,
1438}
1439
1440impl NetworkEfficiencyManager {
1441 pub fn new() -> Self {
1443 Self {
1444 parallel_discovery: ParallelDiscoveryCoordinator::new(ParallelDiscoveryConfig::default()),
1445 adaptive_timeout: AdaptiveTimeoutManager::new(),
1446 bandwidth_validator: BandwidthAwareValidator::new(BandwidthValidationConfig::default()),
1447 congestion_integrator: CongestionControlIntegrator::new(CongestionIntegrationConfig::default()),
1448 is_running: false,
1449 }
1450 }
1451
1452 pub fn with_configs(
1454 discovery_config: ParallelDiscoveryConfig,
1455 validation_config: BandwidthValidationConfig,
1456 congestion_config: CongestionIntegrationConfig,
1457 ) -> Self {
1458 Self {
1459 parallel_discovery: ParallelDiscoveryCoordinator::new(discovery_config),
1460 adaptive_timeout: AdaptiveTimeoutManager::new(),
1461 bandwidth_validator: BandwidthAwareValidator::new(validation_config),
1462 congestion_integrator: CongestionControlIntegrator::new(congestion_config),
1463 is_running: false,
1464 }
1465 }
1466
1467 pub async fn start(&mut self) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
1469 if self.is_running {
1470 return Ok(());
1471 }
1472
1473 self.adaptive_timeout.start().await?;
1474
1475 self.is_running = true;
1476 info!("Network efficiency manager started");
1477 Ok(())
1478 }
1479
1480 pub fn parallel_discovery(&mut self) -> &mut ParallelDiscoveryCoordinator {
1482 &mut self.parallel_discovery
1483 }
1484
1485 pub fn adaptive_timeout(&self) -> &AdaptiveTimeoutManager {
1487 &self.adaptive_timeout
1488 }
1489
1490 pub fn bandwidth_validator(&self) -> &BandwidthAwareValidator {
1492 &self.bandwidth_validator
1493 }
1494
1495 pub fn congestion_integrator(&self) -> &CongestionControlIntegrator {
1497 &self.congestion_integrator
1498 }
1499
1500 pub async fn get_comprehensive_stats(&self) -> NetworkEfficiencyStats {
1502 NetworkEfficiencyStats {
1503 parallel_discovery: self.parallel_discovery.get_stats().await,
1504 adaptive_timeout: self.adaptive_timeout.get_stats().await,
1505 bandwidth_validation: self.bandwidth_validator.get_stats().await,
1506 congestion_integration: self.congestion_integrator.get_stats().await,
1507 }
1508 }
1509
1510 pub async fn shutdown(&mut self) {
1512 if !self.is_running {
1513 return;
1514 }
1515
1516 self.parallel_discovery.shutdown().await;
1517 self.adaptive_timeout.shutdown().await;
1518
1519 self.is_running = false;
1520 info!("Network efficiency manager shutdown complete");
1521 }
1522}
1523
1524#[derive(Debug, Clone)]
1526pub struct NetworkEfficiencyStats {
1527 pub parallel_discovery: ParallelDiscoveryStats,
1528 pub adaptive_timeout: AdaptiveTimeoutStats,
1529 pub bandwidth_validation: BandwidthValidationStats,
1530 pub congestion_integration: CongestionIntegrationStats,
1531}
1532
1533impl Default for NetworkEfficiencyManager {
1534 fn default() -> Self {
1535 Self::new()
1536 }
1537}