ant_quic/optimization/
network.rs

1//! Network efficiency optimization components for ant-quic
2//!
3//! This module provides network-aware optimizations including:
4//! - Parallel candidate discovery across interfaces
5//! - Adaptive timeout adjustment based on network conditions
6//! - Bandwidth-aware QUIC path validation strategies
7//! - Congestion control integration during QUIC connection migration
8
9use std::{
10    collections::{HashMap, VecDeque},
11    net::{IpAddr, SocketAddr},
12    sync::{Arc, Mutex, RwLock},
13    time::{Duration, Instant},
14};
15
16use tokio::time::timeout;
17
18use tracing::{debug, info, warn};
19
20#[cfg(feature = "production-ready")]
21use tokio::time::sleep;
22
23use crate::{
24    nat_traversal_api::{CandidateAddress, PeerId},
25    candidate_discovery::NetworkInterface,
26    connection::nat_traversal::{CandidateSource, CandidateState},
27};
28
29/// Parallel candidate discovery coordinator
30#[derive(Debug)]
31pub struct ParallelDiscoveryCoordinator {
32    /// Active discovery tasks by interface
33    active_discoveries: Arc<RwLock<HashMap<String, DiscoveryTask>>>,
34    /// Discovery configuration
35    config: ParallelDiscoveryConfig,
36    /// Discovery statistics
37    stats: Arc<Mutex<ParallelDiscoveryStats>>,
38    /// Task coordination handle
39    coordination_handle: Option<tokio::task::JoinHandle<()>>,
40}
41
42/// Configuration for parallel discovery
43#[derive(Debug, Clone)]
44pub struct ParallelDiscoveryConfig {
45    /// Maximum concurrent discovery tasks
46    pub max_concurrent_tasks: usize,
47    /// Timeout for individual interface discovery
48    pub interface_timeout: Duration,
49    /// Enable interface prioritization
50    pub enable_prioritization: bool,
51    /// Preferred interface types
52    pub preferred_interface_types: Vec<InterfaceType>,
53    /// Enable adaptive parallelism based on system resources
54    pub enable_adaptive_parallelism: bool,
55}
56
57/// Network interface type for prioritization
58#[derive(Debug, Clone, Copy, PartialEq, Eq)]
59pub enum InterfaceType {
60    Ethernet,
61    WiFi,
62    Cellular,
63    Loopback,
64    VPN,
65    Unknown,
66}
67
68/// Individual discovery task state
69#[derive(Debug)]
70struct DiscoveryTask {
71    interface_name: String,
72    interface_type: InterfaceType,
73    started_at: Instant,
74    status: TaskStatus,
75    discovered_candidates: Vec<CandidateAddress>,
76    priority: u32,
77}
78
79/// Status of a discovery task
80#[derive(Debug, Clone, Copy, PartialEq, Eq)]
81enum TaskStatus {
82    Pending,
83    Running,
84    Completed,
85    Failed,
86    Timeout,
87}
88
89/// Statistics for parallel discovery
90#[derive(Debug, Default, Clone)]
91pub struct ParallelDiscoveryStats {
92    /// Total discovery tasks started
93    pub tasks_started: u64,
94    /// Total discovery tasks completed
95    pub tasks_completed: u64,
96    /// Total discovery tasks failed
97    pub tasks_failed: u64,
98    /// Average discovery time per interface
99    pub avg_discovery_time: Duration,
100    /// Total candidates discovered
101    pub total_candidates: u64,
102    /// Parallelism efficiency (0.0 - 1.0)
103    pub parallelism_efficiency: f64,
104}
105
106/// Adaptive timeout manager for network condition awareness
107#[derive(Debug)]
108pub struct AdaptiveTimeoutManager {
109    /// Network condition measurements
110    network_conditions: Arc<RwLock<NetworkConditions>>,
111    /// Timeout configurations by operation type
112    timeout_configs: HashMap<OperationType, AdaptiveTimeoutConfig>,
113    /// Timeout statistics
114    stats: Arc<Mutex<AdaptiveTimeoutStats>>,
115    /// Monitoring task handle
116    monitoring_handle: Option<tokio::task::JoinHandle<()>>,
117}
118
119/// Network conditions measurement
120#[derive(Debug, Clone)]
121pub struct NetworkConditions {
122    /// Recent RTT measurements
123    rtt_samples: VecDeque<Duration>,
124    /// Packet loss rate (0.0 - 1.0)
125    packet_loss_rate: f64,
126    /// Bandwidth estimate (bytes/sec)
127    bandwidth_estimate: u64,
128    /// Network quality score (0.0 - 1.0)
129    quality_score: f64,
130    /// Congestion level (0.0 - 1.0)
131    congestion_level: f64,
132    /// Last measurement time
133    last_measurement: Instant,
134}
135
136/// Operation types for adaptive timeouts
137#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
138pub enum OperationType {
139    CandidateDiscovery,
140    PathValidation,
141    CoordinationRequest,
142    HolePunching,
143    ConnectionEstablishment,
144}
145
146/// Adaptive timeout configuration
147#[derive(Debug, Clone)]
148struct AdaptiveTimeoutConfig {
149    /// Base timeout value
150    base_timeout: Duration,
151    /// Minimum timeout
152    min_timeout: Duration,
153    /// Maximum timeout
154    max_timeout: Duration,
155    /// RTT multiplier for timeout calculation
156    rtt_multiplier: f64,
157    /// Quality adjustment factor
158    quality_factor: f64,
159    /// Congestion adjustment factor
160    congestion_factor: f64,
161}
162
163/// Statistics for adaptive timeouts
164#[derive(Debug, Default, Clone)]
165pub struct AdaptiveTimeoutStats {
166    /// Total timeout adjustments made
167    pub adjustments_made: u64,
168    /// Average timeout value by operation
169    pub avg_timeouts: HashMap<OperationType, Duration>,
170    /// Timeout effectiveness (success rate)
171    pub timeout_effectiveness: f64,
172    /// Network condition accuracy
173    pub condition_accuracy: f64,
174}
175
176/// Bandwidth-aware path validation coordinator
177#[derive(Debug)]
178pub struct BandwidthAwareValidator {
179    /// Active validation sessions
180    active_validations: Arc<RwLock<HashMap<SocketAddr, ValidationSession>>>,
181    /// Bandwidth monitoring
182    bandwidth_monitor: Arc<Mutex<BandwidthMonitor>>,
183    /// Validation configuration
184    config: BandwidthValidationConfig,
185    /// Validation statistics
186    stats: Arc<Mutex<BandwidthValidationStats>>,
187}
188
189/// Configuration for bandwidth-aware validation
190#[derive(Debug, Clone)]
191pub struct BandwidthValidationConfig {
192    /// Maximum concurrent validations
193    pub max_concurrent_validations: usize,
194    /// Bandwidth threshold for validation throttling (bytes/sec)
195    pub bandwidth_threshold: u64,
196    /// Enable adaptive validation based on bandwidth
197    pub enable_adaptive_validation: bool,
198    /// Validation packet size
199    pub validation_packet_size: usize,
200    /// Maximum validation rate (packets/sec)
201    pub max_validation_rate: f64,
202}
203
204/// Bandwidth monitoring state
205#[derive(Debug)]
206struct BandwidthMonitor {
207    /// Recent bandwidth measurements
208    bandwidth_samples: VecDeque<BandwidthSample>,
209    /// Current bandwidth estimate
210    current_bandwidth: u64,
211    /// Bandwidth utilization (0.0 - 1.0)
212    utilization: f64,
213    /// Last measurement time
214    last_measurement: Instant,
215}
216
217/// Individual bandwidth measurement
218#[derive(Debug, Clone)]
219struct BandwidthSample {
220    timestamp: Instant,
221    bytes_transferred: u64,
222    duration: Duration,
223    bandwidth: u64,
224}
225
226/// Path validation session
227#[derive(Debug)]
228struct ValidationSession {
229    target_address: SocketAddr,
230    started_at: Instant,
231    packets_sent: u32,
232    packets_received: u32,
233    total_bytes: u64,
234    rtt_samples: Vec<Duration>,
235    bandwidth_usage: u64,
236    priority: ValidationPriority,
237}
238
239/// Priority for path validation
240#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord)]
241pub enum ValidationPriority {
242    Low,
243    Normal,
244    High,
245    Critical,
246}
247
248/// Statistics for bandwidth-aware validation
249#[derive(Debug, Default, Clone)]
250pub struct BandwidthValidationStats {
251    /// Total validations started
252    pub validations_started: u64,
253    /// Total validations completed
254    pub validations_completed: u64,
255    /// Total bandwidth used for validation
256    pub total_bandwidth_used: u64,
257    /// Average validation time
258    pub avg_validation_time: Duration,
259    /// Bandwidth efficiency (successful validations / bandwidth used)
260    pub bandwidth_efficiency: f64,
261}
262
263/// Congestion control integration for connection migration
264#[derive(Debug)]
265pub struct CongestionControlIntegrator {
266    /// Active connection migrations
267    active_migrations: Arc<RwLock<HashMap<PeerId, MigrationSession>>>,
268    /// Congestion control state
269    congestion_state: Arc<Mutex<CongestionState>>,
270    /// Integration configuration
271    config: CongestionIntegrationConfig,
272    /// Integration statistics
273    stats: Arc<Mutex<CongestionIntegrationStats>>,
274}
275
276/// Configuration for congestion control integration
277#[derive(Debug, Clone)]
278pub struct CongestionIntegrationConfig {
279    /// Enable congestion-aware migration
280    pub enable_congestion_awareness: bool,
281    /// Congestion threshold for migration decisions
282    pub congestion_threshold: f64,
283    /// Migration rate limiting
284    pub max_migrations_per_second: f64,
285    /// Enable bandwidth estimation during migration
286    pub enable_bandwidth_estimation: bool,
287    /// Congestion window scaling factor
288    pub cwnd_scaling_factor: f64,
289}
290
291/// Connection migration session
292#[derive(Debug)]
293struct MigrationSession {
294    peer_id: PeerId,
295    old_path: SocketAddr,
296    new_path: SocketAddr,
297    started_at: Instant,
298    migration_state: MigrationState,
299    congestion_window: u32,
300    rtt_estimate: Duration,
301    bandwidth_estimate: u64,
302}
303
304/// State of connection migration
305#[derive(Debug, Clone, Copy, PartialEq, Eq)]
306pub enum MigrationState {
307    Initiated,
308    PathValidating,
309    CongestionProbing,
310    Migrating,
311    Completed,
312    Failed,
313}
314
315/// Congestion control state
316#[derive(Debug)]
317struct CongestionState {
318    /// Current congestion window
319    congestion_window: u32,
320    /// Slow start threshold
321    ssthresh: u32,
322    /// RTT measurements
323    rtt_measurements: VecDeque<Duration>,
324    /// Congestion events
325    congestion_events: VecDeque<CongestionEvent>,
326    /// Current congestion level
327    congestion_level: f64,
328}
329
330/// Congestion event for tracking
331#[derive(Debug, Clone)]
332struct CongestionEvent {
333    timestamp: Instant,
334    event_type: CongestionEventType,
335    severity: f64,
336}
337
338/// Types of congestion events
339#[derive(Debug, Clone, Copy, PartialEq, Eq)]
340pub enum CongestionEventType {
341    PacketLoss,
342    Timeout,
343    ECNMark,
344    RTTIncrease,
345}
346
347/// Statistics for congestion control integration
348#[derive(Debug, Default, Clone)]
349pub struct CongestionIntegrationStats {
350    /// Total migrations attempted
351    pub migrations_attempted: u64,
352    /// Total migrations successful
353    pub migrations_successful: u64,
354    /// Average migration time
355    pub avg_migration_time: Duration,
356    /// Congestion-avoided migrations
357    pub congestion_avoided_migrations: u64,
358    /// Bandwidth utilization efficiency
359    pub bandwidth_utilization_efficiency: f64,
360}
361
362impl Default for ParallelDiscoveryConfig {
363    fn default() -> Self {
364        Self {
365            max_concurrent_tasks: 8,
366            interface_timeout: Duration::from_secs(5),
367            enable_prioritization: true,
368            preferred_interface_types: vec![
369                InterfaceType::Ethernet,
370                InterfaceType::WiFi,
371                InterfaceType::Cellular,
372            ],
373            enable_adaptive_parallelism: true,
374        }
375    }
376}
377
378impl Default for BandwidthValidationConfig {
379    fn default() -> Self {
380        Self {
381            max_concurrent_validations: 16,
382            bandwidth_threshold: 1_000_000, // 1 MB/s
383            enable_adaptive_validation: true,
384            validation_packet_size: 64,
385            max_validation_rate: 100.0, // 100 packets/sec
386        }
387    }
388}
389
390impl Default for CongestionIntegrationConfig {
391    fn default() -> Self {
392        Self {
393            enable_congestion_awareness: true,
394            congestion_threshold: 0.7, // 70% congestion level
395            max_migrations_per_second: 10.0,
396            enable_bandwidth_estimation: true,
397            cwnd_scaling_factor: 0.8,
398        }
399    }
400}
401
402impl ParallelDiscoveryCoordinator {
403    /// Create a new parallel discovery coordinator
404    pub fn new(config: ParallelDiscoveryConfig) -> Self {
405        Self {
406            active_discoveries: Arc::new(RwLock::new(HashMap::new())),
407            config,
408            stats: Arc::new(Mutex::new(ParallelDiscoveryStats::default())),
409            coordination_handle: None,
410        }
411    }
412
413    /// Start parallel discovery across multiple interfaces
414    pub async fn start_parallel_discovery(
415        &mut self,
416        interfaces: Vec<NetworkInterface>,
417        peer_id: PeerId,
418    ) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
419        info!("Starting parallel discovery across {} interfaces for peer {:?}", 
420              interfaces.len(), peer_id);
421
422        // Prioritize interfaces if enabled
423        let prioritized_interfaces = if self.config.enable_prioritization {
424            self.prioritize_interfaces(interfaces)
425        } else {
426            interfaces
427        };
428
429        // Limit concurrent tasks based on configuration and system resources
430        let max_tasks = if self.config.enable_adaptive_parallelism {
431            self.calculate_adaptive_parallelism().await
432        } else {
433            self.config.max_concurrent_tasks
434        };
435
436        let tasks_to_start = prioritized_interfaces.into_iter()
437            .take(max_tasks)
438            .collect::<Vec<_>>();
439
440        // Start discovery tasks
441        for interface in tasks_to_start {
442            self.start_interface_discovery(interface, peer_id).await?;
443        }
444
445        // Start coordination task
446        self.start_coordination_task().await?;
447
448        Ok(())
449    }
450
451    /// Prioritize interfaces based on type and characteristics
452    fn prioritize_interfaces(&self, mut interfaces: Vec<NetworkInterface>) -> Vec<NetworkInterface> {
453        interfaces.sort_by_key(|interface| {
454            let interface_type = self.classify_interface_type(&interface.name);
455            let type_priority = self.config.preferred_interface_types
456                .iter()
457                .position(|&t| t == interface_type)
458                .unwrap_or(999);
459            
460            // Lower number = higher priority
461            (type_priority, interface.addresses.len())
462        });
463
464        interfaces
465    }
466
467    /// Classify interface type from name
468    fn classify_interface_type(&self, name: &str) -> InterfaceType {
469        let name_lower = name.to_lowercase();
470        
471        if name_lower.contains("eth") || name_lower.contains("en") {
472            InterfaceType::Ethernet
473        } else if name_lower.contains("wlan") || name_lower.contains("wifi") || name_lower.contains("wl") {
474            InterfaceType::WiFi
475        } else if name_lower.contains("cell") || name_lower.contains("wwan") || name_lower.contains("ppp") {
476            InterfaceType::Cellular
477        } else if name_lower.contains("lo") || name_lower.contains("loopback") {
478            InterfaceType::Loopback
479        } else if name_lower.contains("vpn") || name_lower.contains("tun") || name_lower.contains("tap") {
480            InterfaceType::VPN
481        } else {
482            InterfaceType::Unknown
483        }
484    }
485
486    /// Calculate adaptive parallelism based on system resources
487    async fn calculate_adaptive_parallelism(&self) -> usize {
488        // Simplified adaptive calculation
489        // In production, this would consider:
490        // - CPU cores
491        // - Memory availability
492        // - Network bandwidth
493        // - Current system load
494        
495        let base_parallelism = self.config.max_concurrent_tasks;
496        let system_load_factor = 0.8; // Assume 80% system capacity
497        
498        ((base_parallelism as f64) * system_load_factor) as usize
499    }
500
501    /// Start discovery for a specific interface
502    async fn start_interface_discovery(
503        &self,
504        interface: NetworkInterface,
505        _peer_id: PeerId,
506    ) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
507        let interface_type = self.classify_interface_type(&interface.name);
508        let priority = self.calculate_interface_priority(interface_type);
509
510        let task = DiscoveryTask {
511            interface_name: interface.name.clone(),
512            interface_type,
513            started_at: Instant::now(),
514            status: TaskStatus::Pending,
515            discovered_candidates: Vec::new(),
516            priority,
517        };
518
519        // Add to active discoveries
520        {
521            let mut discoveries = self.active_discoveries.write().unwrap();
522            discoveries.insert(interface.name.clone(), task);
523        }
524
525        // Update stats
526        {
527            let mut stats = self.stats.lock().unwrap();
528            stats.tasks_started += 1;
529        }
530
531        // Start actual discovery (simplified)
532        self.perform_interface_discovery(interface).await?;
533
534        Ok(())
535    }
536
537    /// Calculate priority for interface type
538    fn calculate_interface_priority(&self, interface_type: InterfaceType) -> u32 {
539        match interface_type {
540            InterfaceType::Ethernet => 100,
541            InterfaceType::WiFi => 80,
542            InterfaceType::Cellular => 60,
543            InterfaceType::VPN => 40,
544            InterfaceType::Loopback => 20,
545            InterfaceType::Unknown => 10,
546        }
547    }
548
549    /// Perform discovery for a specific interface
550    async fn perform_interface_discovery(
551        &self,
552        interface: NetworkInterface,
553    ) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
554        let interface_name = interface.name.clone();
555        
556        // Update task status to running
557        {
558            let mut discoveries = self.active_discoveries.write().unwrap();
559            if let Some(task) = discoveries.get_mut(&interface_name) {
560                task.status = TaskStatus::Running;
561            }
562        }
563
564        // Perform discovery with timeout
565        let discovery_result = timeout(
566            self.config.interface_timeout,
567            self.discover_candidates_for_interface(interface),
568        ).await;
569
570        match discovery_result {
571            Ok(Ok(candidates)) => {
572                // Discovery successful
573                {
574                    let mut discoveries = self.active_discoveries.write().unwrap();
575                    if let Some(task) = discoveries.get_mut(&interface_name) {
576                        task.status = TaskStatus::Completed;
577                        task.discovered_candidates = candidates;
578                    }
579                }
580
581                // Update stats
582                {
583                    let mut stats = self.stats.lock().unwrap();
584                    stats.tasks_completed += 1;
585                }
586
587                debug!("Interface discovery completed for {}", interface_name);
588            }
589            Ok(Err(_)) => {
590                // Discovery failed
591                {
592                    let mut discoveries = self.active_discoveries.write().unwrap();
593                    if let Some(task) = discoveries.get_mut(&interface_name) {
594                        task.status = TaskStatus::Failed;
595                    }
596                }
597
598                // Update stats
599                {
600                    let mut stats = self.stats.lock().unwrap();
601                    stats.tasks_failed += 1;
602                }
603
604                warn!("Interface discovery failed for {}", interface_name);
605            }
606            Err(_) => {
607                // Discovery timeout
608                {
609                    let mut discoveries = self.active_discoveries.write().unwrap();
610                    if let Some(task) = discoveries.get_mut(&interface_name) {
611                        task.status = TaskStatus::Timeout;
612                    }
613                }
614
615                // Update stats
616                {
617                    let mut stats = self.stats.lock().unwrap();
618                    stats.tasks_failed += 1;
619                }
620
621                warn!("Interface discovery timeout for {}", interface_name);
622            }
623        }
624
625        Ok(())
626    }
627
628    /// Discover candidates for a specific interface
629    async fn discover_candidates_for_interface(
630        &self,
631        interface: NetworkInterface,
632    ) -> Result<Vec<CandidateAddress>, Box<dyn std::error::Error + Send + Sync>> {
633        let mut candidates = Vec::new();
634
635        for address in &interface.addresses {
636            // Skip loopback and link-local addresses for P2P
637            if self.is_valid_candidate_address(&address) {
638                let candidate = CandidateAddress {
639                    address: *address,
640                    priority: self.calculate_candidate_priority(&address, &interface),
641                    source: CandidateSource::Local,
642                    state: CandidateState::New,
643                };
644
645                candidates.push(candidate);
646            }
647        }
648
649        // Simulate some discovery time
650        #[cfg(feature = "production-ready")]
651        sleep(Duration::from_millis(100)).await;
652
653        Ok(candidates)
654    }
655
656    /// Check if address is valid for P2P candidate
657    fn is_valid_candidate_address(&self, address: &SocketAddr) -> bool {
658        match address.ip() {
659            IpAddr::V4(ipv4) => {
660                !ipv4.is_loopback() && !ipv4.is_link_local() && !ipv4.is_broadcast()
661            }
662            IpAddr::V6(ipv6) => {
663                !ipv6.is_loopback() && !ipv6.is_unspecified()
664            }
665        }
666    }
667
668    /// Calculate priority for a candidate address
669    fn calculate_candidate_priority(&self, address: &SocketAddr, interface: &NetworkInterface) -> u32 {
670        let mut priority = 1000u32;
671
672        // Prefer IPv4 over IPv6 for simplicity
673        if address.is_ipv4() {
674            priority += 100;
675        }
676
677        // Prefer non-private addresses
678        if !self.is_private_address(address) {
679            priority += 200;
680        }
681
682        // Add interface-specific priority
683        let interface_type = self.classify_interface_type(&interface.name);
684        priority += self.calculate_interface_priority(interface_type);
685
686        priority
687    }
688
689    /// Check if address is in private range
690    fn is_private_address(&self, address: &SocketAddr) -> bool {
691        match address.ip() {
692            IpAddr::V4(ipv4) => ipv4.is_private(),
693            IpAddr::V6(ipv6) => {
694                // Check for unique local addresses (fc00::/7)
695                let segments = ipv6.segments();
696                (segments[0] & 0xfe00) == 0xfc00
697            }
698        }
699    }
700
701    /// Start coordination task for managing parallel discoveries
702    async fn start_coordination_task(&mut self) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
703        let discoveries = Arc::clone(&self.active_discoveries);
704        let stats = Arc::clone(&self.stats);
705        let config = self.config.clone();
706
707        let coordination_handle = tokio::spawn(async move {
708            let mut interval = tokio::time::interval(Duration::from_millis(500));
709            
710            loop {
711                interval.tick().await;
712                Self::coordinate_discoveries(&discoveries, &stats, &config).await;
713                
714                // Check if all discoveries are complete
715                let all_complete = {
716                    let discoveries_read = discoveries.read().unwrap();
717                    discoveries_read.values().all(|task| {
718                        matches!(task.status, TaskStatus::Completed | TaskStatus::Failed | TaskStatus::Timeout)
719                    })
720                };
721
722                if all_complete {
723                    break;
724                }
725            }
726        });
727
728        self.coordination_handle = Some(coordination_handle);
729        Ok(())
730    }
731
732    /// Coordinate parallel discoveries
733    async fn coordinate_discoveries(
734        discoveries: &Arc<RwLock<HashMap<String, DiscoveryTask>>>,
735        stats: &Arc<Mutex<ParallelDiscoveryStats>>,
736        _config: &ParallelDiscoveryConfig,
737    ) {
738        let mut total_candidates = 0u64;
739        let mut completed_tasks = 0u64;
740        let mut total_discovery_time = Duration::ZERO;
741
742        {
743            let discoveries_read = discoveries.read().unwrap();
744            for task in discoveries_read.values() {
745                if task.status == TaskStatus::Completed {
746                    total_candidates += task.discovered_candidates.len() as u64;
747                    completed_tasks += 1;
748                    total_discovery_time += task.started_at.elapsed();
749                }
750            }
751        }
752
753        // Update stats
754        {
755            let mut stats_guard = stats.lock().unwrap();
756            stats_guard.total_candidates = total_candidates;
757            stats_guard.tasks_completed = completed_tasks;
758            
759            if completed_tasks > 0 {
760                stats_guard.avg_discovery_time = total_discovery_time / completed_tasks as u32;
761                stats_guard.parallelism_efficiency = completed_tasks as f64 / stats_guard.tasks_started as f64;
762            }
763        }
764    }
765
766    /// Get all discovered candidates from parallel discovery
767    pub async fn get_all_candidates(&self) -> Vec<CandidateAddress> {
768        let mut all_candidates = Vec::new();
769
770        let discoveries = self.active_discoveries.read().unwrap();
771        for task in discoveries.values() {
772            if task.status == TaskStatus::Completed {
773                all_candidates.extend(task.discovered_candidates.clone());
774            }
775        }
776
777        // Sort by priority (highest first)
778        all_candidates.sort_by(|a, b| b.priority.cmp(&a.priority));
779
780        all_candidates
781    }
782
783    /// Get parallel discovery statistics
784    pub async fn get_stats(&self) -> ParallelDiscoveryStats {
785        self.stats.lock().unwrap().clone()
786    }
787
788    /// Shutdown parallel discovery coordinator
789    pub async fn shutdown(&mut self) {
790        if let Some(handle) = self.coordination_handle.take() {
791            handle.abort();
792        }
793
794        // Clear active discoveries
795        {
796            let mut discoveries = self.active_discoveries.write().unwrap();
797            discoveries.clear();
798        }
799
800        info!("Parallel discovery coordinator shutdown complete");
801    }
802}
803
804impl AdaptiveTimeoutManager {
805    /// Create a new adaptive timeout manager
806    pub fn new() -> Self {
807        let mut timeout_configs = HashMap::new();
808        
809        // Initialize default timeout configurations for each operation type
810        timeout_configs.insert(OperationType::CandidateDiscovery, AdaptiveTimeoutConfig {
811            base_timeout: Duration::from_secs(5),
812            min_timeout: Duration::from_millis(500),
813            max_timeout: Duration::from_secs(30),
814            rtt_multiplier: 4.0,
815            quality_factor: 0.5,
816            congestion_factor: 0.3,
817        });
818
819        timeout_configs.insert(OperationType::PathValidation, AdaptiveTimeoutConfig {
820            base_timeout: Duration::from_secs(3),
821            min_timeout: Duration::from_millis(200),
822            max_timeout: Duration::from_secs(15),
823            rtt_multiplier: 3.0,
824            quality_factor: 0.4,
825            congestion_factor: 0.4,
826        });
827
828        timeout_configs.insert(OperationType::CoordinationRequest, AdaptiveTimeoutConfig {
829            base_timeout: Duration::from_secs(10),
830            min_timeout: Duration::from_secs(1),
831            max_timeout: Duration::from_secs(60),
832            rtt_multiplier: 5.0,
833            quality_factor: 0.6,
834            congestion_factor: 0.2,
835        });
836
837        timeout_configs.insert(OperationType::HolePunching, AdaptiveTimeoutConfig {
838            base_timeout: Duration::from_secs(2),
839            min_timeout: Duration::from_millis(100),
840            max_timeout: Duration::from_secs(10),
841            rtt_multiplier: 2.0,
842            quality_factor: 0.3,
843            congestion_factor: 0.5,
844        });
845
846        timeout_configs.insert(OperationType::ConnectionEstablishment, AdaptiveTimeoutConfig {
847            base_timeout: Duration::from_secs(15),
848            min_timeout: Duration::from_secs(2),
849            max_timeout: Duration::from_secs(120),
850            rtt_multiplier: 6.0,
851            quality_factor: 0.7,
852            congestion_factor: 0.1,
853        });
854
855        Self {
856            network_conditions: Arc::new(RwLock::new(NetworkConditions {
857                rtt_samples: VecDeque::new(),
858                packet_loss_rate: 0.0,
859                bandwidth_estimate: 1_000_000, // 1 MB/s default
860                quality_score: 0.8, // Good quality default
861                congestion_level: 0.2, // Low congestion default
862                last_measurement: Instant::now(),
863            })),
864            timeout_configs,
865            stats: Arc::new(Mutex::new(AdaptiveTimeoutStats::default())),
866            monitoring_handle: None,
867        }
868    }
869
870    /// Start the adaptive timeout manager with network monitoring
871    pub async fn start(&mut self) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
872        let network_conditions = Arc::clone(&self.network_conditions);
873        let stats = Arc::clone(&self.stats);
874
875        let monitoring_handle = tokio::spawn(async move {
876            let mut interval = tokio::time::interval(Duration::from_secs(1));
877            
878            loop {
879                interval.tick().await;
880                Self::update_network_conditions(&network_conditions, &stats).await;
881            }
882        });
883
884        self.monitoring_handle = Some(monitoring_handle);
885        info!("Adaptive timeout manager started");
886        Ok(())
887    }
888
889    /// Calculate adaptive timeout for an operation
890    pub async fn calculate_timeout(&self, operation: OperationType) -> Duration {
891        let config = self.timeout_configs.get(&operation)
892            .cloned()
893            .unwrap_or_else(|| AdaptiveTimeoutConfig {
894                base_timeout: Duration::from_secs(5),
895                min_timeout: Duration::from_millis(500),
896                max_timeout: Duration::from_secs(30),
897                rtt_multiplier: 4.0,
898                quality_factor: 0.5,
899                congestion_factor: 0.3,
900            });
901
902        let conditions = self.network_conditions.read().unwrap();
903        
904        // Calculate base timeout from RTT if available
905        let rtt_based_timeout = if let Some(avg_rtt) = self.calculate_average_rtt(&conditions.rtt_samples) {
906            Duration::from_millis((avg_rtt.as_millis() as f64 * config.rtt_multiplier) as u64)
907        } else {
908            config.base_timeout
909        };
910
911        // Adjust for network quality
912        let quality_adjustment = 1.0 + (1.0 - conditions.quality_score) * config.quality_factor;
913        
914        // Adjust for congestion
915        let congestion_adjustment = 1.0 + conditions.congestion_level * config.congestion_factor;
916
917        // Calculate final timeout
918        let adjusted_timeout = Duration::from_millis(
919            (rtt_based_timeout.as_millis() as f64 * quality_adjustment * congestion_adjustment) as u64
920        );
921
922        // Clamp to min/max bounds
923        let final_timeout = adjusted_timeout
924            .max(config.min_timeout)
925            .min(config.max_timeout);
926
927        // Update stats
928        {
929            let mut stats = self.stats.lock().unwrap();
930            stats.adjustments_made += 1;
931            stats.avg_timeouts.insert(operation, final_timeout);
932        }
933
934        debug!("Calculated adaptive timeout for {:?}: {:?} (quality: {:.2}, congestion: {:.2})",
935               operation, final_timeout, conditions.quality_score, conditions.congestion_level);
936
937        final_timeout
938    }
939
940    /// Record network measurement for adaptive timeout calculation
941    pub async fn record_measurement(
942        &self,
943        rtt: Duration,
944        packet_loss: bool,
945        bandwidth: Option<u64>,
946    ) {
947        let mut conditions = self.network_conditions.write().unwrap();
948        
949        // Add RTT sample
950        conditions.rtt_samples.push_back(rtt);
951        if conditions.rtt_samples.len() > 50 {
952            conditions.rtt_samples.pop_front();
953        }
954
955        // Update packet loss rate (exponential moving average)
956        let loss_sample = if packet_loss { 1.0 } else { 0.0 };
957        conditions.packet_loss_rate = conditions.packet_loss_rate * 0.9 + loss_sample * 0.1;
958
959        // Update bandwidth estimate if provided
960        if let Some(bw) = bandwidth {
961            conditions.bandwidth_estimate = (conditions.bandwidth_estimate as f64 * 0.8 + bw as f64 * 0.2) as u64;
962        }
963
964        // Update quality score based on RTT and packet loss
965        let rtt_quality = 1.0 - (rtt.as_millis() as f64 / 1000.0).min(1.0);
966        let loss_quality = 1.0 - conditions.packet_loss_rate;
967        conditions.quality_score = (rtt_quality + loss_quality) / 2.0;
968
969        // Update congestion level based on RTT variance and packet loss
970        let rtt_variance = self.calculate_rtt_variance(&conditions.rtt_samples);
971        conditions.congestion_level = (conditions.packet_loss_rate + rtt_variance).min(1.0);
972
973        conditions.last_measurement = Instant::now();
974    }
975
976    /// Calculate average RTT from samples
977    fn calculate_average_rtt(&self, samples: &VecDeque<Duration>) -> Option<Duration> {
978        if samples.is_empty() {
979            return None;
980        }
981
982        let total_ms: u64 = samples.iter().map(|d| d.as_millis() as u64).sum();
983        Some(Duration::from_millis(total_ms / samples.len() as u64))
984    }
985
986    /// Calculate RTT variance for congestion detection
987    fn calculate_rtt_variance(&self, samples: &VecDeque<Duration>) -> f64 {
988        if samples.len() < 2 {
989            return 0.0;
990        }
991
992        let avg = self.calculate_average_rtt(samples).unwrap().as_millis() as f64;
993        let variance: f64 = samples.iter()
994            .map(|d| {
995                let diff = d.as_millis() as f64 - avg;
996                diff * diff
997            })
998            .sum::<f64>() / samples.len() as f64;
999
1000        (variance.sqrt() / avg).min(1.0)
1001    }
1002
1003    /// Update network conditions periodically
1004    async fn update_network_conditions(
1005        network_conditions: &Arc<RwLock<NetworkConditions>>,
1006        _stats: &Arc<Mutex<AdaptiveTimeoutStats>>,
1007    ) {
1008        // Periodic network condition updates
1009        // In production, this would:
1010        // - Probe network conditions
1011        // - Update bandwidth estimates
1012        // - Detect congestion patterns
1013        // - Adjust quality scores
1014
1015        let mut conditions = network_conditions.write().unwrap();
1016        
1017        // Age out old RTT samples (keep last 100 samples)
1018        while conditions.rtt_samples.len() > 100 {
1019            conditions.rtt_samples.pop_front();
1020        }
1021
1022        // Decay packet loss rate over time
1023        conditions.packet_loss_rate *= 0.99;
1024        
1025        // Update quality score based on recent measurements
1026        if conditions.last_measurement.elapsed() > Duration::from_secs(10) {
1027            // No recent measurements, assume degraded quality
1028            conditions.quality_score *= 0.95;
1029        }
1030    }
1031
1032    /// Get current network conditions
1033    pub async fn get_network_conditions(&self) -> NetworkConditions {
1034        self.network_conditions.read().unwrap().clone()
1035    }
1036
1037    /// Get adaptive timeout statistics
1038    pub async fn get_stats(&self) -> AdaptiveTimeoutStats {
1039        self.stats.lock().unwrap().clone()
1040    }
1041
1042    /// Shutdown the adaptive timeout manager
1043    pub async fn shutdown(&mut self) {
1044        if let Some(handle) = self.monitoring_handle.take() {
1045            handle.abort();
1046        }
1047
1048        info!("Adaptive timeout manager shutdown complete");
1049    }
1050}
1051
1052impl BandwidthAwareValidator {
1053    /// Create a new bandwidth-aware validator
1054    pub fn new(config: BandwidthValidationConfig) -> Self {
1055        Self {
1056            active_validations: Arc::new(RwLock::new(HashMap::new())),
1057            bandwidth_monitor: Arc::new(Mutex::new(BandwidthMonitor {
1058                bandwidth_samples: VecDeque::new(),
1059                current_bandwidth: 1_000_000, // 1 MB/s default
1060                utilization: 0.0,
1061                last_measurement: Instant::now(),
1062            })),
1063            config,
1064            stats: Arc::new(Mutex::new(BandwidthValidationStats::default())),
1065        }
1066    }
1067
1068    /// Start path validation with bandwidth awareness
1069    pub async fn start_validation(
1070        &self,
1071        target_address: SocketAddr,
1072        priority: ValidationPriority,
1073    ) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
1074        // Check if we can start new validation based on bandwidth constraints
1075        if !self.can_start_validation().await {
1076            return Err("Bandwidth limit reached, cannot start validation".into());
1077        }
1078
1079        let session = ValidationSession {
1080            target_address,
1081            started_at: Instant::now(),
1082            packets_sent: 0,
1083            packets_received: 0,
1084            total_bytes: 0,
1085            rtt_samples: Vec::new(),
1086            bandwidth_usage: 0,
1087            priority,
1088        };
1089
1090        // Add to active validations
1091        {
1092            let mut validations = self.active_validations.write().unwrap();
1093            validations.insert(target_address, session);
1094        }
1095
1096        // Update stats
1097        {
1098            let mut stats = self.stats.lock().unwrap();
1099            stats.validations_started += 1;
1100        }
1101
1102        debug!("Started bandwidth-aware validation for {}", target_address);
1103        Ok(())
1104    }
1105
1106    /// Check if new validation can be started based on bandwidth constraints
1107    async fn can_start_validation(&self) -> bool {
1108        let validations = self.active_validations.read().unwrap();
1109        let bandwidth_monitor = self.bandwidth_monitor.lock().unwrap();
1110
1111        // Check concurrent validation limit
1112        if validations.len() >= self.config.max_concurrent_validations {
1113            return false;
1114        }
1115
1116        // Check bandwidth utilization if adaptive validation is enabled
1117        if self.config.enable_adaptive_validation {
1118            let current_usage: u64 = validations.values()
1119                .map(|session| session.bandwidth_usage)
1120                .sum();
1121
1122            let available_bandwidth = bandwidth_monitor.current_bandwidth;
1123            let utilization = current_usage as f64 / available_bandwidth as f64;
1124
1125            if utilization > 0.8 { // 80% utilization threshold
1126                return false;
1127            }
1128        }
1129
1130        true
1131    }
1132
1133    /// Record validation packet transmission
1134    pub async fn record_packet_sent(
1135        &self,
1136        target_address: SocketAddr,
1137        packet_size: usize,
1138    ) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
1139        let mut validations = self.active_validations.write().unwrap();
1140        
1141        if let Some(session) = validations.get_mut(&target_address) {
1142            session.packets_sent += 1;
1143            session.total_bytes += packet_size as u64;
1144            session.bandwidth_usage += packet_size as u64;
1145        }
1146
1147        // Update bandwidth monitoring
1148        self.update_bandwidth_usage(packet_size as u64).await;
1149
1150        Ok(())
1151    }
1152
1153    /// Record validation packet reception
1154    pub async fn record_packet_received(
1155        &self,
1156        target_address: SocketAddr,
1157        rtt: Duration,
1158    ) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
1159        let mut validations = self.active_validations.write().unwrap();
1160        
1161        if let Some(session) = validations.get_mut(&target_address) {
1162            session.packets_received += 1;
1163            session.rtt_samples.push(rtt);
1164        }
1165
1166        Ok(())
1167    }
1168
1169    /// Update bandwidth usage monitoring
1170    async fn update_bandwidth_usage(&self, bytes_used: u64) {
1171        let mut monitor = self.bandwidth_monitor.lock().unwrap();
1172        
1173        let now = Instant::now();
1174        let sample = BandwidthSample {
1175            timestamp: now,
1176            bytes_transferred: bytes_used,
1177            duration: now.duration_since(monitor.last_measurement),
1178            bandwidth: if monitor.last_measurement.elapsed().as_secs() > 0 {
1179                bytes_used / monitor.last_measurement.elapsed().as_secs()
1180            } else {
1181                0
1182            },
1183        };
1184
1185        monitor.bandwidth_samples.push_back(sample);
1186        if monitor.bandwidth_samples.len() > 100 {
1187            monitor.bandwidth_samples.pop_front();
1188        }
1189
1190        // Update current bandwidth estimate
1191        if !monitor.bandwidth_samples.is_empty() {
1192            let total_bytes: u64 = monitor.bandwidth_samples.iter()
1193                .map(|s| s.bytes_transferred)
1194                .sum();
1195            let total_time: Duration = monitor.bandwidth_samples.iter()
1196                .map(|s| s.duration)
1197                .sum();
1198            
1199            if total_time.as_secs() > 0 {
1200                monitor.current_bandwidth = total_bytes / total_time.as_secs();
1201            }
1202        }
1203
1204        monitor.last_measurement = now;
1205    }
1206
1207    /// Complete validation session
1208    pub async fn complete_validation(
1209        &self,
1210        target_address: SocketAddr,
1211        success: bool,
1212    ) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
1213        let session = {
1214            let mut validations = self.active_validations.write().unwrap();
1215            validations.remove(&target_address)
1216        };
1217
1218        if let Some(session) = session {
1219            let duration = session.started_at.elapsed();
1220            
1221            // Update stats
1222            {
1223                let mut stats = self.stats.lock().unwrap();
1224                if success {
1225                    stats.validations_completed += 1;
1226                }
1227                stats.total_bandwidth_used += session.bandwidth_usage;
1228                stats.avg_validation_time = if stats.validations_completed > 0 {
1229                    Duration::from_millis(
1230                        (stats.avg_validation_time.as_millis() as u64 * (stats.validations_completed - 1) + 
1231                         duration.as_millis() as u64) / stats.validations_completed
1232                    )
1233                } else {
1234                    duration
1235                };
1236                
1237                if stats.total_bandwidth_used > 0 {
1238                    stats.bandwidth_efficiency = stats.validations_completed as f64 / 
1239                                                stats.total_bandwidth_used as f64 * 1000.0; // per KB
1240                }
1241            }
1242
1243            debug!("Completed validation for {} in {:?} (success: {})", 
1244                   target_address, duration, success);
1245        }
1246
1247        Ok(())
1248    }
1249
1250    /// Get bandwidth validation statistics
1251    pub async fn get_stats(&self) -> BandwidthValidationStats {
1252        self.stats.lock().unwrap().clone()
1253    }
1254}
1255
1256impl CongestionControlIntegrator {
1257    /// Create a new congestion control integrator
1258    pub fn new(config: CongestionIntegrationConfig) -> Self {
1259        Self {
1260            active_migrations: Arc::new(RwLock::new(HashMap::new())),
1261            congestion_state: Arc::new(Mutex::new(CongestionState {
1262                congestion_window: 10, // Initial cwnd
1263                ssthresh: 65535,
1264                rtt_measurements: VecDeque::new(),
1265                congestion_events: VecDeque::new(),
1266                congestion_level: 0.0,
1267            })),
1268            config,
1269            stats: Arc::new(Mutex::new(CongestionIntegrationStats::default())),
1270        }
1271    }
1272
1273    /// Start connection migration with congestion awareness
1274    pub async fn start_migration(
1275        &self,
1276        peer_id: PeerId,
1277        old_path: SocketAddr,
1278        new_path: SocketAddr,
1279    ) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
1280        // Check if migration should be delayed due to congestion
1281        if self.config.enable_congestion_awareness {
1282            let congestion_state = self.congestion_state.lock().unwrap();
1283            if congestion_state.congestion_level > self.config.congestion_threshold {
1284                return Err("Migration delayed due to high congestion".into());
1285            }
1286        }
1287
1288        let session = MigrationSession {
1289            peer_id,
1290            old_path,
1291            new_path,
1292            started_at: Instant::now(),
1293            migration_state: MigrationState::Initiated,
1294            congestion_window: {
1295                let state = self.congestion_state.lock().unwrap();
1296                (state.congestion_window as f64 * self.config.cwnd_scaling_factor) as u32
1297            },
1298            rtt_estimate: Duration::from_millis(100), // Default RTT
1299            bandwidth_estimate: 1_000_000, // 1 MB/s default
1300        };
1301
1302        // Add to active migrations
1303        {
1304            let mut migrations = self.active_migrations.write().unwrap();
1305            migrations.insert(peer_id, session);
1306        }
1307
1308        // Update stats
1309        {
1310            let mut stats = self.stats.lock().unwrap();
1311            stats.migrations_attempted += 1;
1312        }
1313
1314        info!("Started congestion-aware migration for peer {:?}: {} -> {}", 
1315              peer_id, old_path, new_path);
1316        Ok(())
1317    }
1318
1319    /// Update migration state based on congestion feedback
1320    pub async fn update_migration_state(
1321        &self,
1322        peer_id: PeerId,
1323        new_state: MigrationState,
1324        rtt: Option<Duration>,
1325        bandwidth: Option<u64>,
1326    ) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
1327        let mut migrations = self.active_migrations.write().unwrap();
1328        
1329        if let Some(session) = migrations.get_mut(&peer_id) {
1330            session.migration_state = new_state;
1331            
1332            if let Some(rtt) = rtt {
1333                session.rtt_estimate = rtt;
1334                
1335                // Update global congestion state
1336                let mut congestion_state = self.congestion_state.lock().unwrap();
1337                congestion_state.rtt_measurements.push_back(rtt);
1338                if congestion_state.rtt_measurements.len() > 50 {
1339                    congestion_state.rtt_measurements.pop_front();
1340                }
1341            }
1342            
1343            if let Some(bw) = bandwidth {
1344                session.bandwidth_estimate = bw;
1345            }
1346
1347            // Check if migration completed
1348            if matches!(new_state, MigrationState::Completed) {
1349                let duration = session.started_at.elapsed();
1350                
1351                // Update stats
1352                let mut stats = self.stats.lock().unwrap();
1353                stats.migrations_successful += 1;
1354                stats.avg_migration_time = if stats.migrations_successful > 0 {
1355                    Duration::from_millis(
1356                        (stats.avg_migration_time.as_millis() as u64 * (stats.migrations_successful - 1) + 
1357                         duration.as_millis() as u64) / stats.migrations_successful
1358                    )
1359                } else {
1360                    duration
1361                };
1362
1363                debug!("Migration completed for peer {:?} in {:?}", peer_id, duration);
1364            }
1365        }
1366
1367        Ok(())
1368    }
1369
1370    /// Record congestion event
1371    pub async fn record_congestion_event(
1372        &self,
1373        event_type: CongestionEventType,
1374        severity: f64,
1375    ) {
1376        let event = CongestionEvent {
1377            timestamp: Instant::now(),
1378            event_type,
1379            severity,
1380        };
1381
1382        let mut congestion_state = self.congestion_state.lock().unwrap();
1383        congestion_state.congestion_events.push_back(event);
1384        
1385        // Keep only recent events
1386        if congestion_state.congestion_events.len() > 100 {
1387            congestion_state.congestion_events.pop_front();
1388        }
1389
1390        // Update congestion level based on recent events
1391        let recent_events: Vec<_> = congestion_state.congestion_events.iter()
1392            .filter(|e| e.timestamp.elapsed() < Duration::from_secs(10))
1393            .collect();
1394
1395        if !recent_events.is_empty() {
1396            let avg_severity: f64 = recent_events.iter()
1397                .map(|e| e.severity)
1398                .sum::<f64>() / recent_events.len() as f64;
1399            
1400            congestion_state.congestion_level = avg_severity;
1401        }
1402
1403        // Adjust congestion window based on event
1404        match event_type {
1405            CongestionEventType::PacketLoss | CongestionEventType::Timeout => {
1406                congestion_state.ssthresh = congestion_state.congestion_window / 2;
1407                congestion_state.congestion_window = congestion_state.ssthresh;
1408            }
1409            CongestionEventType::ECNMark => {
1410                congestion_state.congestion_window = 
1411                    (congestion_state.congestion_window as f64 * 0.8) as u32;
1412            }
1413            CongestionEventType::RTTIncrease => {
1414                // Gradual reduction for RTT increase
1415                congestion_state.congestion_window = 
1416                    (congestion_state.congestion_window as f64 * 0.95) as u32;
1417            }
1418        }
1419
1420        debug!("Recorded congestion event: {:?} (severity: {:.2}, new cwnd: {})", 
1421               event_type, severity, congestion_state.congestion_window);
1422    }
1423
1424    /// Get congestion control integration statistics
1425    pub async fn get_stats(&self) -> CongestionIntegrationStats {
1426        self.stats.lock().unwrap().clone()
1427    }
1428}
1429
1430/// Network efficiency optimization manager that coordinates all network optimization components
1431#[derive(Debug)]
1432pub struct NetworkEfficiencyManager {
1433    parallel_discovery: ParallelDiscoveryCoordinator,
1434    adaptive_timeout: AdaptiveTimeoutManager,
1435    bandwidth_validator: BandwidthAwareValidator,
1436    congestion_integrator: CongestionControlIntegrator,
1437    is_running: bool,
1438}
1439
1440impl NetworkEfficiencyManager {
1441    /// Create a new network efficiency manager with default configurations
1442    pub fn new() -> Self {
1443        Self {
1444            parallel_discovery: ParallelDiscoveryCoordinator::new(ParallelDiscoveryConfig::default()),
1445            adaptive_timeout: AdaptiveTimeoutManager::new(),
1446            bandwidth_validator: BandwidthAwareValidator::new(BandwidthValidationConfig::default()),
1447            congestion_integrator: CongestionControlIntegrator::new(CongestionIntegrationConfig::default()),
1448            is_running: false,
1449        }
1450    }
1451
1452    /// Create a new network efficiency manager with custom configurations
1453    pub fn with_configs(
1454        discovery_config: ParallelDiscoveryConfig,
1455        validation_config: BandwidthValidationConfig,
1456        congestion_config: CongestionIntegrationConfig,
1457    ) -> Self {
1458        Self {
1459            parallel_discovery: ParallelDiscoveryCoordinator::new(discovery_config),
1460            adaptive_timeout: AdaptiveTimeoutManager::new(),
1461            bandwidth_validator: BandwidthAwareValidator::new(validation_config),
1462            congestion_integrator: CongestionControlIntegrator::new(congestion_config),
1463            is_running: false,
1464        }
1465    }
1466
1467    /// Start all network efficiency components
1468    pub async fn start(&mut self) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
1469        if self.is_running {
1470            return Ok(());
1471        }
1472
1473        self.adaptive_timeout.start().await?;
1474
1475        self.is_running = true;
1476        info!("Network efficiency manager started");
1477        Ok(())
1478    }
1479
1480    /// Get parallel discovery coordinator reference
1481    pub fn parallel_discovery(&mut self) -> &mut ParallelDiscoveryCoordinator {
1482        &mut self.parallel_discovery
1483    }
1484
1485    /// Get adaptive timeout manager reference
1486    pub fn adaptive_timeout(&self) -> &AdaptiveTimeoutManager {
1487        &self.adaptive_timeout
1488    }
1489
1490    /// Get bandwidth validator reference
1491    pub fn bandwidth_validator(&self) -> &BandwidthAwareValidator {
1492        &self.bandwidth_validator
1493    }
1494
1495    /// Get congestion integrator reference
1496    pub fn congestion_integrator(&self) -> &CongestionControlIntegrator {
1497        &self.congestion_integrator
1498    }
1499
1500    /// Get comprehensive network efficiency statistics
1501    pub async fn get_comprehensive_stats(&self) -> NetworkEfficiencyStats {
1502        NetworkEfficiencyStats {
1503            parallel_discovery: self.parallel_discovery.get_stats().await,
1504            adaptive_timeout: self.adaptive_timeout.get_stats().await,
1505            bandwidth_validation: self.bandwidth_validator.get_stats().await,
1506            congestion_integration: self.congestion_integrator.get_stats().await,
1507        }
1508    }
1509
1510    /// Shutdown all network efficiency components
1511    pub async fn shutdown(&mut self) {
1512        if !self.is_running {
1513            return;
1514        }
1515
1516        self.parallel_discovery.shutdown().await;
1517        self.adaptive_timeout.shutdown().await;
1518
1519        self.is_running = false;
1520        info!("Network efficiency manager shutdown complete");
1521    }
1522}
1523
1524/// Comprehensive network efficiency statistics
1525#[derive(Debug, Clone)]
1526pub struct NetworkEfficiencyStats {
1527    pub parallel_discovery: ParallelDiscoveryStats,
1528    pub adaptive_timeout: AdaptiveTimeoutStats,
1529    pub bandwidth_validation: BandwidthValidationStats,
1530    pub congestion_integration: CongestionIntegrationStats,
1531}
1532
1533impl Default for NetworkEfficiencyManager {
1534    fn default() -> Self {
1535        Self::new()
1536    }
1537}