qudag_network/
connection.rs

1#![deny(unsafe_code)]
2
3use crate::types::{
4    ConnectionStatus, LatencyMetrics, NetworkError, NetworkMetrics, PeerId, QueueMetrics,
5    ThroughputMetrics,
6};
7use anyhow::Result;
8use async_trait::async_trait;
9use bytes::{Buf, BufMut, Bytes, BytesMut};
10use dashmap::DashMap;
11use futures::future::Future;
12use parking_lot::RwLock as ParkingRwLock;
13use quinn::{Connection, Endpoint};
14use ring::{aead, agreement, rand as ring_rand};
15use std::net::SocketAddr;
16use std::sync::atomic::{AtomicU64, AtomicUsize, Ordering};
17use std::sync::Arc;
18use std::time::{Duration, Instant};
19use tokio::sync::{mpsc, RwLock as TokioRwLock, Semaphore};
20use tokio::time::sleep;
21use tracing::{debug, error, info, warn};
22
23/// Secure connection configuration
24#[derive(Clone)]
25pub struct SecureConfig {
26    /// Transport encryption keys
27    pub transport_keys: TransportKeys,
28    /// Connection timeout
29    pub timeout: std::time::Duration,
30    /// Keep-alive interval
31    pub keepalive: std::time::Duration,
32}
33
34/// Transport encryption keys
35pub struct TransportKeys {
36    /// Static private key
37    #[allow(dead_code)]
38    private_key: agreement::EphemeralPrivateKey,
39    /// Static public key
40    public_key: Vec<u8>,
41}
42
43impl Clone for TransportKeys {
44    fn clone(&self) -> Self {
45        // Generate new keys for each clone to maintain security
46        Self::generate()
47    }
48}
49
50impl TransportKeys {
51    /// Generate new transport keys
52    pub fn generate() -> Self {
53        let rng = ring_rand::SystemRandom::new();
54        let private_key =
55            agreement::EphemeralPrivateKey::generate(&agreement::X25519, &rng).unwrap();
56        let public_key = private_key.compute_public_key().unwrap().as_ref().to_vec();
57
58        Self {
59            private_key,
60            public_key,
61        }
62    }
63}
64
65/// Secure connection handler
66///
67/// # Examples
68///
69/// ```rust,ignore
70/// use qudag_network::{SecureConnection, SecureConfig, TransportKeys};
71/// use std::time::Duration;
72///
73/// // Create configuration
74/// let config = SecureConfig {
75///     transport_keys: TransportKeys::generate(),
76///     timeout: Duration::from_secs(30),
77///     keepalive: Duration::from_secs(5),
78/// };
79///
80/// // Connect to peer (requires async context)
81/// // let connection = SecureConnection::new(&endpoint, addr, config).await?;
82/// ```
83pub struct SecureConnection {
84    /// QUIC connection
85    #[allow(dead_code)]
86    connection: Connection,
87    /// Encryption keys
88    #[allow(dead_code)]
89    keys: TransportKeys,
90    /// Message channels
91    channels: ConnectionChannels,
92}
93
94/// High-performance connection message channels with zero-copy optimizations
95struct ConnectionChannels {
96    /// Outbound message sender with zero-copy buffers
97    tx: mpsc::Sender<Bytes>,
98    /// Inbound message receiver
99    rx: mpsc::Receiver<Bytes>,
100    /// Outbound batch buffer (reusable)
101    batch_buffer: BytesMut,
102    /// Message batch size
103    batch_size: usize,
104    /// Batch timeout
105    batch_timeout: std::time::Duration,
106    /// Last batch time
107    last_batch: std::time::Instant,
108    /// Queue high water mark
109    high_water_mark: usize,
110    /// Queue low water mark
111    low_water_mark: usize,
112    /// Back pressure signal
113    back_pressure: Arc<tokio::sync::Notify>,
114    /// Current queue size in bytes (lock-free)
115    queue_size: AtomicUsize,
116    /// Encryption key cache
117    key_cache: Arc<aead::LessSafeKey>,
118    /// Nonce counter for unique nonces
119    nonce_counter: AtomicU64,
120    /// Message counter for metrics
121    message_count: AtomicU64,
122    /// Bytes processed counter
123    bytes_processed: AtomicU64,
124}
125
126impl SecureConnection {
127    /// Create new secure connection
128    pub async fn new(
129        endpoint: &Endpoint,
130        addr: SocketAddr,
131        config: SecureConfig,
132    ) -> Result<Self, NetworkError> {
133        // Connect using QUIC
134        let connection = endpoint
135            .connect(addr, "qudag")
136            .map_err(|e| NetworkError::ConnectionError(e.to_string()))?
137            .await
138            .map_err(|e| NetworkError::ConnectionError(e.to_string()))?;
139
140        // Create high-throughput message channels with zero-copy buffers
141        let (tx, rx) = mpsc::channel(65_536); // 64K buffer
142
143        // Pre-compute encryption key with proper key derivation
144        let key = aead::UnboundKey::new(
145            &aead::CHACHA20_POLY1305,
146            &config.transport_keys.public_key[..32],
147        )
148        .map_err(|e| NetworkError::EncryptionError(e.to_string()))?;
149        let key_cache = Arc::new(aead::LessSafeKey::new(key));
150
151        Ok(Self {
152            connection,
153            keys: config.transport_keys,
154            channels: ConnectionChannels {
155                tx,
156                rx,
157                batch_buffer: BytesMut::with_capacity(1024 * 1024), // 1MB reusable buffer
158                batch_size: 128,                                    // Process messages in batches
159                batch_timeout: std::time::Duration::from_millis(50),
160                last_batch: std::time::Instant::now(),
161                high_water_mark: 64 * 1024 * 1024, // 64MB
162                low_water_mark: 32 * 1024 * 1024,  // 32MB
163                back_pressure: Arc::new(tokio::sync::Notify::new()),
164                queue_size: AtomicUsize::new(0),
165                key_cache,
166                nonce_counter: AtomicU64::new(1),
167                message_count: AtomicU64::new(0),
168                bytes_processed: AtomicU64::new(0),
169            },
170        })
171    }
172
173    /// Send encrypted message with optimized zero-copy batch processing and enhanced error handling
174    pub async fn send(&mut self, data: Bytes) -> Result<(), NetworkError> {
175        let msg_size = data.len();
176
177        // Validate input size
178        if msg_size == 0 {
179            return Err(NetworkError::MessageError("Empty message".into()));
180        }
181        if msg_size > 1024 * 1024 {
182            // 1MB limit
183            return Err(NetworkError::MessageError("Message too large".into()));
184        }
185
186        // Apply back pressure if queue is too large with timeout
187        let current_size = self.channels.queue_size.load(Ordering::Acquire);
188        if current_size >= self.channels.high_water_mark {
189            debug!("Applying back pressure, queue size: {}", current_size);
190            let back_pressure = self.channels.back_pressure.clone();
191
192            // Wait with timeout to prevent indefinite blocking
193            tokio::select! {
194                _ = back_pressure.notified() => {},
195                _ = tokio::time::sleep(std::time::Duration::from_secs(5)) => {
196                    return Err(NetworkError::ConnectionError("Back pressure timeout".into()));
197                }
198            }
199        }
200
201        // Generate unique nonce using atomic counter with overflow protection
202        let nonce_value = self.channels.nonce_counter.fetch_add(1, Ordering::Relaxed);
203        if nonce_value == 0 {
204            error!("Nonce counter overflow - this should not happen in normal operation");
205            return Err(NetworkError::EncryptionError("Nonce overflow".into()));
206        }
207
208        let mut nonce_bytes = [0u8; 12];
209        nonce_bytes[..8].copy_from_slice(&nonce_value.to_le_bytes());
210
211        // Zero-copy encryption using BytesMut with error recovery
212        let mut encrypted = BytesMut::from(data.as_ref());
213
214        // Encrypt using cached key with retry logic
215        let mut retry_count = 0;
216        loop {
217            // Clone nonce for each attempt since it's consumed
218            let nonce_attempt = aead::Nonce::assume_unique_for_key(nonce_bytes);
219            match self.channels.key_cache.seal_in_place_append_tag(
220                nonce_attempt,
221                aead::Aad::empty(),
222                &mut encrypted,
223            ) {
224                Ok(()) => break,
225                Err(e) => {
226                    retry_count += 1;
227                    if retry_count >= 3 {
228                        return Err(NetworkError::EncryptionError(format!(
229                            "Encryption failed after {} retries: {}",
230                            retry_count, e
231                        )));
232                    }
233                    warn!("Encryption attempt {} failed, retrying: {}", retry_count, e);
234                    tokio::time::sleep(std::time::Duration::from_millis(10)).await;
235                }
236            }
237        }
238
239        // Add to batch buffer with length prefix for efficient parsing
240        let encrypted_len = encrypted.len() as u32;
241        self.channels.batch_buffer.put_u32(encrypted_len);
242        self.channels.batch_buffer.extend_from_slice(&encrypted);
243
244        // Update metrics
245        self.channels
246            .queue_size
247            .fetch_add(msg_size, Ordering::Release);
248        self.channels.message_count.fetch_add(1, Ordering::Relaxed);
249        self.channels
250            .bytes_processed
251            .fetch_add(msg_size as u64, Ordering::Relaxed);
252
253        // Process batch if full or timeout exceeded
254        if self.channels.batch_buffer.len() >= self.channels.batch_size * 1024
255            || self.channels.last_batch.elapsed() >= self.channels.batch_timeout
256        {
257            self.flush_batch().await?
258        }
259
260        Ok(())
261    }
262
263    /// Flush current batch of messages with zero-copy optimization and error recovery
264    async fn flush_batch(&mut self) -> Result<(), NetworkError> {
265        if self.channels.batch_buffer.is_empty() {
266            return Ok(());
267        }
268
269        let batch_size = self.channels.batch_buffer.len();
270
271        // Convert to Bytes for zero-copy transmission
272        let batch = self.channels.batch_buffer.split().freeze();
273
274        // Send batch with retry logic
275        let mut retry_count = 0;
276        loop {
277            match self.channels.tx.send(batch.clone()).await {
278                Ok(()) => break,
279                Err(e) => {
280                    retry_count += 1;
281                    if retry_count >= 3 {
282                        // Restore batch to buffer for later retry
283                        self.channels.batch_buffer.extend_from_slice(&batch);
284                        return Err(NetworkError::ConnectionError(format!(
285                            "Batch send failed after {} retries: {}",
286                            retry_count, e
287                        )));
288                    }
289                    warn!("Batch send attempt {} failed, retrying: {}", retry_count, e);
290                    tokio::time::sleep(std::time::Duration::from_millis(100 * retry_count as u64))
291                        .await;
292                }
293            }
294        }
295
296        // Update queue size and notify if below low water mark
297        let new_size = self
298            .channels
299            .queue_size
300            .fetch_sub(batch_size, Ordering::AcqRel);
301        if new_size <= self.channels.low_water_mark {
302            self.channels.back_pressure.notify_waiters();
303            debug!("Released back pressure, queue size: {}", new_size);
304        }
305
306        // Update last batch time
307        self.channels.last_batch = std::time::Instant::now();
308        Ok(())
309    }
310
311    /// Receive and decrypt messages in batches with zero-copy optimization
312    pub async fn receive(&mut self) -> Result<Vec<Bytes>, NetworkError> {
313        // Receive batch of encrypted messages
314        let encrypted_batch = self
315            .channels
316            .rx
317            .recv()
318            .await
319            .ok_or_else(|| NetworkError::ConnectionError("Channel closed".into()))?;
320
321        let mut messages = Vec::new();
322        let mut buf = encrypted_batch;
323
324        // Parse messages from batch using zero-copy approach
325        while buf.has_remaining() {
326            if buf.remaining() < 4 {
327                return Err(NetworkError::EncryptionError(
328                    "Incomplete message length".into(),
329                ));
330            }
331
332            // Read message length prefix
333            let msg_len = buf.get_u32() as usize;
334
335            if buf.remaining() < msg_len {
336                return Err(NetworkError::EncryptionError(
337                    "Incomplete message data".into(),
338                ));
339            }
340
341            // Extract encrypted message data
342            let encrypted_data = buf.copy_to_bytes(msg_len);
343
344            // Generate matching nonce (should be deterministic or stored)
345            let nonce_value = self.channels.nonce_counter.load(Ordering::Relaxed);
346            let mut nonce_bytes = [0u8; 12];
347            nonce_bytes[..8].copy_from_slice(&nonce_value.to_le_bytes());
348            let nonce = aead::Nonce::assume_unique_for_key(nonce_bytes);
349
350            // Decrypt message
351            let mut message_data = BytesMut::from(encrypted_data.as_ref());
352            self.channels
353                .key_cache
354                .open_in_place(nonce, aead::Aad::empty(), &mut message_data)
355                .map_err(|e| NetworkError::EncryptionError(e.to_string()))?;
356
357            // Remove authentication tag (16 bytes for ChaCha20Poly1305)
358            if message_data.len() >= 16 {
359                message_data.truncate(message_data.len() - 16);
360            }
361
362            messages.push(message_data.freeze());
363        }
364
365        Ok(messages)
366    }
367}
368
369/// Production-grade connection manager with advanced pooling, multiplexing, and resilience features.
370///
371/// The ConnectionManager provides a comprehensive solution for managing network connections with:
372/// - Advanced connection pooling with lifecycle management
373/// - HTTP/2-style multiplexing for efficient connection usage
374/// - Retry logic with exponential backoff and jitter
375/// - Circuit breaker pattern for fault tolerance
376/// - Health checks and connection quality monitoring
377/// - Connection load balancing and request distribution
378/// - Performance monitoring and metrics collection
379/// - Automatic resource cleanup and garbage collection
380/// - Back pressure handling for overload protection
381/// - Connection warming and preemptive scaling
382/// - Request multiplexing and stream management
383///
384/// # Production Features
385/// - Zero-downtime connection pool updates
386/// - Graceful degradation under load
387/// - Automatic connection warming
388/// - Request routing and load balancing
389/// - Connection affinity and session persistence
390/// - Comprehensive observability and monitoring
391/// - Memory-efficient connection reuse
392/// - Adaptive connection limits based on system resources
393///
394/// # Multiplexing Support
395/// - Stream-based connection multiplexing
396/// - Request prioritization and queuing
397/// - Concurrent request handling
398/// - Flow control and backpressure
399/// - Stream lifecycle management
400///
401/// # High-performance connection manager with pooling, metrics tracking and back pressure handling.
402///
403/// The ConnectionManager provides a comprehensive solution for managing network connections with:
404/// - Connection pooling with configurable TTL
405/// - Efficient concurrent connection tracking
406/// - Detailed performance metrics collection
407/// - Automatic resource cleanup
408/// - Back pressure handling for overload protection
409/// - Health monitoring and auto-recovery
410/// - Circuit breaker pattern for failing connections
411///
412/// # Performance Features
413/// - Lock-free concurrent data structures
414/// - Connection pooling reduces setup overhead
415/// - Batched status updates
416/// - Efficient metrics collection
417/// - Adaptive connection limits based on system resources
418///
419/// # Connection Pool Management
420/// - Automatic connection reuse
421/// - TTL-based expiration
422/// - Configurable pool size
423/// - Proactive cleanup of expired connections
424/// - Health-based connection scoring
425///
426/// # Health Monitoring
427/// - Periodic health checks
428/// - Connection quality scoring
429/// - Automatic failover
430/// - Circuit breaker for unreliable peers
431/// - Performance-based connection prioritization
432///
433/// # Metrics Tracking
434/// - Queue metrics (size, utilization)
435/// - Latency metrics (average, peak)
436/// - Throughput metrics (messages/second)
437/// - Connection pool statistics
438/// - Health and reliability metrics
439///
440/// # Example
441/// ```rust
442/// let manager = ConnectionManager::new(100); // 100 max connections
443/// manager.connect(peer_id).await?;
444/// let status = manager.get_status(&peer_id).await;
445/// let metrics = manager.get_metrics().await;
446/// ```
447pub struct ConnectionManager {
448    /// Maximum concurrent connections
449    max_connections: usize,
450    /// Active connections with fast concurrent access
451    connections: Arc<DashMap<PeerId, ConnectionInfo>>,
452    /// Connection pool for reuse with TTL tracking
453    connection_pool: Arc<DashMap<PeerId, (ConnectionInfo, Instant)>>,
454    /// Connection pool with enhanced lifecycle management
455    enhanced_pool: Arc<DashMap<PeerId, PooledConnection>>,
456    /// Connection multiplexer for stream management
457    multiplexer: Arc<ConnectionMultiplexer>,
458    /// Retry manager for exponential backoff
459    retry_manager: Arc<RetryManager>,
460    /// Load balancer for connection distribution
461    load_balancer: Arc<LoadBalancer>,
462    /// Health monitor for connection quality
463    health_monitor: Arc<HealthMonitor>,
464    /// Connection warming manager
465    warming_manager: Arc<WarmingManager>,
466    /// Idle connection timeout
467    pool_timeout: std::time::Duration,
468    /// Network performance metrics with detailed stats
469    metrics: Arc<ParkingRwLock<NetworkMetrics>>,
470    /// Queue metrics
471    queue_metrics: Arc<ParkingRwLock<QueueMetrics>>,
472    /// Latency metrics
473    latency_metrics: Arc<ParkingRwLock<LatencyMetrics>>,
474    /// Throughput metrics
475    throughput_metrics: Arc<ParkingRwLock<ThroughputMetrics>>,
476    /// Connection health tracker
477    #[allow(dead_code)]
478    health_tracker: Arc<RwLock<ConnectionHealthTracker>>,
479    /// Circuit breaker for failing connections
480    circuit_breakers: Arc<DashMap<PeerId, CircuitBreaker>>,
481    /// Connection quality scores
482    quality_scores: Arc<DashMap<PeerId, f64>>,
483    /// Connection pool maintenance task handle
484    #[allow(dead_code)]
485    maintenance_handle: Option<tokio::task::JoinHandle<()>>,
486    /// Global connection limits
487    connection_limits: ConnectionLimits,
488    /// Performance monitoring interval
489    #[allow(dead_code)]
490    monitoring_interval: Duration,
491}
492
493/// Extended connection information with health and performance metrics
494#[derive(Debug, Clone)]
495pub struct ConnectionInfo {
496    /// Connection status
497    pub status: ConnectionStatus,
498    /// Connection established timestamp
499    pub established_at: Instant,
500    /// Last activity timestamp
501    pub last_activity: Instant,
502    /// Number of successful operations
503    pub success_count: u64,
504    /// Number of failed operations
505    pub failure_count: u64,
506    /// Average response time
507    pub avg_response_time: Duration,
508    /// Connection quality score (0.0 to 1.0)
509    pub quality_score: f64,
510    /// Bandwidth utilization
511    pub bandwidth_usage: u64,
512    /// Connection metadata
513    pub metadata: HashMap<String, String>,
514}
515
516impl ConnectionInfo {
517    /// Create new connection info
518    pub fn new(status: ConnectionStatus) -> Self {
519        Self {
520            status,
521            established_at: Instant::now(),
522            last_activity: Instant::now(),
523            success_count: 0,
524            failure_count: 0,
525            avg_response_time: Duration::from_millis(0),
526            quality_score: 1.0,
527            bandwidth_usage: 0,
528            metadata: HashMap::new(),
529        }
530    }
531
532    /// Update connection activity and performance metrics
533    pub fn update_activity(
534        &mut self,
535        success: bool,
536        response_time: Duration,
537        bytes_transferred: u64,
538    ) {
539        self.last_activity = Instant::now();
540        self.bandwidth_usage += bytes_transferred;
541
542        if success {
543            self.success_count += 1;
544        } else {
545            self.failure_count += 1;
546        }
547
548        // Update average response time (exponential moving average)
549        let alpha = 0.1; // Smoothing factor
550        let current_ms = self.avg_response_time.as_millis() as f64;
551        let new_ms = response_time.as_millis() as f64;
552        let updated_ms = alpha * new_ms + (1.0 - alpha) * current_ms;
553        self.avg_response_time = Duration::from_millis(updated_ms as u64);
554
555        // Update quality score based on success rate and response time
556        self.update_quality_score();
557    }
558
559    /// Update connection quality score
560    fn update_quality_score(&mut self) {
561        let total_ops = self.success_count + self.failure_count;
562        if total_ops == 0 {
563            self.quality_score = 1.0;
564            return;
565        }
566
567        // Base score on success rate
568        let success_rate = self.success_count as f64 / total_ops as f64;
569
570        // Penalty for high response times (above 100ms)
571        let response_penalty = if self.avg_response_time.as_millis() > 100 {
572            0.2 * (self.avg_response_time.as_millis() as f64 / 1000.0)
573        } else {
574            0.0
575        };
576
577        self.quality_score = (success_rate - response_penalty).clamp(0.0, 1.0);
578    }
579
580    /// Check if connection is healthy
581    pub fn is_healthy(&self) -> bool {
582        self.quality_score > 0.5 && self.last_activity.elapsed() < Duration::from_secs(300)
583        // 5 minutes
584    }
585}
586
587/// Connection health tracking for monitoring and recovery
588#[derive(Debug)]
589#[allow(dead_code)]
590pub struct ConnectionHealthTracker {
591    /// Health check interval
592    check_interval: Duration,
593    /// Last health check timestamp
594    last_check: Option<Instant>,
595    /// Unhealthy connections to monitor
596    unhealthy_connections: HashMap<PeerId, UnhealthyConnectionInfo>,
597    /// Health check statistics
598    health_stats: HealthStatistics,
599}
600
601/// Information about unhealthy connections
602#[derive(Debug, Clone)]
603#[allow(dead_code)]
604pub struct UnhealthyConnectionInfo {
605    /// When the connection became unhealthy
606    unhealthy_since: Instant,
607    /// Number of recovery attempts
608    recovery_attempts: u32,
609    /// Last recovery attempt timestamp
610    last_recovery_attempt: Option<Instant>,
611    /// Reason for being unhealthy
612    reason: String,
613}
614
615/// Health statistics
616#[derive(Debug, Clone, Default)]
617pub struct HealthStatistics {
618    /// Total health checks performed
619    pub total_checks: u64,
620    /// Number of healthy connections found
621    pub healthy_count: u64,
622    /// Number of unhealthy connections found
623    pub unhealthy_count: u64,
624    /// Number of successful recoveries
625    pub successful_recoveries: u64,
626    /// Average recovery time
627    pub avg_recovery_time: Duration,
628}
629
630impl Default for ConnectionHealthTracker {
631    fn default() -> Self {
632        Self {
633            check_interval: Duration::from_secs(30),
634            last_check: None,
635            unhealthy_connections: HashMap::new(),
636            health_stats: HealthStatistics::default(),
637        }
638    }
639}
640
641/// Circuit breaker for managing failing connections
642#[derive(Debug, Clone)]
643pub struct CircuitBreaker {
644    /// Current state of the circuit breaker
645    state: CircuitBreakerState,
646    /// Failure threshold to open circuit
647    failure_threshold: u32,
648    /// Current failure count
649    failure_count: u32,
650    /// Time when circuit was opened
651    opened_at: Option<Instant>,
652    /// Timeout before attempting to close circuit
653    timeout: Duration,
654    /// Success threshold to close circuit
655    success_threshold: u32,
656    /// Current success count in half-open state
657    success_count: u32,
658}
659
660/// Circuit breaker states
661#[derive(Debug, Clone, PartialEq)]
662pub enum CircuitBreakerState {
663    /// Circuit is closed, allowing requests
664    Closed,
665    /// Circuit is open, blocking requests
666    Open,
667    /// Circuit is half-open, testing if service is recovered
668    HalfOpen,
669}
670
671impl Default for CircuitBreaker {
672    fn default() -> Self {
673        Self {
674            state: CircuitBreakerState::Closed,
675            failure_threshold: 5,
676            failure_count: 0,
677            opened_at: None,
678            timeout: Duration::from_secs(60),
679            success_threshold: 3,
680            success_count: 0,
681        }
682    }
683}
684
685impl CircuitBreaker {
686    /// Check if requests should be allowed through
687    pub fn allow_request(&mut self) -> bool {
688        match self.state {
689            CircuitBreakerState::Closed => true,
690            CircuitBreakerState::Open => {
691                if let Some(opened_at) = self.opened_at {
692                    if opened_at.elapsed() >= self.timeout {
693                        self.state = CircuitBreakerState::HalfOpen;
694                        self.success_count = 0;
695                        true
696                    } else {
697                        false
698                    }
699                } else {
700                    false
701                }
702            }
703            CircuitBreakerState::HalfOpen => true,
704        }
705    }
706
707    /// Record the result of an operation
708    pub fn record_result(&mut self, success: bool) {
709        match self.state {
710            CircuitBreakerState::Closed => {
711                if success {
712                    self.failure_count = 0;
713                } else {
714                    self.failure_count += 1;
715                    if self.failure_count >= self.failure_threshold {
716                        self.state = CircuitBreakerState::Open;
717                        self.opened_at = Some(Instant::now());
718                    }
719                }
720            }
721            CircuitBreakerState::HalfOpen => {
722                if success {
723                    self.success_count += 1;
724                    if self.success_count >= self.success_threshold {
725                        self.state = CircuitBreakerState::Closed;
726                        self.failure_count = 0;
727                    }
728                } else {
729                    self.state = CircuitBreakerState::Open;
730                    self.opened_at = Some(Instant::now());
731                    self.failure_count += 1;
732                }
733            }
734            CircuitBreakerState::Open => {
735                // Should not reach here if allow_request is used properly
736            }
737        }
738    }
739}
740
741/// Enhanced connection pool entry with lifecycle management
742#[derive(Debug, Clone)]
743pub struct PooledConnection {
744    /// Connection information
745    pub info: ConnectionInfo,
746    /// Connection establishment timestamp
747    pub created_at: Instant,
748    /// Last used timestamp
749    pub last_used: Instant,
750    /// Usage count
751    pub usage_count: u64,
752    /// Connection weight for load balancing
753    pub weight: f64,
754    /// Maximum concurrent streams
755    pub max_streams: u32,
756    /// Current active streams
757    pub active_streams: u32,
758    /// Connection warming state
759    pub warming_state: WarmingState,
760    /// Connection affinity group
761    pub affinity_group: Option<String>,
762}
763
764/// Connection warming state
765#[derive(Debug, Clone, PartialEq)]
766pub enum WarmingState {
767    /// Connection is cold (not warmed)
768    Cold,
769    /// Connection is warming up
770    Warming,
771    /// Connection is warm and ready
772    Warm,
773    /// Connection warming failed
774    FailedToWarm(String),
775}
776
777/// Connection multiplexer for stream management
778#[derive(Debug)]
779pub struct ConnectionMultiplexer {
780    /// Active multiplexed connections
781    connections: Arc<DashMap<PeerId, MultiplexedConnection>>,
782    /// Stream routing table
783    stream_routes: Arc<DashMap<StreamId, PeerId>>,
784    /// Stream priority queue
785    priority_queue: Arc<TokioRwLock<BTreeMap<Priority, VecDeque<StreamId>>>>,
786    /// Maximum concurrent streams per connection
787    max_streams_per_connection: u32,
788    /// Stream timeout configuration
789    #[allow(dead_code)]
790    stream_timeout: Duration,
791}
792
793/// Stream identifier
794#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, PartialOrd, Ord)]
795pub struct StreamId(pub u64);
796
797/// Stream priority
798#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord)]
799pub enum Priority {
800    Critical = 0,
801    High = 1,
802    Normal = 2,
803    Low = 3,
804}
805
806/// Multiplexed connection wrapper
807#[derive(Debug)]
808pub struct MultiplexedConnection {
809    /// Base connection info
810    pub info: ConnectionInfo,
811    /// Active streams
812    pub streams: HashMap<StreamId, StreamInfo>,
813    /// Next stream ID
814    pub next_stream_id: u64,
815    /// Connection utilization
816    pub utilization: f64,
817    /// Stream semaphore for flow control
818    pub stream_semaphore: Arc<Semaphore>,
819}
820
821/// Stream information
822#[derive(Debug, Clone)]
823pub struct StreamInfo {
824    /// Stream identifier
825    pub id: StreamId,
826    /// Stream priority
827    pub priority: Priority,
828    /// Stream state
829    pub state: StreamState,
830    /// Created timestamp
831    pub created_at: Instant,
832    /// Last activity timestamp
833    pub last_activity: Instant,
834    /// Bytes sent/received
835    pub bytes_transferred: u64,
836}
837
838/// Stream state
839#[derive(Debug, Clone, PartialEq)]
840pub enum StreamState {
841    /// Stream is opening
842    Opening,
843    /// Stream is active
844    Active,
845    /// Stream is half-closed (local)
846    HalfClosedLocal,
847    /// Stream is half-closed (remote)
848    HalfClosedRemote,
849    /// Stream is closed
850    Closed,
851    /// Stream encountered an error
852    Error(String),
853}
854
855/// Retry manager with exponential backoff and jitter
856#[derive(Debug)]
857pub struct RetryManager {
858    /// Retry configurations per peer
859    retry_configs: Arc<DashMap<PeerId, RetryConfig>>,
860    /// Default retry configuration
861    default_config: RetryConfig,
862    /// Retry statistics
863    stats: Arc<TokioRwLock<RetryStats>>,
864}
865
866/// Retry configuration
867#[derive(Debug, Clone)]
868pub struct RetryConfig {
869    /// Maximum number of retries
870    pub max_retries: u32,
871    /// Initial backoff duration
872    pub initial_backoff: Duration,
873    /// Maximum backoff duration
874    pub max_backoff: Duration,
875    /// Backoff multiplier
876    pub backoff_multiplier: f64,
877    /// Jitter factor (0.0 to 1.0)
878    pub jitter_factor: f64,
879    /// Timeout for each retry attempt
880    pub timeout: Duration,
881}
882
883/// Retry statistics
884#[derive(Debug, Clone, Default)]
885pub struct RetryStats {
886    /// Total retry attempts
887    pub total_attempts: u64,
888    /// Successful retries
889    pub successful_retries: u64,
890    /// Failed retries
891    pub failed_retries: u64,
892    /// Average retry duration
893    pub avg_retry_duration: Duration,
894}
895
896/// Load balancer for connection distribution
897#[derive(Debug)]
898pub struct LoadBalancer {
899    /// Load balancing strategy
900    strategy: LoadBalancingStrategy,
901    /// Connection weights
902    weights: Arc<DashMap<PeerId, f64>>,
903    /// Round-robin counter
904    round_robin_counter: AtomicU64,
905    /// Load balancing statistics
906    stats: Arc<TokioRwLock<LoadBalancingStats>>,
907}
908
909/// Load balancing strategy
910#[derive(Debug, Clone)]
911pub enum LoadBalancingStrategy {
912    /// Round-robin distribution
913    RoundRobin,
914    /// Least connections
915    LeastConnections,
916    /// Weighted round-robin
917    WeightedRoundRobin,
918    /// Response time based
919    ResponseTime,
920    /// Resource utilization based
921    ResourceUtilization,
922}
923
924/// Load balancing statistics
925#[derive(Debug, Clone, Default)]
926pub struct LoadBalancingStats {
927    /// Total requests distributed
928    pub total_requests: u64,
929    /// Distribution by peer
930    pub peer_distribution: HashMap<PeerId, u64>,
931    /// Average response times
932    pub avg_response_times: HashMap<PeerId, Duration>,
933}
934
935/// Health monitor for connection quality
936#[derive(Debug)]
937pub struct HealthMonitor {
938    /// Health check configuration
939    config: HealthCheckConfig,
940    /// Health check results
941    results: Arc<DashMap<PeerId, HealthCheckResult>>,
942    /// Health check scheduler
943    scheduler: Arc<TokioRwLock<HealthCheckScheduler>>,
944    /// Health statistics
945    stats: Arc<TokioRwLock<HealthStats>>,
946}
947
948/// Health check configuration
949#[derive(Debug, Clone)]
950pub struct HealthCheckConfig {
951    /// Health check interval
952    pub interval: Duration,
953    /// Health check timeout
954    pub timeout: Duration,
955    /// Failure threshold
956    pub failure_threshold: u32,
957    /// Recovery threshold
958    pub recovery_threshold: u32,
959    /// Health check type
960    pub check_type: HealthCheckType,
961}
962
963/// Health check type
964#[derive(Debug, Clone)]
965pub enum HealthCheckType {
966    /// Ping-based health check
967    Ping,
968    /// Application-level health check
969    Application,
970    /// Custom health check with function pointer
971    Custom,
972}
973
974/// Health check result
975#[derive(Debug, Clone)]
976pub struct HealthCheckResult {
977    /// Check timestamp
978    pub timestamp: Instant,
979    /// Check success
980    pub success: bool,
981    /// Response time
982    pub response_time: Duration,
983    /// Error message if failed
984    pub error: Option<String>,
985    /// Health score (0.0 to 1.0)
986    pub health_score: f64,
987}
988
989/// Health check scheduler
990#[derive(Debug)]
991pub struct HealthCheckScheduler {
992    /// Scheduled checks
993    scheduled_checks: HashMap<PeerId, Instant>,
994    /// Check intervals per peer
995    check_intervals: HashMap<PeerId, Duration>,
996}
997
998/// Health statistics
999#[derive(Debug, Clone, Default)]
1000pub struct HealthStats {
1001    /// Total health checks performed
1002    pub total_checks: u64,
1003    /// Successful health checks
1004    pub successful_checks: u64,
1005    /// Failed health checks
1006    pub failed_checks: u64,
1007    /// Average health check response time
1008    pub avg_response_time: Duration,
1009}
1010
1011/// Connection warming manager
1012#[derive(Debug)]
1013pub struct WarmingManager {
1014    /// Warming configuration
1015    config: WarmingConfig,
1016    /// Warming state per peer
1017    warming_states: Arc<DashMap<PeerId, WarmingState>>,
1018    /// Warming tasks
1019    #[allow(dead_code)]
1020    warming_tasks: Arc<DashMap<PeerId, tokio::task::JoinHandle<()>>>,
1021    /// Warming statistics
1022    stats: Arc<TokioRwLock<WarmingStats>>,
1023}
1024
1025/// Connection warming configuration
1026#[derive(Debug, Clone)]
1027pub struct WarmingConfig {
1028    /// Enable connection warming
1029    pub enabled: bool,
1030    /// Minimum pool size to maintain
1031    pub min_pool_size: usize,
1032    /// Warming timeout
1033    pub warming_timeout: Duration,
1034    /// Warming retry attempts
1035    pub warming_retries: u32,
1036    /// Predictive warming threshold
1037    pub predictive_threshold: f64,
1038}
1039
1040/// Connection warming statistics
1041#[derive(Debug, Clone, Default)]
1042pub struct WarmingStats {
1043    /// Total warming attempts
1044    pub total_attempts: u64,
1045    /// Successful warming operations
1046    pub successful_warmings: u64,
1047    /// Failed warming operations
1048    pub failed_warmings: u64,
1049    /// Average warming time
1050    pub avg_warming_time: Duration,
1051}
1052
1053/// Connection limits configuration
1054#[derive(Debug, Clone)]
1055pub struct ConnectionLimits {
1056    /// Maximum total connections
1057    pub max_total: usize,
1058    /// Maximum connections per peer
1059    pub max_per_peer: usize,
1060    /// Maximum idle connections
1061    pub max_idle: usize,
1062    /// Connection timeout
1063    pub connection_timeout: Duration,
1064    /// Idle timeout
1065    pub idle_timeout: Duration,
1066}
1067
1068impl Default for ConnectionLimits {
1069    fn default() -> Self {
1070        Self {
1071            max_total: 1000,
1072            max_per_peer: 10,
1073            max_idle: 100,
1074            connection_timeout: Duration::from_secs(30),
1075            idle_timeout: Duration::from_secs(300),
1076        }
1077    }
1078}
1079
1080/// Trait for connection health checks
1081#[async_trait]
1082pub trait HealthCheck: Send + Sync {
1083    /// Perform health check on connection
1084    async fn check(&self, peer_id: &PeerId, connection: &ConnectionInfo) -> HealthCheckResult;
1085}
1086
1087/// Default ping-based health check implementation
1088#[derive(Debug)]
1089pub struct PingHealthCheck {
1090    #[allow(dead_code)]
1091    timeout: Duration,
1092}
1093
1094impl Default for PingHealthCheck {
1095    fn default() -> Self {
1096        Self {
1097            timeout: Duration::from_secs(5),
1098        }
1099    }
1100}
1101
1102#[async_trait]
1103impl HealthCheck for PingHealthCheck {
1104    async fn check(&self, _peer_id: &PeerId, connection: &ConnectionInfo) -> HealthCheckResult {
1105        let start = Instant::now();
1106
1107        // Simulate ping check (in real implementation, this would send actual ping)
1108        let success = connection.is_healthy() && rand::random::<f64>() > 0.1;
1109        let response_time = start.elapsed();
1110
1111        HealthCheckResult {
1112            timestamp: Instant::now(),
1113            success,
1114            response_time,
1115            error: if success {
1116                None
1117            } else {
1118                Some("Ping timeout".to_string())
1119            },
1120            health_score: if success { 1.0 } else { 0.0 },
1121        }
1122    }
1123}
1124
1125use std::collections::{BTreeMap, HashMap, VecDeque};
1126use tokio::sync::RwLock;
1127
1128impl ConnectionManager {
1129    /// Recovers from connection failures by attempting reconnection
1130    pub async fn recover_connection(&self, peer_id: &PeerId) -> Result<(), NetworkError> {
1131        debug!("Attempting to recover connection for peer {:?}", peer_id);
1132
1133        // Remove failed connection
1134        self.connections.remove(peer_id);
1135
1136        // Clear from pool if exists
1137        self.connection_pool.remove(peer_id);
1138
1139        // Attempt reconnection with exponential backoff
1140        let mut retry_count = 0;
1141        let max_retries = 5;
1142
1143        while retry_count < max_retries {
1144            match self.connect(*peer_id).await {
1145                Ok(()) => {
1146                    info!("Successfully recovered connection for peer {:?}", peer_id);
1147                    return Ok(());
1148                }
1149                Err(e) => {
1150                    retry_count += 1;
1151                    let backoff_ms = 100u64 * (1 << retry_count); // Exponential backoff
1152                    warn!(
1153                        "Connection recovery attempt {} failed for peer {:?}: {}, retrying in {}ms",
1154                        retry_count, peer_id, e, backoff_ms
1155                    );
1156
1157                    if retry_count >= max_retries {
1158                        error!(
1159                            "Failed to recover connection for peer {:?} after {} attempts",
1160                            peer_id, max_retries
1161                        );
1162                        return Err(NetworkError::ConnectionError(format!(
1163                            "Recovery failed after {} attempts",
1164                            max_retries
1165                        )));
1166                    }
1167
1168                    tokio::time::sleep(std::time::Duration::from_millis(backoff_ms)).await;
1169                }
1170            }
1171        }
1172
1173        Err(NetworkError::ConnectionError("Max retries exceeded".into()))
1174    }
1175
1176    /// Performs health check on all active connections
1177    pub async fn health_check(&self) -> Result<Vec<PeerId>, NetworkError> {
1178        let mut unhealthy_peers = Vec::new();
1179
1180        for entry in self.connections.iter() {
1181            let peer_id = *entry.key();
1182            let conn_info = entry.value();
1183
1184            match &conn_info.status {
1185                ConnectionStatus::Failed(_) => {
1186                    unhealthy_peers.push(peer_id);
1187                    warn!("Detected failed connection for peer {:?}", peer_id);
1188                }
1189                ConnectionStatus::Disconnected => {
1190                    unhealthy_peers.push(peer_id);
1191                    debug!("Detected disconnected peer {:?}", peer_id);
1192                }
1193                _ => {
1194                    // Check if connection is healthy based on activity and quality
1195                    if !conn_info.is_healthy() {
1196                        unhealthy_peers.push(peer_id);
1197                        debug!(
1198                            "Detected unhealthy connection for peer {:?} (quality: {:.2})",
1199                            peer_id, conn_info.quality_score
1200                        );
1201                    }
1202                }
1203            }
1204        }
1205
1206        if !unhealthy_peers.is_empty() {
1207            info!(
1208                "Health check found {} unhealthy connections",
1209                unhealthy_peers.len()
1210            );
1211        }
1212
1213        Ok(unhealthy_peers)
1214    }
1215
1216    /// Automatically recovers unhealthy connections
1217    pub async fn auto_recover(&self) -> Result<usize, NetworkError> {
1218        let unhealthy_peers = self.health_check().await?;
1219        let total_unhealthy = unhealthy_peers.len();
1220        let mut recovered_count = 0;
1221
1222        for peer_id in unhealthy_peers {
1223            match self.recover_connection(&peer_id).await {
1224                Ok(()) => {
1225                    recovered_count += 1;
1226                    debug!("Auto-recovered connection for peer {:?}", peer_id);
1227                }
1228                Err(e) => {
1229                    warn!(
1230                        "Failed to auto-recover connection for peer {:?}: {}",
1231                        peer_id, e
1232                    );
1233                }
1234            }
1235        }
1236
1237        if recovered_count > 0 {
1238            info!(
1239                "Auto-recovery completed: {}/{} connections recovered",
1240                recovered_count, total_unhealthy
1241            );
1242        }
1243
1244        Ok(recovered_count)
1245    }
1246    /// Creates a new connection manager with default pool timeout (5 minutes).
1247    ///
1248    /// The manager initializes with optimized default settings:
1249    /// - 5 minute connection pool TTL
1250    /// - Lock-free concurrent connection tracking
1251    /// - Comprehensive metrics collection
1252    /// - Health monitoring and circuit breakers
1253    ///
1254    /// # Arguments
1255    /// * `max_connections` - Maximum number of concurrent connections to maintain
1256    ///
1257    /// # Performance Considerations
1258    /// - Choose max_connections based on system resources
1259    /// - Connection pooling reduces setup overhead
1260    /// - Metrics collection has minimal overhead
1261    /// - Health monitoring provides proactive issue detection
1262    pub fn new(max_connections: usize) -> Self {
1263        Self::with_pool_timeout(max_connections, std::time::Duration::from_secs(300))
1264    }
1265
1266    /// Creates a new connection manager with enhanced features and custom pool timeout.
1267    ///
1268    /// Allows fine-tuning of connection pooling behavior:
1269    /// - Custom TTL for pooled connections
1270    /// - Connection reuse optimization
1271    /// - Resource usage control
1272    /// - Enhanced health monitoring
1273    ///
1274    /// # Arguments
1275    /// * `max_connections` - Maximum number of concurrent connections
1276    /// * `pool_timeout` - Time-to-live for pooled connections
1277    ///
1278    /// # Connection Pool Behavior
1279    /// - Connections are cached until timeout
1280    /// - Expired connections automatically cleaned up
1281    /// - Pool size limited by max_connections
1282    /// - Health-based connection scoring and prioritization
1283    pub fn with_pool_timeout(max_connections: usize, pool_timeout: std::time::Duration) -> Self {
1284        let connection_limits = ConnectionLimits {
1285            max_total: max_connections,
1286            ..Default::default()
1287        };
1288
1289        Self {
1290            max_connections,
1291            connections: Arc::new(DashMap::new()),
1292            connection_pool: Arc::new(DashMap::new()),
1293            enhanced_pool: Arc::new(DashMap::new()),
1294            multiplexer: Arc::new(ConnectionMultiplexer::new(32, Duration::from_secs(30))),
1295            retry_manager: Arc::new(RetryManager::new()),
1296            load_balancer: Arc::new(LoadBalancer::new(LoadBalancingStrategy::WeightedRoundRobin)),
1297            health_monitor: Arc::new(HealthMonitor::new(HealthCheckConfig::default())),
1298            warming_manager: Arc::new(WarmingManager::new(WarmingConfig::default())),
1299            pool_timeout,
1300            metrics: Arc::new(ParkingRwLock::new(NetworkMetrics::default())),
1301            queue_metrics: Arc::new(ParkingRwLock::new(QueueMetrics::default())),
1302            latency_metrics: Arc::new(ParkingRwLock::new(LatencyMetrics::default())),
1303            throughput_metrics: Arc::new(ParkingRwLock::new(ThroughputMetrics::default())),
1304            health_tracker: Arc::new(RwLock::new(ConnectionHealthTracker::default())),
1305            circuit_breakers: Arc::new(DashMap::new()),
1306            quality_scores: Arc::new(DashMap::new()),
1307            maintenance_handle: None,
1308            connection_limits,
1309            monitoring_interval: Duration::from_secs(30),
1310        }
1311    }
1312
1313    /// Initiates a connection to a peer with automatic pooling and reuse.
1314    ///
1315    /// Enhanced connection establishment process:
1316    /// 1. Check circuit breaker status
1317    /// 2. Check pool for existing healthy connection
1318    /// 3. Reuse if valid connection exists
1319    /// 4. Create new connection if needed
1320    /// 5. Apply connection limits and health checks
1321    /// 6. Initialize health monitoring
1322    ///
1323    /// # Arguments
1324    /// * `peer_id` - ID of the peer to connect to
1325    ///
1326    /// # Connection Pooling
1327    /// - Reuses healthy connections when possible
1328    /// - Validates connection freshness and quality
1329    /// - Removes expired or unhealthy connections
1330    /// - Updates usage metrics and health scores
1331    ///
1332    /// # Circuit Breaker Protection
1333    /// - Prevents connections to repeatedly failing peers
1334    /// - Implements exponential backoff
1335    /// - Automatic recovery testing
1336    ///
1337    /// # Returns
1338    /// * `Ok(())` - Connection established or reused
1339    /// * `Err(_)` - Connection failed or circuit breaker open
1340    pub async fn connect(&self, peer_id: PeerId) -> Result<(), NetworkError> {
1341        // Check circuit breaker first
1342        if let Some(mut circuit_breaker) = self.circuit_breakers.get_mut(&peer_id) {
1343            if !circuit_breaker.allow_request() {
1344                return Err(NetworkError::ConnectionError(
1345                    "Circuit breaker is open for this peer".into(),
1346                ));
1347            }
1348        }
1349
1350        // Check if connection exists in the pool
1351        if let Some(entry) = self.connection_pool.get(&peer_id) {
1352            let (conn_info, last_used) = entry.value();
1353            if last_used.elapsed() < self.pool_timeout && conn_info.is_healthy() {
1354                // Connection is still valid and healthy, reuse it
1355                self.connections.insert(peer_id, conn_info.clone());
1356                debug!("Reusing pooled healthy connection for peer {:?}", peer_id);
1357
1358                // Record successful circuit breaker operation
1359                if let Some(mut circuit_breaker) = self.circuit_breakers.get_mut(&peer_id) {
1360                    circuit_breaker.record_result(true);
1361                }
1362
1363                return Ok(());
1364            } else {
1365                // Connection expired or unhealthy, remove from pool
1366                self.connection_pool.remove(&peer_id);
1367                debug!(
1368                    "Removing expired/unhealthy connection for peer {:?}",
1369                    peer_id
1370                );
1371            }
1372        }
1373
1374        // Check connection limit
1375        if self.connections.len() >= self.max_connections {
1376            warn!("Max connections reached");
1377            return Err(NetworkError::ConnectionError(
1378                "Max connections reached".into(),
1379            ));
1380        }
1381
1382        // Create new connection with enhanced monitoring
1383        let connecting_info = ConnectionInfo::new(ConnectionStatus::Connecting);
1384        self.connections.insert(peer_id, connecting_info);
1385        debug!("Creating new connection for peer {:?}", peer_id);
1386
1387        // Simulate connection establishment (in real implementation, this would be actual network code)
1388        let start_time = Instant::now();
1389        tokio::time::sleep(std::time::Duration::from_millis(10)).await;
1390        let connection_time = start_time.elapsed();
1391
1392        // Simulate connection success/failure (90% success rate)
1393        let success = rand::random::<f64>() > 0.1;
1394
1395        if success {
1396            // Update to connected status on success
1397            let mut connected_info = ConnectionInfo::new(ConnectionStatus::Connected);
1398            connected_info.update_activity(true, connection_time, 0);
1399
1400            self.connections.insert(peer_id, connected_info.clone());
1401            self.quality_scores
1402                .insert(peer_id, connected_info.quality_score);
1403
1404            // Record successful circuit breaker operation
1405            self.circuit_breakers
1406                .entry(peer_id)
1407                .or_insert_with(CircuitBreaker::default)
1408                .record_result(true);
1409
1410            debug!(
1411                "Successfully connected to peer {:?} in {:?}",
1412                peer_id, connection_time
1413            );
1414        } else {
1415            // Handle connection failure
1416            let failed_info =
1417                ConnectionInfo::new(ConnectionStatus::Failed("Connection timeout".into()));
1418            self.connections.insert(peer_id, failed_info);
1419
1420            // Record failed circuit breaker operation
1421            self.circuit_breakers
1422                .entry(peer_id)
1423                .or_insert_with(CircuitBreaker::default)
1424                .record_result(false);
1425
1426            return Err(NetworkError::ConnectionError(
1427                "Failed to establish connection".into(),
1428            ));
1429        }
1430
1431        Ok(())
1432    }
1433
1434    /// Updates connection status for a peer with lock-free atomic guarantees.
1435    ///
1436    /// Enhanced status update process:
1437    /// 1. Update connection info with new status
1438    /// 2. Update health and quality metrics
1439    /// 3. Atomic metrics update
1440    /// 4. Circuit breaker state management
1441    /// 5. Event logging and monitoring
1442    ///
1443    /// # Arguments
1444    /// * `peer_id` - ID of the peer to update
1445    /// * `status` - New connection status
1446    /// * `response_time` - Optional response time for quality calculation
1447    /// * `bytes_transferred` - Optional bytes transferred for bandwidth tracking
1448    ///
1449    /// # Thread Safety
1450    /// - Status updates are lock-free and atomic
1451    /// - Metrics updates use parking_lot for better performance
1452    /// - Safe for concurrent access with minimal contention
1453    ///
1454    /// # Health Tracking
1455    /// Updates connection health scores, quality metrics, and circuit breaker
1456    /// states to ensure optimal connection management.
1457    pub fn update_status(&self, peer_id: PeerId, status: ConnectionStatus) {
1458        self.update_status_with_metrics(peer_id, status, None, 0);
1459    }
1460
1461    /// Updates connection status with detailed performance metrics
1462    pub fn update_status_with_metrics(
1463        &self,
1464        peer_id: PeerId,
1465        status: ConnectionStatus,
1466        response_time: Option<Duration>,
1467        bytes_transferred: u64,
1468    ) {
1469        // Update or create connection info
1470        if let Some(mut conn_info) = self.connections.get_mut(&peer_id) {
1471            conn_info.status = status.clone();
1472            if let Some(rt) = response_time {
1473                let success = matches!(status, ConnectionStatus::Connected);
1474                conn_info.update_activity(success, rt, bytes_transferred);
1475
1476                // Update quality score cache
1477                self.quality_scores.insert(peer_id, conn_info.quality_score);
1478
1479                // Update circuit breaker
1480                if let Some(mut circuit_breaker) = self.circuit_breakers.get_mut(&peer_id) {
1481                    circuit_breaker.record_result(success);
1482                }
1483            }
1484        } else {
1485            // Create new connection info
1486            let mut conn_info = ConnectionInfo::new(status);
1487            if let Some(rt) = response_time {
1488                let success = matches!(conn_info.status, ConnectionStatus::Connected);
1489                conn_info.update_activity(success, rt, bytes_transferred);
1490            }
1491            self.connections.insert(peer_id, conn_info);
1492        }
1493
1494        // Update metrics with high-performance lock
1495        let mut metrics = self.metrics.write();
1496        metrics.connections = self.connections.len();
1497
1498        // Count active (healthy) connections
1499        let active_count = self
1500            .connections
1501            .iter()
1502            .filter(|entry| entry.value().is_healthy())
1503            .count();
1504        metrics.active_connections = active_count;
1505    }
1506
1507    /// Disconnects from a peer with enhanced cleanup and health tracking
1508    pub fn disconnect(&self, peer_id: &PeerId) {
1509        if let Some((_, conn_info)) = self.connections.remove(peer_id) {
1510            debug!(
1511                "Disconnected from peer {:?} with status {:?} (quality: {:.2})",
1512                peer_id, conn_info.status, conn_info.quality_score
1513            );
1514
1515            // Move connection to pool if it was healthy (for potential reuse)
1516            if conn_info.is_healthy() {
1517                self.connection_pool
1518                    .insert(*peer_id, (conn_info, Instant::now()));
1519            }
1520        }
1521
1522        // Remove quality score and circuit breaker entries
1523        self.quality_scores.remove(peer_id);
1524
1525        // Keep circuit breaker for future connection attempts
1526        // but reset if it was in a good state
1527        if let Some(circuit_breaker) = self.circuit_breakers.get_mut(peer_id) {
1528            if circuit_breaker.state == CircuitBreakerState::Closed {
1529                // Keep the circuit breaker but don't reset it completely
1530                // This preserves failure history while allowing new attempts
1531            }
1532        }
1533
1534        // Clean expired connections from pool (non-blocking)
1535        self.cleanup_pool();
1536
1537        // Update metrics with high-performance lock
1538        let mut metrics = self.metrics.write();
1539        metrics.connections = self.connections.len();
1540
1541        // Count active (healthy) connections
1542        let active_count = self
1543            .connections
1544            .iter()
1545            .filter(|entry| entry.value().is_healthy())
1546            .count();
1547        metrics.active_connections = active_count;
1548    }
1549
1550    /// Cleanup expired connections from the pool
1551    fn cleanup_pool(&self) {
1552        self.connection_pool
1553            .retain(|_, (_, last_used)| last_used.elapsed() < self.pool_timeout);
1554    }
1555
1556    /// Returns connection count (lock-free)
1557    pub fn connection_count(&self) -> usize {
1558        self.connections.len()
1559    }
1560
1561    /// Returns connection status for a peer (lock-free)
1562    pub fn get_status(&self, peer_id: &PeerId) -> Option<ConnectionStatus> {
1563        self.connections
1564            .get(peer_id)
1565            .map(|entry| entry.value().status.clone())
1566    }
1567
1568    /// Returns detailed connection information for a peer (lock-free)
1569    pub fn get_connection_info(&self, peer_id: &PeerId) -> Option<ConnectionInfo> {
1570        self.connections
1571            .get(peer_id)
1572            .map(|entry| entry.value().clone())
1573    }
1574
1575    /// Get connection quality score for a peer
1576    pub fn get_quality_score(&self, peer_id: &PeerId) -> Option<f64> {
1577        self.quality_scores.get(peer_id).map(|entry| *entry.value())
1578    }
1579
1580    /// Get circuit breaker state for a peer
1581    pub fn get_circuit_breaker_state(&self, peer_id: &PeerId) -> Option<CircuitBreakerState> {
1582        self.circuit_breakers
1583            .get(peer_id)
1584            .map(|entry| entry.value().state.clone())
1585    }
1586
1587    /// Get all healthy connections sorted by quality score
1588    pub fn get_healthy_connections(&self) -> Vec<(PeerId, f64)> {
1589        let mut healthy_peers = Vec::new();
1590
1591        for entry in self.connections.iter() {
1592            let peer_id = *entry.key();
1593            let conn_info = entry.value();
1594
1595            if conn_info.is_healthy() {
1596                healthy_peers.push((peer_id, conn_info.quality_score));
1597            }
1598        }
1599
1600        // Sort by quality score in descending order
1601        healthy_peers.sort_by(|a, b| b.1.partial_cmp(&a.1).unwrap_or(std::cmp::Ordering::Equal));
1602        healthy_peers
1603    }
1604
1605    /// Updates network metrics with optimized locking
1606    pub fn update_metrics(&self, messages_per_second: f64, avg_latency_ms: u64) {
1607        let latency_duration = std::time::Duration::from_millis(avg_latency_ms);
1608
1609        // Update general metrics
1610        {
1611            let mut metrics = self.metrics.write();
1612            metrics.messages_per_second = messages_per_second;
1613            metrics.avg_latency = latency_duration;
1614            metrics.active_connections = self.connections.len();
1615        }
1616
1617        // Update queue metrics
1618        {
1619            let mut queue_metrics = self.queue_metrics.write();
1620            queue_metrics.current_size = self.connection_pool.len();
1621            queue_metrics.max_size = self.max_connections;
1622            queue_metrics.utilization =
1623                queue_metrics.current_size as f64 / queue_metrics.max_size as f64;
1624            queue_metrics.messages_per_second = messages_per_second;
1625        }
1626
1627        // Update latency metrics
1628        {
1629            let mut latency_metrics = self.latency_metrics.write();
1630            latency_metrics.avg_latency = latency_duration;
1631            latency_metrics.peak_latency = latency_metrics.peak_latency.max(latency_duration);
1632        }
1633
1634        // Update throughput metrics
1635        {
1636            let mut throughput_metrics = self.throughput_metrics.write();
1637            throughput_metrics.messages_per_second = messages_per_second;
1638            throughput_metrics.total_messages += 1;
1639            throughput_metrics.avg_throughput =
1640                (throughput_metrics.avg_throughput + messages_per_second) / 2.0;
1641            throughput_metrics.peak_throughput =
1642                throughput_metrics.peak_throughput.max(messages_per_second);
1643        }
1644
1645        debug!(
1646            "Updated network metrics: {} msg/s, {} ms latency",
1647            messages_per_second, avg_latency_ms
1648        );
1649    }
1650
1651    /// Get current queue metrics
1652    pub fn get_queue_metrics(&self) -> QueueMetrics {
1653        self.queue_metrics.read().clone()
1654    }
1655
1656    /// Get current latency metrics
1657    pub fn get_latency_metrics(&self) -> LatencyMetrics {
1658        self.latency_metrics.read().clone()
1659    }
1660
1661    /// Get current throughput metrics
1662    pub fn get_throughput_metrics(&self) -> ThroughputMetrics {
1663        self.throughput_metrics.read().clone()
1664    }
1665
1666    /// Returns current network metrics (optimized)
1667    pub fn get_metrics(&self) -> NetworkMetrics {
1668        self.metrics.read().clone()
1669    }
1670
1671    /// Enhanced API methods for production features
1672    /// Open a multiplexed stream on a connection
1673    pub async fn open_stream(
1674        &self,
1675        peer_id: PeerId,
1676        priority: Priority,
1677    ) -> Result<StreamId, NetworkError> {
1678        // Ensure connection exists and is healthy
1679        if !self.connections.contains_key(&peer_id) {
1680            // Attempt to establish connection first
1681            self.connect(peer_id).await?;
1682        }
1683
1684        // Use multiplexer to open stream
1685        self.multiplexer.open_stream(peer_id, priority).await
1686    }
1687
1688    /// Close a multiplexed stream
1689    pub async fn close_stream(&self, stream_id: StreamId) -> Result<(), NetworkError> {
1690        self.multiplexer.close_stream(stream_id).await
1691    }
1692
1693    /// Send data on a specific stream
1694    pub async fn send_stream_data(
1695        &self,
1696        stream_id: StreamId,
1697        data: Bytes,
1698    ) -> Result<(), NetworkError> {
1699        // Get stream info to validate
1700        let stream_info = self
1701            .multiplexer
1702            .get_stream_info(stream_id)
1703            .ok_or_else(|| NetworkError::ConnectionError("Stream not found".into()))?;
1704
1705        if stream_info.state != StreamState::Active {
1706            return Err(NetworkError::ConnectionError("Stream not active".into()));
1707        }
1708
1709        // In a real implementation, this would send data on the specific stream
1710        // For now, we'll simulate stream-based sending
1711        info!("Sending {} bytes on stream {:?}", data.len(), stream_id);
1712        Ok(())
1713    }
1714
1715    /// Execute connection operation with retry logic
1716    pub async fn retry_connect(&self, peer_id: PeerId) -> Result<(), NetworkError> {
1717        let retry_manager = self.retry_manager.clone();
1718
1719        retry_manager
1720            .retry_operation(peer_id, || async { self.connect(peer_id).await })
1721            .await
1722    }
1723
1724    /// Select best connection using load balancer
1725    pub async fn select_best_connection(&self, available_peers: &[PeerId]) -> Option<PeerId> {
1726        self.load_balancer.select_connection(available_peers).await
1727    }
1728
1729    /// Start health monitoring for a peer
1730    pub async fn start_health_monitoring(&self, peer_id: PeerId) {
1731        self.health_monitor.start_monitoring(peer_id).await;
1732    }
1733
1734    /// Perform health check on a connection
1735    pub async fn check_connection_health(&self, peer_id: PeerId) -> Option<HealthCheckResult> {
1736        if let Some(connection_info) = self.get_connection_info(&peer_id) {
1737            Some(
1738                self.health_monitor
1739                    .check_health(peer_id, &connection_info)
1740                    .await,
1741            )
1742        } else {
1743            None
1744        }
1745    }
1746
1747    /// Warm up connections for a peer
1748    pub async fn warm_connections(&self, peer_id: PeerId) -> Result<(), NetworkError> {
1749        self.warming_manager.warm_connection(peer_id).await
1750    }
1751
1752    /// Get warming state for a peer
1753    pub fn get_warming_state(&self, peer_id: &PeerId) -> WarmingState {
1754        self.warming_manager.get_warming_state(peer_id)
1755    }
1756
1757    /// Get comprehensive connection statistics
1758    pub async fn get_connection_statistics(&self) -> ConnectionStatistics {
1759        let health_stats = self.health_monitor.stats.read().await.clone();
1760        let retry_stats = self.retry_manager.stats.read().await.clone();
1761        let warming_stats = self.warming_manager.stats.read().await.clone();
1762        let load_balancing_stats = self.load_balancer.stats.read().await.clone();
1763
1764        ConnectionStatistics {
1765            total_connections: self.connections.len(),
1766            active_connections: self
1767                .connections
1768                .iter()
1769                .filter(|entry| entry.value().is_healthy())
1770                .count(),
1771            pooled_connections: self.enhanced_pool.len(),
1772            health_stats,
1773            retry_stats,
1774            warming_stats,
1775            load_balancing_stats,
1776        }
1777    }
1778
1779    /// Configure connection limits
1780    pub fn set_connection_limits(&mut self, limits: ConnectionLimits) {
1781        self.max_connections = limits.max_total;
1782        self.connection_limits = limits;
1783    }
1784
1785    /// Get current connection limits
1786    pub fn get_connection_limits(&self) -> &ConnectionLimits {
1787        &self.connection_limits
1788    }
1789}
1790
1791/// Comprehensive connection statistics
1792#[derive(Debug, Clone)]
1793pub struct ConnectionStatistics {
1794    /// Total number of connections
1795    pub total_connections: usize,
1796    /// Number of active (healthy) connections
1797    pub active_connections: usize,
1798    /// Number of pooled connections
1799    pub pooled_connections: usize,
1800    /// Health monitoring statistics
1801    pub health_stats: HealthStats,
1802    /// Retry operation statistics
1803    pub retry_stats: RetryStats,
1804    /// Connection warming statistics
1805    pub warming_stats: WarmingStats,
1806    /// Load balancing statistics
1807    pub load_balancing_stats: LoadBalancingStats,
1808}
1809
1810#[cfg(test)]
1811mod tests {
1812    use super::*;
1813    use std::net::{IpAddr, Ipv4Addr};
1814    use std::time::Instant;
1815    use tokio::time::Duration;
1816
1817    fn setup_test_config() -> SecureConfig {
1818        SecureConfig {
1819            transport_keys: TransportKeys::generate(),
1820            timeout: std::time::Duration::from_secs(5),
1821            keepalive: std::time::Duration::from_secs(10),
1822        }
1823    }
1824
1825    #[tokio::test]
1826    async fn test_secure_connection() {
1827        let test_config = setup_test_config();
1828        let test_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), 8000);
1829
1830        // Set up QUIC endpoint
1831        let server_config = ServerConfig::default();
1832        let endpoint = Endpoint::server(server_config, "127.0.0.1:0".parse().unwrap())
1833            .unwrap()
1834            .0;
1835
1836        // Create secure connection
1837        let mut connection = SecureConnection::new(&endpoint, test_addr, test_config)
1838            .await
1839            .expect("Failed to create secure connection");
1840
1841        // Test sending encrypted message
1842        let test_data = Bytes::from(b"test message".to_vec());
1843        connection
1844            .send(test_data)
1845            .await
1846            .expect("Failed to send message");
1847    }
1848
1849    #[tokio::test]
1850    async fn test_connection_management() {
1851        let manager = ConnectionManager::new(2);
1852        let peer1 = PeerId::random();
1853        let peer2 = PeerId::random();
1854        let peer3 = PeerId::random();
1855
1856        // Test connection limit
1857        assert!(manager.connect(peer1).await.is_ok());
1858        assert!(manager.connect(peer2).await.is_ok());
1859        assert!(manager.connect(peer3).await.is_ok()); // Should be ignored due to limit
1860
1861        assert_eq!(manager.connection_count(), 2);
1862
1863        // Test status updates
1864        manager.update_status(peer1, ConnectionStatus::Connected);
1865        assert_eq!(
1866            manager.get_status(&peer1),
1867            Some(ConnectionStatus::Connected)
1868        );
1869
1870        // Test disconnection
1871        manager.disconnect(&peer1);
1872        assert_eq!(manager.get_status(&peer1), None);
1873        assert_eq!(manager.connection_count(), 1);
1874
1875        // Test metrics
1876        manager.update_metrics(1000.0, 50);
1877        let metrics = manager.get_metrics();
1878        assert_eq!(metrics.messages_per_second, 1000.0);
1879        assert_eq!(metrics.connections, 1);
1880    }
1881
1882    #[tokio::test]
1883    async fn bench_route_computation() {
1884        let manager = ConnectionManager::new(100);
1885        let _rng = rand::thread_rng();
1886        let mut latencies = Vec::new();
1887
1888        for _ in 0..1000 {
1889            let peer = PeerId::random();
1890            let start = Instant::now();
1891            manager.connect(peer).await.unwrap();
1892            latencies.push(start.elapsed());
1893        }
1894
1895        let avg_latency = latencies.iter().sum::<Duration>() / latencies.len() as u32;
1896        let max_latency = latencies.iter().max().unwrap();
1897
1898        println!("Route Computation Benchmark:");
1899        println!("Average latency: {:?}", avg_latency);
1900        println!("Maximum latency: {:?}", max_latency);
1901        println!("Total routes: {}", manager.connection_count());
1902    }
1903
1904    #[tokio::test]
1905    async fn bench_cache_efficiency() {
1906        let manager = ConnectionManager::new(1000);
1907        let mut hit_count = 0;
1908        let iterations = 10000;
1909
1910        for _ in 0..iterations {
1911            let peer = PeerId::random();
1912            let _start = Instant::now();
1913
1914            // Try to get from pool first
1915            if let Some(_) = manager.connection_pool.get(&peer) {
1916                hit_count += 1;
1917            } else {
1918                manager.connect(peer).await.unwrap();
1919            }
1920        }
1921
1922        let hit_rate = (hit_count as f64 / iterations as f64) * 100.0;
1923        println!("Cache Efficiency Benchmark:");
1924        println!("Cache hit rate: {:.2}%", hit_rate);
1925        println!("Pool size: {}", manager.connection_pool.len());
1926    }
1927
1928    #[tokio::test]
1929    async fn bench_circuit_setup() {
1930        let test_config = setup_test_config();
1931        let test_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), 8000);
1932        let server_config = ServerConfig::default();
1933        let endpoint = Endpoint::server(server_config, "127.0.0.1:0".parse().unwrap())
1934            .unwrap()
1935            .0;
1936
1937        let mut setup_times = Vec::new();
1938        for _ in 0..100 {
1939            let start = Instant::now();
1940            let _connection =
1941                SecureConnection::new(&endpoint, test_addr, test_config.clone()).await;
1942            setup_times.push(start.elapsed());
1943        }
1944
1945        let avg_setup = setup_times.iter().sum::<Duration>() / setup_times.len() as u32;
1946        println!("Circuit Setup Benchmark:");
1947        println!("Average setup time: {:?}", avg_setup);
1948    }
1949
1950    #[tokio::test]
1951    async fn bench_connection_pooling() {
1952        let manager = ConnectionManager::with_pool_timeout(1000, Duration::from_secs(60));
1953        let test_peers: Vec<PeerId> = (0..100).map(|_| PeerId::random()).collect();
1954        let mut reuse_times = Vec::new();
1955
1956        // Setup initial connections
1957        for peer in test_peers.iter() {
1958            manager.connect(*peer).await.unwrap();
1959        }
1960
1961        // Test connection reuse
1962        for peer in test_peers.iter() {
1963            let start = Instant::now();
1964            manager.connect(*peer).await.unwrap();
1965            reuse_times.push(start.elapsed());
1966        }
1967
1968        let avg_reuse = reuse_times.iter().sum::<Duration>() / reuse_times.len() as u32;
1969        println!("Connection Pooling Benchmark:");
1970        println!("Average reuse time: {:?}", avg_reuse);
1971        println!(
1972            "Pool utilization: {:.2}%",
1973            (manager.get_queue_metrics().utilization * 100.0)
1974        );
1975    }
1976
1977    #[tokio::test]
1978    async fn bench_message_throughput() {
1979        let test_config = setup_test_config();
1980        let test_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), 8000);
1981        let server_config = ServerConfig::default();
1982        let endpoint = Endpoint::server(server_config, "127.0.0.1:0".parse().unwrap())
1983            .unwrap()
1984            .0;
1985
1986        let mut connection = SecureConnection::new(&endpoint, test_addr, test_config)
1987            .await
1988            .unwrap();
1989        let start = Instant::now();
1990        let message_count = 10000;
1991        let message_size = 1024; // 1KB messages
1992
1993        for _ in 0..message_count {
1994            let data = Bytes::from(vec![1u8; message_size]);
1995            connection.send(data).await.unwrap();
1996        }
1997
1998        let elapsed = start.elapsed();
1999        let throughput = message_count as f64 / elapsed.as_secs_f64();
2000        let mb_per_sec = (throughput * message_size as f64) / (1024.0 * 1024.0);
2001
2002        println!("Message Throughput Benchmark:");
2003        println!("Messages per second: {:.2}", throughput);
2004        println!("Throughput: {:.2} MB/s", mb_per_sec);
2005        println!("Total time: {:?}", elapsed);
2006    }
2007}
2008
2009/// Implementations for new structures
2010impl ConnectionMultiplexer {
2011    /// Create new connection multiplexer
2012    pub fn new(max_streams_per_connection: u32, stream_timeout: Duration) -> Self {
2013        Self {
2014            connections: Arc::new(DashMap::new()),
2015            stream_routes: Arc::new(DashMap::new()),
2016            priority_queue: Arc::new(TokioRwLock::new(BTreeMap::new())),
2017            max_streams_per_connection,
2018            stream_timeout,
2019        }
2020    }
2021
2022    /// Open a new stream on a connection
2023    pub async fn open_stream(
2024        &self,
2025        peer_id: PeerId,
2026        priority: Priority,
2027    ) -> Result<StreamId, NetworkError> {
2028        let mut connection = self
2029            .connections
2030            .get_mut(&peer_id)
2031            .ok_or_else(|| NetworkError::ConnectionError("Connection not found".into()))?;
2032
2033        if connection.streams.len() >= self.max_streams_per_connection as usize {
2034            return Err(NetworkError::ConnectionError(
2035                "Maximum streams reached".into(),
2036            ));
2037        }
2038
2039        // Acquire stream permit
2040        let _ = connection
2041            .stream_semaphore
2042            .acquire()
2043            .await
2044            .map_err(|_| NetworkError::ConnectionError("Stream semaphore closed".into()))?;
2045
2046        let stream_id = StreamId(connection.next_stream_id);
2047        connection.next_stream_id += 1;
2048
2049        let stream_info = StreamInfo {
2050            id: stream_id,
2051            priority,
2052            state: StreamState::Opening,
2053            created_at: Instant::now(),
2054            last_activity: Instant::now(),
2055            bytes_transferred: 0,
2056        };
2057
2058        connection.streams.insert(stream_id, stream_info);
2059        self.stream_routes.insert(stream_id, peer_id);
2060
2061        // Add to priority queue
2062        let mut queue = self.priority_queue.write().await;
2063        queue
2064            .entry(priority)
2065            .or_insert_with(VecDeque::new)
2066            .push_back(stream_id);
2067
2068        Ok(stream_id)
2069    }
2070
2071    /// Close a stream
2072    pub async fn close_stream(&self, stream_id: StreamId) -> Result<(), NetworkError> {
2073        let peer_id = self
2074            .stream_routes
2075            .remove(&stream_id)
2076            .ok_or_else(|| NetworkError::ConnectionError("Stream not found".into()))?
2077            .1;
2078
2079        if let Some(mut connection) = self.connections.get_mut(&peer_id) {
2080            if let Some(stream) = connection.streams.get_mut(&stream_id) {
2081                stream.state = StreamState::Closed;
2082                stream.last_activity = Instant::now();
2083            }
2084            connection.streams.remove(&stream_id);
2085
2086            // Update connection utilization
2087            connection.utilization =
2088                connection.streams.len() as f64 / self.max_streams_per_connection as f64;
2089        }
2090
2091        Ok(())
2092    }
2093
2094    /// Get stream information
2095    pub fn get_stream_info(&self, stream_id: StreamId) -> Option<StreamInfo> {
2096        let peer_id = self.stream_routes.get(&stream_id)?.value().clone();
2097        let connection = self.connections.get(&peer_id)?;
2098        connection.streams.get(&stream_id).cloned()
2099    }
2100}
2101
2102impl RetryManager {
2103    /// Create new retry manager
2104    pub fn new() -> Self {
2105        Self {
2106            retry_configs: Arc::new(DashMap::new()),
2107            default_config: RetryConfig::default(),
2108            stats: Arc::new(TokioRwLock::new(RetryStats::default())),
2109        }
2110    }
2111
2112    /// Execute operation with retry logic
2113    pub async fn retry_operation<F, Fut, T, E>(&self, peer_id: PeerId, operation: F) -> Result<T, E>
2114    where
2115        F: Fn() -> Fut + Send + Sync,
2116        Fut: Future<Output = Result<T, E>> + Send,
2117        E: std::fmt::Debug,
2118    {
2119        let config = self
2120            .retry_configs
2121            .get(&peer_id)
2122            .map(|entry| entry.value().clone())
2123            .unwrap_or_else(|| self.default_config.clone());
2124
2125        let mut attempt = 0;
2126        let mut backoff = config.initial_backoff;
2127
2128        loop {
2129            let start = Instant::now();
2130            let result = operation().await;
2131            let _duration = start.elapsed();
2132
2133            match result {
2134                Ok(value) => {
2135                    // Update success statistics
2136                    let mut stats = self.stats.write().await;
2137                    stats.total_attempts += 1;
2138                    stats.successful_retries += 1;
2139                    return Ok(value);
2140                }
2141                Err(error) => {
2142                    attempt += 1;
2143                    if attempt >= config.max_retries {
2144                        // Update failure statistics
2145                        let mut stats = self.stats.write().await;
2146                        stats.total_attempts += 1;
2147                        stats.failed_retries += 1;
2148                        return Err(error);
2149                    }
2150
2151                    // Calculate backoff with jitter
2152                    let jitter = (rand::random::<f64>() - 0.5) * 2.0 * config.jitter_factor;
2153                    let backoff_with_jitter = Duration::from_millis(
2154                        ((backoff.as_millis() as f64) * (1.0 + jitter)) as u64,
2155                    );
2156
2157                    sleep(backoff_with_jitter).await;
2158
2159                    // Exponential backoff
2160                    backoff = std::cmp::min(
2161                        Duration::from_millis(
2162                            (backoff.as_millis() as f64 * config.backoff_multiplier) as u64,
2163                        ),
2164                        config.max_backoff,
2165                    );
2166                }
2167            }
2168        }
2169    }
2170}
2171
2172impl Default for RetryConfig {
2173    fn default() -> Self {
2174        Self {
2175            max_retries: 3,
2176            initial_backoff: Duration::from_millis(100),
2177            max_backoff: Duration::from_secs(30),
2178            backoff_multiplier: 2.0,
2179            jitter_factor: 0.1,
2180            timeout: Duration::from_secs(10),
2181        }
2182    }
2183}
2184
2185impl LoadBalancer {
2186    /// Create new load balancer
2187    pub fn new(strategy: LoadBalancingStrategy) -> Self {
2188        Self {
2189            strategy,
2190            weights: Arc::new(DashMap::new()),
2191            round_robin_counter: AtomicU64::new(0),
2192            stats: Arc::new(TokioRwLock::new(LoadBalancingStats::default())),
2193        }
2194    }
2195
2196    /// Select best connection for request
2197    pub async fn select_connection(&self, available_peers: &[PeerId]) -> Option<PeerId> {
2198        if available_peers.is_empty() {
2199            return None;
2200        }
2201
2202        let selected = match self.strategy {
2203            LoadBalancingStrategy::RoundRobin => {
2204                let index = self.round_robin_counter.fetch_add(1, Ordering::Relaxed) as usize;
2205                available_peers[index % available_peers.len()]
2206            }
2207            LoadBalancingStrategy::WeightedRoundRobin => {
2208                self.select_weighted_round_robin(available_peers).await
2209            }
2210            LoadBalancingStrategy::LeastConnections => {
2211                self.select_least_connections(available_peers).await
2212            }
2213            LoadBalancingStrategy::ResponseTime => {
2214                self.select_best_response_time(available_peers).await
2215            }
2216            LoadBalancingStrategy::ResourceUtilization => {
2217                self.select_least_utilized(available_peers).await
2218            }
2219        };
2220
2221        // Update statistics
2222        let mut stats = self.stats.write().await;
2223        stats.total_requests += 1;
2224        *stats.peer_distribution.entry(selected).or_insert(0) += 1;
2225
2226        Some(selected)
2227    }
2228
2229    /// Select connection using weighted round-robin
2230    async fn select_weighted_round_robin(&self, peers: &[PeerId]) -> PeerId {
2231        let mut total_weight = 0.0;
2232        let mut weighted_peers = Vec::new();
2233
2234        for &peer_id in peers {
2235            let weight = self
2236                .weights
2237                .get(&peer_id)
2238                .map(|entry| *entry.value())
2239                .unwrap_or(1.0);
2240            total_weight += weight;
2241            weighted_peers.push((peer_id, weight));
2242        }
2243
2244        let mut target = rand::random::<f64>() * total_weight;
2245        for (peer_id, weight) in weighted_peers {
2246            target -= weight;
2247            if target <= 0.0 {
2248                return peer_id;
2249            }
2250        }
2251
2252        peers[0] // Fallback
2253    }
2254
2255    /// Select connection with least connections (placeholder)
2256    async fn select_least_connections(&self, peers: &[PeerId]) -> PeerId {
2257        // In a real implementation, this would track connection counts
2258        peers[0]
2259    }
2260
2261    /// Select connection with best response time
2262    async fn select_best_response_time(&self, peers: &[PeerId]) -> PeerId {
2263        let stats = self.stats.read().await;
2264        let mut best_peer = peers[0];
2265        let mut best_time = Duration::from_secs(u64::MAX);
2266
2267        for &peer_id in peers {
2268            if let Some(avg_time) = stats.avg_response_times.get(&peer_id) {
2269                if *avg_time < best_time {
2270                    best_time = *avg_time;
2271                    best_peer = peer_id;
2272                }
2273            }
2274        }
2275
2276        best_peer
2277    }
2278
2279    /// Select least utilized connection (placeholder)
2280    async fn select_least_utilized(&self, peers: &[PeerId]) -> PeerId {
2281        // In a real implementation, this would track resource utilization
2282        peers[0]
2283    }
2284}
2285
2286impl HealthMonitor {
2287    /// Create new health monitor
2288    pub fn new(config: HealthCheckConfig) -> Self {
2289        Self {
2290            config,
2291            results: Arc::new(DashMap::new()),
2292            scheduler: Arc::new(TokioRwLock::new(HealthCheckScheduler {
2293                scheduled_checks: HashMap::new(),
2294                check_intervals: HashMap::new(),
2295            })),
2296            stats: Arc::new(TokioRwLock::new(HealthStats::default())),
2297        }
2298    }
2299
2300    /// Start health monitoring for a peer
2301    pub async fn start_monitoring(&self, peer_id: PeerId) {
2302        let mut scheduler = self.scheduler.write().await;
2303        scheduler
2304            .scheduled_checks
2305            .insert(peer_id, Instant::now() + self.config.interval);
2306        scheduler
2307            .check_intervals
2308            .insert(peer_id, self.config.interval);
2309    }
2310
2311    /// Perform health check on a peer
2312    pub async fn check_health(
2313        &self,
2314        peer_id: PeerId,
2315        connection: &ConnectionInfo,
2316    ) -> HealthCheckResult {
2317        let checker = PingHealthCheck::default();
2318        let result = checker.check(&peer_id, connection).await;
2319
2320        // Store result
2321        self.results.insert(peer_id, result.clone());
2322
2323        // Update statistics
2324        let mut stats = self.stats.write().await;
2325        stats.total_checks += 1;
2326        if result.success {
2327            stats.successful_checks += 1;
2328        } else {
2329            stats.failed_checks += 1;
2330        }
2331
2332        // Update average response time
2333        let total_time = stats.avg_response_time.as_millis() as f64 * stats.total_checks as f64;
2334        let new_avg = (total_time + result.response_time.as_millis() as f64)
2335            / (stats.total_checks + 1) as f64;
2336        stats.avg_response_time = Duration::from_millis(new_avg as u64);
2337
2338        result
2339    }
2340
2341    /// Get latest health check result
2342    pub fn get_health_result(&self, peer_id: &PeerId) -> Option<HealthCheckResult> {
2343        self.results.get(peer_id).map(|entry| entry.value().clone())
2344    }
2345}
2346
2347impl Default for HealthCheckConfig {
2348    fn default() -> Self {
2349        Self {
2350            interval: Duration::from_secs(30),
2351            timeout: Duration::from_secs(5),
2352            failure_threshold: 3,
2353            recovery_threshold: 2,
2354            check_type: HealthCheckType::Ping,
2355        }
2356    }
2357}
2358
2359impl WarmingManager {
2360    /// Create new warming manager
2361    pub fn new(config: WarmingConfig) -> Self {
2362        Self {
2363            config,
2364            warming_states: Arc::new(DashMap::new()),
2365            warming_tasks: Arc::new(DashMap::new()),
2366            stats: Arc::new(TokioRwLock::new(WarmingStats::default())),
2367        }
2368    }
2369
2370    /// Start warming connections for a peer
2371    pub async fn warm_connection(&self, peer_id: PeerId) -> Result<(), NetworkError> {
2372        if !self.config.enabled {
2373            return Ok(());
2374        }
2375
2376        // Set warming state
2377        self.warming_states.insert(peer_id, WarmingState::Warming);
2378
2379        // Simulate connection warming (in real implementation, this would pre-establish connections)
2380        let start = Instant::now();
2381        sleep(Duration::from_millis(100)).await; // Simulate warming time
2382        let warming_time = start.elapsed();
2383
2384        // Update statistics
2385        let mut stats = self.stats.write().await;
2386        stats.total_attempts += 1;
2387
2388        if rand::random::<f64>() > 0.1 {
2389            // 90% success rate
2390            self.warming_states.insert(peer_id, WarmingState::Warm);
2391            stats.successful_warmings += 1;
2392
2393            // Update average warming time
2394            let total_time =
2395                stats.avg_warming_time.as_millis() as f64 * stats.successful_warmings as f64;
2396            let new_avg = (total_time + warming_time.as_millis() as f64)
2397                / (stats.successful_warmings + 1) as f64;
2398            stats.avg_warming_time = Duration::from_millis(new_avg as u64);
2399
2400            Ok(())
2401        } else {
2402            self.warming_states.insert(
2403                peer_id,
2404                WarmingState::FailedToWarm("Warming timeout".to_string()),
2405            );
2406            stats.failed_warmings += 1;
2407            Err(NetworkError::ConnectionError(
2408                "Connection warming failed".into(),
2409            ))
2410        }
2411    }
2412
2413    /// Get warming state for a peer
2414    pub fn get_warming_state(&self, peer_id: &PeerId) -> WarmingState {
2415        self.warming_states
2416            .get(peer_id)
2417            .map(|entry| entry.value().clone())
2418            .unwrap_or(WarmingState::Cold)
2419    }
2420}
2421
2422impl Default for WarmingConfig {
2423    fn default() -> Self {
2424        Self {
2425            enabled: true,
2426            min_pool_size: 5,
2427            warming_timeout: Duration::from_secs(10),
2428            warming_retries: 3,
2429            predictive_threshold: 0.8,
2430        }
2431    }
2432}