ant_quic/optimization/
network.rs

1//! Network efficiency optimization components for ant-quic
2//!
3//! This module provides network-aware optimizations including:
4//! - Parallel candidate discovery across interfaces
5//! - Adaptive timeout adjustment based on network conditions
6//! - Bandwidth-aware QUIC path validation strategies
7//! - Congestion control integration during QUIC connection migration
8
9use std::{
10    collections::{HashMap, VecDeque},
11    net::{IpAddr, SocketAddr},
12    sync::{Arc, Mutex, RwLock},
13    time::{Duration, Instant},
14};
15
16use tokio::time::timeout;
17
18use tracing::{debug, info, warn};
19
20use tokio::time::sleep;
21
22use crate::{
23    candidate_discovery::NetworkInterface,
24    connection::nat_traversal::{CandidateSource, CandidateState},
25    nat_traversal_api::{CandidateAddress, PeerId},
26};
27
28/// Parallel candidate discovery coordinator
29#[derive(Debug)]
30pub struct ParallelDiscoveryCoordinator {
31    /// Active discovery tasks by interface
32    active_discoveries: Arc<RwLock<HashMap<String, DiscoveryTask>>>,
33    /// Discovery configuration
34    config: ParallelDiscoveryConfig,
35    /// Discovery statistics
36    stats: Arc<Mutex<ParallelDiscoveryStats>>,
37    /// Task coordination handle
38    coordination_handle: Option<tokio::task::JoinHandle<()>>,
39}
40
41/// Configuration for parallel discovery
42#[derive(Debug, Clone)]
43pub struct ParallelDiscoveryConfig {
44    /// Maximum concurrent discovery tasks
45    pub max_concurrent_tasks: usize,
46    /// Timeout for individual interface discovery
47    pub interface_timeout: Duration,
48    /// Enable interface prioritization
49    pub enable_prioritization: bool,
50    /// Preferred interface types
51    pub preferred_interface_types: Vec<InterfaceType>,
52    /// Enable adaptive parallelism based on system resources
53    pub enable_adaptive_parallelism: bool,
54}
55
56/// Network interface type for prioritization
57#[derive(Debug, Clone, Copy, PartialEq, Eq)]
58pub enum InterfaceType {
59    Ethernet,
60    WiFi,
61    Cellular,
62    Loopback,
63    VPN,
64    Unknown,
65}
66
67/// Individual discovery task state
68#[derive(Debug)]
69struct DiscoveryTask {
70    interface_name: String,
71    interface_type: InterfaceType,
72    started_at: Instant,
73    status: TaskStatus,
74    discovered_candidates: Vec<CandidateAddress>,
75    priority: u32,
76}
77
78/// Status of a discovery task
79#[derive(Debug, Clone, Copy, PartialEq, Eq)]
80enum TaskStatus {
81    Pending,
82    Running,
83    Completed,
84    Failed,
85    Timeout,
86}
87
88/// Statistics for parallel discovery
89#[derive(Debug, Default, Clone)]
90pub struct ParallelDiscoveryStats {
91    /// Total discovery tasks started
92    pub tasks_started: u64,
93    /// Total discovery tasks completed
94    pub tasks_completed: u64,
95    /// Total discovery tasks failed
96    pub tasks_failed: u64,
97    /// Average discovery time per interface
98    pub avg_discovery_time: Duration,
99    /// Total candidates discovered
100    pub total_candidates: u64,
101    /// Parallelism efficiency (0.0 - 1.0)
102    pub parallelism_efficiency: f64,
103}
104
105/// Adaptive timeout manager for network condition awareness
106#[derive(Debug)]
107pub struct AdaptiveTimeoutManager {
108    /// Network condition measurements
109    network_conditions: Arc<RwLock<NetworkConditions>>,
110    /// Timeout configurations by operation type
111    timeout_configs: HashMap<OperationType, AdaptiveTimeoutConfig>,
112    /// Timeout statistics
113    stats: Arc<Mutex<AdaptiveTimeoutStats>>,
114    /// Monitoring task handle
115    monitoring_handle: Option<tokio::task::JoinHandle<()>>,
116}
117
118/// Network conditions measurement
119#[derive(Debug, Clone)]
120pub struct NetworkConditions {
121    /// Recent RTT measurements
122    rtt_samples: VecDeque<Duration>,
123    /// Packet loss rate (0.0 - 1.0)
124    packet_loss_rate: f64,
125    /// Bandwidth estimate (bytes/sec)
126    bandwidth_estimate: u64,
127    /// Network quality score (0.0 - 1.0)
128    quality_score: f64,
129    /// Congestion level (0.0 - 1.0)
130    congestion_level: f64,
131    /// Last measurement time
132    last_measurement: Instant,
133}
134
135/// Operation types for adaptive timeouts
136#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
137pub enum OperationType {
138    CandidateDiscovery,
139    PathValidation,
140    CoordinationRequest,
141    HolePunching,
142    ConnectionEstablishment,
143}
144
145/// Adaptive timeout configuration
146#[derive(Debug, Clone)]
147struct AdaptiveTimeoutConfig {
148    /// Base timeout value
149    base_timeout: Duration,
150    /// Minimum timeout
151    min_timeout: Duration,
152    /// Maximum timeout
153    max_timeout: Duration,
154    /// RTT multiplier for timeout calculation
155    rtt_multiplier: f64,
156    /// Quality adjustment factor
157    quality_factor: f64,
158    /// Congestion adjustment factor
159    congestion_factor: f64,
160}
161
162/// Statistics for adaptive timeouts
163#[derive(Debug, Default, Clone)]
164pub struct AdaptiveTimeoutStats {
165    /// Total timeout adjustments made
166    pub adjustments_made: u64,
167    /// Average timeout value by operation
168    pub avg_timeouts: HashMap<OperationType, Duration>,
169    /// Timeout effectiveness (success rate)
170    pub timeout_effectiveness: f64,
171    /// Network condition accuracy
172    pub condition_accuracy: f64,
173}
174
175/// Bandwidth-aware path validation coordinator
176#[derive(Debug)]
177pub struct BandwidthAwareValidator {
178    /// Active validation sessions
179    active_validations: Arc<RwLock<HashMap<SocketAddr, ValidationSession>>>,
180    /// Bandwidth monitoring
181    bandwidth_monitor: Arc<Mutex<BandwidthMonitor>>,
182    /// Validation configuration
183    config: BandwidthValidationConfig,
184    /// Validation statistics
185    stats: Arc<Mutex<BandwidthValidationStats>>,
186}
187
188/// Configuration for bandwidth-aware validation
189#[derive(Debug, Clone)]
190pub struct BandwidthValidationConfig {
191    /// Maximum concurrent validations
192    pub max_concurrent_validations: usize,
193    /// Bandwidth threshold for validation throttling (bytes/sec)
194    pub bandwidth_threshold: u64,
195    /// Enable adaptive validation based on bandwidth
196    pub enable_adaptive_validation: bool,
197    /// Validation packet size
198    pub validation_packet_size: usize,
199    /// Maximum validation rate (packets/sec)
200    pub max_validation_rate: f64,
201}
202
203/// Bandwidth monitoring state
204#[derive(Debug)]
205struct BandwidthMonitor {
206    /// Recent bandwidth measurements
207    bandwidth_samples: VecDeque<BandwidthSample>,
208    /// Current bandwidth estimate
209    current_bandwidth: u64,
210    /// Bandwidth utilization (0.0 - 1.0)
211    utilization: f64,
212    /// Last measurement time
213    last_measurement: Instant,
214}
215
216/// Individual bandwidth measurement
217#[derive(Debug, Clone)]
218struct BandwidthSample {
219    timestamp: Instant,
220    bytes_transferred: u64,
221    duration: Duration,
222    bandwidth: u64,
223}
224
225/// Path validation session
226#[derive(Debug)]
227struct ValidationSession {
228    target_address: SocketAddr,
229    started_at: Instant,
230    packets_sent: u32,
231    packets_received: u32,
232    total_bytes: u64,
233    rtt_samples: Vec<Duration>,
234    bandwidth_usage: u64,
235    priority: ValidationPriority,
236}
237
238/// Priority for path validation
239#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord)]
240pub enum ValidationPriority {
241    Low,
242    Normal,
243    High,
244    Critical,
245}
246
247/// Statistics for bandwidth-aware validation
248#[derive(Debug, Default, Clone)]
249pub struct BandwidthValidationStats {
250    /// Total validations started
251    pub validations_started: u64,
252    /// Total validations completed
253    pub validations_completed: u64,
254    /// Total bandwidth used for validation
255    pub total_bandwidth_used: u64,
256    /// Average validation time
257    pub avg_validation_time: Duration,
258    /// Bandwidth efficiency (successful validations / bandwidth used)
259    pub bandwidth_efficiency: f64,
260}
261
262/// Congestion control integration for connection migration
263#[derive(Debug)]
264pub struct CongestionControlIntegrator {
265    /// Active connection migrations
266    active_migrations: Arc<RwLock<HashMap<PeerId, MigrationSession>>>,
267    /// Congestion control state
268    congestion_state: Arc<Mutex<CongestionState>>,
269    /// Integration configuration
270    config: CongestionIntegrationConfig,
271    /// Integration statistics
272    stats: Arc<Mutex<CongestionIntegrationStats>>,
273}
274
275/// Configuration for congestion control integration
276#[derive(Debug, Clone)]
277pub struct CongestionIntegrationConfig {
278    /// Enable congestion-aware migration
279    pub enable_congestion_awareness: bool,
280    /// Congestion threshold for migration decisions
281    pub congestion_threshold: f64,
282    /// Migration rate limiting
283    pub max_migrations_per_second: f64,
284    /// Enable bandwidth estimation during migration
285    pub enable_bandwidth_estimation: bool,
286    /// Congestion window scaling factor
287    pub cwnd_scaling_factor: f64,
288}
289
290/// Connection migration session
291#[derive(Debug)]
292struct MigrationSession {
293    peer_id: PeerId,
294    old_path: SocketAddr,
295    new_path: SocketAddr,
296    started_at: Instant,
297    migration_state: MigrationState,
298    congestion_window: u32,
299    rtt_estimate: Duration,
300    bandwidth_estimate: u64,
301}
302
303/// State of connection migration
304#[derive(Debug, Clone, Copy, PartialEq, Eq)]
305pub enum MigrationState {
306    Initiated,
307    PathValidating,
308    CongestionProbing,
309    Migrating,
310    Completed,
311    Failed,
312}
313
314/// Congestion control state
315#[derive(Debug)]
316struct CongestionState {
317    /// Current congestion window
318    congestion_window: u32,
319    /// Slow start threshold
320    ssthresh: u32,
321    /// RTT measurements
322    rtt_measurements: VecDeque<Duration>,
323    /// Congestion events
324    congestion_events: VecDeque<CongestionEvent>,
325    /// Current congestion level
326    congestion_level: f64,
327}
328
329/// Congestion event for tracking
330#[derive(Debug, Clone)]
331struct CongestionEvent {
332    timestamp: Instant,
333    event_type: CongestionEventType,
334    severity: f64,
335}
336
337/// Types of congestion events
338#[derive(Debug, Clone, Copy, PartialEq, Eq)]
339pub enum CongestionEventType {
340    PacketLoss,
341    Timeout,
342    ECNMark,
343    RTTIncrease,
344}
345
346/// Statistics for congestion control integration
347#[derive(Debug, Default, Clone)]
348pub struct CongestionIntegrationStats {
349    /// Total migrations attempted
350    pub migrations_attempted: u64,
351    /// Total migrations successful
352    pub migrations_successful: u64,
353    /// Average migration time
354    pub avg_migration_time: Duration,
355    /// Congestion-avoided migrations
356    pub congestion_avoided_migrations: u64,
357    /// Bandwidth utilization efficiency
358    pub bandwidth_utilization_efficiency: f64,
359}
360
361impl Default for ParallelDiscoveryConfig {
362    fn default() -> Self {
363        Self {
364            max_concurrent_tasks: 8,
365            interface_timeout: Duration::from_secs(5),
366            enable_prioritization: true,
367            preferred_interface_types: vec![
368                InterfaceType::Ethernet,
369                InterfaceType::WiFi,
370                InterfaceType::Cellular,
371            ],
372            enable_adaptive_parallelism: true,
373        }
374    }
375}
376
377impl Default for BandwidthValidationConfig {
378    fn default() -> Self {
379        Self {
380            max_concurrent_validations: 16,
381            bandwidth_threshold: 1_000_000, // 1 MB/s
382            enable_adaptive_validation: true,
383            validation_packet_size: 64,
384            max_validation_rate: 100.0, // 100 packets/sec
385        }
386    }
387}
388
389impl Default for CongestionIntegrationConfig {
390    fn default() -> Self {
391        Self {
392            enable_congestion_awareness: true,
393            congestion_threshold: 0.7, // 70% congestion level
394            max_migrations_per_second: 10.0,
395            enable_bandwidth_estimation: true,
396            cwnd_scaling_factor: 0.8,
397        }
398    }
399}
400
401impl ParallelDiscoveryCoordinator {
402    /// Create a new parallel discovery coordinator
403    pub fn new(config: ParallelDiscoveryConfig) -> Self {
404        Self {
405            active_discoveries: Arc::new(RwLock::new(HashMap::new())),
406            config,
407            stats: Arc::new(Mutex::new(ParallelDiscoveryStats::default())),
408            coordination_handle: None,
409        }
410    }
411
412    /// Start parallel discovery across multiple interfaces
413    pub async fn start_parallel_discovery(
414        &mut self,
415        interfaces: Vec<NetworkInterface>,
416        peer_id: PeerId,
417    ) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
418        info!(
419            "Starting parallel discovery across {} interfaces for peer {:?}",
420            interfaces.len(),
421            peer_id
422        );
423
424        // Prioritize interfaces if enabled
425        let prioritized_interfaces = if self.config.enable_prioritization {
426            self.prioritize_interfaces(interfaces)
427        } else {
428            interfaces
429        };
430
431        // Limit concurrent tasks based on configuration and system resources
432        let max_tasks = if self.config.enable_adaptive_parallelism {
433            self.calculate_adaptive_parallelism().await
434        } else {
435            self.config.max_concurrent_tasks
436        };
437
438        let tasks_to_start = prioritized_interfaces
439            .into_iter()
440            .take(max_tasks)
441            .collect::<Vec<_>>();
442
443        // Start discovery tasks
444        for interface in tasks_to_start {
445            self.start_interface_discovery(interface, peer_id).await?;
446        }
447
448        // Start coordination task
449        self.start_coordination_task().await?;
450
451        Ok(())
452    }
453
454    /// Prioritize interfaces based on type and characteristics
455    fn prioritize_interfaces(
456        &self,
457        mut interfaces: Vec<NetworkInterface>,
458    ) -> Vec<NetworkInterface> {
459        interfaces.sort_by_key(|interface| {
460            let interface_type = self.classify_interface_type(&interface.name);
461            let type_priority = self
462                .config
463                .preferred_interface_types
464                .iter()
465                .position(|&t| t == interface_type)
466                .unwrap_or(999);
467
468            // Lower number = higher priority
469            (type_priority, interface.addresses.len())
470        });
471
472        interfaces
473    }
474
475    /// Classify interface type from name
476    fn classify_interface_type(&self, name: &str) -> InterfaceType {
477        let name_lower = name.to_lowercase();
478
479        if name_lower.contains("eth") || name_lower.contains("en") {
480            InterfaceType::Ethernet
481        } else if name_lower.contains("wlan")
482            || name_lower.contains("wifi")
483            || name_lower.contains("wl")
484        {
485            InterfaceType::WiFi
486        } else if name_lower.contains("cell")
487            || name_lower.contains("wwan")
488            || name_lower.contains("ppp")
489        {
490            InterfaceType::Cellular
491        } else if name_lower.contains("lo") || name_lower.contains("loopback") {
492            InterfaceType::Loopback
493        } else if name_lower.contains("vpn")
494            || name_lower.contains("tun")
495            || name_lower.contains("tap")
496        {
497            InterfaceType::VPN
498        } else {
499            InterfaceType::Unknown
500        }
501    }
502
503    /// Calculate adaptive parallelism based on system resources
504    async fn calculate_adaptive_parallelism(&self) -> usize {
505        // Simplified adaptive calculation
506        // In production, this would consider:
507        // - CPU cores
508        // - Memory availability
509        // - Network bandwidth
510        // - Current system load
511
512        let base_parallelism = self.config.max_concurrent_tasks;
513        let system_load_factor = 0.8; // Assume 80% system capacity
514
515        ((base_parallelism as f64) * system_load_factor) as usize
516    }
517
518    /// Start discovery for a specific interface
519    async fn start_interface_discovery(
520        &self,
521        interface: NetworkInterface,
522        _peer_id: PeerId,
523    ) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
524        let interface_type = self.classify_interface_type(&interface.name);
525        let priority = self.calculate_interface_priority(interface_type);
526
527        let task = DiscoveryTask {
528            interface_name: interface.name.clone(),
529            interface_type,
530            started_at: Instant::now(),
531            status: TaskStatus::Pending,
532            discovered_candidates: Vec::new(),
533            priority,
534        };
535
536        // Add to active discoveries
537        {
538            let mut discoveries = self.active_discoveries.write().unwrap();
539            discoveries.insert(interface.name.clone(), task);
540        }
541
542        // Update stats
543        {
544            let mut stats = self.stats.lock().unwrap();
545            stats.tasks_started += 1;
546        }
547
548        // Start actual discovery (simplified)
549        self.perform_interface_discovery(interface).await?;
550
551        Ok(())
552    }
553
554    /// Calculate priority for interface type
555    fn calculate_interface_priority(&self, interface_type: InterfaceType) -> u32 {
556        match interface_type {
557            InterfaceType::Ethernet => 100,
558            InterfaceType::WiFi => 80,
559            InterfaceType::Cellular => 60,
560            InterfaceType::VPN => 40,
561            InterfaceType::Loopback => 20,
562            InterfaceType::Unknown => 10,
563        }
564    }
565
566    /// Perform discovery for a specific interface
567    async fn perform_interface_discovery(
568        &self,
569        interface: NetworkInterface,
570    ) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
571        let interface_name = interface.name.clone();
572
573        // Update task status to running
574        {
575            let mut discoveries = self.active_discoveries.write().unwrap();
576            if let Some(task) = discoveries.get_mut(&interface_name) {
577                task.status = TaskStatus::Running;
578            }
579        }
580
581        // Perform discovery with timeout
582        let discovery_result = timeout(
583            self.config.interface_timeout,
584            self.discover_candidates_for_interface(interface),
585        )
586        .await;
587
588        match discovery_result {
589            Ok(Ok(candidates)) => {
590                // Discovery successful
591                {
592                    let mut discoveries = self.active_discoveries.write().unwrap();
593                    if let Some(task) = discoveries.get_mut(&interface_name) {
594                        task.status = TaskStatus::Completed;
595                        task.discovered_candidates = candidates;
596                    }
597                }
598
599                // Update stats
600                {
601                    let mut stats = self.stats.lock().unwrap();
602                    stats.tasks_completed += 1;
603                }
604
605                debug!("Interface discovery completed for {}", interface_name);
606            }
607            Ok(Err(_)) => {
608                // Discovery failed
609                {
610                    let mut discoveries = self.active_discoveries.write().unwrap();
611                    if let Some(task) = discoveries.get_mut(&interface_name) {
612                        task.status = TaskStatus::Failed;
613                    }
614                }
615
616                // Update stats
617                {
618                    let mut stats = self.stats.lock().unwrap();
619                    stats.tasks_failed += 1;
620                }
621
622                warn!("Interface discovery failed for {}", interface_name);
623            }
624            Err(_) => {
625                // Discovery timeout
626                {
627                    let mut discoveries = self.active_discoveries.write().unwrap();
628                    if let Some(task) = discoveries.get_mut(&interface_name) {
629                        task.status = TaskStatus::Timeout;
630                    }
631                }
632
633                // Update stats
634                {
635                    let mut stats = self.stats.lock().unwrap();
636                    stats.tasks_failed += 1;
637                }
638
639                warn!("Interface discovery timeout for {}", interface_name);
640            }
641        }
642
643        Ok(())
644    }
645
646    /// Discover candidates for a specific interface
647    async fn discover_candidates_for_interface(
648        &self,
649        interface: NetworkInterface,
650    ) -> Result<Vec<CandidateAddress>, Box<dyn std::error::Error + Send + Sync>> {
651        let mut candidates = Vec::new();
652
653        for address in &interface.addresses {
654            // Skip loopback and link-local addresses for P2P
655            if self.is_valid_candidate_address(address) {
656                let candidate = CandidateAddress {
657                    address: *address,
658                    priority: self.calculate_candidate_priority(address, &interface),
659                    source: CandidateSource::Local,
660                    state: CandidateState::New,
661                };
662
663                candidates.push(candidate);
664            }
665        }
666
667        // Simulate some discovery time
668        sleep(Duration::from_millis(100)).await;
669
670        Ok(candidates)
671    }
672
673    /// Check if address is valid for P2P candidate
674    fn is_valid_candidate_address(&self, address: &SocketAddr) -> bool {
675        match address.ip() {
676            IpAddr::V4(ipv4) => {
677                !ipv4.is_loopback() && !ipv4.is_link_local() && !ipv4.is_broadcast()
678            }
679            IpAddr::V6(ipv6) => !ipv6.is_loopback() && !ipv6.is_unspecified(),
680        }
681    }
682
683    /// Calculate priority for a candidate address
684    fn calculate_candidate_priority(
685        &self,
686        address: &SocketAddr,
687        interface: &NetworkInterface,
688    ) -> u32 {
689        let mut priority = 1000u32;
690
691        // Prefer IPv4 over IPv6 for simplicity
692        if address.is_ipv4() {
693            priority += 100;
694        }
695
696        // Prefer non-private addresses
697        if !self.is_private_address(address) {
698            priority += 200;
699        }
700
701        // Add interface-specific priority
702        let interface_type = self.classify_interface_type(&interface.name);
703        priority += self.calculate_interface_priority(interface_type);
704
705        priority
706    }
707
708    /// Check if address is in private range
709    fn is_private_address(&self, address: &SocketAddr) -> bool {
710        match address.ip() {
711            IpAddr::V4(ipv4) => ipv4.is_private(),
712            IpAddr::V6(ipv6) => {
713                // Check for unique local addresses (fc00::/7)
714                let segments = ipv6.segments();
715                (segments[0] & 0xfe00) == 0xfc00
716            }
717        }
718    }
719
720    /// Start coordination task for managing parallel discoveries
721    async fn start_coordination_task(
722        &mut self,
723    ) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
724        let discoveries = Arc::clone(&self.active_discoveries);
725        let stats = Arc::clone(&self.stats);
726        let config = self.config.clone();
727
728        let coordination_handle = tokio::spawn(async move {
729            let mut interval = tokio::time::interval(Duration::from_millis(500));
730
731            loop {
732                interval.tick().await;
733                Self::coordinate_discoveries(&discoveries, &stats, &config).await;
734
735                // Check if all discoveries are complete
736                let all_complete = {
737                    let discoveries_read = discoveries.read().unwrap();
738                    discoveries_read.values().all(|task| {
739                        matches!(
740                            task.status,
741                            TaskStatus::Completed | TaskStatus::Failed | TaskStatus::Timeout
742                        )
743                    })
744                };
745
746                if all_complete {
747                    break;
748                }
749            }
750        });
751
752        self.coordination_handle = Some(coordination_handle);
753        Ok(())
754    }
755
756    /// Coordinate parallel discoveries
757    async fn coordinate_discoveries(
758        discoveries: &Arc<RwLock<HashMap<String, DiscoveryTask>>>,
759        stats: &Arc<Mutex<ParallelDiscoveryStats>>,
760        _config: &ParallelDiscoveryConfig,
761    ) {
762        let mut total_candidates = 0u64;
763        let mut completed_tasks = 0u64;
764        let mut total_discovery_time = Duration::ZERO;
765
766        {
767            let discoveries_read = discoveries.read().unwrap();
768            for task in discoveries_read.values() {
769                if task.status == TaskStatus::Completed {
770                    total_candidates += task.discovered_candidates.len() as u64;
771                    completed_tasks += 1;
772                    total_discovery_time += task.started_at.elapsed();
773                }
774            }
775        }
776
777        // Update stats
778        {
779            let mut stats_guard = stats.lock().unwrap();
780            stats_guard.total_candidates = total_candidates;
781            stats_guard.tasks_completed = completed_tasks;
782
783            if completed_tasks > 0 {
784                stats_guard.avg_discovery_time = total_discovery_time / completed_tasks as u32;
785                stats_guard.parallelism_efficiency =
786                    completed_tasks as f64 / stats_guard.tasks_started as f64;
787            }
788        }
789    }
790
791    /// Get all discovered candidates from parallel discovery
792    pub async fn get_all_candidates(&self) -> Vec<CandidateAddress> {
793        let mut all_candidates = Vec::new();
794
795        let discoveries = self.active_discoveries.read().unwrap();
796        for task in discoveries.values() {
797            if task.status == TaskStatus::Completed {
798                all_candidates.extend(task.discovered_candidates.clone());
799            }
800        }
801
802        // Sort by priority (highest first)
803        all_candidates.sort_by(|a, b| b.priority.cmp(&a.priority));
804
805        all_candidates
806    }
807
808    /// Get parallel discovery statistics
809    pub async fn get_stats(&self) -> ParallelDiscoveryStats {
810        self.stats.lock().unwrap().clone()
811    }
812
813    /// Shutdown parallel discovery coordinator
814    pub async fn shutdown(&mut self) {
815        if let Some(handle) = self.coordination_handle.take() {
816            handle.abort();
817        }
818
819        // Clear active discoveries
820        {
821            let mut discoveries = self.active_discoveries.write().unwrap();
822            discoveries.clear();
823        }
824
825        info!("Parallel discovery coordinator shutdown complete");
826    }
827}
828
829impl Default for AdaptiveTimeoutManager {
830    fn default() -> Self {
831        Self::new()
832    }
833}
834
835impl AdaptiveTimeoutManager {
836    /// Create a new adaptive timeout manager
837    pub fn new() -> Self {
838        let mut timeout_configs = HashMap::new();
839
840        // Initialize default timeout configurations for each operation type
841        timeout_configs.insert(
842            OperationType::CandidateDiscovery,
843            AdaptiveTimeoutConfig {
844                base_timeout: Duration::from_secs(5),
845                min_timeout: Duration::from_millis(500),
846                max_timeout: Duration::from_secs(30),
847                rtt_multiplier: 4.0,
848                quality_factor: 0.5,
849                congestion_factor: 0.3,
850            },
851        );
852
853        timeout_configs.insert(
854            OperationType::PathValidation,
855            AdaptiveTimeoutConfig {
856                base_timeout: Duration::from_secs(3),
857                min_timeout: Duration::from_millis(200),
858                max_timeout: Duration::from_secs(15),
859                rtt_multiplier: 3.0,
860                quality_factor: 0.4,
861                congestion_factor: 0.4,
862            },
863        );
864
865        timeout_configs.insert(
866            OperationType::CoordinationRequest,
867            AdaptiveTimeoutConfig {
868                base_timeout: Duration::from_secs(10),
869                min_timeout: Duration::from_secs(1),
870                max_timeout: Duration::from_secs(60),
871                rtt_multiplier: 5.0,
872                quality_factor: 0.6,
873                congestion_factor: 0.2,
874            },
875        );
876
877        timeout_configs.insert(
878            OperationType::HolePunching,
879            AdaptiveTimeoutConfig {
880                base_timeout: Duration::from_secs(2),
881                min_timeout: Duration::from_millis(100),
882                max_timeout: Duration::from_secs(10),
883                rtt_multiplier: 2.0,
884                quality_factor: 0.3,
885                congestion_factor: 0.5,
886            },
887        );
888
889        timeout_configs.insert(
890            OperationType::ConnectionEstablishment,
891            AdaptiveTimeoutConfig {
892                base_timeout: Duration::from_secs(15),
893                min_timeout: Duration::from_secs(2),
894                max_timeout: Duration::from_secs(120),
895                rtt_multiplier: 6.0,
896                quality_factor: 0.7,
897                congestion_factor: 0.1,
898            },
899        );
900
901        Self {
902            network_conditions: Arc::new(RwLock::new(NetworkConditions {
903                rtt_samples: VecDeque::new(),
904                packet_loss_rate: 0.0,
905                bandwidth_estimate: 1_000_000, // 1 MB/s default
906                quality_score: 0.8,            // Good quality default
907                congestion_level: 0.2,         // Low congestion default
908                last_measurement: Instant::now(),
909            })),
910            timeout_configs,
911            stats: Arc::new(Mutex::new(AdaptiveTimeoutStats::default())),
912            monitoring_handle: None,
913        }
914    }
915
916    /// Start the adaptive timeout manager with network monitoring
917    pub async fn start(&mut self) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
918        let network_conditions = Arc::clone(&self.network_conditions);
919        let stats = Arc::clone(&self.stats);
920
921        let monitoring_handle = tokio::spawn(async move {
922            let mut interval = tokio::time::interval(Duration::from_secs(1));
923
924            loop {
925                interval.tick().await;
926                Self::update_network_conditions(&network_conditions, &stats).await;
927            }
928        });
929
930        self.monitoring_handle = Some(monitoring_handle);
931        info!("Adaptive timeout manager started");
932        Ok(())
933    }
934
935    /// Calculate adaptive timeout for an operation
936    pub async fn calculate_timeout(&self, operation: OperationType) -> Duration {
937        let config = self
938            .timeout_configs
939            .get(&operation)
940            .cloned()
941            .unwrap_or_else(|| AdaptiveTimeoutConfig {
942                base_timeout: Duration::from_secs(5),
943                min_timeout: Duration::from_millis(500),
944                max_timeout: Duration::from_secs(30),
945                rtt_multiplier: 4.0,
946                quality_factor: 0.5,
947                congestion_factor: 0.3,
948            });
949
950        let conditions = self.network_conditions.read().unwrap();
951
952        // Calculate base timeout from RTT if available
953        let rtt_based_timeout =
954            if let Some(avg_rtt) = self.calculate_average_rtt(&conditions.rtt_samples) {
955                Duration::from_millis((avg_rtt.as_millis() as f64 * config.rtt_multiplier) as u64)
956            } else {
957                config.base_timeout
958            };
959
960        // Adjust for network quality
961        let quality_adjustment = 1.0 + (1.0 - conditions.quality_score) * config.quality_factor;
962
963        // Adjust for congestion
964        let congestion_adjustment = 1.0 + conditions.congestion_level * config.congestion_factor;
965
966        // Calculate final timeout
967        let adjusted_timeout = Duration::from_millis(
968            (rtt_based_timeout.as_millis() as f64 * quality_adjustment * congestion_adjustment)
969                as u64,
970        );
971
972        // Clamp to min/max bounds
973        let final_timeout = adjusted_timeout
974            .max(config.min_timeout)
975            .min(config.max_timeout);
976
977        // Update stats
978        {
979            let mut stats = self.stats.lock().unwrap();
980            stats.adjustments_made += 1;
981            stats.avg_timeouts.insert(operation, final_timeout);
982        }
983
984        debug!(
985            "Calculated adaptive timeout for {:?}: {:?} (quality: {:.2}, congestion: {:.2})",
986            operation, final_timeout, conditions.quality_score, conditions.congestion_level
987        );
988
989        final_timeout
990    }
991
992    /// Record network measurement for adaptive timeout calculation
993    pub async fn record_measurement(
994        &self,
995        rtt: Duration,
996        packet_loss: bool,
997        bandwidth: Option<u64>,
998    ) {
999        let mut conditions = self.network_conditions.write().unwrap();
1000
1001        // Add RTT sample
1002        conditions.rtt_samples.push_back(rtt);
1003        if conditions.rtt_samples.len() > 50 {
1004            conditions.rtt_samples.pop_front();
1005        }
1006
1007        // Update packet loss rate (exponential moving average)
1008        let loss_sample = if packet_loss { 1.0 } else { 0.0 };
1009        conditions.packet_loss_rate = conditions.packet_loss_rate * 0.9 + loss_sample * 0.1;
1010
1011        // Update bandwidth estimate if provided
1012        if let Some(bw) = bandwidth {
1013            conditions.bandwidth_estimate =
1014                (conditions.bandwidth_estimate as f64 * 0.8 + bw as f64 * 0.2) as u64;
1015        }
1016
1017        // Update quality score based on RTT and packet loss
1018        let rtt_quality = 1.0 - (rtt.as_millis() as f64 / 1000.0).min(1.0);
1019        let loss_quality = 1.0 - conditions.packet_loss_rate;
1020        conditions.quality_score = (rtt_quality + loss_quality) / 2.0;
1021
1022        // Update congestion level based on RTT variance and packet loss
1023        let rtt_variance = self.calculate_rtt_variance(&conditions.rtt_samples);
1024        conditions.congestion_level = (conditions.packet_loss_rate + rtt_variance).min(1.0);
1025
1026        conditions.last_measurement = Instant::now();
1027    }
1028
1029    /// Calculate average RTT from samples
1030    fn calculate_average_rtt(&self, samples: &VecDeque<Duration>) -> Option<Duration> {
1031        if samples.is_empty() {
1032            return None;
1033        }
1034
1035        let total_ms: u64 = samples.iter().map(|d| d.as_millis() as u64).sum();
1036        Some(Duration::from_millis(total_ms / samples.len() as u64))
1037    }
1038
1039    /// Calculate RTT variance for congestion detection
1040    fn calculate_rtt_variance(&self, samples: &VecDeque<Duration>) -> f64 {
1041        if samples.len() < 2 {
1042            return 0.0;
1043        }
1044
1045        let avg = self.calculate_average_rtt(samples).unwrap().as_millis() as f64;
1046        let variance: f64 = samples
1047            .iter()
1048            .map(|d| {
1049                let diff = d.as_millis() as f64 - avg;
1050                diff * diff
1051            })
1052            .sum::<f64>()
1053            / samples.len() as f64;
1054
1055        (variance.sqrt() / avg).min(1.0)
1056    }
1057
1058    /// Update network conditions periodically
1059    async fn update_network_conditions(
1060        network_conditions: &Arc<RwLock<NetworkConditions>>,
1061        _stats: &Arc<Mutex<AdaptiveTimeoutStats>>,
1062    ) {
1063        // Periodic network condition updates
1064        // In production, this would:
1065        // - Probe network conditions
1066        // - Update bandwidth estimates
1067        // - Detect congestion patterns
1068        // - Adjust quality scores
1069
1070        let mut conditions = network_conditions.write().unwrap();
1071
1072        // Age out old RTT samples (keep last 100 samples)
1073        while conditions.rtt_samples.len() > 100 {
1074            conditions.rtt_samples.pop_front();
1075        }
1076
1077        // Decay packet loss rate over time
1078        conditions.packet_loss_rate *= 0.99;
1079
1080        // Update quality score based on recent measurements
1081        if conditions.last_measurement.elapsed() > Duration::from_secs(10) {
1082            // No recent measurements, assume degraded quality
1083            conditions.quality_score *= 0.95;
1084        }
1085    }
1086
1087    /// Get current network conditions
1088    pub async fn get_network_conditions(&self) -> NetworkConditions {
1089        self.network_conditions.read().unwrap().clone()
1090    }
1091
1092    /// Get adaptive timeout statistics
1093    pub async fn get_stats(&self) -> AdaptiveTimeoutStats {
1094        self.stats.lock().unwrap().clone()
1095    }
1096
1097    /// Shutdown the adaptive timeout manager
1098    pub async fn shutdown(&mut self) {
1099        if let Some(handle) = self.monitoring_handle.take() {
1100            handle.abort();
1101        }
1102
1103        info!("Adaptive timeout manager shutdown complete");
1104    }
1105}
1106
1107impl BandwidthAwareValidator {
1108    /// Create a new bandwidth-aware validator
1109    pub fn new(config: BandwidthValidationConfig) -> Self {
1110        Self {
1111            active_validations: Arc::new(RwLock::new(HashMap::new())),
1112            bandwidth_monitor: Arc::new(Mutex::new(BandwidthMonitor {
1113                bandwidth_samples: VecDeque::new(),
1114                current_bandwidth: 1_000_000, // 1 MB/s default
1115                utilization: 0.0,
1116                last_measurement: Instant::now(),
1117            })),
1118            config,
1119            stats: Arc::new(Mutex::new(BandwidthValidationStats::default())),
1120        }
1121    }
1122
1123    /// Start path validation with bandwidth awareness
1124    pub async fn start_validation(
1125        &self,
1126        target_address: SocketAddr,
1127        priority: ValidationPriority,
1128    ) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
1129        // Check if we can start new validation based on bandwidth constraints
1130        if !self.can_start_validation().await {
1131            return Err("Bandwidth limit reached, cannot start validation".into());
1132        }
1133
1134        let session = ValidationSession {
1135            target_address,
1136            started_at: Instant::now(),
1137            packets_sent: 0,
1138            packets_received: 0,
1139            total_bytes: 0,
1140            rtt_samples: Vec::new(),
1141            bandwidth_usage: 0,
1142            priority,
1143        };
1144
1145        // Add to active validations
1146        {
1147            let mut validations = self.active_validations.write().unwrap();
1148            validations.insert(target_address, session);
1149        }
1150
1151        // Update stats
1152        {
1153            let mut stats = self.stats.lock().unwrap();
1154            stats.validations_started += 1;
1155        }
1156
1157        debug!("Started bandwidth-aware validation for {}", target_address);
1158        Ok(())
1159    }
1160
1161    /// Check if new validation can be started based on bandwidth constraints
1162    async fn can_start_validation(&self) -> bool {
1163        let validations = self.active_validations.read().unwrap();
1164        let bandwidth_monitor = self.bandwidth_monitor.lock().unwrap();
1165
1166        // Check concurrent validation limit
1167        if validations.len() >= self.config.max_concurrent_validations {
1168            return false;
1169        }
1170
1171        // Check bandwidth utilization if adaptive validation is enabled
1172        if self.config.enable_adaptive_validation {
1173            let current_usage: u64 = validations
1174                .values()
1175                .map(|session| session.bandwidth_usage)
1176                .sum();
1177
1178            let available_bandwidth = bandwidth_monitor.current_bandwidth;
1179            let utilization = current_usage as f64 / available_bandwidth as f64;
1180
1181            if utilization > 0.8 {
1182                // 80% utilization threshold
1183                return false;
1184            }
1185        }
1186
1187        true
1188    }
1189
1190    /// Record validation packet transmission
1191    pub async fn record_packet_sent(
1192        &self,
1193        target_address: SocketAddr,
1194        packet_size: usize,
1195    ) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
1196        let mut validations = self.active_validations.write().unwrap();
1197
1198        if let Some(session) = validations.get_mut(&target_address) {
1199            session.packets_sent += 1;
1200            session.total_bytes += packet_size as u64;
1201            session.bandwidth_usage += packet_size as u64;
1202        }
1203
1204        // Update bandwidth monitoring
1205        self.update_bandwidth_usage(packet_size as u64).await;
1206
1207        Ok(())
1208    }
1209
1210    /// Record validation packet reception
1211    pub async fn record_packet_received(
1212        &self,
1213        target_address: SocketAddr,
1214        rtt: Duration,
1215    ) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
1216        let mut validations = self.active_validations.write().unwrap();
1217
1218        if let Some(session) = validations.get_mut(&target_address) {
1219            session.packets_received += 1;
1220            session.rtt_samples.push(rtt);
1221        }
1222
1223        Ok(())
1224    }
1225
1226    /// Update bandwidth usage monitoring
1227    async fn update_bandwidth_usage(&self, bytes_used: u64) {
1228        let mut monitor = self.bandwidth_monitor.lock().unwrap();
1229
1230        let now = Instant::now();
1231        let sample = BandwidthSample {
1232            timestamp: now,
1233            bytes_transferred: bytes_used,
1234            duration: now.duration_since(monitor.last_measurement),
1235            bandwidth: if monitor.last_measurement.elapsed().as_secs() > 0 {
1236                bytes_used / monitor.last_measurement.elapsed().as_secs()
1237            } else {
1238                0
1239            },
1240        };
1241
1242        monitor.bandwidth_samples.push_back(sample);
1243        if monitor.bandwidth_samples.len() > 100 {
1244            monitor.bandwidth_samples.pop_front();
1245        }
1246
1247        // Update current bandwidth estimate
1248        if !monitor.bandwidth_samples.is_empty() {
1249            let total_bytes: u64 = monitor
1250                .bandwidth_samples
1251                .iter()
1252                .map(|s| s.bytes_transferred)
1253                .sum();
1254            let total_time: Duration = monitor.bandwidth_samples.iter().map(|s| s.duration).sum();
1255
1256            if total_time.as_secs() > 0 {
1257                monitor.current_bandwidth = total_bytes / total_time.as_secs();
1258            }
1259        }
1260
1261        monitor.last_measurement = now;
1262    }
1263
1264    /// Complete validation session
1265    pub async fn complete_validation(
1266        &self,
1267        target_address: SocketAddr,
1268        success: bool,
1269    ) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
1270        let session = {
1271            let mut validations = self.active_validations.write().unwrap();
1272            validations.remove(&target_address)
1273        };
1274
1275        if let Some(session) = session {
1276            let duration = session.started_at.elapsed();
1277
1278            // Update stats
1279            {
1280                let mut stats = self.stats.lock().unwrap();
1281                if success {
1282                    stats.validations_completed += 1;
1283                }
1284                stats.total_bandwidth_used += session.bandwidth_usage;
1285                stats.avg_validation_time = if stats.validations_completed > 0 {
1286                    Duration::from_millis(
1287                        (stats.avg_validation_time.as_millis() as u64
1288                            * (stats.validations_completed - 1)
1289                            + duration.as_millis() as u64)
1290                            / stats.validations_completed,
1291                    )
1292                } else {
1293                    duration
1294                };
1295
1296                if stats.total_bandwidth_used > 0 {
1297                    stats.bandwidth_efficiency = stats.validations_completed as f64
1298                        / stats.total_bandwidth_used as f64
1299                        * 1000.0; // per KB
1300                }
1301            }
1302
1303            debug!(
1304                "Completed validation for {} in {:?} (success: {})",
1305                target_address, duration, success
1306            );
1307        }
1308
1309        Ok(())
1310    }
1311
1312    /// Get bandwidth validation statistics
1313    pub async fn get_stats(&self) -> BandwidthValidationStats {
1314        self.stats.lock().unwrap().clone()
1315    }
1316}
1317
1318impl CongestionControlIntegrator {
1319    /// Create a new congestion control integrator
1320    pub fn new(config: CongestionIntegrationConfig) -> Self {
1321        Self {
1322            active_migrations: Arc::new(RwLock::new(HashMap::new())),
1323            congestion_state: Arc::new(Mutex::new(CongestionState {
1324                congestion_window: 10, // Initial cwnd
1325                ssthresh: 65535,
1326                rtt_measurements: VecDeque::new(),
1327                congestion_events: VecDeque::new(),
1328                congestion_level: 0.0,
1329            })),
1330            config,
1331            stats: Arc::new(Mutex::new(CongestionIntegrationStats::default())),
1332        }
1333    }
1334
1335    /// Start connection migration with congestion awareness
1336    pub async fn start_migration(
1337        &self,
1338        peer_id: PeerId,
1339        old_path: SocketAddr,
1340        new_path: SocketAddr,
1341    ) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
1342        // Check if migration should be delayed due to congestion
1343        if self.config.enable_congestion_awareness {
1344            let congestion_state = self.congestion_state.lock().unwrap();
1345            if congestion_state.congestion_level > self.config.congestion_threshold {
1346                return Err("Migration delayed due to high congestion".into());
1347            }
1348        }
1349
1350        let session = MigrationSession {
1351            peer_id,
1352            old_path,
1353            new_path,
1354            started_at: Instant::now(),
1355            migration_state: MigrationState::Initiated,
1356            congestion_window: {
1357                let state = self.congestion_state.lock().unwrap();
1358                (state.congestion_window as f64 * self.config.cwnd_scaling_factor) as u32
1359            },
1360            rtt_estimate: Duration::from_millis(100), // Default RTT
1361            bandwidth_estimate: 1_000_000,            // 1 MB/s default
1362        };
1363
1364        // Add to active migrations
1365        {
1366            let mut migrations = self.active_migrations.write().unwrap();
1367            migrations.insert(peer_id, session);
1368        }
1369
1370        // Update stats
1371        {
1372            let mut stats = self.stats.lock().unwrap();
1373            stats.migrations_attempted += 1;
1374        }
1375
1376        info!(
1377            "Started congestion-aware migration for peer {:?}: {} -> {}",
1378            peer_id, old_path, new_path
1379        );
1380        Ok(())
1381    }
1382
1383    /// Update migration state based on congestion feedback
1384    pub async fn update_migration_state(
1385        &self,
1386        peer_id: PeerId,
1387        new_state: MigrationState,
1388        rtt: Option<Duration>,
1389        bandwidth: Option<u64>,
1390    ) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
1391        let mut migrations = self.active_migrations.write().unwrap();
1392
1393        if let Some(session) = migrations.get_mut(&peer_id) {
1394            session.migration_state = new_state;
1395
1396            if let Some(rtt) = rtt {
1397                session.rtt_estimate = rtt;
1398
1399                // Update global congestion state
1400                let mut congestion_state = self.congestion_state.lock().unwrap();
1401                congestion_state.rtt_measurements.push_back(rtt);
1402                if congestion_state.rtt_measurements.len() > 50 {
1403                    congestion_state.rtt_measurements.pop_front();
1404                }
1405            }
1406
1407            if let Some(bw) = bandwidth {
1408                session.bandwidth_estimate = bw;
1409            }
1410
1411            // Check if migration completed
1412            if matches!(new_state, MigrationState::Completed) {
1413                let duration = session.started_at.elapsed();
1414
1415                // Update stats
1416                let mut stats = self.stats.lock().unwrap();
1417                stats.migrations_successful += 1;
1418                stats.avg_migration_time = if stats.migrations_successful > 0 {
1419                    Duration::from_millis(
1420                        (stats.avg_migration_time.as_millis() as u64
1421                            * (stats.migrations_successful - 1)
1422                            + duration.as_millis() as u64)
1423                            / stats.migrations_successful,
1424                    )
1425                } else {
1426                    duration
1427                };
1428
1429                debug!(
1430                    "Migration completed for peer {:?} in {:?}",
1431                    peer_id, duration
1432                );
1433            }
1434        }
1435
1436        Ok(())
1437    }
1438
1439    /// Record congestion event
1440    pub async fn record_congestion_event(&self, event_type: CongestionEventType, severity: f64) {
1441        let event = CongestionEvent {
1442            timestamp: Instant::now(),
1443            event_type,
1444            severity,
1445        };
1446
1447        let mut congestion_state = self.congestion_state.lock().unwrap();
1448        congestion_state.congestion_events.push_back(event);
1449
1450        // Keep only recent events
1451        if congestion_state.congestion_events.len() > 100 {
1452            congestion_state.congestion_events.pop_front();
1453        }
1454
1455        // Update congestion level based on recent events
1456        let recent_events: Vec<_> = congestion_state
1457            .congestion_events
1458            .iter()
1459            .filter(|e| e.timestamp.elapsed() < Duration::from_secs(10))
1460            .collect();
1461
1462        if !recent_events.is_empty() {
1463            let avg_severity: f64 =
1464                recent_events.iter().map(|e| e.severity).sum::<f64>() / recent_events.len() as f64;
1465
1466            congestion_state.congestion_level = avg_severity;
1467        }
1468
1469        // Adjust congestion window based on event
1470        match event_type {
1471            CongestionEventType::PacketLoss | CongestionEventType::Timeout => {
1472                congestion_state.ssthresh = congestion_state.congestion_window / 2;
1473                congestion_state.congestion_window = congestion_state.ssthresh;
1474            }
1475            CongestionEventType::ECNMark => {
1476                congestion_state.congestion_window =
1477                    (congestion_state.congestion_window as f64 * 0.8) as u32;
1478            }
1479            CongestionEventType::RTTIncrease => {
1480                // Gradual reduction for RTT increase
1481                congestion_state.congestion_window =
1482                    (congestion_state.congestion_window as f64 * 0.95) as u32;
1483            }
1484        }
1485
1486        debug!(
1487            "Recorded congestion event: {:?} (severity: {:.2}, new cwnd: {})",
1488            event_type, severity, congestion_state.congestion_window
1489        );
1490    }
1491
1492    /// Get congestion control integration statistics
1493    pub async fn get_stats(&self) -> CongestionIntegrationStats {
1494        self.stats.lock().unwrap().clone()
1495    }
1496}
1497
1498/// Network efficiency optimization manager that coordinates all network optimization components
1499#[derive(Debug)]
1500pub struct NetworkEfficiencyManager {
1501    parallel_discovery: ParallelDiscoveryCoordinator,
1502    adaptive_timeout: AdaptiveTimeoutManager,
1503    bandwidth_validator: BandwidthAwareValidator,
1504    congestion_integrator: CongestionControlIntegrator,
1505    is_running: bool,
1506}
1507
1508impl NetworkEfficiencyManager {
1509    /// Create a new network efficiency manager with default configurations
1510    pub fn new() -> Self {
1511        Self {
1512            parallel_discovery: ParallelDiscoveryCoordinator::new(
1513                ParallelDiscoveryConfig::default(),
1514            ),
1515            adaptive_timeout: AdaptiveTimeoutManager::new(),
1516            bandwidth_validator: BandwidthAwareValidator::new(BandwidthValidationConfig::default()),
1517            congestion_integrator: CongestionControlIntegrator::new(
1518                CongestionIntegrationConfig::default(),
1519            ),
1520            is_running: false,
1521        }
1522    }
1523
1524    /// Create a new network efficiency manager with custom configurations
1525    pub fn with_configs(
1526        discovery_config: ParallelDiscoveryConfig,
1527        validation_config: BandwidthValidationConfig,
1528        congestion_config: CongestionIntegrationConfig,
1529    ) -> Self {
1530        Self {
1531            parallel_discovery: ParallelDiscoveryCoordinator::new(discovery_config),
1532            adaptive_timeout: AdaptiveTimeoutManager::new(),
1533            bandwidth_validator: BandwidthAwareValidator::new(validation_config),
1534            congestion_integrator: CongestionControlIntegrator::new(congestion_config),
1535            is_running: false,
1536        }
1537    }
1538
1539    /// Start all network efficiency components
1540    pub async fn start(&mut self) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
1541        if self.is_running {
1542            return Ok(());
1543        }
1544
1545        self.adaptive_timeout.start().await?;
1546
1547        self.is_running = true;
1548        info!("Network efficiency manager started");
1549        Ok(())
1550    }
1551
1552    /// Get parallel discovery coordinator reference
1553    pub fn parallel_discovery(&mut self) -> &mut ParallelDiscoveryCoordinator {
1554        &mut self.parallel_discovery
1555    }
1556
1557    /// Get adaptive timeout manager reference
1558    pub fn adaptive_timeout(&self) -> &AdaptiveTimeoutManager {
1559        &self.adaptive_timeout
1560    }
1561
1562    /// Get bandwidth validator reference
1563    pub fn bandwidth_validator(&self) -> &BandwidthAwareValidator {
1564        &self.bandwidth_validator
1565    }
1566
1567    /// Get congestion integrator reference
1568    pub fn congestion_integrator(&self) -> &CongestionControlIntegrator {
1569        &self.congestion_integrator
1570    }
1571
1572    /// Get comprehensive network efficiency statistics
1573    pub async fn get_comprehensive_stats(&self) -> NetworkEfficiencyStats {
1574        NetworkEfficiencyStats {
1575            parallel_discovery: self.parallel_discovery.get_stats().await,
1576            adaptive_timeout: self.adaptive_timeout.get_stats().await,
1577            bandwidth_validation: self.bandwidth_validator.get_stats().await,
1578            congestion_integration: self.congestion_integrator.get_stats().await,
1579        }
1580    }
1581
1582    /// Shutdown all network efficiency components
1583    pub async fn shutdown(&mut self) {
1584        if !self.is_running {
1585            return;
1586        }
1587
1588        self.parallel_discovery.shutdown().await;
1589        self.adaptive_timeout.shutdown().await;
1590
1591        self.is_running = false;
1592        info!("Network efficiency manager shutdown complete");
1593    }
1594}
1595
1596/// Comprehensive network efficiency statistics
1597#[derive(Debug, Clone)]
1598pub struct NetworkEfficiencyStats {
1599    pub parallel_discovery: ParallelDiscoveryStats,
1600    pub adaptive_timeout: AdaptiveTimeoutStats,
1601    pub bandwidth_validation: BandwidthValidationStats,
1602    pub congestion_integration: CongestionIntegrationStats,
1603}
1604
1605impl Default for NetworkEfficiencyManager {
1606    fn default() -> Self {
1607        Self::new()
1608    }
1609}