1#![deny(unsafe_code)]
2
3use crate::types::{
4 ConnectionStatus, LatencyMetrics, NetworkError, NetworkMetrics, PeerId, QueueMetrics,
5 ThroughputMetrics,
6};
7use anyhow::Result;
8use async_trait::async_trait;
9use bytes::{Buf, BufMut, Bytes, BytesMut};
10use dashmap::DashMap;
11use futures::future::Future;
12use parking_lot::RwLock as ParkingRwLock;
13use quinn::{Connection, Endpoint};
14use ring::{aead, agreement, rand as ring_rand};
15use std::net::SocketAddr;
16use std::sync::atomic::{AtomicU64, AtomicUsize, Ordering};
17use std::sync::Arc;
18use std::time::{Duration, Instant};
19use tokio::sync::{mpsc, RwLock as TokioRwLock, Semaphore};
20use tokio::time::sleep;
21use tracing::{debug, error, info, warn};
22
23#[derive(Clone)]
25pub struct SecureConfig {
26 pub transport_keys: TransportKeys,
28 pub timeout: std::time::Duration,
30 pub keepalive: std::time::Duration,
32}
33
34pub struct TransportKeys {
36 #[allow(dead_code)]
38 private_key: agreement::EphemeralPrivateKey,
39 public_key: Vec<u8>,
41}
42
43impl Clone for TransportKeys {
44 fn clone(&self) -> Self {
45 Self::generate()
47 }
48}
49
50impl TransportKeys {
51 pub fn generate() -> Self {
53 let rng = ring_rand::SystemRandom::new();
54 let private_key =
55 agreement::EphemeralPrivateKey::generate(&agreement::X25519, &rng).unwrap();
56 let public_key = private_key.compute_public_key().unwrap().as_ref().to_vec();
57
58 Self {
59 private_key,
60 public_key,
61 }
62 }
63}
64
65pub struct SecureConnection {
84 #[allow(dead_code)]
86 connection: Connection,
87 #[allow(dead_code)]
89 keys: TransportKeys,
90 channels: ConnectionChannels,
92}
93
94struct ConnectionChannels {
96 tx: mpsc::Sender<Bytes>,
98 rx: mpsc::Receiver<Bytes>,
100 batch_buffer: BytesMut,
102 batch_size: usize,
104 batch_timeout: std::time::Duration,
106 last_batch: std::time::Instant,
108 high_water_mark: usize,
110 low_water_mark: usize,
112 back_pressure: Arc<tokio::sync::Notify>,
114 queue_size: AtomicUsize,
116 key_cache: Arc<aead::LessSafeKey>,
118 nonce_counter: AtomicU64,
120 message_count: AtomicU64,
122 bytes_processed: AtomicU64,
124}
125
126impl SecureConnection {
127 pub async fn new(
129 endpoint: &Endpoint,
130 addr: SocketAddr,
131 config: SecureConfig,
132 ) -> Result<Self, NetworkError> {
133 let connection = endpoint
135 .connect(addr, "qudag")
136 .map_err(|e| NetworkError::ConnectionError(e.to_string()))?
137 .await
138 .map_err(|e| NetworkError::ConnectionError(e.to_string()))?;
139
140 let (tx, rx) = mpsc::channel(65_536); let key = aead::UnboundKey::new(
145 &aead::CHACHA20_POLY1305,
146 &config.transport_keys.public_key[..32],
147 )
148 .map_err(|e| NetworkError::EncryptionError(e.to_string()))?;
149 let key_cache = Arc::new(aead::LessSafeKey::new(key));
150
151 Ok(Self {
152 connection,
153 keys: config.transport_keys,
154 channels: ConnectionChannels {
155 tx,
156 rx,
157 batch_buffer: BytesMut::with_capacity(1024 * 1024), batch_size: 128, batch_timeout: std::time::Duration::from_millis(50),
160 last_batch: std::time::Instant::now(),
161 high_water_mark: 64 * 1024 * 1024, low_water_mark: 32 * 1024 * 1024, back_pressure: Arc::new(tokio::sync::Notify::new()),
164 queue_size: AtomicUsize::new(0),
165 key_cache,
166 nonce_counter: AtomicU64::new(1),
167 message_count: AtomicU64::new(0),
168 bytes_processed: AtomicU64::new(0),
169 },
170 })
171 }
172
173 pub async fn send(&mut self, data: Bytes) -> Result<(), NetworkError> {
175 let msg_size = data.len();
176
177 if msg_size == 0 {
179 return Err(NetworkError::MessageError("Empty message".into()));
180 }
181 if msg_size > 1024 * 1024 {
182 return Err(NetworkError::MessageError("Message too large".into()));
184 }
185
186 let current_size = self.channels.queue_size.load(Ordering::Acquire);
188 if current_size >= self.channels.high_water_mark {
189 debug!("Applying back pressure, queue size: {}", current_size);
190 let back_pressure = self.channels.back_pressure.clone();
191
192 tokio::select! {
194 _ = back_pressure.notified() => {},
195 _ = tokio::time::sleep(std::time::Duration::from_secs(5)) => {
196 return Err(NetworkError::ConnectionError("Back pressure timeout".into()));
197 }
198 }
199 }
200
201 let nonce_value = self.channels.nonce_counter.fetch_add(1, Ordering::Relaxed);
203 if nonce_value == 0 {
204 error!("Nonce counter overflow - this should not happen in normal operation");
205 return Err(NetworkError::EncryptionError("Nonce overflow".into()));
206 }
207
208 let mut nonce_bytes = [0u8; 12];
209 nonce_bytes[..8].copy_from_slice(&nonce_value.to_le_bytes());
210
211 let mut encrypted = BytesMut::from(data.as_ref());
213
214 let mut retry_count = 0;
216 loop {
217 let nonce_attempt = aead::Nonce::assume_unique_for_key(nonce_bytes);
219 match self.channels.key_cache.seal_in_place_append_tag(
220 nonce_attempt,
221 aead::Aad::empty(),
222 &mut encrypted,
223 ) {
224 Ok(()) => break,
225 Err(e) => {
226 retry_count += 1;
227 if retry_count >= 3 {
228 return Err(NetworkError::EncryptionError(format!(
229 "Encryption failed after {} retries: {}",
230 retry_count, e
231 )));
232 }
233 warn!("Encryption attempt {} failed, retrying: {}", retry_count, e);
234 tokio::time::sleep(std::time::Duration::from_millis(10)).await;
235 }
236 }
237 }
238
239 let encrypted_len = encrypted.len() as u32;
241 self.channels.batch_buffer.put_u32(encrypted_len);
242 self.channels.batch_buffer.extend_from_slice(&encrypted);
243
244 self.channels
246 .queue_size
247 .fetch_add(msg_size, Ordering::Release);
248 self.channels.message_count.fetch_add(1, Ordering::Relaxed);
249 self.channels
250 .bytes_processed
251 .fetch_add(msg_size as u64, Ordering::Relaxed);
252
253 if self.channels.batch_buffer.len() >= self.channels.batch_size * 1024
255 || self.channels.last_batch.elapsed() >= self.channels.batch_timeout
256 {
257 self.flush_batch().await?
258 }
259
260 Ok(())
261 }
262
263 async fn flush_batch(&mut self) -> Result<(), NetworkError> {
265 if self.channels.batch_buffer.is_empty() {
266 return Ok(());
267 }
268
269 let batch_size = self.channels.batch_buffer.len();
270
271 let batch = self.channels.batch_buffer.split().freeze();
273
274 let mut retry_count = 0;
276 loop {
277 match self.channels.tx.send(batch.clone()).await {
278 Ok(()) => break,
279 Err(e) => {
280 retry_count += 1;
281 if retry_count >= 3 {
282 self.channels.batch_buffer.extend_from_slice(&batch);
284 return Err(NetworkError::ConnectionError(format!(
285 "Batch send failed after {} retries: {}",
286 retry_count, e
287 )));
288 }
289 warn!("Batch send attempt {} failed, retrying: {}", retry_count, e);
290 tokio::time::sleep(std::time::Duration::from_millis(100 * retry_count as u64))
291 .await;
292 }
293 }
294 }
295
296 let new_size = self
298 .channels
299 .queue_size
300 .fetch_sub(batch_size, Ordering::AcqRel);
301 if new_size <= self.channels.low_water_mark {
302 self.channels.back_pressure.notify_waiters();
303 debug!("Released back pressure, queue size: {}", new_size);
304 }
305
306 self.channels.last_batch = std::time::Instant::now();
308 Ok(())
309 }
310
311 pub async fn receive(&mut self) -> Result<Vec<Bytes>, NetworkError> {
313 let encrypted_batch = self
315 .channels
316 .rx
317 .recv()
318 .await
319 .ok_or_else(|| NetworkError::ConnectionError("Channel closed".into()))?;
320
321 let mut messages = Vec::new();
322 let mut buf = encrypted_batch;
323
324 while buf.has_remaining() {
326 if buf.remaining() < 4 {
327 return Err(NetworkError::EncryptionError(
328 "Incomplete message length".into(),
329 ));
330 }
331
332 let msg_len = buf.get_u32() as usize;
334
335 if buf.remaining() < msg_len {
336 return Err(NetworkError::EncryptionError(
337 "Incomplete message data".into(),
338 ));
339 }
340
341 let encrypted_data = buf.copy_to_bytes(msg_len);
343
344 let nonce_value = self.channels.nonce_counter.load(Ordering::Relaxed);
346 let mut nonce_bytes = [0u8; 12];
347 nonce_bytes[..8].copy_from_slice(&nonce_value.to_le_bytes());
348 let nonce = aead::Nonce::assume_unique_for_key(nonce_bytes);
349
350 let mut message_data = BytesMut::from(encrypted_data.as_ref());
352 self.channels
353 .key_cache
354 .open_in_place(nonce, aead::Aad::empty(), &mut message_data)
355 .map_err(|e| NetworkError::EncryptionError(e.to_string()))?;
356
357 if message_data.len() >= 16 {
359 message_data.truncate(message_data.len() - 16);
360 }
361
362 messages.push(message_data.freeze());
363 }
364
365 Ok(messages)
366 }
367}
368
369pub struct ConnectionManager {
448 max_connections: usize,
450 connections: Arc<DashMap<PeerId, ConnectionInfo>>,
452 connection_pool: Arc<DashMap<PeerId, (ConnectionInfo, Instant)>>,
454 enhanced_pool: Arc<DashMap<PeerId, PooledConnection>>,
456 multiplexer: Arc<ConnectionMultiplexer>,
458 retry_manager: Arc<RetryManager>,
460 load_balancer: Arc<LoadBalancer>,
462 health_monitor: Arc<HealthMonitor>,
464 warming_manager: Arc<WarmingManager>,
466 pool_timeout: std::time::Duration,
468 metrics: Arc<ParkingRwLock<NetworkMetrics>>,
470 queue_metrics: Arc<ParkingRwLock<QueueMetrics>>,
472 latency_metrics: Arc<ParkingRwLock<LatencyMetrics>>,
474 throughput_metrics: Arc<ParkingRwLock<ThroughputMetrics>>,
476 #[allow(dead_code)]
478 health_tracker: Arc<RwLock<ConnectionHealthTracker>>,
479 circuit_breakers: Arc<DashMap<PeerId, CircuitBreaker>>,
481 quality_scores: Arc<DashMap<PeerId, f64>>,
483 #[allow(dead_code)]
485 maintenance_handle: Option<tokio::task::JoinHandle<()>>,
486 connection_limits: ConnectionLimits,
488 #[allow(dead_code)]
490 monitoring_interval: Duration,
491}
492
493#[derive(Debug, Clone)]
495pub struct ConnectionInfo {
496 pub status: ConnectionStatus,
498 pub established_at: Instant,
500 pub last_activity: Instant,
502 pub success_count: u64,
504 pub failure_count: u64,
506 pub avg_response_time: Duration,
508 pub quality_score: f64,
510 pub bandwidth_usage: u64,
512 pub metadata: HashMap<String, String>,
514}
515
516impl ConnectionInfo {
517 pub fn new(status: ConnectionStatus) -> Self {
519 Self {
520 status,
521 established_at: Instant::now(),
522 last_activity: Instant::now(),
523 success_count: 0,
524 failure_count: 0,
525 avg_response_time: Duration::from_millis(0),
526 quality_score: 1.0,
527 bandwidth_usage: 0,
528 metadata: HashMap::new(),
529 }
530 }
531
532 pub fn update_activity(
534 &mut self,
535 success: bool,
536 response_time: Duration,
537 bytes_transferred: u64,
538 ) {
539 self.last_activity = Instant::now();
540 self.bandwidth_usage += bytes_transferred;
541
542 if success {
543 self.success_count += 1;
544 } else {
545 self.failure_count += 1;
546 }
547
548 let alpha = 0.1; let current_ms = self.avg_response_time.as_millis() as f64;
551 let new_ms = response_time.as_millis() as f64;
552 let updated_ms = alpha * new_ms + (1.0 - alpha) * current_ms;
553 self.avg_response_time = Duration::from_millis(updated_ms as u64);
554
555 self.update_quality_score();
557 }
558
559 fn update_quality_score(&mut self) {
561 let total_ops = self.success_count + self.failure_count;
562 if total_ops == 0 {
563 self.quality_score = 1.0;
564 return;
565 }
566
567 let success_rate = self.success_count as f64 / total_ops as f64;
569
570 let response_penalty = if self.avg_response_time.as_millis() > 100 {
572 0.2 * (self.avg_response_time.as_millis() as f64 / 1000.0)
573 } else {
574 0.0
575 };
576
577 self.quality_score = (success_rate - response_penalty).clamp(0.0, 1.0);
578 }
579
580 pub fn is_healthy(&self) -> bool {
582 self.quality_score > 0.5 && self.last_activity.elapsed() < Duration::from_secs(300)
583 }
585}
586
587#[derive(Debug)]
589#[allow(dead_code)]
590pub struct ConnectionHealthTracker {
591 check_interval: Duration,
593 last_check: Option<Instant>,
595 unhealthy_connections: HashMap<PeerId, UnhealthyConnectionInfo>,
597 health_stats: HealthStatistics,
599}
600
601#[derive(Debug, Clone)]
603#[allow(dead_code)]
604pub struct UnhealthyConnectionInfo {
605 unhealthy_since: Instant,
607 recovery_attempts: u32,
609 last_recovery_attempt: Option<Instant>,
611 reason: String,
613}
614
615#[derive(Debug, Clone, Default)]
617pub struct HealthStatistics {
618 pub total_checks: u64,
620 pub healthy_count: u64,
622 pub unhealthy_count: u64,
624 pub successful_recoveries: u64,
626 pub avg_recovery_time: Duration,
628}
629
630impl Default for ConnectionHealthTracker {
631 fn default() -> Self {
632 Self {
633 check_interval: Duration::from_secs(30),
634 last_check: None,
635 unhealthy_connections: HashMap::new(),
636 health_stats: HealthStatistics::default(),
637 }
638 }
639}
640
641#[derive(Debug, Clone)]
643pub struct CircuitBreaker {
644 state: CircuitBreakerState,
646 failure_threshold: u32,
648 failure_count: u32,
650 opened_at: Option<Instant>,
652 timeout: Duration,
654 success_threshold: u32,
656 success_count: u32,
658}
659
660#[derive(Debug, Clone, PartialEq)]
662pub enum CircuitBreakerState {
663 Closed,
665 Open,
667 HalfOpen,
669}
670
671impl Default for CircuitBreaker {
672 fn default() -> Self {
673 Self {
674 state: CircuitBreakerState::Closed,
675 failure_threshold: 5,
676 failure_count: 0,
677 opened_at: None,
678 timeout: Duration::from_secs(60),
679 success_threshold: 3,
680 success_count: 0,
681 }
682 }
683}
684
685impl CircuitBreaker {
686 pub fn allow_request(&mut self) -> bool {
688 match self.state {
689 CircuitBreakerState::Closed => true,
690 CircuitBreakerState::Open => {
691 if let Some(opened_at) = self.opened_at {
692 if opened_at.elapsed() >= self.timeout {
693 self.state = CircuitBreakerState::HalfOpen;
694 self.success_count = 0;
695 true
696 } else {
697 false
698 }
699 } else {
700 false
701 }
702 }
703 CircuitBreakerState::HalfOpen => true,
704 }
705 }
706
707 pub fn record_result(&mut self, success: bool) {
709 match self.state {
710 CircuitBreakerState::Closed => {
711 if success {
712 self.failure_count = 0;
713 } else {
714 self.failure_count += 1;
715 if self.failure_count >= self.failure_threshold {
716 self.state = CircuitBreakerState::Open;
717 self.opened_at = Some(Instant::now());
718 }
719 }
720 }
721 CircuitBreakerState::HalfOpen => {
722 if success {
723 self.success_count += 1;
724 if self.success_count >= self.success_threshold {
725 self.state = CircuitBreakerState::Closed;
726 self.failure_count = 0;
727 }
728 } else {
729 self.state = CircuitBreakerState::Open;
730 self.opened_at = Some(Instant::now());
731 self.failure_count += 1;
732 }
733 }
734 CircuitBreakerState::Open => {
735 }
737 }
738 }
739}
740
741#[derive(Debug, Clone)]
743pub struct PooledConnection {
744 pub info: ConnectionInfo,
746 pub created_at: Instant,
748 pub last_used: Instant,
750 pub usage_count: u64,
752 pub weight: f64,
754 pub max_streams: u32,
756 pub active_streams: u32,
758 pub warming_state: WarmingState,
760 pub affinity_group: Option<String>,
762}
763
764#[derive(Debug, Clone, PartialEq)]
766pub enum WarmingState {
767 Cold,
769 Warming,
771 Warm,
773 FailedToWarm(String),
775}
776
777#[derive(Debug)]
779pub struct ConnectionMultiplexer {
780 connections: Arc<DashMap<PeerId, MultiplexedConnection>>,
782 stream_routes: Arc<DashMap<StreamId, PeerId>>,
784 priority_queue: Arc<TokioRwLock<BTreeMap<Priority, VecDeque<StreamId>>>>,
786 max_streams_per_connection: u32,
788 #[allow(dead_code)]
790 stream_timeout: Duration,
791}
792
793#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, PartialOrd, Ord)]
795pub struct StreamId(pub u64);
796
797#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord)]
799pub enum Priority {
800 Critical = 0,
801 High = 1,
802 Normal = 2,
803 Low = 3,
804}
805
806#[derive(Debug)]
808pub struct MultiplexedConnection {
809 pub info: ConnectionInfo,
811 pub streams: HashMap<StreamId, StreamInfo>,
813 pub next_stream_id: u64,
815 pub utilization: f64,
817 pub stream_semaphore: Arc<Semaphore>,
819}
820
821#[derive(Debug, Clone)]
823pub struct StreamInfo {
824 pub id: StreamId,
826 pub priority: Priority,
828 pub state: StreamState,
830 pub created_at: Instant,
832 pub last_activity: Instant,
834 pub bytes_transferred: u64,
836}
837
838#[derive(Debug, Clone, PartialEq)]
840pub enum StreamState {
841 Opening,
843 Active,
845 HalfClosedLocal,
847 HalfClosedRemote,
849 Closed,
851 Error(String),
853}
854
855#[derive(Debug)]
857pub struct RetryManager {
858 retry_configs: Arc<DashMap<PeerId, RetryConfig>>,
860 default_config: RetryConfig,
862 stats: Arc<TokioRwLock<RetryStats>>,
864}
865
866#[derive(Debug, Clone)]
868pub struct RetryConfig {
869 pub max_retries: u32,
871 pub initial_backoff: Duration,
873 pub max_backoff: Duration,
875 pub backoff_multiplier: f64,
877 pub jitter_factor: f64,
879 pub timeout: Duration,
881}
882
883#[derive(Debug, Clone, Default)]
885pub struct RetryStats {
886 pub total_attempts: u64,
888 pub successful_retries: u64,
890 pub failed_retries: u64,
892 pub avg_retry_duration: Duration,
894}
895
896#[derive(Debug)]
898pub struct LoadBalancer {
899 strategy: LoadBalancingStrategy,
901 weights: Arc<DashMap<PeerId, f64>>,
903 round_robin_counter: AtomicU64,
905 stats: Arc<TokioRwLock<LoadBalancingStats>>,
907}
908
909#[derive(Debug, Clone)]
911pub enum LoadBalancingStrategy {
912 RoundRobin,
914 LeastConnections,
916 WeightedRoundRobin,
918 ResponseTime,
920 ResourceUtilization,
922}
923
924#[derive(Debug, Clone, Default)]
926pub struct LoadBalancingStats {
927 pub total_requests: u64,
929 pub peer_distribution: HashMap<PeerId, u64>,
931 pub avg_response_times: HashMap<PeerId, Duration>,
933}
934
935#[derive(Debug)]
937pub struct HealthMonitor {
938 config: HealthCheckConfig,
940 results: Arc<DashMap<PeerId, HealthCheckResult>>,
942 scheduler: Arc<TokioRwLock<HealthCheckScheduler>>,
944 stats: Arc<TokioRwLock<HealthStats>>,
946}
947
948#[derive(Debug, Clone)]
950pub struct HealthCheckConfig {
951 pub interval: Duration,
953 pub timeout: Duration,
955 pub failure_threshold: u32,
957 pub recovery_threshold: u32,
959 pub check_type: HealthCheckType,
961}
962
963#[derive(Debug, Clone)]
965pub enum HealthCheckType {
966 Ping,
968 Application,
970 Custom,
972}
973
974#[derive(Debug, Clone)]
976pub struct HealthCheckResult {
977 pub timestamp: Instant,
979 pub success: bool,
981 pub response_time: Duration,
983 pub error: Option<String>,
985 pub health_score: f64,
987}
988
989#[derive(Debug)]
991pub struct HealthCheckScheduler {
992 scheduled_checks: HashMap<PeerId, Instant>,
994 check_intervals: HashMap<PeerId, Duration>,
996}
997
998#[derive(Debug, Clone, Default)]
1000pub struct HealthStats {
1001 pub total_checks: u64,
1003 pub successful_checks: u64,
1005 pub failed_checks: u64,
1007 pub avg_response_time: Duration,
1009}
1010
1011#[derive(Debug)]
1013pub struct WarmingManager {
1014 config: WarmingConfig,
1016 warming_states: Arc<DashMap<PeerId, WarmingState>>,
1018 #[allow(dead_code)]
1020 warming_tasks: Arc<DashMap<PeerId, tokio::task::JoinHandle<()>>>,
1021 stats: Arc<TokioRwLock<WarmingStats>>,
1023}
1024
1025#[derive(Debug, Clone)]
1027pub struct WarmingConfig {
1028 pub enabled: bool,
1030 pub min_pool_size: usize,
1032 pub warming_timeout: Duration,
1034 pub warming_retries: u32,
1036 pub predictive_threshold: f64,
1038}
1039
1040#[derive(Debug, Clone, Default)]
1042pub struct WarmingStats {
1043 pub total_attempts: u64,
1045 pub successful_warmings: u64,
1047 pub failed_warmings: u64,
1049 pub avg_warming_time: Duration,
1051}
1052
1053#[derive(Debug, Clone)]
1055pub struct ConnectionLimits {
1056 pub max_total: usize,
1058 pub max_per_peer: usize,
1060 pub max_idle: usize,
1062 pub connection_timeout: Duration,
1064 pub idle_timeout: Duration,
1066}
1067
1068impl Default for ConnectionLimits {
1069 fn default() -> Self {
1070 Self {
1071 max_total: 1000,
1072 max_per_peer: 10,
1073 max_idle: 100,
1074 connection_timeout: Duration::from_secs(30),
1075 idle_timeout: Duration::from_secs(300),
1076 }
1077 }
1078}
1079
1080#[async_trait]
1082pub trait HealthCheck: Send + Sync {
1083 async fn check(&self, peer_id: &PeerId, connection: &ConnectionInfo) -> HealthCheckResult;
1085}
1086
1087#[derive(Debug)]
1089pub struct PingHealthCheck {
1090 #[allow(dead_code)]
1091 timeout: Duration,
1092}
1093
1094impl Default for PingHealthCheck {
1095 fn default() -> Self {
1096 Self {
1097 timeout: Duration::from_secs(5),
1098 }
1099 }
1100}
1101
1102#[async_trait]
1103impl HealthCheck for PingHealthCheck {
1104 async fn check(&self, _peer_id: &PeerId, connection: &ConnectionInfo) -> HealthCheckResult {
1105 let start = Instant::now();
1106
1107 let success = connection.is_healthy() && rand::random::<f64>() > 0.1;
1109 let response_time = start.elapsed();
1110
1111 HealthCheckResult {
1112 timestamp: Instant::now(),
1113 success,
1114 response_time,
1115 error: if success {
1116 None
1117 } else {
1118 Some("Ping timeout".to_string())
1119 },
1120 health_score: if success { 1.0 } else { 0.0 },
1121 }
1122 }
1123}
1124
1125use std::collections::{BTreeMap, HashMap, VecDeque};
1126use tokio::sync::RwLock;
1127
1128impl ConnectionManager {
1129 pub async fn recover_connection(&self, peer_id: &PeerId) -> Result<(), NetworkError> {
1131 debug!("Attempting to recover connection for peer {:?}", peer_id);
1132
1133 self.connections.remove(peer_id);
1135
1136 self.connection_pool.remove(peer_id);
1138
1139 let mut retry_count = 0;
1141 let max_retries = 5;
1142
1143 while retry_count < max_retries {
1144 match self.connect(*peer_id).await {
1145 Ok(()) => {
1146 info!("Successfully recovered connection for peer {:?}", peer_id);
1147 return Ok(());
1148 }
1149 Err(e) => {
1150 retry_count += 1;
1151 let backoff_ms = 100u64 * (1 << retry_count); warn!(
1153 "Connection recovery attempt {} failed for peer {:?}: {}, retrying in {}ms",
1154 retry_count, peer_id, e, backoff_ms
1155 );
1156
1157 if retry_count >= max_retries {
1158 error!(
1159 "Failed to recover connection for peer {:?} after {} attempts",
1160 peer_id, max_retries
1161 );
1162 return Err(NetworkError::ConnectionError(format!(
1163 "Recovery failed after {} attempts",
1164 max_retries
1165 )));
1166 }
1167
1168 tokio::time::sleep(std::time::Duration::from_millis(backoff_ms)).await;
1169 }
1170 }
1171 }
1172
1173 Err(NetworkError::ConnectionError("Max retries exceeded".into()))
1174 }
1175
1176 pub async fn health_check(&self) -> Result<Vec<PeerId>, NetworkError> {
1178 let mut unhealthy_peers = Vec::new();
1179
1180 for entry in self.connections.iter() {
1181 let peer_id = *entry.key();
1182 let conn_info = entry.value();
1183
1184 match &conn_info.status {
1185 ConnectionStatus::Failed(_) => {
1186 unhealthy_peers.push(peer_id);
1187 warn!("Detected failed connection for peer {:?}", peer_id);
1188 }
1189 ConnectionStatus::Disconnected => {
1190 unhealthy_peers.push(peer_id);
1191 debug!("Detected disconnected peer {:?}", peer_id);
1192 }
1193 _ => {
1194 if !conn_info.is_healthy() {
1196 unhealthy_peers.push(peer_id);
1197 debug!(
1198 "Detected unhealthy connection for peer {:?} (quality: {:.2})",
1199 peer_id, conn_info.quality_score
1200 );
1201 }
1202 }
1203 }
1204 }
1205
1206 if !unhealthy_peers.is_empty() {
1207 info!(
1208 "Health check found {} unhealthy connections",
1209 unhealthy_peers.len()
1210 );
1211 }
1212
1213 Ok(unhealthy_peers)
1214 }
1215
1216 pub async fn auto_recover(&self) -> Result<usize, NetworkError> {
1218 let unhealthy_peers = self.health_check().await?;
1219 let total_unhealthy = unhealthy_peers.len();
1220 let mut recovered_count = 0;
1221
1222 for peer_id in unhealthy_peers {
1223 match self.recover_connection(&peer_id).await {
1224 Ok(()) => {
1225 recovered_count += 1;
1226 debug!("Auto-recovered connection for peer {:?}", peer_id);
1227 }
1228 Err(e) => {
1229 warn!(
1230 "Failed to auto-recover connection for peer {:?}: {}",
1231 peer_id, e
1232 );
1233 }
1234 }
1235 }
1236
1237 if recovered_count > 0 {
1238 info!(
1239 "Auto-recovery completed: {}/{} connections recovered",
1240 recovered_count, total_unhealthy
1241 );
1242 }
1243
1244 Ok(recovered_count)
1245 }
1246 pub fn new(max_connections: usize) -> Self {
1263 Self::with_pool_timeout(max_connections, std::time::Duration::from_secs(300))
1264 }
1265
1266 pub fn with_pool_timeout(max_connections: usize, pool_timeout: std::time::Duration) -> Self {
1284 let connection_limits = ConnectionLimits {
1285 max_total: max_connections,
1286 ..Default::default()
1287 };
1288
1289 Self {
1290 max_connections,
1291 connections: Arc::new(DashMap::new()),
1292 connection_pool: Arc::new(DashMap::new()),
1293 enhanced_pool: Arc::new(DashMap::new()),
1294 multiplexer: Arc::new(ConnectionMultiplexer::new(32, Duration::from_secs(30))),
1295 retry_manager: Arc::new(RetryManager::new()),
1296 load_balancer: Arc::new(LoadBalancer::new(LoadBalancingStrategy::WeightedRoundRobin)),
1297 health_monitor: Arc::new(HealthMonitor::new(HealthCheckConfig::default())),
1298 warming_manager: Arc::new(WarmingManager::new(WarmingConfig::default())),
1299 pool_timeout,
1300 metrics: Arc::new(ParkingRwLock::new(NetworkMetrics::default())),
1301 queue_metrics: Arc::new(ParkingRwLock::new(QueueMetrics::default())),
1302 latency_metrics: Arc::new(ParkingRwLock::new(LatencyMetrics::default())),
1303 throughput_metrics: Arc::new(ParkingRwLock::new(ThroughputMetrics::default())),
1304 health_tracker: Arc::new(RwLock::new(ConnectionHealthTracker::default())),
1305 circuit_breakers: Arc::new(DashMap::new()),
1306 quality_scores: Arc::new(DashMap::new()),
1307 maintenance_handle: None,
1308 connection_limits,
1309 monitoring_interval: Duration::from_secs(30),
1310 }
1311 }
1312
1313 pub async fn connect(&self, peer_id: PeerId) -> Result<(), NetworkError> {
1341 if let Some(mut circuit_breaker) = self.circuit_breakers.get_mut(&peer_id) {
1343 if !circuit_breaker.allow_request() {
1344 return Err(NetworkError::ConnectionError(
1345 "Circuit breaker is open for this peer".into(),
1346 ));
1347 }
1348 }
1349
1350 if let Some(entry) = self.connection_pool.get(&peer_id) {
1352 let (conn_info, last_used) = entry.value();
1353 if last_used.elapsed() < self.pool_timeout && conn_info.is_healthy() {
1354 self.connections.insert(peer_id, conn_info.clone());
1356 debug!("Reusing pooled healthy connection for peer {:?}", peer_id);
1357
1358 if let Some(mut circuit_breaker) = self.circuit_breakers.get_mut(&peer_id) {
1360 circuit_breaker.record_result(true);
1361 }
1362
1363 return Ok(());
1364 } else {
1365 self.connection_pool.remove(&peer_id);
1367 debug!(
1368 "Removing expired/unhealthy connection for peer {:?}",
1369 peer_id
1370 );
1371 }
1372 }
1373
1374 if self.connections.len() >= self.max_connections {
1376 warn!("Max connections reached");
1377 return Err(NetworkError::ConnectionError(
1378 "Max connections reached".into(),
1379 ));
1380 }
1381
1382 let connecting_info = ConnectionInfo::new(ConnectionStatus::Connecting);
1384 self.connections.insert(peer_id, connecting_info);
1385 debug!("Creating new connection for peer {:?}", peer_id);
1386
1387 let start_time = Instant::now();
1389 tokio::time::sleep(std::time::Duration::from_millis(10)).await;
1390 let connection_time = start_time.elapsed();
1391
1392 let success = rand::random::<f64>() > 0.1;
1394
1395 if success {
1396 let mut connected_info = ConnectionInfo::new(ConnectionStatus::Connected);
1398 connected_info.update_activity(true, connection_time, 0);
1399
1400 self.connections.insert(peer_id, connected_info.clone());
1401 self.quality_scores
1402 .insert(peer_id, connected_info.quality_score);
1403
1404 self.circuit_breakers
1406 .entry(peer_id)
1407 .or_insert_with(CircuitBreaker::default)
1408 .record_result(true);
1409
1410 debug!(
1411 "Successfully connected to peer {:?} in {:?}",
1412 peer_id, connection_time
1413 );
1414 } else {
1415 let failed_info =
1417 ConnectionInfo::new(ConnectionStatus::Failed("Connection timeout".into()));
1418 self.connections.insert(peer_id, failed_info);
1419
1420 self.circuit_breakers
1422 .entry(peer_id)
1423 .or_insert_with(CircuitBreaker::default)
1424 .record_result(false);
1425
1426 return Err(NetworkError::ConnectionError(
1427 "Failed to establish connection".into(),
1428 ));
1429 }
1430
1431 Ok(())
1432 }
1433
1434 pub fn update_status(&self, peer_id: PeerId, status: ConnectionStatus) {
1458 self.update_status_with_metrics(peer_id, status, None, 0);
1459 }
1460
1461 pub fn update_status_with_metrics(
1463 &self,
1464 peer_id: PeerId,
1465 status: ConnectionStatus,
1466 response_time: Option<Duration>,
1467 bytes_transferred: u64,
1468 ) {
1469 if let Some(mut conn_info) = self.connections.get_mut(&peer_id) {
1471 conn_info.status = status.clone();
1472 if let Some(rt) = response_time {
1473 let success = matches!(status, ConnectionStatus::Connected);
1474 conn_info.update_activity(success, rt, bytes_transferred);
1475
1476 self.quality_scores.insert(peer_id, conn_info.quality_score);
1478
1479 if let Some(mut circuit_breaker) = self.circuit_breakers.get_mut(&peer_id) {
1481 circuit_breaker.record_result(success);
1482 }
1483 }
1484 } else {
1485 let mut conn_info = ConnectionInfo::new(status);
1487 if let Some(rt) = response_time {
1488 let success = matches!(conn_info.status, ConnectionStatus::Connected);
1489 conn_info.update_activity(success, rt, bytes_transferred);
1490 }
1491 self.connections.insert(peer_id, conn_info);
1492 }
1493
1494 let mut metrics = self.metrics.write();
1496 metrics.connections = self.connections.len();
1497
1498 let active_count = self
1500 .connections
1501 .iter()
1502 .filter(|entry| entry.value().is_healthy())
1503 .count();
1504 metrics.active_connections = active_count;
1505 }
1506
1507 pub fn disconnect(&self, peer_id: &PeerId) {
1509 if let Some((_, conn_info)) = self.connections.remove(peer_id) {
1510 debug!(
1511 "Disconnected from peer {:?} with status {:?} (quality: {:.2})",
1512 peer_id, conn_info.status, conn_info.quality_score
1513 );
1514
1515 if conn_info.is_healthy() {
1517 self.connection_pool
1518 .insert(*peer_id, (conn_info, Instant::now()));
1519 }
1520 }
1521
1522 self.quality_scores.remove(peer_id);
1524
1525 if let Some(circuit_breaker) = self.circuit_breakers.get_mut(peer_id) {
1528 if circuit_breaker.state == CircuitBreakerState::Closed {
1529 }
1532 }
1533
1534 self.cleanup_pool();
1536
1537 let mut metrics = self.metrics.write();
1539 metrics.connections = self.connections.len();
1540
1541 let active_count = self
1543 .connections
1544 .iter()
1545 .filter(|entry| entry.value().is_healthy())
1546 .count();
1547 metrics.active_connections = active_count;
1548 }
1549
1550 fn cleanup_pool(&self) {
1552 self.connection_pool
1553 .retain(|_, (_, last_used)| last_used.elapsed() < self.pool_timeout);
1554 }
1555
1556 pub fn connection_count(&self) -> usize {
1558 self.connections.len()
1559 }
1560
1561 pub fn get_status(&self, peer_id: &PeerId) -> Option<ConnectionStatus> {
1563 self.connections
1564 .get(peer_id)
1565 .map(|entry| entry.value().status.clone())
1566 }
1567
1568 pub fn get_connection_info(&self, peer_id: &PeerId) -> Option<ConnectionInfo> {
1570 self.connections
1571 .get(peer_id)
1572 .map(|entry| entry.value().clone())
1573 }
1574
1575 pub fn get_quality_score(&self, peer_id: &PeerId) -> Option<f64> {
1577 self.quality_scores.get(peer_id).map(|entry| *entry.value())
1578 }
1579
1580 pub fn get_circuit_breaker_state(&self, peer_id: &PeerId) -> Option<CircuitBreakerState> {
1582 self.circuit_breakers
1583 .get(peer_id)
1584 .map(|entry| entry.value().state.clone())
1585 }
1586
1587 pub fn get_healthy_connections(&self) -> Vec<(PeerId, f64)> {
1589 let mut healthy_peers = Vec::new();
1590
1591 for entry in self.connections.iter() {
1592 let peer_id = *entry.key();
1593 let conn_info = entry.value();
1594
1595 if conn_info.is_healthy() {
1596 healthy_peers.push((peer_id, conn_info.quality_score));
1597 }
1598 }
1599
1600 healthy_peers.sort_by(|a, b| b.1.partial_cmp(&a.1).unwrap_or(std::cmp::Ordering::Equal));
1602 healthy_peers
1603 }
1604
1605 pub fn update_metrics(&self, messages_per_second: f64, avg_latency_ms: u64) {
1607 let latency_duration = std::time::Duration::from_millis(avg_latency_ms);
1608
1609 {
1611 let mut metrics = self.metrics.write();
1612 metrics.messages_per_second = messages_per_second;
1613 metrics.avg_latency = latency_duration;
1614 metrics.active_connections = self.connections.len();
1615 }
1616
1617 {
1619 let mut queue_metrics = self.queue_metrics.write();
1620 queue_metrics.current_size = self.connection_pool.len();
1621 queue_metrics.max_size = self.max_connections;
1622 queue_metrics.utilization =
1623 queue_metrics.current_size as f64 / queue_metrics.max_size as f64;
1624 queue_metrics.messages_per_second = messages_per_second;
1625 }
1626
1627 {
1629 let mut latency_metrics = self.latency_metrics.write();
1630 latency_metrics.avg_latency = latency_duration;
1631 latency_metrics.peak_latency = latency_metrics.peak_latency.max(latency_duration);
1632 }
1633
1634 {
1636 let mut throughput_metrics = self.throughput_metrics.write();
1637 throughput_metrics.messages_per_second = messages_per_second;
1638 throughput_metrics.total_messages += 1;
1639 throughput_metrics.avg_throughput =
1640 (throughput_metrics.avg_throughput + messages_per_second) / 2.0;
1641 throughput_metrics.peak_throughput =
1642 throughput_metrics.peak_throughput.max(messages_per_second);
1643 }
1644
1645 debug!(
1646 "Updated network metrics: {} msg/s, {} ms latency",
1647 messages_per_second, avg_latency_ms
1648 );
1649 }
1650
1651 pub fn get_queue_metrics(&self) -> QueueMetrics {
1653 self.queue_metrics.read().clone()
1654 }
1655
1656 pub fn get_latency_metrics(&self) -> LatencyMetrics {
1658 self.latency_metrics.read().clone()
1659 }
1660
1661 pub fn get_throughput_metrics(&self) -> ThroughputMetrics {
1663 self.throughput_metrics.read().clone()
1664 }
1665
1666 pub fn get_metrics(&self) -> NetworkMetrics {
1668 self.metrics.read().clone()
1669 }
1670
1671 pub async fn open_stream(
1674 &self,
1675 peer_id: PeerId,
1676 priority: Priority,
1677 ) -> Result<StreamId, NetworkError> {
1678 if !self.connections.contains_key(&peer_id) {
1680 self.connect(peer_id).await?;
1682 }
1683
1684 self.multiplexer.open_stream(peer_id, priority).await
1686 }
1687
1688 pub async fn close_stream(&self, stream_id: StreamId) -> Result<(), NetworkError> {
1690 self.multiplexer.close_stream(stream_id).await
1691 }
1692
1693 pub async fn send_stream_data(
1695 &self,
1696 stream_id: StreamId,
1697 data: Bytes,
1698 ) -> Result<(), NetworkError> {
1699 let stream_info = self
1701 .multiplexer
1702 .get_stream_info(stream_id)
1703 .ok_or_else(|| NetworkError::ConnectionError("Stream not found".into()))?;
1704
1705 if stream_info.state != StreamState::Active {
1706 return Err(NetworkError::ConnectionError("Stream not active".into()));
1707 }
1708
1709 info!("Sending {} bytes on stream {:?}", data.len(), stream_id);
1712 Ok(())
1713 }
1714
1715 pub async fn retry_connect(&self, peer_id: PeerId) -> Result<(), NetworkError> {
1717 let retry_manager = self.retry_manager.clone();
1718
1719 retry_manager
1720 .retry_operation(peer_id, || async { self.connect(peer_id).await })
1721 .await
1722 }
1723
1724 pub async fn select_best_connection(&self, available_peers: &[PeerId]) -> Option<PeerId> {
1726 self.load_balancer.select_connection(available_peers).await
1727 }
1728
1729 pub async fn start_health_monitoring(&self, peer_id: PeerId) {
1731 self.health_monitor.start_monitoring(peer_id).await;
1732 }
1733
1734 pub async fn check_connection_health(&self, peer_id: PeerId) -> Option<HealthCheckResult> {
1736 if let Some(connection_info) = self.get_connection_info(&peer_id) {
1737 Some(
1738 self.health_monitor
1739 .check_health(peer_id, &connection_info)
1740 .await,
1741 )
1742 } else {
1743 None
1744 }
1745 }
1746
1747 pub async fn warm_connections(&self, peer_id: PeerId) -> Result<(), NetworkError> {
1749 self.warming_manager.warm_connection(peer_id).await
1750 }
1751
1752 pub fn get_warming_state(&self, peer_id: &PeerId) -> WarmingState {
1754 self.warming_manager.get_warming_state(peer_id)
1755 }
1756
1757 pub async fn get_connection_statistics(&self) -> ConnectionStatistics {
1759 let health_stats = self.health_monitor.stats.read().await.clone();
1760 let retry_stats = self.retry_manager.stats.read().await.clone();
1761 let warming_stats = self.warming_manager.stats.read().await.clone();
1762 let load_balancing_stats = self.load_balancer.stats.read().await.clone();
1763
1764 ConnectionStatistics {
1765 total_connections: self.connections.len(),
1766 active_connections: self
1767 .connections
1768 .iter()
1769 .filter(|entry| entry.value().is_healthy())
1770 .count(),
1771 pooled_connections: self.enhanced_pool.len(),
1772 health_stats,
1773 retry_stats,
1774 warming_stats,
1775 load_balancing_stats,
1776 }
1777 }
1778
1779 pub fn set_connection_limits(&mut self, limits: ConnectionLimits) {
1781 self.max_connections = limits.max_total;
1782 self.connection_limits = limits;
1783 }
1784
1785 pub fn get_connection_limits(&self) -> &ConnectionLimits {
1787 &self.connection_limits
1788 }
1789}
1790
1791#[derive(Debug, Clone)]
1793pub struct ConnectionStatistics {
1794 pub total_connections: usize,
1796 pub active_connections: usize,
1798 pub pooled_connections: usize,
1800 pub health_stats: HealthStats,
1802 pub retry_stats: RetryStats,
1804 pub warming_stats: WarmingStats,
1806 pub load_balancing_stats: LoadBalancingStats,
1808}
1809
1810#[cfg(test)]
1811mod tests {
1812 use super::*;
1813 use std::net::{IpAddr, Ipv4Addr};
1814 use std::time::Instant;
1815 use tokio::time::Duration;
1816
1817 fn setup_test_config() -> SecureConfig {
1818 SecureConfig {
1819 transport_keys: TransportKeys::generate(),
1820 timeout: std::time::Duration::from_secs(5),
1821 keepalive: std::time::Duration::from_secs(10),
1822 }
1823 }
1824
1825 #[tokio::test]
1826 async fn test_secure_connection() {
1827 let test_config = setup_test_config();
1828 let test_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), 8000);
1829
1830 let server_config = ServerConfig::default();
1832 let endpoint = Endpoint::server(server_config, "127.0.0.1:0".parse().unwrap())
1833 .unwrap()
1834 .0;
1835
1836 let mut connection = SecureConnection::new(&endpoint, test_addr, test_config)
1838 .await
1839 .expect("Failed to create secure connection");
1840
1841 let test_data = Bytes::from(b"test message".to_vec());
1843 connection
1844 .send(test_data)
1845 .await
1846 .expect("Failed to send message");
1847 }
1848
1849 #[tokio::test]
1850 async fn test_connection_management() {
1851 let manager = ConnectionManager::new(2);
1852 let peer1 = PeerId::random();
1853 let peer2 = PeerId::random();
1854 let peer3 = PeerId::random();
1855
1856 assert!(manager.connect(peer1).await.is_ok());
1858 assert!(manager.connect(peer2).await.is_ok());
1859 assert!(manager.connect(peer3).await.is_ok()); assert_eq!(manager.connection_count(), 2);
1862
1863 manager.update_status(peer1, ConnectionStatus::Connected);
1865 assert_eq!(
1866 manager.get_status(&peer1),
1867 Some(ConnectionStatus::Connected)
1868 );
1869
1870 manager.disconnect(&peer1);
1872 assert_eq!(manager.get_status(&peer1), None);
1873 assert_eq!(manager.connection_count(), 1);
1874
1875 manager.update_metrics(1000.0, 50);
1877 let metrics = manager.get_metrics();
1878 assert_eq!(metrics.messages_per_second, 1000.0);
1879 assert_eq!(metrics.connections, 1);
1880 }
1881
1882 #[tokio::test]
1883 async fn bench_route_computation() {
1884 let manager = ConnectionManager::new(100);
1885 let _rng = rand::thread_rng();
1886 let mut latencies = Vec::new();
1887
1888 for _ in 0..1000 {
1889 let peer = PeerId::random();
1890 let start = Instant::now();
1891 manager.connect(peer).await.unwrap();
1892 latencies.push(start.elapsed());
1893 }
1894
1895 let avg_latency = latencies.iter().sum::<Duration>() / latencies.len() as u32;
1896 let max_latency = latencies.iter().max().unwrap();
1897
1898 println!("Route Computation Benchmark:");
1899 println!("Average latency: {:?}", avg_latency);
1900 println!("Maximum latency: {:?}", max_latency);
1901 println!("Total routes: {}", manager.connection_count());
1902 }
1903
1904 #[tokio::test]
1905 async fn bench_cache_efficiency() {
1906 let manager = ConnectionManager::new(1000);
1907 let mut hit_count = 0;
1908 let iterations = 10000;
1909
1910 for _ in 0..iterations {
1911 let peer = PeerId::random();
1912 let _start = Instant::now();
1913
1914 if let Some(_) = manager.connection_pool.get(&peer) {
1916 hit_count += 1;
1917 } else {
1918 manager.connect(peer).await.unwrap();
1919 }
1920 }
1921
1922 let hit_rate = (hit_count as f64 / iterations as f64) * 100.0;
1923 println!("Cache Efficiency Benchmark:");
1924 println!("Cache hit rate: {:.2}%", hit_rate);
1925 println!("Pool size: {}", manager.connection_pool.len());
1926 }
1927
1928 #[tokio::test]
1929 async fn bench_circuit_setup() {
1930 let test_config = setup_test_config();
1931 let test_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), 8000);
1932 let server_config = ServerConfig::default();
1933 let endpoint = Endpoint::server(server_config, "127.0.0.1:0".parse().unwrap())
1934 .unwrap()
1935 .0;
1936
1937 let mut setup_times = Vec::new();
1938 for _ in 0..100 {
1939 let start = Instant::now();
1940 let _connection =
1941 SecureConnection::new(&endpoint, test_addr, test_config.clone()).await;
1942 setup_times.push(start.elapsed());
1943 }
1944
1945 let avg_setup = setup_times.iter().sum::<Duration>() / setup_times.len() as u32;
1946 println!("Circuit Setup Benchmark:");
1947 println!("Average setup time: {:?}", avg_setup);
1948 }
1949
1950 #[tokio::test]
1951 async fn bench_connection_pooling() {
1952 let manager = ConnectionManager::with_pool_timeout(1000, Duration::from_secs(60));
1953 let test_peers: Vec<PeerId> = (0..100).map(|_| PeerId::random()).collect();
1954 let mut reuse_times = Vec::new();
1955
1956 for peer in test_peers.iter() {
1958 manager.connect(*peer).await.unwrap();
1959 }
1960
1961 for peer in test_peers.iter() {
1963 let start = Instant::now();
1964 manager.connect(*peer).await.unwrap();
1965 reuse_times.push(start.elapsed());
1966 }
1967
1968 let avg_reuse = reuse_times.iter().sum::<Duration>() / reuse_times.len() as u32;
1969 println!("Connection Pooling Benchmark:");
1970 println!("Average reuse time: {:?}", avg_reuse);
1971 println!(
1972 "Pool utilization: {:.2}%",
1973 (manager.get_queue_metrics().utilization * 100.0)
1974 );
1975 }
1976
1977 #[tokio::test]
1978 async fn bench_message_throughput() {
1979 let test_config = setup_test_config();
1980 let test_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), 8000);
1981 let server_config = ServerConfig::default();
1982 let endpoint = Endpoint::server(server_config, "127.0.0.1:0".parse().unwrap())
1983 .unwrap()
1984 .0;
1985
1986 let mut connection = SecureConnection::new(&endpoint, test_addr, test_config)
1987 .await
1988 .unwrap();
1989 let start = Instant::now();
1990 let message_count = 10000;
1991 let message_size = 1024; for _ in 0..message_count {
1994 let data = Bytes::from(vec![1u8; message_size]);
1995 connection.send(data).await.unwrap();
1996 }
1997
1998 let elapsed = start.elapsed();
1999 let throughput = message_count as f64 / elapsed.as_secs_f64();
2000 let mb_per_sec = (throughput * message_size as f64) / (1024.0 * 1024.0);
2001
2002 println!("Message Throughput Benchmark:");
2003 println!("Messages per second: {:.2}", throughput);
2004 println!("Throughput: {:.2} MB/s", mb_per_sec);
2005 println!("Total time: {:?}", elapsed);
2006 }
2007}
2008
2009impl ConnectionMultiplexer {
2011 pub fn new(max_streams_per_connection: u32, stream_timeout: Duration) -> Self {
2013 Self {
2014 connections: Arc::new(DashMap::new()),
2015 stream_routes: Arc::new(DashMap::new()),
2016 priority_queue: Arc::new(TokioRwLock::new(BTreeMap::new())),
2017 max_streams_per_connection,
2018 stream_timeout,
2019 }
2020 }
2021
2022 pub async fn open_stream(
2024 &self,
2025 peer_id: PeerId,
2026 priority: Priority,
2027 ) -> Result<StreamId, NetworkError> {
2028 let mut connection = self
2029 .connections
2030 .get_mut(&peer_id)
2031 .ok_or_else(|| NetworkError::ConnectionError("Connection not found".into()))?;
2032
2033 if connection.streams.len() >= self.max_streams_per_connection as usize {
2034 return Err(NetworkError::ConnectionError(
2035 "Maximum streams reached".into(),
2036 ));
2037 }
2038
2039 let _ = connection
2041 .stream_semaphore
2042 .acquire()
2043 .await
2044 .map_err(|_| NetworkError::ConnectionError("Stream semaphore closed".into()))?;
2045
2046 let stream_id = StreamId(connection.next_stream_id);
2047 connection.next_stream_id += 1;
2048
2049 let stream_info = StreamInfo {
2050 id: stream_id,
2051 priority,
2052 state: StreamState::Opening,
2053 created_at: Instant::now(),
2054 last_activity: Instant::now(),
2055 bytes_transferred: 0,
2056 };
2057
2058 connection.streams.insert(stream_id, stream_info);
2059 self.stream_routes.insert(stream_id, peer_id);
2060
2061 let mut queue = self.priority_queue.write().await;
2063 queue
2064 .entry(priority)
2065 .or_insert_with(VecDeque::new)
2066 .push_back(stream_id);
2067
2068 Ok(stream_id)
2069 }
2070
2071 pub async fn close_stream(&self, stream_id: StreamId) -> Result<(), NetworkError> {
2073 let peer_id = self
2074 .stream_routes
2075 .remove(&stream_id)
2076 .ok_or_else(|| NetworkError::ConnectionError("Stream not found".into()))?
2077 .1;
2078
2079 if let Some(mut connection) = self.connections.get_mut(&peer_id) {
2080 if let Some(stream) = connection.streams.get_mut(&stream_id) {
2081 stream.state = StreamState::Closed;
2082 stream.last_activity = Instant::now();
2083 }
2084 connection.streams.remove(&stream_id);
2085
2086 connection.utilization =
2088 connection.streams.len() as f64 / self.max_streams_per_connection as f64;
2089 }
2090
2091 Ok(())
2092 }
2093
2094 pub fn get_stream_info(&self, stream_id: StreamId) -> Option<StreamInfo> {
2096 let peer_id = self.stream_routes.get(&stream_id)?.value().clone();
2097 let connection = self.connections.get(&peer_id)?;
2098 connection.streams.get(&stream_id).cloned()
2099 }
2100}
2101
2102impl RetryManager {
2103 pub fn new() -> Self {
2105 Self {
2106 retry_configs: Arc::new(DashMap::new()),
2107 default_config: RetryConfig::default(),
2108 stats: Arc::new(TokioRwLock::new(RetryStats::default())),
2109 }
2110 }
2111
2112 pub async fn retry_operation<F, Fut, T, E>(&self, peer_id: PeerId, operation: F) -> Result<T, E>
2114 where
2115 F: Fn() -> Fut + Send + Sync,
2116 Fut: Future<Output = Result<T, E>> + Send,
2117 E: std::fmt::Debug,
2118 {
2119 let config = self
2120 .retry_configs
2121 .get(&peer_id)
2122 .map(|entry| entry.value().clone())
2123 .unwrap_or_else(|| self.default_config.clone());
2124
2125 let mut attempt = 0;
2126 let mut backoff = config.initial_backoff;
2127
2128 loop {
2129 let start = Instant::now();
2130 let result = operation().await;
2131 let _duration = start.elapsed();
2132
2133 match result {
2134 Ok(value) => {
2135 let mut stats = self.stats.write().await;
2137 stats.total_attempts += 1;
2138 stats.successful_retries += 1;
2139 return Ok(value);
2140 }
2141 Err(error) => {
2142 attempt += 1;
2143 if attempt >= config.max_retries {
2144 let mut stats = self.stats.write().await;
2146 stats.total_attempts += 1;
2147 stats.failed_retries += 1;
2148 return Err(error);
2149 }
2150
2151 let jitter = (rand::random::<f64>() - 0.5) * 2.0 * config.jitter_factor;
2153 let backoff_with_jitter = Duration::from_millis(
2154 ((backoff.as_millis() as f64) * (1.0 + jitter)) as u64,
2155 );
2156
2157 sleep(backoff_with_jitter).await;
2158
2159 backoff = std::cmp::min(
2161 Duration::from_millis(
2162 (backoff.as_millis() as f64 * config.backoff_multiplier) as u64,
2163 ),
2164 config.max_backoff,
2165 );
2166 }
2167 }
2168 }
2169 }
2170}
2171
2172impl Default for RetryConfig {
2173 fn default() -> Self {
2174 Self {
2175 max_retries: 3,
2176 initial_backoff: Duration::from_millis(100),
2177 max_backoff: Duration::from_secs(30),
2178 backoff_multiplier: 2.0,
2179 jitter_factor: 0.1,
2180 timeout: Duration::from_secs(10),
2181 }
2182 }
2183}
2184
2185impl LoadBalancer {
2186 pub fn new(strategy: LoadBalancingStrategy) -> Self {
2188 Self {
2189 strategy,
2190 weights: Arc::new(DashMap::new()),
2191 round_robin_counter: AtomicU64::new(0),
2192 stats: Arc::new(TokioRwLock::new(LoadBalancingStats::default())),
2193 }
2194 }
2195
2196 pub async fn select_connection(&self, available_peers: &[PeerId]) -> Option<PeerId> {
2198 if available_peers.is_empty() {
2199 return None;
2200 }
2201
2202 let selected = match self.strategy {
2203 LoadBalancingStrategy::RoundRobin => {
2204 let index = self.round_robin_counter.fetch_add(1, Ordering::Relaxed) as usize;
2205 available_peers[index % available_peers.len()]
2206 }
2207 LoadBalancingStrategy::WeightedRoundRobin => {
2208 self.select_weighted_round_robin(available_peers).await
2209 }
2210 LoadBalancingStrategy::LeastConnections => {
2211 self.select_least_connections(available_peers).await
2212 }
2213 LoadBalancingStrategy::ResponseTime => {
2214 self.select_best_response_time(available_peers).await
2215 }
2216 LoadBalancingStrategy::ResourceUtilization => {
2217 self.select_least_utilized(available_peers).await
2218 }
2219 };
2220
2221 let mut stats = self.stats.write().await;
2223 stats.total_requests += 1;
2224 *stats.peer_distribution.entry(selected).or_insert(0) += 1;
2225
2226 Some(selected)
2227 }
2228
2229 async fn select_weighted_round_robin(&self, peers: &[PeerId]) -> PeerId {
2231 let mut total_weight = 0.0;
2232 let mut weighted_peers = Vec::new();
2233
2234 for &peer_id in peers {
2235 let weight = self
2236 .weights
2237 .get(&peer_id)
2238 .map(|entry| *entry.value())
2239 .unwrap_or(1.0);
2240 total_weight += weight;
2241 weighted_peers.push((peer_id, weight));
2242 }
2243
2244 let mut target = rand::random::<f64>() * total_weight;
2245 for (peer_id, weight) in weighted_peers {
2246 target -= weight;
2247 if target <= 0.0 {
2248 return peer_id;
2249 }
2250 }
2251
2252 peers[0] }
2254
2255 async fn select_least_connections(&self, peers: &[PeerId]) -> PeerId {
2257 peers[0]
2259 }
2260
2261 async fn select_best_response_time(&self, peers: &[PeerId]) -> PeerId {
2263 let stats = self.stats.read().await;
2264 let mut best_peer = peers[0];
2265 let mut best_time = Duration::from_secs(u64::MAX);
2266
2267 for &peer_id in peers {
2268 if let Some(avg_time) = stats.avg_response_times.get(&peer_id) {
2269 if *avg_time < best_time {
2270 best_time = *avg_time;
2271 best_peer = peer_id;
2272 }
2273 }
2274 }
2275
2276 best_peer
2277 }
2278
2279 async fn select_least_utilized(&self, peers: &[PeerId]) -> PeerId {
2281 peers[0]
2283 }
2284}
2285
2286impl HealthMonitor {
2287 pub fn new(config: HealthCheckConfig) -> Self {
2289 Self {
2290 config,
2291 results: Arc::new(DashMap::new()),
2292 scheduler: Arc::new(TokioRwLock::new(HealthCheckScheduler {
2293 scheduled_checks: HashMap::new(),
2294 check_intervals: HashMap::new(),
2295 })),
2296 stats: Arc::new(TokioRwLock::new(HealthStats::default())),
2297 }
2298 }
2299
2300 pub async fn start_monitoring(&self, peer_id: PeerId) {
2302 let mut scheduler = self.scheduler.write().await;
2303 scheduler
2304 .scheduled_checks
2305 .insert(peer_id, Instant::now() + self.config.interval);
2306 scheduler
2307 .check_intervals
2308 .insert(peer_id, self.config.interval);
2309 }
2310
2311 pub async fn check_health(
2313 &self,
2314 peer_id: PeerId,
2315 connection: &ConnectionInfo,
2316 ) -> HealthCheckResult {
2317 let checker = PingHealthCheck::default();
2318 let result = checker.check(&peer_id, connection).await;
2319
2320 self.results.insert(peer_id, result.clone());
2322
2323 let mut stats = self.stats.write().await;
2325 stats.total_checks += 1;
2326 if result.success {
2327 stats.successful_checks += 1;
2328 } else {
2329 stats.failed_checks += 1;
2330 }
2331
2332 let total_time = stats.avg_response_time.as_millis() as f64 * stats.total_checks as f64;
2334 let new_avg = (total_time + result.response_time.as_millis() as f64)
2335 / (stats.total_checks + 1) as f64;
2336 stats.avg_response_time = Duration::from_millis(new_avg as u64);
2337
2338 result
2339 }
2340
2341 pub fn get_health_result(&self, peer_id: &PeerId) -> Option<HealthCheckResult> {
2343 self.results.get(peer_id).map(|entry| entry.value().clone())
2344 }
2345}
2346
2347impl Default for HealthCheckConfig {
2348 fn default() -> Self {
2349 Self {
2350 interval: Duration::from_secs(30),
2351 timeout: Duration::from_secs(5),
2352 failure_threshold: 3,
2353 recovery_threshold: 2,
2354 check_type: HealthCheckType::Ping,
2355 }
2356 }
2357}
2358
2359impl WarmingManager {
2360 pub fn new(config: WarmingConfig) -> Self {
2362 Self {
2363 config,
2364 warming_states: Arc::new(DashMap::new()),
2365 warming_tasks: Arc::new(DashMap::new()),
2366 stats: Arc::new(TokioRwLock::new(WarmingStats::default())),
2367 }
2368 }
2369
2370 pub async fn warm_connection(&self, peer_id: PeerId) -> Result<(), NetworkError> {
2372 if !self.config.enabled {
2373 return Ok(());
2374 }
2375
2376 self.warming_states.insert(peer_id, WarmingState::Warming);
2378
2379 let start = Instant::now();
2381 sleep(Duration::from_millis(100)).await; let warming_time = start.elapsed();
2383
2384 let mut stats = self.stats.write().await;
2386 stats.total_attempts += 1;
2387
2388 if rand::random::<f64>() > 0.1 {
2389 self.warming_states.insert(peer_id, WarmingState::Warm);
2391 stats.successful_warmings += 1;
2392
2393 let total_time =
2395 stats.avg_warming_time.as_millis() as f64 * stats.successful_warmings as f64;
2396 let new_avg = (total_time + warming_time.as_millis() as f64)
2397 / (stats.successful_warmings + 1) as f64;
2398 stats.avg_warming_time = Duration::from_millis(new_avg as u64);
2399
2400 Ok(())
2401 } else {
2402 self.warming_states.insert(
2403 peer_id,
2404 WarmingState::FailedToWarm("Warming timeout".to_string()),
2405 );
2406 stats.failed_warmings += 1;
2407 Err(NetworkError::ConnectionError(
2408 "Connection warming failed".into(),
2409 ))
2410 }
2411 }
2412
2413 pub fn get_warming_state(&self, peer_id: &PeerId) -> WarmingState {
2415 self.warming_states
2416 .get(peer_id)
2417 .map(|entry| entry.value().clone())
2418 .unwrap_or(WarmingState::Cold)
2419 }
2420}
2421
2422impl Default for WarmingConfig {
2423 fn default() -> Self {
2424 Self {
2425 enabled: true,
2426 min_pool_size: 5,
2427 warming_timeout: Duration::from_secs(10),
2428 warming_retries: 3,
2429 predictive_threshold: 0.8,
2430 }
2431 }
2432}