ant_quic/optimization/
network.rs

1//! Network efficiency optimization components for ant-quic
2//!
3//! This module provides network-aware optimizations including:
4//! - Parallel candidate discovery across interfaces
5//! - Adaptive timeout adjustment based on network conditions
6//! - Bandwidth-aware QUIC path validation strategies
7//! - Congestion control integration during QUIC connection migration
8
9use std::{
10    collections::{HashMap, VecDeque},
11    net::{IpAddr, SocketAddr},
12    sync::{Arc, Mutex, RwLock},
13    time::{Duration, Instant},
14};
15
16use tokio::time::timeout;
17
18use tracing::{debug, info, warn};
19
20use tokio::time::sleep;
21
22use crate::{
23    nat_traversal_api::{CandidateAddress, PeerId},
24    candidate_discovery::NetworkInterface,
25    connection::nat_traversal::{CandidateSource, CandidateState},
26};
27
28/// Parallel candidate discovery coordinator
29#[derive(Debug)]
30pub struct ParallelDiscoveryCoordinator {
31    /// Active discovery tasks by interface
32    active_discoveries: Arc<RwLock<HashMap<String, DiscoveryTask>>>,
33    /// Discovery configuration
34    config: ParallelDiscoveryConfig,
35    /// Discovery statistics
36    stats: Arc<Mutex<ParallelDiscoveryStats>>,
37    /// Task coordination handle
38    coordination_handle: Option<tokio::task::JoinHandle<()>>,
39}
40
41/// Configuration for parallel discovery
42#[derive(Debug, Clone)]
43pub struct ParallelDiscoveryConfig {
44    /// Maximum concurrent discovery tasks
45    pub max_concurrent_tasks: usize,
46    /// Timeout for individual interface discovery
47    pub interface_timeout: Duration,
48    /// Enable interface prioritization
49    pub enable_prioritization: bool,
50    /// Preferred interface types
51    pub preferred_interface_types: Vec<InterfaceType>,
52    /// Enable adaptive parallelism based on system resources
53    pub enable_adaptive_parallelism: bool,
54}
55
56/// Network interface type for prioritization
57#[derive(Debug, Clone, Copy, PartialEq, Eq)]
58pub enum InterfaceType {
59    Ethernet,
60    WiFi,
61    Cellular,
62    Loopback,
63    VPN,
64    Unknown,
65}
66
67/// Individual discovery task state
68#[derive(Debug)]
69struct DiscoveryTask {
70    interface_name: String,
71    interface_type: InterfaceType,
72    started_at: Instant,
73    status: TaskStatus,
74    discovered_candidates: Vec<CandidateAddress>,
75    priority: u32,
76}
77
78/// Status of a discovery task
79#[derive(Debug, Clone, Copy, PartialEq, Eq)]
80enum TaskStatus {
81    Pending,
82    Running,
83    Completed,
84    Failed,
85    Timeout,
86}
87
88/// Statistics for parallel discovery
89#[derive(Debug, Default, Clone)]
90pub struct ParallelDiscoveryStats {
91    /// Total discovery tasks started
92    pub tasks_started: u64,
93    /// Total discovery tasks completed
94    pub tasks_completed: u64,
95    /// Total discovery tasks failed
96    pub tasks_failed: u64,
97    /// Average discovery time per interface
98    pub avg_discovery_time: Duration,
99    /// Total candidates discovered
100    pub total_candidates: u64,
101    /// Parallelism efficiency (0.0 - 1.0)
102    pub parallelism_efficiency: f64,
103}
104
105/// Adaptive timeout manager for network condition awareness
106#[derive(Debug)]
107pub struct AdaptiveTimeoutManager {
108    /// Network condition measurements
109    network_conditions: Arc<RwLock<NetworkConditions>>,
110    /// Timeout configurations by operation type
111    timeout_configs: HashMap<OperationType, AdaptiveTimeoutConfig>,
112    /// Timeout statistics
113    stats: Arc<Mutex<AdaptiveTimeoutStats>>,
114    /// Monitoring task handle
115    monitoring_handle: Option<tokio::task::JoinHandle<()>>,
116}
117
118/// Network conditions measurement
119#[derive(Debug, Clone)]
120pub struct NetworkConditions {
121    /// Recent RTT measurements
122    rtt_samples: VecDeque<Duration>,
123    /// Packet loss rate (0.0 - 1.0)
124    packet_loss_rate: f64,
125    /// Bandwidth estimate (bytes/sec)
126    bandwidth_estimate: u64,
127    /// Network quality score (0.0 - 1.0)
128    quality_score: f64,
129    /// Congestion level (0.0 - 1.0)
130    congestion_level: f64,
131    /// Last measurement time
132    last_measurement: Instant,
133}
134
135/// Operation types for adaptive timeouts
136#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
137pub enum OperationType {
138    CandidateDiscovery,
139    PathValidation,
140    CoordinationRequest,
141    HolePunching,
142    ConnectionEstablishment,
143}
144
145/// Adaptive timeout configuration
146#[derive(Debug, Clone)]
147struct AdaptiveTimeoutConfig {
148    /// Base timeout value
149    base_timeout: Duration,
150    /// Minimum timeout
151    min_timeout: Duration,
152    /// Maximum timeout
153    max_timeout: Duration,
154    /// RTT multiplier for timeout calculation
155    rtt_multiplier: f64,
156    /// Quality adjustment factor
157    quality_factor: f64,
158    /// Congestion adjustment factor
159    congestion_factor: f64,
160}
161
162/// Statistics for adaptive timeouts
163#[derive(Debug, Default, Clone)]
164pub struct AdaptiveTimeoutStats {
165    /// Total timeout adjustments made
166    pub adjustments_made: u64,
167    /// Average timeout value by operation
168    pub avg_timeouts: HashMap<OperationType, Duration>,
169    /// Timeout effectiveness (success rate)
170    pub timeout_effectiveness: f64,
171    /// Network condition accuracy
172    pub condition_accuracy: f64,
173}
174
175/// Bandwidth-aware path validation coordinator
176#[derive(Debug)]
177pub struct BandwidthAwareValidator {
178    /// Active validation sessions
179    active_validations: Arc<RwLock<HashMap<SocketAddr, ValidationSession>>>,
180    /// Bandwidth monitoring
181    bandwidth_monitor: Arc<Mutex<BandwidthMonitor>>,
182    /// Validation configuration
183    config: BandwidthValidationConfig,
184    /// Validation statistics
185    stats: Arc<Mutex<BandwidthValidationStats>>,
186}
187
188/// Configuration for bandwidth-aware validation
189#[derive(Debug, Clone)]
190pub struct BandwidthValidationConfig {
191    /// Maximum concurrent validations
192    pub max_concurrent_validations: usize,
193    /// Bandwidth threshold for validation throttling (bytes/sec)
194    pub bandwidth_threshold: u64,
195    /// Enable adaptive validation based on bandwidth
196    pub enable_adaptive_validation: bool,
197    /// Validation packet size
198    pub validation_packet_size: usize,
199    /// Maximum validation rate (packets/sec)
200    pub max_validation_rate: f64,
201}
202
203/// Bandwidth monitoring state
204#[derive(Debug)]
205struct BandwidthMonitor {
206    /// Recent bandwidth measurements
207    bandwidth_samples: VecDeque<BandwidthSample>,
208    /// Current bandwidth estimate
209    current_bandwidth: u64,
210    /// Bandwidth utilization (0.0 - 1.0)
211    utilization: f64,
212    /// Last measurement time
213    last_measurement: Instant,
214}
215
216/// Individual bandwidth measurement
217#[derive(Debug, Clone)]
218struct BandwidthSample {
219    timestamp: Instant,
220    bytes_transferred: u64,
221    duration: Duration,
222    bandwidth: u64,
223}
224
225/// Path validation session
226#[derive(Debug)]
227struct ValidationSession {
228    target_address: SocketAddr,
229    started_at: Instant,
230    packets_sent: u32,
231    packets_received: u32,
232    total_bytes: u64,
233    rtt_samples: Vec<Duration>,
234    bandwidth_usage: u64,
235    priority: ValidationPriority,
236}
237
238/// Priority for path validation
239#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord)]
240pub enum ValidationPriority {
241    Low,
242    Normal,
243    High,
244    Critical,
245}
246
247/// Statistics for bandwidth-aware validation
248#[derive(Debug, Default, Clone)]
249pub struct BandwidthValidationStats {
250    /// Total validations started
251    pub validations_started: u64,
252    /// Total validations completed
253    pub validations_completed: u64,
254    /// Total bandwidth used for validation
255    pub total_bandwidth_used: u64,
256    /// Average validation time
257    pub avg_validation_time: Duration,
258    /// Bandwidth efficiency (successful validations / bandwidth used)
259    pub bandwidth_efficiency: f64,
260}
261
262/// Congestion control integration for connection migration
263#[derive(Debug)]
264pub struct CongestionControlIntegrator {
265    /// Active connection migrations
266    active_migrations: Arc<RwLock<HashMap<PeerId, MigrationSession>>>,
267    /// Congestion control state
268    congestion_state: Arc<Mutex<CongestionState>>,
269    /// Integration configuration
270    config: CongestionIntegrationConfig,
271    /// Integration statistics
272    stats: Arc<Mutex<CongestionIntegrationStats>>,
273}
274
275/// Configuration for congestion control integration
276#[derive(Debug, Clone)]
277pub struct CongestionIntegrationConfig {
278    /// Enable congestion-aware migration
279    pub enable_congestion_awareness: bool,
280    /// Congestion threshold for migration decisions
281    pub congestion_threshold: f64,
282    /// Migration rate limiting
283    pub max_migrations_per_second: f64,
284    /// Enable bandwidth estimation during migration
285    pub enable_bandwidth_estimation: bool,
286    /// Congestion window scaling factor
287    pub cwnd_scaling_factor: f64,
288}
289
290/// Connection migration session
291#[derive(Debug)]
292struct MigrationSession {
293    peer_id: PeerId,
294    old_path: SocketAddr,
295    new_path: SocketAddr,
296    started_at: Instant,
297    migration_state: MigrationState,
298    congestion_window: u32,
299    rtt_estimate: Duration,
300    bandwidth_estimate: u64,
301}
302
303/// State of connection migration
304#[derive(Debug, Clone, Copy, PartialEq, Eq)]
305pub enum MigrationState {
306    Initiated,
307    PathValidating,
308    CongestionProbing,
309    Migrating,
310    Completed,
311    Failed,
312}
313
314/// Congestion control state
315#[derive(Debug)]
316struct CongestionState {
317    /// Current congestion window
318    congestion_window: u32,
319    /// Slow start threshold
320    ssthresh: u32,
321    /// RTT measurements
322    rtt_measurements: VecDeque<Duration>,
323    /// Congestion events
324    congestion_events: VecDeque<CongestionEvent>,
325    /// Current congestion level
326    congestion_level: f64,
327}
328
329/// Congestion event for tracking
330#[derive(Debug, Clone)]
331struct CongestionEvent {
332    timestamp: Instant,
333    event_type: CongestionEventType,
334    severity: f64,
335}
336
337/// Types of congestion events
338#[derive(Debug, Clone, Copy, PartialEq, Eq)]
339pub enum CongestionEventType {
340    PacketLoss,
341    Timeout,
342    ECNMark,
343    RTTIncrease,
344}
345
346/// Statistics for congestion control integration
347#[derive(Debug, Default, Clone)]
348pub struct CongestionIntegrationStats {
349    /// Total migrations attempted
350    pub migrations_attempted: u64,
351    /// Total migrations successful
352    pub migrations_successful: u64,
353    /// Average migration time
354    pub avg_migration_time: Duration,
355    /// Congestion-avoided migrations
356    pub congestion_avoided_migrations: u64,
357    /// Bandwidth utilization efficiency
358    pub bandwidth_utilization_efficiency: f64,
359}
360
361impl Default for ParallelDiscoveryConfig {
362    fn default() -> Self {
363        Self {
364            max_concurrent_tasks: 8,
365            interface_timeout: Duration::from_secs(5),
366            enable_prioritization: true,
367            preferred_interface_types: vec![
368                InterfaceType::Ethernet,
369                InterfaceType::WiFi,
370                InterfaceType::Cellular,
371            ],
372            enable_adaptive_parallelism: true,
373        }
374    }
375}
376
377impl Default for BandwidthValidationConfig {
378    fn default() -> Self {
379        Self {
380            max_concurrent_validations: 16,
381            bandwidth_threshold: 1_000_000, // 1 MB/s
382            enable_adaptive_validation: true,
383            validation_packet_size: 64,
384            max_validation_rate: 100.0, // 100 packets/sec
385        }
386    }
387}
388
389impl Default for CongestionIntegrationConfig {
390    fn default() -> Self {
391        Self {
392            enable_congestion_awareness: true,
393            congestion_threshold: 0.7, // 70% congestion level
394            max_migrations_per_second: 10.0,
395            enable_bandwidth_estimation: true,
396            cwnd_scaling_factor: 0.8,
397        }
398    }
399}
400
401impl ParallelDiscoveryCoordinator {
402    /// Create a new parallel discovery coordinator
403    pub fn new(config: ParallelDiscoveryConfig) -> Self {
404        Self {
405            active_discoveries: Arc::new(RwLock::new(HashMap::new())),
406            config,
407            stats: Arc::new(Mutex::new(ParallelDiscoveryStats::default())),
408            coordination_handle: None,
409        }
410    }
411
412    /// Start parallel discovery across multiple interfaces
413    pub async fn start_parallel_discovery(
414        &mut self,
415        interfaces: Vec<NetworkInterface>,
416        peer_id: PeerId,
417    ) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
418        info!("Starting parallel discovery across {} interfaces for peer {:?}", 
419              interfaces.len(), peer_id);
420
421        // Prioritize interfaces if enabled
422        let prioritized_interfaces = if self.config.enable_prioritization {
423            self.prioritize_interfaces(interfaces)
424        } else {
425            interfaces
426        };
427
428        // Limit concurrent tasks based on configuration and system resources
429        let max_tasks = if self.config.enable_adaptive_parallelism {
430            self.calculate_adaptive_parallelism().await
431        } else {
432            self.config.max_concurrent_tasks
433        };
434
435        let tasks_to_start = prioritized_interfaces.into_iter()
436            .take(max_tasks)
437            .collect::<Vec<_>>();
438
439        // Start discovery tasks
440        for interface in tasks_to_start {
441            self.start_interface_discovery(interface, peer_id).await?;
442        }
443
444        // Start coordination task
445        self.start_coordination_task().await?;
446
447        Ok(())
448    }
449
450    /// Prioritize interfaces based on type and characteristics
451    fn prioritize_interfaces(&self, mut interfaces: Vec<NetworkInterface>) -> Vec<NetworkInterface> {
452        interfaces.sort_by_key(|interface| {
453            let interface_type = self.classify_interface_type(&interface.name);
454            let type_priority = self.config.preferred_interface_types
455                .iter()
456                .position(|&t| t == interface_type)
457                .unwrap_or(999);
458            
459            // Lower number = higher priority
460            (type_priority, interface.addresses.len())
461        });
462
463        interfaces
464    }
465
466    /// Classify interface type from name
467    fn classify_interface_type(&self, name: &str) -> InterfaceType {
468        let name_lower = name.to_lowercase();
469        
470        if name_lower.contains("eth") || name_lower.contains("en") {
471            InterfaceType::Ethernet
472        } else if name_lower.contains("wlan") || name_lower.contains("wifi") || name_lower.contains("wl") {
473            InterfaceType::WiFi
474        } else if name_lower.contains("cell") || name_lower.contains("wwan") || name_lower.contains("ppp") {
475            InterfaceType::Cellular
476        } else if name_lower.contains("lo") || name_lower.contains("loopback") {
477            InterfaceType::Loopback
478        } else if name_lower.contains("vpn") || name_lower.contains("tun") || name_lower.contains("tap") {
479            InterfaceType::VPN
480        } else {
481            InterfaceType::Unknown
482        }
483    }
484
485    /// Calculate adaptive parallelism based on system resources
486    async fn calculate_adaptive_parallelism(&self) -> usize {
487        // Simplified adaptive calculation
488        // In production, this would consider:
489        // - CPU cores
490        // - Memory availability
491        // - Network bandwidth
492        // - Current system load
493        
494        let base_parallelism = self.config.max_concurrent_tasks;
495        let system_load_factor = 0.8; // Assume 80% system capacity
496        
497        ((base_parallelism as f64) * system_load_factor) as usize
498    }
499
500    /// Start discovery for a specific interface
501    async fn start_interface_discovery(
502        &self,
503        interface: NetworkInterface,
504        _peer_id: PeerId,
505    ) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
506        let interface_type = self.classify_interface_type(&interface.name);
507        let priority = self.calculate_interface_priority(interface_type);
508
509        let task = DiscoveryTask {
510            interface_name: interface.name.clone(),
511            interface_type,
512            started_at: Instant::now(),
513            status: TaskStatus::Pending,
514            discovered_candidates: Vec::new(),
515            priority,
516        };
517
518        // Add to active discoveries
519        {
520            let mut discoveries = self.active_discoveries.write().unwrap();
521            discoveries.insert(interface.name.clone(), task);
522        }
523
524        // Update stats
525        {
526            let mut stats = self.stats.lock().unwrap();
527            stats.tasks_started += 1;
528        }
529
530        // Start actual discovery (simplified)
531        self.perform_interface_discovery(interface).await?;
532
533        Ok(())
534    }
535
536    /// Calculate priority for interface type
537    fn calculate_interface_priority(&self, interface_type: InterfaceType) -> u32 {
538        match interface_type {
539            InterfaceType::Ethernet => 100,
540            InterfaceType::WiFi => 80,
541            InterfaceType::Cellular => 60,
542            InterfaceType::VPN => 40,
543            InterfaceType::Loopback => 20,
544            InterfaceType::Unknown => 10,
545        }
546    }
547
548    /// Perform discovery for a specific interface
549    async fn perform_interface_discovery(
550        &self,
551        interface: NetworkInterface,
552    ) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
553        let interface_name = interface.name.clone();
554        
555        // Update task status to running
556        {
557            let mut discoveries = self.active_discoveries.write().unwrap();
558            if let Some(task) = discoveries.get_mut(&interface_name) {
559                task.status = TaskStatus::Running;
560            }
561        }
562
563        // Perform discovery with timeout
564        let discovery_result = timeout(
565            self.config.interface_timeout,
566            self.discover_candidates_for_interface(interface),
567        ).await;
568
569        match discovery_result {
570            Ok(Ok(candidates)) => {
571                // Discovery successful
572                {
573                    let mut discoveries = self.active_discoveries.write().unwrap();
574                    if let Some(task) = discoveries.get_mut(&interface_name) {
575                        task.status = TaskStatus::Completed;
576                        task.discovered_candidates = candidates;
577                    }
578                }
579
580                // Update stats
581                {
582                    let mut stats = self.stats.lock().unwrap();
583                    stats.tasks_completed += 1;
584                }
585
586                debug!("Interface discovery completed for {}", interface_name);
587            }
588            Ok(Err(_)) => {
589                // Discovery failed
590                {
591                    let mut discoveries = self.active_discoveries.write().unwrap();
592                    if let Some(task) = discoveries.get_mut(&interface_name) {
593                        task.status = TaskStatus::Failed;
594                    }
595                }
596
597                // Update stats
598                {
599                    let mut stats = self.stats.lock().unwrap();
600                    stats.tasks_failed += 1;
601                }
602
603                warn!("Interface discovery failed for {}", interface_name);
604            }
605            Err(_) => {
606                // Discovery timeout
607                {
608                    let mut discoveries = self.active_discoveries.write().unwrap();
609                    if let Some(task) = discoveries.get_mut(&interface_name) {
610                        task.status = TaskStatus::Timeout;
611                    }
612                }
613
614                // Update stats
615                {
616                    let mut stats = self.stats.lock().unwrap();
617                    stats.tasks_failed += 1;
618                }
619
620                warn!("Interface discovery timeout for {}", interface_name);
621            }
622        }
623
624        Ok(())
625    }
626
627    /// Discover candidates for a specific interface
628    async fn discover_candidates_for_interface(
629        &self,
630        interface: NetworkInterface,
631    ) -> Result<Vec<CandidateAddress>, Box<dyn std::error::Error + Send + Sync>> {
632        let mut candidates = Vec::new();
633
634        for address in &interface.addresses {
635            // Skip loopback and link-local addresses for P2P
636            if self.is_valid_candidate_address(&address) {
637                let candidate = CandidateAddress {
638                    address: *address,
639                    priority: self.calculate_candidate_priority(&address, &interface),
640                    source: CandidateSource::Local,
641                    state: CandidateState::New,
642                };
643
644                candidates.push(candidate);
645            }
646        }
647
648        // Simulate some discovery time
649        sleep(Duration::from_millis(100)).await;
650
651        Ok(candidates)
652    }
653
654    /// Check if address is valid for P2P candidate
655    fn is_valid_candidate_address(&self, address: &SocketAddr) -> bool {
656        match address.ip() {
657            IpAddr::V4(ipv4) => {
658                !ipv4.is_loopback() && !ipv4.is_link_local() && !ipv4.is_broadcast()
659            }
660            IpAddr::V6(ipv6) => {
661                !ipv6.is_loopback() && !ipv6.is_unspecified()
662            }
663        }
664    }
665
666    /// Calculate priority for a candidate address
667    fn calculate_candidate_priority(&self, address: &SocketAddr, interface: &NetworkInterface) -> u32 {
668        let mut priority = 1000u32;
669
670        // Prefer IPv4 over IPv6 for simplicity
671        if address.is_ipv4() {
672            priority += 100;
673        }
674
675        // Prefer non-private addresses
676        if !self.is_private_address(address) {
677            priority += 200;
678        }
679
680        // Add interface-specific priority
681        let interface_type = self.classify_interface_type(&interface.name);
682        priority += self.calculate_interface_priority(interface_type);
683
684        priority
685    }
686
687    /// Check if address is in private range
688    fn is_private_address(&self, address: &SocketAddr) -> bool {
689        match address.ip() {
690            IpAddr::V4(ipv4) => ipv4.is_private(),
691            IpAddr::V6(ipv6) => {
692                // Check for unique local addresses (fc00::/7)
693                let segments = ipv6.segments();
694                (segments[0] & 0xfe00) == 0xfc00
695            }
696        }
697    }
698
699    /// Start coordination task for managing parallel discoveries
700    async fn start_coordination_task(&mut self) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
701        let discoveries = Arc::clone(&self.active_discoveries);
702        let stats = Arc::clone(&self.stats);
703        let config = self.config.clone();
704
705        let coordination_handle = tokio::spawn(async move {
706            let mut interval = tokio::time::interval(Duration::from_millis(500));
707            
708            loop {
709                interval.tick().await;
710                Self::coordinate_discoveries(&discoveries, &stats, &config).await;
711                
712                // Check if all discoveries are complete
713                let all_complete = {
714                    let discoveries_read = discoveries.read().unwrap();
715                    discoveries_read.values().all(|task| {
716                        matches!(task.status, TaskStatus::Completed | TaskStatus::Failed | TaskStatus::Timeout)
717                    })
718                };
719
720                if all_complete {
721                    break;
722                }
723            }
724        });
725
726        self.coordination_handle = Some(coordination_handle);
727        Ok(())
728    }
729
730    /// Coordinate parallel discoveries
731    async fn coordinate_discoveries(
732        discoveries: &Arc<RwLock<HashMap<String, DiscoveryTask>>>,
733        stats: &Arc<Mutex<ParallelDiscoveryStats>>,
734        _config: &ParallelDiscoveryConfig,
735    ) {
736        let mut total_candidates = 0u64;
737        let mut completed_tasks = 0u64;
738        let mut total_discovery_time = Duration::ZERO;
739
740        {
741            let discoveries_read = discoveries.read().unwrap();
742            for task in discoveries_read.values() {
743                if task.status == TaskStatus::Completed {
744                    total_candidates += task.discovered_candidates.len() as u64;
745                    completed_tasks += 1;
746                    total_discovery_time += task.started_at.elapsed();
747                }
748            }
749        }
750
751        // Update stats
752        {
753            let mut stats_guard = stats.lock().unwrap();
754            stats_guard.total_candidates = total_candidates;
755            stats_guard.tasks_completed = completed_tasks;
756            
757            if completed_tasks > 0 {
758                stats_guard.avg_discovery_time = total_discovery_time / completed_tasks as u32;
759                stats_guard.parallelism_efficiency = completed_tasks as f64 / stats_guard.tasks_started as f64;
760            }
761        }
762    }
763
764    /// Get all discovered candidates from parallel discovery
765    pub async fn get_all_candidates(&self) -> Vec<CandidateAddress> {
766        let mut all_candidates = Vec::new();
767
768        let discoveries = self.active_discoveries.read().unwrap();
769        for task in discoveries.values() {
770            if task.status == TaskStatus::Completed {
771                all_candidates.extend(task.discovered_candidates.clone());
772            }
773        }
774
775        // Sort by priority (highest first)
776        all_candidates.sort_by(|a, b| b.priority.cmp(&a.priority));
777
778        all_candidates
779    }
780
781    /// Get parallel discovery statistics
782    pub async fn get_stats(&self) -> ParallelDiscoveryStats {
783        self.stats.lock().unwrap().clone()
784    }
785
786    /// Shutdown parallel discovery coordinator
787    pub async fn shutdown(&mut self) {
788        if let Some(handle) = self.coordination_handle.take() {
789            handle.abort();
790        }
791
792        // Clear active discoveries
793        {
794            let mut discoveries = self.active_discoveries.write().unwrap();
795            discoveries.clear();
796        }
797
798        info!("Parallel discovery coordinator shutdown complete");
799    }
800}
801
802impl AdaptiveTimeoutManager {
803    /// Create a new adaptive timeout manager
804    pub fn new() -> Self {
805        let mut timeout_configs = HashMap::new();
806        
807        // Initialize default timeout configurations for each operation type
808        timeout_configs.insert(OperationType::CandidateDiscovery, AdaptiveTimeoutConfig {
809            base_timeout: Duration::from_secs(5),
810            min_timeout: Duration::from_millis(500),
811            max_timeout: Duration::from_secs(30),
812            rtt_multiplier: 4.0,
813            quality_factor: 0.5,
814            congestion_factor: 0.3,
815        });
816
817        timeout_configs.insert(OperationType::PathValidation, AdaptiveTimeoutConfig {
818            base_timeout: Duration::from_secs(3),
819            min_timeout: Duration::from_millis(200),
820            max_timeout: Duration::from_secs(15),
821            rtt_multiplier: 3.0,
822            quality_factor: 0.4,
823            congestion_factor: 0.4,
824        });
825
826        timeout_configs.insert(OperationType::CoordinationRequest, AdaptiveTimeoutConfig {
827            base_timeout: Duration::from_secs(10),
828            min_timeout: Duration::from_secs(1),
829            max_timeout: Duration::from_secs(60),
830            rtt_multiplier: 5.0,
831            quality_factor: 0.6,
832            congestion_factor: 0.2,
833        });
834
835        timeout_configs.insert(OperationType::HolePunching, AdaptiveTimeoutConfig {
836            base_timeout: Duration::from_secs(2),
837            min_timeout: Duration::from_millis(100),
838            max_timeout: Duration::from_secs(10),
839            rtt_multiplier: 2.0,
840            quality_factor: 0.3,
841            congestion_factor: 0.5,
842        });
843
844        timeout_configs.insert(OperationType::ConnectionEstablishment, AdaptiveTimeoutConfig {
845            base_timeout: Duration::from_secs(15),
846            min_timeout: Duration::from_secs(2),
847            max_timeout: Duration::from_secs(120),
848            rtt_multiplier: 6.0,
849            quality_factor: 0.7,
850            congestion_factor: 0.1,
851        });
852
853        Self {
854            network_conditions: Arc::new(RwLock::new(NetworkConditions {
855                rtt_samples: VecDeque::new(),
856                packet_loss_rate: 0.0,
857                bandwidth_estimate: 1_000_000, // 1 MB/s default
858                quality_score: 0.8, // Good quality default
859                congestion_level: 0.2, // Low congestion default
860                last_measurement: Instant::now(),
861            })),
862            timeout_configs,
863            stats: Arc::new(Mutex::new(AdaptiveTimeoutStats::default())),
864            monitoring_handle: None,
865        }
866    }
867
868    /// Start the adaptive timeout manager with network monitoring
869    pub async fn start(&mut self) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
870        let network_conditions = Arc::clone(&self.network_conditions);
871        let stats = Arc::clone(&self.stats);
872
873        let monitoring_handle = tokio::spawn(async move {
874            let mut interval = tokio::time::interval(Duration::from_secs(1));
875            
876            loop {
877                interval.tick().await;
878                Self::update_network_conditions(&network_conditions, &stats).await;
879            }
880        });
881
882        self.monitoring_handle = Some(monitoring_handle);
883        info!("Adaptive timeout manager started");
884        Ok(())
885    }
886
887    /// Calculate adaptive timeout for an operation
888    pub async fn calculate_timeout(&self, operation: OperationType) -> Duration {
889        let config = self.timeout_configs.get(&operation)
890            .cloned()
891            .unwrap_or_else(|| AdaptiveTimeoutConfig {
892                base_timeout: Duration::from_secs(5),
893                min_timeout: Duration::from_millis(500),
894                max_timeout: Duration::from_secs(30),
895                rtt_multiplier: 4.0,
896                quality_factor: 0.5,
897                congestion_factor: 0.3,
898            });
899
900        let conditions = self.network_conditions.read().unwrap();
901        
902        // Calculate base timeout from RTT if available
903        let rtt_based_timeout = if let Some(avg_rtt) = self.calculate_average_rtt(&conditions.rtt_samples) {
904            Duration::from_millis((avg_rtt.as_millis() as f64 * config.rtt_multiplier) as u64)
905        } else {
906            config.base_timeout
907        };
908
909        // Adjust for network quality
910        let quality_adjustment = 1.0 + (1.0 - conditions.quality_score) * config.quality_factor;
911        
912        // Adjust for congestion
913        let congestion_adjustment = 1.0 + conditions.congestion_level * config.congestion_factor;
914
915        // Calculate final timeout
916        let adjusted_timeout = Duration::from_millis(
917            (rtt_based_timeout.as_millis() as f64 * quality_adjustment * congestion_adjustment) as u64
918        );
919
920        // Clamp to min/max bounds
921        let final_timeout = adjusted_timeout
922            .max(config.min_timeout)
923            .min(config.max_timeout);
924
925        // Update stats
926        {
927            let mut stats = self.stats.lock().unwrap();
928            stats.adjustments_made += 1;
929            stats.avg_timeouts.insert(operation, final_timeout);
930        }
931
932        debug!("Calculated adaptive timeout for {:?}: {:?} (quality: {:.2}, congestion: {:.2})",
933               operation, final_timeout, conditions.quality_score, conditions.congestion_level);
934
935        final_timeout
936    }
937
938    /// Record network measurement for adaptive timeout calculation
939    pub async fn record_measurement(
940        &self,
941        rtt: Duration,
942        packet_loss: bool,
943        bandwidth: Option<u64>,
944    ) {
945        let mut conditions = self.network_conditions.write().unwrap();
946        
947        // Add RTT sample
948        conditions.rtt_samples.push_back(rtt);
949        if conditions.rtt_samples.len() > 50 {
950            conditions.rtt_samples.pop_front();
951        }
952
953        // Update packet loss rate (exponential moving average)
954        let loss_sample = if packet_loss { 1.0 } else { 0.0 };
955        conditions.packet_loss_rate = conditions.packet_loss_rate * 0.9 + loss_sample * 0.1;
956
957        // Update bandwidth estimate if provided
958        if let Some(bw) = bandwidth {
959            conditions.bandwidth_estimate = (conditions.bandwidth_estimate as f64 * 0.8 + bw as f64 * 0.2) as u64;
960        }
961
962        // Update quality score based on RTT and packet loss
963        let rtt_quality = 1.0 - (rtt.as_millis() as f64 / 1000.0).min(1.0);
964        let loss_quality = 1.0 - conditions.packet_loss_rate;
965        conditions.quality_score = (rtt_quality + loss_quality) / 2.0;
966
967        // Update congestion level based on RTT variance and packet loss
968        let rtt_variance = self.calculate_rtt_variance(&conditions.rtt_samples);
969        conditions.congestion_level = (conditions.packet_loss_rate + rtt_variance).min(1.0);
970
971        conditions.last_measurement = Instant::now();
972    }
973
974    /// Calculate average RTT from samples
975    fn calculate_average_rtt(&self, samples: &VecDeque<Duration>) -> Option<Duration> {
976        if samples.is_empty() {
977            return None;
978        }
979
980        let total_ms: u64 = samples.iter().map(|d| d.as_millis() as u64).sum();
981        Some(Duration::from_millis(total_ms / samples.len() as u64))
982    }
983
984    /// Calculate RTT variance for congestion detection
985    fn calculate_rtt_variance(&self, samples: &VecDeque<Duration>) -> f64 {
986        if samples.len() < 2 {
987            return 0.0;
988        }
989
990        let avg = self.calculate_average_rtt(samples).unwrap().as_millis() as f64;
991        let variance: f64 = samples.iter()
992            .map(|d| {
993                let diff = d.as_millis() as f64 - avg;
994                diff * diff
995            })
996            .sum::<f64>() / samples.len() as f64;
997
998        (variance.sqrt() / avg).min(1.0)
999    }
1000
1001    /// Update network conditions periodically
1002    async fn update_network_conditions(
1003        network_conditions: &Arc<RwLock<NetworkConditions>>,
1004        _stats: &Arc<Mutex<AdaptiveTimeoutStats>>,
1005    ) {
1006        // Periodic network condition updates
1007        // In production, this would:
1008        // - Probe network conditions
1009        // - Update bandwidth estimates
1010        // - Detect congestion patterns
1011        // - Adjust quality scores
1012
1013        let mut conditions = network_conditions.write().unwrap();
1014        
1015        // Age out old RTT samples (keep last 100 samples)
1016        while conditions.rtt_samples.len() > 100 {
1017            conditions.rtt_samples.pop_front();
1018        }
1019
1020        // Decay packet loss rate over time
1021        conditions.packet_loss_rate *= 0.99;
1022        
1023        // Update quality score based on recent measurements
1024        if conditions.last_measurement.elapsed() > Duration::from_secs(10) {
1025            // No recent measurements, assume degraded quality
1026            conditions.quality_score *= 0.95;
1027        }
1028    }
1029
1030    /// Get current network conditions
1031    pub async fn get_network_conditions(&self) -> NetworkConditions {
1032        self.network_conditions.read().unwrap().clone()
1033    }
1034
1035    /// Get adaptive timeout statistics
1036    pub async fn get_stats(&self) -> AdaptiveTimeoutStats {
1037        self.stats.lock().unwrap().clone()
1038    }
1039
1040    /// Shutdown the adaptive timeout manager
1041    pub async fn shutdown(&mut self) {
1042        if let Some(handle) = self.monitoring_handle.take() {
1043            handle.abort();
1044        }
1045
1046        info!("Adaptive timeout manager shutdown complete");
1047    }
1048}
1049
1050impl BandwidthAwareValidator {
1051    /// Create a new bandwidth-aware validator
1052    pub fn new(config: BandwidthValidationConfig) -> Self {
1053        Self {
1054            active_validations: Arc::new(RwLock::new(HashMap::new())),
1055            bandwidth_monitor: Arc::new(Mutex::new(BandwidthMonitor {
1056                bandwidth_samples: VecDeque::new(),
1057                current_bandwidth: 1_000_000, // 1 MB/s default
1058                utilization: 0.0,
1059                last_measurement: Instant::now(),
1060            })),
1061            config,
1062            stats: Arc::new(Mutex::new(BandwidthValidationStats::default())),
1063        }
1064    }
1065
1066    /// Start path validation with bandwidth awareness
1067    pub async fn start_validation(
1068        &self,
1069        target_address: SocketAddr,
1070        priority: ValidationPriority,
1071    ) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
1072        // Check if we can start new validation based on bandwidth constraints
1073        if !self.can_start_validation().await {
1074            return Err("Bandwidth limit reached, cannot start validation".into());
1075        }
1076
1077        let session = ValidationSession {
1078            target_address,
1079            started_at: Instant::now(),
1080            packets_sent: 0,
1081            packets_received: 0,
1082            total_bytes: 0,
1083            rtt_samples: Vec::new(),
1084            bandwidth_usage: 0,
1085            priority,
1086        };
1087
1088        // Add to active validations
1089        {
1090            let mut validations = self.active_validations.write().unwrap();
1091            validations.insert(target_address, session);
1092        }
1093
1094        // Update stats
1095        {
1096            let mut stats = self.stats.lock().unwrap();
1097            stats.validations_started += 1;
1098        }
1099
1100        debug!("Started bandwidth-aware validation for {}", target_address);
1101        Ok(())
1102    }
1103
1104    /// Check if new validation can be started based on bandwidth constraints
1105    async fn can_start_validation(&self) -> bool {
1106        let validations = self.active_validations.read().unwrap();
1107        let bandwidth_monitor = self.bandwidth_monitor.lock().unwrap();
1108
1109        // Check concurrent validation limit
1110        if validations.len() >= self.config.max_concurrent_validations {
1111            return false;
1112        }
1113
1114        // Check bandwidth utilization if adaptive validation is enabled
1115        if self.config.enable_adaptive_validation {
1116            let current_usage: u64 = validations.values()
1117                .map(|session| session.bandwidth_usage)
1118                .sum();
1119
1120            let available_bandwidth = bandwidth_monitor.current_bandwidth;
1121            let utilization = current_usage as f64 / available_bandwidth as f64;
1122
1123            if utilization > 0.8 { // 80% utilization threshold
1124                return false;
1125            }
1126        }
1127
1128        true
1129    }
1130
1131    /// Record validation packet transmission
1132    pub async fn record_packet_sent(
1133        &self,
1134        target_address: SocketAddr,
1135        packet_size: usize,
1136    ) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
1137        let mut validations = self.active_validations.write().unwrap();
1138        
1139        if let Some(session) = validations.get_mut(&target_address) {
1140            session.packets_sent += 1;
1141            session.total_bytes += packet_size as u64;
1142            session.bandwidth_usage += packet_size as u64;
1143        }
1144
1145        // Update bandwidth monitoring
1146        self.update_bandwidth_usage(packet_size as u64).await;
1147
1148        Ok(())
1149    }
1150
1151    /// Record validation packet reception
1152    pub async fn record_packet_received(
1153        &self,
1154        target_address: SocketAddr,
1155        rtt: Duration,
1156    ) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
1157        let mut validations = self.active_validations.write().unwrap();
1158        
1159        if let Some(session) = validations.get_mut(&target_address) {
1160            session.packets_received += 1;
1161            session.rtt_samples.push(rtt);
1162        }
1163
1164        Ok(())
1165    }
1166
1167    /// Update bandwidth usage monitoring
1168    async fn update_bandwidth_usage(&self, bytes_used: u64) {
1169        let mut monitor = self.bandwidth_monitor.lock().unwrap();
1170        
1171        let now = Instant::now();
1172        let sample = BandwidthSample {
1173            timestamp: now,
1174            bytes_transferred: bytes_used,
1175            duration: now.duration_since(monitor.last_measurement),
1176            bandwidth: if monitor.last_measurement.elapsed().as_secs() > 0 {
1177                bytes_used / monitor.last_measurement.elapsed().as_secs()
1178            } else {
1179                0
1180            },
1181        };
1182
1183        monitor.bandwidth_samples.push_back(sample);
1184        if monitor.bandwidth_samples.len() > 100 {
1185            monitor.bandwidth_samples.pop_front();
1186        }
1187
1188        // Update current bandwidth estimate
1189        if !monitor.bandwidth_samples.is_empty() {
1190            let total_bytes: u64 = monitor.bandwidth_samples.iter()
1191                .map(|s| s.bytes_transferred)
1192                .sum();
1193            let total_time: Duration = monitor.bandwidth_samples.iter()
1194                .map(|s| s.duration)
1195                .sum();
1196            
1197            if total_time.as_secs() > 0 {
1198                monitor.current_bandwidth = total_bytes / total_time.as_secs();
1199            }
1200        }
1201
1202        monitor.last_measurement = now;
1203    }
1204
1205    /// Complete validation session
1206    pub async fn complete_validation(
1207        &self,
1208        target_address: SocketAddr,
1209        success: bool,
1210    ) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
1211        let session = {
1212            let mut validations = self.active_validations.write().unwrap();
1213            validations.remove(&target_address)
1214        };
1215
1216        if let Some(session) = session {
1217            let duration = session.started_at.elapsed();
1218            
1219            // Update stats
1220            {
1221                let mut stats = self.stats.lock().unwrap();
1222                if success {
1223                    stats.validations_completed += 1;
1224                }
1225                stats.total_bandwidth_used += session.bandwidth_usage;
1226                stats.avg_validation_time = if stats.validations_completed > 0 {
1227                    Duration::from_millis(
1228                        (stats.avg_validation_time.as_millis() as u64 * (stats.validations_completed - 1) + 
1229                         duration.as_millis() as u64) / stats.validations_completed
1230                    )
1231                } else {
1232                    duration
1233                };
1234                
1235                if stats.total_bandwidth_used > 0 {
1236                    stats.bandwidth_efficiency = stats.validations_completed as f64 / 
1237                                                stats.total_bandwidth_used as f64 * 1000.0; // per KB
1238                }
1239            }
1240
1241            debug!("Completed validation for {} in {:?} (success: {})", 
1242                   target_address, duration, success);
1243        }
1244
1245        Ok(())
1246    }
1247
1248    /// Get bandwidth validation statistics
1249    pub async fn get_stats(&self) -> BandwidthValidationStats {
1250        self.stats.lock().unwrap().clone()
1251    }
1252}
1253
1254impl CongestionControlIntegrator {
1255    /// Create a new congestion control integrator
1256    pub fn new(config: CongestionIntegrationConfig) -> Self {
1257        Self {
1258            active_migrations: Arc::new(RwLock::new(HashMap::new())),
1259            congestion_state: Arc::new(Mutex::new(CongestionState {
1260                congestion_window: 10, // Initial cwnd
1261                ssthresh: 65535,
1262                rtt_measurements: VecDeque::new(),
1263                congestion_events: VecDeque::new(),
1264                congestion_level: 0.0,
1265            })),
1266            config,
1267            stats: Arc::new(Mutex::new(CongestionIntegrationStats::default())),
1268        }
1269    }
1270
1271    /// Start connection migration with congestion awareness
1272    pub async fn start_migration(
1273        &self,
1274        peer_id: PeerId,
1275        old_path: SocketAddr,
1276        new_path: SocketAddr,
1277    ) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
1278        // Check if migration should be delayed due to congestion
1279        if self.config.enable_congestion_awareness {
1280            let congestion_state = self.congestion_state.lock().unwrap();
1281            if congestion_state.congestion_level > self.config.congestion_threshold {
1282                return Err("Migration delayed due to high congestion".into());
1283            }
1284        }
1285
1286        let session = MigrationSession {
1287            peer_id,
1288            old_path,
1289            new_path,
1290            started_at: Instant::now(),
1291            migration_state: MigrationState::Initiated,
1292            congestion_window: {
1293                let state = self.congestion_state.lock().unwrap();
1294                (state.congestion_window as f64 * self.config.cwnd_scaling_factor) as u32
1295            },
1296            rtt_estimate: Duration::from_millis(100), // Default RTT
1297            bandwidth_estimate: 1_000_000, // 1 MB/s default
1298        };
1299
1300        // Add to active migrations
1301        {
1302            let mut migrations = self.active_migrations.write().unwrap();
1303            migrations.insert(peer_id, session);
1304        }
1305
1306        // Update stats
1307        {
1308            let mut stats = self.stats.lock().unwrap();
1309            stats.migrations_attempted += 1;
1310        }
1311
1312        info!("Started congestion-aware migration for peer {:?}: {} -> {}", 
1313              peer_id, old_path, new_path);
1314        Ok(())
1315    }
1316
1317    /// Update migration state based on congestion feedback
1318    pub async fn update_migration_state(
1319        &self,
1320        peer_id: PeerId,
1321        new_state: MigrationState,
1322        rtt: Option<Duration>,
1323        bandwidth: Option<u64>,
1324    ) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
1325        let mut migrations = self.active_migrations.write().unwrap();
1326        
1327        if let Some(session) = migrations.get_mut(&peer_id) {
1328            session.migration_state = new_state;
1329            
1330            if let Some(rtt) = rtt {
1331                session.rtt_estimate = rtt;
1332                
1333                // Update global congestion state
1334                let mut congestion_state = self.congestion_state.lock().unwrap();
1335                congestion_state.rtt_measurements.push_back(rtt);
1336                if congestion_state.rtt_measurements.len() > 50 {
1337                    congestion_state.rtt_measurements.pop_front();
1338                }
1339            }
1340            
1341            if let Some(bw) = bandwidth {
1342                session.bandwidth_estimate = bw;
1343            }
1344
1345            // Check if migration completed
1346            if matches!(new_state, MigrationState::Completed) {
1347                let duration = session.started_at.elapsed();
1348                
1349                // Update stats
1350                let mut stats = self.stats.lock().unwrap();
1351                stats.migrations_successful += 1;
1352                stats.avg_migration_time = if stats.migrations_successful > 0 {
1353                    Duration::from_millis(
1354                        (stats.avg_migration_time.as_millis() as u64 * (stats.migrations_successful - 1) + 
1355                         duration.as_millis() as u64) / stats.migrations_successful
1356                    )
1357                } else {
1358                    duration
1359                };
1360
1361                debug!("Migration completed for peer {:?} in {:?}", peer_id, duration);
1362            }
1363        }
1364
1365        Ok(())
1366    }
1367
1368    /// Record congestion event
1369    pub async fn record_congestion_event(
1370        &self,
1371        event_type: CongestionEventType,
1372        severity: f64,
1373    ) {
1374        let event = CongestionEvent {
1375            timestamp: Instant::now(),
1376            event_type,
1377            severity,
1378        };
1379
1380        let mut congestion_state = self.congestion_state.lock().unwrap();
1381        congestion_state.congestion_events.push_back(event);
1382        
1383        // Keep only recent events
1384        if congestion_state.congestion_events.len() > 100 {
1385            congestion_state.congestion_events.pop_front();
1386        }
1387
1388        // Update congestion level based on recent events
1389        let recent_events: Vec<_> = congestion_state.congestion_events.iter()
1390            .filter(|e| e.timestamp.elapsed() < Duration::from_secs(10))
1391            .collect();
1392
1393        if !recent_events.is_empty() {
1394            let avg_severity: f64 = recent_events.iter()
1395                .map(|e| e.severity)
1396                .sum::<f64>() / recent_events.len() as f64;
1397            
1398            congestion_state.congestion_level = avg_severity;
1399        }
1400
1401        // Adjust congestion window based on event
1402        match event_type {
1403            CongestionEventType::PacketLoss | CongestionEventType::Timeout => {
1404                congestion_state.ssthresh = congestion_state.congestion_window / 2;
1405                congestion_state.congestion_window = congestion_state.ssthresh;
1406            }
1407            CongestionEventType::ECNMark => {
1408                congestion_state.congestion_window = 
1409                    (congestion_state.congestion_window as f64 * 0.8) as u32;
1410            }
1411            CongestionEventType::RTTIncrease => {
1412                // Gradual reduction for RTT increase
1413                congestion_state.congestion_window = 
1414                    (congestion_state.congestion_window as f64 * 0.95) as u32;
1415            }
1416        }
1417
1418        debug!("Recorded congestion event: {:?} (severity: {:.2}, new cwnd: {})", 
1419               event_type, severity, congestion_state.congestion_window);
1420    }
1421
1422    /// Get congestion control integration statistics
1423    pub async fn get_stats(&self) -> CongestionIntegrationStats {
1424        self.stats.lock().unwrap().clone()
1425    }
1426}
1427
1428/// Network efficiency optimization manager that coordinates all network optimization components
1429#[derive(Debug)]
1430pub struct NetworkEfficiencyManager {
1431    parallel_discovery: ParallelDiscoveryCoordinator,
1432    adaptive_timeout: AdaptiveTimeoutManager,
1433    bandwidth_validator: BandwidthAwareValidator,
1434    congestion_integrator: CongestionControlIntegrator,
1435    is_running: bool,
1436}
1437
1438impl NetworkEfficiencyManager {
1439    /// Create a new network efficiency manager with default configurations
1440    pub fn new() -> Self {
1441        Self {
1442            parallel_discovery: ParallelDiscoveryCoordinator::new(ParallelDiscoveryConfig::default()),
1443            adaptive_timeout: AdaptiveTimeoutManager::new(),
1444            bandwidth_validator: BandwidthAwareValidator::new(BandwidthValidationConfig::default()),
1445            congestion_integrator: CongestionControlIntegrator::new(CongestionIntegrationConfig::default()),
1446            is_running: false,
1447        }
1448    }
1449
1450    /// Create a new network efficiency manager with custom configurations
1451    pub fn with_configs(
1452        discovery_config: ParallelDiscoveryConfig,
1453        validation_config: BandwidthValidationConfig,
1454        congestion_config: CongestionIntegrationConfig,
1455    ) -> Self {
1456        Self {
1457            parallel_discovery: ParallelDiscoveryCoordinator::new(discovery_config),
1458            adaptive_timeout: AdaptiveTimeoutManager::new(),
1459            bandwidth_validator: BandwidthAwareValidator::new(validation_config),
1460            congestion_integrator: CongestionControlIntegrator::new(congestion_config),
1461            is_running: false,
1462        }
1463    }
1464
1465    /// Start all network efficiency components
1466    pub async fn start(&mut self) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
1467        if self.is_running {
1468            return Ok(());
1469        }
1470
1471        self.adaptive_timeout.start().await?;
1472
1473        self.is_running = true;
1474        info!("Network efficiency manager started");
1475        Ok(())
1476    }
1477
1478    /// Get parallel discovery coordinator reference
1479    pub fn parallel_discovery(&mut self) -> &mut ParallelDiscoveryCoordinator {
1480        &mut self.parallel_discovery
1481    }
1482
1483    /// Get adaptive timeout manager reference
1484    pub fn adaptive_timeout(&self) -> &AdaptiveTimeoutManager {
1485        &self.adaptive_timeout
1486    }
1487
1488    /// Get bandwidth validator reference
1489    pub fn bandwidth_validator(&self) -> &BandwidthAwareValidator {
1490        &self.bandwidth_validator
1491    }
1492
1493    /// Get congestion integrator reference
1494    pub fn congestion_integrator(&self) -> &CongestionControlIntegrator {
1495        &self.congestion_integrator
1496    }
1497
1498    /// Get comprehensive network efficiency statistics
1499    pub async fn get_comprehensive_stats(&self) -> NetworkEfficiencyStats {
1500        NetworkEfficiencyStats {
1501            parallel_discovery: self.parallel_discovery.get_stats().await,
1502            adaptive_timeout: self.adaptive_timeout.get_stats().await,
1503            bandwidth_validation: self.bandwidth_validator.get_stats().await,
1504            congestion_integration: self.congestion_integrator.get_stats().await,
1505        }
1506    }
1507
1508    /// Shutdown all network efficiency components
1509    pub async fn shutdown(&mut self) {
1510        if !self.is_running {
1511            return;
1512        }
1513
1514        self.parallel_discovery.shutdown().await;
1515        self.adaptive_timeout.shutdown().await;
1516
1517        self.is_running = false;
1518        info!("Network efficiency manager shutdown complete");
1519    }
1520}
1521
1522/// Comprehensive network efficiency statistics
1523#[derive(Debug, Clone)]
1524pub struct NetworkEfficiencyStats {
1525    pub parallel_discovery: ParallelDiscoveryStats,
1526    pub adaptive_timeout: AdaptiveTimeoutStats,
1527    pub bandwidth_validation: BandwidthValidationStats,
1528    pub congestion_integration: CongestionIntegrationStats,
1529}
1530
1531impl Default for NetworkEfficiencyManager {
1532    fn default() -> Self {
1533        Self::new()
1534    }
1535}