qudag-network 0.5.0

P2P networking layer for QuDAG - LibP2P with onion routing, dark addressing, and quantum encryption
Documentation
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
1001
1002
1003
1004
1005
1006
1007
1008
1009
1010
1011
1012
1013
1014
1015
1016
1017
1018
1019
1020
1021
1022
1023
1024
1025
1026
1027
1028
1029
1030
1031
1032
1033
1034
1035
1036
1037
1038
1039
1040
1041
1042
1043
1044
1045
1046
1047
1048
1049
1050
1051
1052
1053
1054
1055
1056
1057
1058
1059
1060
1061
1062
1063
1064
1065
1066
1067
1068
1069
1070
1071
1072
1073
1074
1075
1076
1077
1078
1079
1080
1081
1082
1083
1084
1085
1086
1087
1088
1089
1090
1091
1092
1093
1094
1095
1096
1097
1098
1099
1100
1101
1102
1103
1104
1105
1106
1107
1108
1109
1110
1111
1112
1113
1114
1115
1116
1117
1118
1119
1120
1121
1122
1123
1124
1125
1126
1127
1128
1129
1130
1131
1132
1133
1134
1135
1136
1137
1138
1139
1140
1141
1142
1143
1144
1145
1146
1147
1148
1149
1150
1151
1152
1153
1154
1155
1156
1157
1158
1159
1160
1161
1162
1163
1164
1165
1166
1167
1168
1169
1170
1171
1172
1173
1174
1175
1176
1177
1178
1179
1180
1181
1182
1183
1184
1185
1186
1187
1188
1189
1190
1191
1192
1193
1194
1195
1196
1197
1198
1199
1200
1201
1202
1203
1204
1205
1206
1207
1208
1209
1210
1211
1212
1213
1214
1215
1216
1217
1218
1219
1220
1221
1222
1223
1224
1225
1226
1227
1228
1229
1230
1231
1232
1233
1234
1235
1236
1237
1238
1239
1240
1241
1242
1243
1244
1245
1246
1247
1248
1249
1250
1251
1252
1253
1254
1255
1256
1257
1258
1259
1260
1261
1262
1263
1264
1265
1266
1267
1268
1269
1270
1271
1272
1273
1274
1275
1276
1277
1278
1279
1280
1281
1282
1283
1284
1285
1286
1287
1288
1289
1290
1291
1292
1293
1294
1295
1296
1297
1298
1299
1300
1301
1302
1303
1304
1305
1306
1307
1308
1309
1310
1311
1312
1313
1314
1315
1316
1317
1318
1319
1320
1321
1322
1323
1324
1325
1326
1327
1328
1329
1330
1331
1332
1333
1334
1335
1336
1337
1338
1339
1340
1341
1342
1343
1344
1345
1346
1347
1348
1349
1350
1351
1352
1353
1354
1355
1356
1357
1358
1359
1360
1361
1362
1363
1364
1365
1366
1367
1368
1369
1370
1371
1372
1373
1374
1375
1376
1377
1378
1379
1380
1381
1382
1383
1384
1385
1386
1387
1388
1389
1390
1391
1392
1393
1394
1395
1396
1397
1398
1399
1400
1401
1402
1403
1404
1405
1406
1407
1408
1409
1410
1411
1412
1413
1414
1415
1416
1417
1418
1419
1420
1421
1422
1423
1424
1425
1426
1427
1428
1429
1430
1431
1432
1433
1434
1435
1436
1437
1438
1439
1440
1441
1442
1443
1444
1445
1446
1447
1448
1449
1450
1451
1452
1453
1454
1455
1456
1457
1458
1459
1460
1461
1462
1463
1464
1465
1466
1467
1468
1469
1470
1471
1472
1473
1474
1475
1476
1477
1478
1479
1480
1481
1482
1483
1484
1485
1486
1487
1488
1489
1490
1491
1492
1493
1494
1495
1496
1497
1498
1499
1500
1501
1502
1503
1504
1505
1506
1507
1508
1509
1510
1511
1512
1513
1514
1515
1516
1517
1518
1519
1520
1521
1522
1523
1524
1525
1526
1527
1528
1529
1530
1531
1532
1533
1534
1535
1536
1537
1538
1539
1540
1541
1542
1543
1544
1545
1546
1547
1548
1549
1550
1551
1552
1553
1554
1555
1556
1557
1558
1559
1560
1561
1562
1563
1564
1565
1566
1567
1568
1569
1570
1571
1572
1573
1574
1575
1576
1577
1578
1579
1580
1581
1582
1583
1584
1585
1586
1587
1588
1589
1590
1591
1592
1593
1594
1595
1596
1597
1598
1599
1600
1601
1602
1603
1604
1605
1606
1607
1608
1609
1610
1611
1612
1613
1614
1615
1616
1617
1618
1619
1620
1621
1622
1623
1624
1625
1626
1627
1628
1629
1630
1631
1632
1633
1634
1635
1636
1637
1638
1639
1640
1641
1642
1643
1644
1645
1646
1647
1648
1649
1650
1651
1652
1653
1654
1655
1656
1657
1658
1659
1660
1661
1662
1663
1664
1665
1666
1667
1668
1669
1670
1671
1672
1673
1674
1675
1676
1677
1678
1679
1680
1681
1682
1683
1684
1685
1686
1687
1688
1689
1690
1691
1692
1693
1694
1695
1696
1697
1698
1699
1700
1701
1702
1703
1704
1705
1706
1707
1708
1709
1710
1711
1712
1713
1714
1715
1716
1717
1718
1719
1720
1721
1722
1723
1724
1725
1726
1727
1728
1729
1730
1731
1732
1733
1734
1735
1736
1737
1738
1739
1740
1741
1742
1743
1744
1745
1746
1747
1748
1749
1750
1751
1752
1753
1754
1755
1756
1757
1758
1759
1760
1761
1762
1763
1764
1765
1766
1767
1768
1769
1770
1771
1772
1773
1774
1775
1776
1777
1778
1779
1780
1781
1782
1783
1784
1785
1786
1787
1788
1789
1790
1791
1792
1793
1794
1795
1796
1797
1798
1799
1800
1801
1802
1803
1804
1805
1806
1807
1808
1809
1810
1811
1812
1813
1814
1815
1816
1817
1818
1819
1820
1821
1822
1823
1824
1825
1826
1827
1828
1829
1830
1831
1832
1833
1834
1835
1836
1837
1838
1839
1840
1841
1842
1843
1844
1845
1846
1847
1848
1849
1850
1851
1852
1853
1854
1855
1856
1857
1858
1859
1860
1861
1862
1863
1864
1865
1866
1867
1868
1869
1870
1871
1872
1873
1874
1875
1876
1877
1878
1879
1880
1881
1882
1883
1884
1885
1886
1887
1888
1889
1890
1891
1892
1893
1894
1895
1896
1897
1898
1899
1900
1901
1902
1903
1904
1905
1906
1907
1908
1909
1910
1911
1912
1913
1914
1915
1916
1917
1918
1919
1920
1921
1922
1923
1924
1925
1926
1927
1928
1929
1930
1931
1932
1933
1934
1935
1936
1937
1938
1939
1940
1941
1942
1943
1944
1945
1946
1947
1948
1949
1950
1951
1952
1953
1954
1955
1956
1957
1958
1959
1960
1961
1962
1963
1964
1965
1966
1967
1968
1969
1970
1971
1972
1973
1974
1975
1976
1977
1978
1979
1980
1981
1982
1983
1984
1985
1986
1987
1988
1989
1990
1991
1992
1993
1994
1995
1996
1997
1998
1999
2000
2001
2002
2003
2004
2005
2006
2007
2008
2009
2010
2011
2012
2013
2014
2015
2016
2017
2018
2019
2020
2021
2022
2023
2024
2025
2026
2027
2028
2029
2030
2031
2032
2033
2034
2035
2036
2037
2038
2039
2040
2041
2042
2043
2044
2045
2046
2047
2048
2049
2050
2051
2052
2053
2054
2055
2056
2057
2058
2059
2060
2061
2062
2063
2064
2065
2066
2067
2068
2069
2070
2071
2072
2073
2074
2075
2076
2077
2078
2079
2080
2081
2082
2083
2084
2085
2086
2087
2088
2089
2090
2091
2092
2093
2094
2095
2096
2097
2098
2099
2100
2101
2102
2103
2104
2105
2106
2107
2108
2109
2110
2111
2112
2113
2114
2115
2116
2117
2118
2119
2120
2121
2122
2123
2124
2125
2126
2127
2128
2129
2130
2131
2132
2133
2134
2135
2136
2137
2138
2139
2140
2141
2142
2143
2144
2145
2146
2147
2148
2149
2150
2151
2152
2153
2154
2155
2156
2157
2158
2159
2160
2161
2162
2163
2164
2165
2166
2167
2168
2169
2170
2171
2172
2173
2174
2175
2176
2177
2178
2179
2180
2181
2182
2183
2184
2185
2186
2187
2188
2189
2190
2191
2192
2193
2194
2195
2196
2197
2198
2199
2200
2201
2202
2203
2204
2205
2206
2207
2208
2209
2210
2211
2212
2213
2214
2215
2216
2217
2218
2219
2220
2221
2222
2223
2224
2225
2226
2227
2228
2229
2230
2231
2232
2233
2234
2235
2236
2237
2238
2239
2240
2241
2242
2243
2244
2245
2246
2247
2248
2249
2250
2251
2252
2253
2254
2255
2256
2257
2258
2259
2260
2261
2262
2263
2264
2265
2266
2267
2268
2269
2270
2271
2272
2273
2274
2275
2276
2277
2278
2279
2280
2281
2282
2283
2284
2285
2286
2287
2288
2289
2290
2291
2292
2293
2294
2295
2296
2297
2298
2299
2300
2301
2302
2303
2304
2305
2306
2307
2308
2309
2310
2311
2312
2313
2314
2315
2316
2317
2318
2319
2320
2321
2322
2323
2324
2325
2326
2327
2328
2329
2330
2331
2332
2333
2334
2335
2336
2337
2338
2339
2340
2341
2342
2343
2344
2345
2346
2347
2348
2349
2350
2351
2352
2353
2354
2355
2356
2357
2358
2359
2360
2361
2362
2363
2364
2365
2366
2367
2368
2369
2370
2371
2372
2373
2374
2375
2376
2377
2378
2379
2380
2381
2382
2383
2384
2385
2386
2387
2388
2389
2390
2391
2392
2393
2394
2395
2396
2397
2398
2399
2400
2401
2402
2403
2404
2405
2406
2407
2408
2409
2410
2411
2412
2413
2414
2415
2416
2417
2418
2419
2420
2421
2422
2423
2424
2425
2426
2427
2428
2429
2430
2431
2432
#![deny(unsafe_code)]

use crate::types::{
    ConnectionStatus, LatencyMetrics, NetworkError, NetworkMetrics, PeerId, QueueMetrics,
    ThroughputMetrics,
};
use anyhow::Result;
use async_trait::async_trait;
use bytes::{Buf, BufMut, Bytes, BytesMut};
use dashmap::DashMap;
use futures::future::Future;
use parking_lot::RwLock as ParkingRwLock;
use quinn::{Connection, Endpoint};
use ring::{aead, agreement, rand as ring_rand};
use std::net::SocketAddr;
use std::sync::atomic::{AtomicU64, AtomicUsize, Ordering};
use std::sync::Arc;
use std::time::{Duration, Instant};
use tokio::sync::{mpsc, RwLock as TokioRwLock, Semaphore};
use tokio::time::sleep;
use tracing::{debug, error, info, warn};

/// Secure connection configuration
#[derive(Clone)]
pub struct SecureConfig {
    /// Transport encryption keys
    pub transport_keys: TransportKeys,
    /// Connection timeout
    pub timeout: std::time::Duration,
    /// Keep-alive interval
    pub keepalive: std::time::Duration,
}

/// Transport encryption keys
pub struct TransportKeys {
    /// Static private key
    #[allow(dead_code)]
    private_key: agreement::EphemeralPrivateKey,
    /// Static public key
    public_key: Vec<u8>,
}

impl Clone for TransportKeys {
    fn clone(&self) -> Self {
        // Generate new keys for each clone to maintain security
        Self::generate()
    }
}

impl TransportKeys {
    /// Generate new transport keys
    pub fn generate() -> Self {
        let rng = ring_rand::SystemRandom::new();
        let private_key =
            agreement::EphemeralPrivateKey::generate(&agreement::X25519, &rng).unwrap();
        let public_key = private_key.compute_public_key().unwrap().as_ref().to_vec();

        Self {
            private_key,
            public_key,
        }
    }
}

/// Secure connection handler
///
/// # Examples
///
/// ```rust,ignore
/// use qudag_network::{SecureConnection, SecureConfig, TransportKeys};
/// use std::time::Duration;
///
/// // Create configuration
/// let config = SecureConfig {
///     transport_keys: TransportKeys::generate(),
///     timeout: Duration::from_secs(30),
///     keepalive: Duration::from_secs(5),
/// };
///
/// // Connect to peer (requires async context)
/// // let connection = SecureConnection::new(&endpoint, addr, config).await?;
/// ```
pub struct SecureConnection {
    /// QUIC connection
    #[allow(dead_code)]
    connection: Connection,
    /// Encryption keys
    #[allow(dead_code)]
    keys: TransportKeys,
    /// Message channels
    channels: ConnectionChannels,
}

/// High-performance connection message channels with zero-copy optimizations
struct ConnectionChannels {
    /// Outbound message sender with zero-copy buffers
    tx: mpsc::Sender<Bytes>,
    /// Inbound message receiver
    rx: mpsc::Receiver<Bytes>,
    /// Outbound batch buffer (reusable)
    batch_buffer: BytesMut,
    /// Message batch size
    batch_size: usize,
    /// Batch timeout
    batch_timeout: std::time::Duration,
    /// Last batch time
    last_batch: std::time::Instant,
    /// Queue high water mark
    high_water_mark: usize,
    /// Queue low water mark
    low_water_mark: usize,
    /// Back pressure signal
    back_pressure: Arc<tokio::sync::Notify>,
    /// Current queue size in bytes (lock-free)
    queue_size: AtomicUsize,
    /// Encryption key cache
    key_cache: Arc<aead::LessSafeKey>,
    /// Nonce counter for unique nonces
    nonce_counter: AtomicU64,
    /// Message counter for metrics
    message_count: AtomicU64,
    /// Bytes processed counter
    bytes_processed: AtomicU64,
}

impl SecureConnection {
    /// Create new secure connection
    pub async fn new(
        endpoint: &Endpoint,
        addr: SocketAddr,
        config: SecureConfig,
    ) -> Result<Self, NetworkError> {
        // Connect using QUIC
        let connection = endpoint
            .connect(addr, "qudag")
            .map_err(|e| NetworkError::ConnectionError(e.to_string()))?
            .await
            .map_err(|e| NetworkError::ConnectionError(e.to_string()))?;

        // Create high-throughput message channels with zero-copy buffers
        let (tx, rx) = mpsc::channel(65_536); // 64K buffer

        // Pre-compute encryption key with proper key derivation
        let key = aead::UnboundKey::new(
            &aead::CHACHA20_POLY1305,
            &config.transport_keys.public_key[..32],
        )
        .map_err(|e| NetworkError::EncryptionError(e.to_string()))?;
        let key_cache = Arc::new(aead::LessSafeKey::new(key));

        Ok(Self {
            connection,
            keys: config.transport_keys,
            channels: ConnectionChannels {
                tx,
                rx,
                batch_buffer: BytesMut::with_capacity(1024 * 1024), // 1MB reusable buffer
                batch_size: 128,                                    // Process messages in batches
                batch_timeout: std::time::Duration::from_millis(50),
                last_batch: std::time::Instant::now(),
                high_water_mark: 64 * 1024 * 1024, // 64MB
                low_water_mark: 32 * 1024 * 1024,  // 32MB
                back_pressure: Arc::new(tokio::sync::Notify::new()),
                queue_size: AtomicUsize::new(0),
                key_cache,
                nonce_counter: AtomicU64::new(1),
                message_count: AtomicU64::new(0),
                bytes_processed: AtomicU64::new(0),
            },
        })
    }

    /// Send encrypted message with optimized zero-copy batch processing and enhanced error handling
    pub async fn send(&mut self, data: Bytes) -> Result<(), NetworkError> {
        let msg_size = data.len();

        // Validate input size
        if msg_size == 0 {
            return Err(NetworkError::MessageError("Empty message".into()));
        }
        if msg_size > 1024 * 1024 {
            // 1MB limit
            return Err(NetworkError::MessageError("Message too large".into()));
        }

        // Apply back pressure if queue is too large with timeout
        let current_size = self.channels.queue_size.load(Ordering::Acquire);
        if current_size >= self.channels.high_water_mark {
            debug!("Applying back pressure, queue size: {}", current_size);
            let back_pressure = self.channels.back_pressure.clone();

            // Wait with timeout to prevent indefinite blocking
            tokio::select! {
                _ = back_pressure.notified() => {},
                _ = tokio::time::sleep(std::time::Duration::from_secs(5)) => {
                    return Err(NetworkError::ConnectionError("Back pressure timeout".into()));
                }
            }
        }

        // Generate unique nonce using atomic counter with overflow protection
        let nonce_value = self.channels.nonce_counter.fetch_add(1, Ordering::Relaxed);
        if nonce_value == 0 {
            error!("Nonce counter overflow - this should not happen in normal operation");
            return Err(NetworkError::EncryptionError("Nonce overflow".into()));
        }

        let mut nonce_bytes = [0u8; 12];
        nonce_bytes[..8].copy_from_slice(&nonce_value.to_le_bytes());

        // Zero-copy encryption using BytesMut with error recovery
        let mut encrypted = BytesMut::from(data.as_ref());

        // Encrypt using cached key with retry logic
        let mut retry_count = 0;
        loop {
            // Clone nonce for each attempt since it's consumed
            let nonce_attempt = aead::Nonce::assume_unique_for_key(nonce_bytes);
            match self.channels.key_cache.seal_in_place_append_tag(
                nonce_attempt,
                aead::Aad::empty(),
                &mut encrypted,
            ) {
                Ok(()) => break,
                Err(e) => {
                    retry_count += 1;
                    if retry_count >= 3 {
                        return Err(NetworkError::EncryptionError(format!(
                            "Encryption failed after {} retries: {}",
                            retry_count, e
                        )));
                    }
                    warn!("Encryption attempt {} failed, retrying: {}", retry_count, e);
                    tokio::time::sleep(std::time::Duration::from_millis(10)).await;
                }
            }
        }

        // Add to batch buffer with length prefix for efficient parsing
        let encrypted_len = encrypted.len() as u32;
        self.channels.batch_buffer.put_u32(encrypted_len);
        self.channels.batch_buffer.extend_from_slice(&encrypted);

        // Update metrics
        self.channels
            .queue_size
            .fetch_add(msg_size, Ordering::Release);
        self.channels.message_count.fetch_add(1, Ordering::Relaxed);
        self.channels
            .bytes_processed
            .fetch_add(msg_size as u64, Ordering::Relaxed);

        // Process batch if full or timeout exceeded
        if self.channels.batch_buffer.len() >= self.channels.batch_size * 1024
            || self.channels.last_batch.elapsed() >= self.channels.batch_timeout
        {
            self.flush_batch().await?
        }

        Ok(())
    }

    /// Flush current batch of messages with zero-copy optimization and error recovery
    async fn flush_batch(&mut self) -> Result<(), NetworkError> {
        if self.channels.batch_buffer.is_empty() {
            return Ok(());
        }

        let batch_size = self.channels.batch_buffer.len();

        // Convert to Bytes for zero-copy transmission
        let batch = self.channels.batch_buffer.split().freeze();

        // Send batch with retry logic
        let mut retry_count = 0;
        loop {
            match self.channels.tx.send(batch.clone()).await {
                Ok(()) => break,
                Err(e) => {
                    retry_count += 1;
                    if retry_count >= 3 {
                        // Restore batch to buffer for later retry
                        self.channels.batch_buffer.extend_from_slice(&batch);
                        return Err(NetworkError::ConnectionError(format!(
                            "Batch send failed after {} retries: {}",
                            retry_count, e
                        )));
                    }
                    warn!("Batch send attempt {} failed, retrying: {}", retry_count, e);
                    tokio::time::sleep(std::time::Duration::from_millis(100 * retry_count as u64))
                        .await;
                }
            }
        }

        // Update queue size and notify if below low water mark
        let new_size = self
            .channels
            .queue_size
            .fetch_sub(batch_size, Ordering::AcqRel);
        if new_size <= self.channels.low_water_mark {
            self.channels.back_pressure.notify_waiters();
            debug!("Released back pressure, queue size: {}", new_size);
        }

        // Update last batch time
        self.channels.last_batch = std::time::Instant::now();
        Ok(())
    }

    /// Receive and decrypt messages in batches with zero-copy optimization
    pub async fn receive(&mut self) -> Result<Vec<Bytes>, NetworkError> {
        // Receive batch of encrypted messages
        let encrypted_batch = self
            .channels
            .rx
            .recv()
            .await
            .ok_or_else(|| NetworkError::ConnectionError("Channel closed".into()))?;

        let mut messages = Vec::new();
        let mut buf = encrypted_batch;

        // Parse messages from batch using zero-copy approach
        while buf.has_remaining() {
            if buf.remaining() < 4 {
                return Err(NetworkError::EncryptionError(
                    "Incomplete message length".into(),
                ));
            }

            // Read message length prefix
            let msg_len = buf.get_u32() as usize;

            if buf.remaining() < msg_len {
                return Err(NetworkError::EncryptionError(
                    "Incomplete message data".into(),
                ));
            }

            // Extract encrypted message data
            let encrypted_data = buf.copy_to_bytes(msg_len);

            // Generate matching nonce (should be deterministic or stored)
            let nonce_value = self.channels.nonce_counter.load(Ordering::Relaxed);
            let mut nonce_bytes = [0u8; 12];
            nonce_bytes[..8].copy_from_slice(&nonce_value.to_le_bytes());
            let nonce = aead::Nonce::assume_unique_for_key(nonce_bytes);

            // Decrypt message
            let mut message_data = BytesMut::from(encrypted_data.as_ref());
            self.channels
                .key_cache
                .open_in_place(nonce, aead::Aad::empty(), &mut message_data)
                .map_err(|e| NetworkError::EncryptionError(e.to_string()))?;

            // Remove authentication tag (16 bytes for ChaCha20Poly1305)
            if message_data.len() >= 16 {
                message_data.truncate(message_data.len() - 16);
            }

            messages.push(message_data.freeze());
        }

        Ok(messages)
    }
}

/// Production-grade connection manager with advanced pooling, multiplexing, and resilience features.
///
/// The ConnectionManager provides a comprehensive solution for managing network connections with:
/// - Advanced connection pooling with lifecycle management
/// - HTTP/2-style multiplexing for efficient connection usage
/// - Retry logic with exponential backoff and jitter
/// - Circuit breaker pattern for fault tolerance
/// - Health checks and connection quality monitoring
/// - Connection load balancing and request distribution
/// - Performance monitoring and metrics collection
/// - Automatic resource cleanup and garbage collection
/// - Back pressure handling for overload protection
/// - Connection warming and preemptive scaling
/// - Request multiplexing and stream management
///
/// # Production Features
/// - Zero-downtime connection pool updates
/// - Graceful degradation under load
/// - Automatic connection warming
/// - Request routing and load balancing
/// - Connection affinity and session persistence
/// - Comprehensive observability and monitoring
/// - Memory-efficient connection reuse
/// - Adaptive connection limits based on system resources
///
/// # Multiplexing Support
/// - Stream-based connection multiplexing
/// - Request prioritization and queuing
/// - Concurrent request handling
/// - Flow control and backpressure
/// - Stream lifecycle management
///
/// # High-performance connection manager with pooling, metrics tracking and back pressure handling.
///
/// The ConnectionManager provides a comprehensive solution for managing network connections with:
/// - Connection pooling with configurable TTL
/// - Efficient concurrent connection tracking
/// - Detailed performance metrics collection
/// - Automatic resource cleanup
/// - Back pressure handling for overload protection
/// - Health monitoring and auto-recovery
/// - Circuit breaker pattern for failing connections
///
/// # Performance Features
/// - Lock-free concurrent data structures
/// - Connection pooling reduces setup overhead
/// - Batched status updates
/// - Efficient metrics collection
/// - Adaptive connection limits based on system resources
///
/// # Connection Pool Management
/// - Automatic connection reuse
/// - TTL-based expiration
/// - Configurable pool size
/// - Proactive cleanup of expired connections
/// - Health-based connection scoring
///
/// # Health Monitoring
/// - Periodic health checks
/// - Connection quality scoring
/// - Automatic failover
/// - Circuit breaker for unreliable peers
/// - Performance-based connection prioritization
///
/// # Metrics Tracking
/// - Queue metrics (size, utilization)
/// - Latency metrics (average, peak)
/// - Throughput metrics (messages/second)
/// - Connection pool statistics
/// - Health and reliability metrics
///
/// # Example
/// ```rust
/// let manager = ConnectionManager::new(100); // 100 max connections
/// manager.connect(peer_id).await?;
/// let status = manager.get_status(&peer_id).await;
/// let metrics = manager.get_metrics().await;
/// ```
pub struct ConnectionManager {
    /// Maximum concurrent connections
    max_connections: usize,
    /// Active connections with fast concurrent access
    connections: Arc<DashMap<PeerId, ConnectionInfo>>,
    /// Connection pool for reuse with TTL tracking
    connection_pool: Arc<DashMap<PeerId, (ConnectionInfo, Instant)>>,
    /// Connection pool with enhanced lifecycle management
    enhanced_pool: Arc<DashMap<PeerId, PooledConnection>>,
    /// Connection multiplexer for stream management
    multiplexer: Arc<ConnectionMultiplexer>,
    /// Retry manager for exponential backoff
    retry_manager: Arc<RetryManager>,
    /// Load balancer for connection distribution
    load_balancer: Arc<LoadBalancer>,
    /// Health monitor for connection quality
    health_monitor: Arc<HealthMonitor>,
    /// Connection warming manager
    warming_manager: Arc<WarmingManager>,
    /// Idle connection timeout
    pool_timeout: std::time::Duration,
    /// Network performance metrics with detailed stats
    metrics: Arc<ParkingRwLock<NetworkMetrics>>,
    /// Queue metrics
    queue_metrics: Arc<ParkingRwLock<QueueMetrics>>,
    /// Latency metrics
    latency_metrics: Arc<ParkingRwLock<LatencyMetrics>>,
    /// Throughput metrics
    throughput_metrics: Arc<ParkingRwLock<ThroughputMetrics>>,
    /// Connection health tracker
    #[allow(dead_code)]
    health_tracker: Arc<RwLock<ConnectionHealthTracker>>,
    /// Circuit breaker for failing connections
    circuit_breakers: Arc<DashMap<PeerId, CircuitBreaker>>,
    /// Connection quality scores
    quality_scores: Arc<DashMap<PeerId, f64>>,
    /// Connection pool maintenance task handle
    #[allow(dead_code)]
    maintenance_handle: Option<tokio::task::JoinHandle<()>>,
    /// Global connection limits
    connection_limits: ConnectionLimits,
    /// Performance monitoring interval
    #[allow(dead_code)]
    monitoring_interval: Duration,
}

/// Extended connection information with health and performance metrics
#[derive(Debug, Clone)]
pub struct ConnectionInfo {
    /// Connection status
    pub status: ConnectionStatus,
    /// Connection established timestamp
    pub established_at: Instant,
    /// Last activity timestamp
    pub last_activity: Instant,
    /// Number of successful operations
    pub success_count: u64,
    /// Number of failed operations
    pub failure_count: u64,
    /// Average response time
    pub avg_response_time: Duration,
    /// Connection quality score (0.0 to 1.0)
    pub quality_score: f64,
    /// Bandwidth utilization
    pub bandwidth_usage: u64,
    /// Connection metadata
    pub metadata: HashMap<String, String>,
}

impl ConnectionInfo {
    /// Create new connection info
    pub fn new(status: ConnectionStatus) -> Self {
        Self {
            status,
            established_at: Instant::now(),
            last_activity: Instant::now(),
            success_count: 0,
            failure_count: 0,
            avg_response_time: Duration::from_millis(0),
            quality_score: 1.0,
            bandwidth_usage: 0,
            metadata: HashMap::new(),
        }
    }

    /// Update connection activity and performance metrics
    pub fn update_activity(
        &mut self,
        success: bool,
        response_time: Duration,
        bytes_transferred: u64,
    ) {
        self.last_activity = Instant::now();
        self.bandwidth_usage += bytes_transferred;

        if success {
            self.success_count += 1;
        } else {
            self.failure_count += 1;
        }

        // Update average response time (exponential moving average)
        let alpha = 0.1; // Smoothing factor
        let current_ms = self.avg_response_time.as_millis() as f64;
        let new_ms = response_time.as_millis() as f64;
        let updated_ms = alpha * new_ms + (1.0 - alpha) * current_ms;
        self.avg_response_time = Duration::from_millis(updated_ms as u64);

        // Update quality score based on success rate and response time
        self.update_quality_score();
    }

    /// Update connection quality score
    fn update_quality_score(&mut self) {
        let total_ops = self.success_count + self.failure_count;
        if total_ops == 0 {
            self.quality_score = 1.0;
            return;
        }

        // Base score on success rate
        let success_rate = self.success_count as f64 / total_ops as f64;

        // Penalty for high response times (above 100ms)
        let response_penalty = if self.avg_response_time.as_millis() > 100 {
            0.2 * (self.avg_response_time.as_millis() as f64 / 1000.0)
        } else {
            0.0
        };

        self.quality_score = (success_rate - response_penalty).clamp(0.0, 1.0);
    }

    /// Check if connection is healthy
    pub fn is_healthy(&self) -> bool {
        self.quality_score > 0.5 && self.last_activity.elapsed() < Duration::from_secs(300)
        // 5 minutes
    }
}

/// Connection health tracking for monitoring and recovery
#[derive(Debug)]
#[allow(dead_code)]
pub struct ConnectionHealthTracker {
    /// Health check interval
    check_interval: Duration,
    /// Last health check timestamp
    last_check: Option<Instant>,
    /// Unhealthy connections to monitor
    unhealthy_connections: HashMap<PeerId, UnhealthyConnectionInfo>,
    /// Health check statistics
    health_stats: HealthStatistics,
}

/// Information about unhealthy connections
#[derive(Debug, Clone)]
#[allow(dead_code)]
pub struct UnhealthyConnectionInfo {
    /// When the connection became unhealthy
    unhealthy_since: Instant,
    /// Number of recovery attempts
    recovery_attempts: u32,
    /// Last recovery attempt timestamp
    last_recovery_attempt: Option<Instant>,
    /// Reason for being unhealthy
    reason: String,
}

/// Health statistics
#[derive(Debug, Clone, Default)]
pub struct HealthStatistics {
    /// Total health checks performed
    pub total_checks: u64,
    /// Number of healthy connections found
    pub healthy_count: u64,
    /// Number of unhealthy connections found
    pub unhealthy_count: u64,
    /// Number of successful recoveries
    pub successful_recoveries: u64,
    /// Average recovery time
    pub avg_recovery_time: Duration,
}

impl Default for ConnectionHealthTracker {
    fn default() -> Self {
        Self {
            check_interval: Duration::from_secs(30),
            last_check: None,
            unhealthy_connections: HashMap::new(),
            health_stats: HealthStatistics::default(),
        }
    }
}

/// Circuit breaker for managing failing connections
#[derive(Debug, Clone)]
pub struct CircuitBreaker {
    /// Current state of the circuit breaker
    state: CircuitBreakerState,
    /// Failure threshold to open circuit
    failure_threshold: u32,
    /// Current failure count
    failure_count: u32,
    /// Time when circuit was opened
    opened_at: Option<Instant>,
    /// Timeout before attempting to close circuit
    timeout: Duration,
    /// Success threshold to close circuit
    success_threshold: u32,
    /// Current success count in half-open state
    success_count: u32,
}

/// Circuit breaker states
#[derive(Debug, Clone, PartialEq)]
pub enum CircuitBreakerState {
    /// Circuit is closed, allowing requests
    Closed,
    /// Circuit is open, blocking requests
    Open,
    /// Circuit is half-open, testing if service is recovered
    HalfOpen,
}

impl Default for CircuitBreaker {
    fn default() -> Self {
        Self {
            state: CircuitBreakerState::Closed,
            failure_threshold: 5,
            failure_count: 0,
            opened_at: None,
            timeout: Duration::from_secs(60),
            success_threshold: 3,
            success_count: 0,
        }
    }
}

impl CircuitBreaker {
    /// Check if requests should be allowed through
    pub fn allow_request(&mut self) -> bool {
        match self.state {
            CircuitBreakerState::Closed => true,
            CircuitBreakerState::Open => {
                if let Some(opened_at) = self.opened_at {
                    if opened_at.elapsed() >= self.timeout {
                        self.state = CircuitBreakerState::HalfOpen;
                        self.success_count = 0;
                        true
                    } else {
                        false
                    }
                } else {
                    false
                }
            }
            CircuitBreakerState::HalfOpen => true,
        }
    }

    /// Record the result of an operation
    pub fn record_result(&mut self, success: bool) {
        match self.state {
            CircuitBreakerState::Closed => {
                if success {
                    self.failure_count = 0;
                } else {
                    self.failure_count += 1;
                    if self.failure_count >= self.failure_threshold {
                        self.state = CircuitBreakerState::Open;
                        self.opened_at = Some(Instant::now());
                    }
                }
            }
            CircuitBreakerState::HalfOpen => {
                if success {
                    self.success_count += 1;
                    if self.success_count >= self.success_threshold {
                        self.state = CircuitBreakerState::Closed;
                        self.failure_count = 0;
                    }
                } else {
                    self.state = CircuitBreakerState::Open;
                    self.opened_at = Some(Instant::now());
                    self.failure_count += 1;
                }
            }
            CircuitBreakerState::Open => {
                // Should not reach here if allow_request is used properly
            }
        }
    }
}

/// Enhanced connection pool entry with lifecycle management
#[derive(Debug, Clone)]
pub struct PooledConnection {
    /// Connection information
    pub info: ConnectionInfo,
    /// Connection establishment timestamp
    pub created_at: Instant,
    /// Last used timestamp
    pub last_used: Instant,
    /// Usage count
    pub usage_count: u64,
    /// Connection weight for load balancing
    pub weight: f64,
    /// Maximum concurrent streams
    pub max_streams: u32,
    /// Current active streams
    pub active_streams: u32,
    /// Connection warming state
    pub warming_state: WarmingState,
    /// Connection affinity group
    pub affinity_group: Option<String>,
}

/// Connection warming state
#[derive(Debug, Clone, PartialEq)]
pub enum WarmingState {
    /// Connection is cold (not warmed)
    Cold,
    /// Connection is warming up
    Warming,
    /// Connection is warm and ready
    Warm,
    /// Connection warming failed
    FailedToWarm(String),
}

/// Connection multiplexer for stream management
#[derive(Debug)]
pub struct ConnectionMultiplexer {
    /// Active multiplexed connections
    connections: Arc<DashMap<PeerId, MultiplexedConnection>>,
    /// Stream routing table
    stream_routes: Arc<DashMap<StreamId, PeerId>>,
    /// Stream priority queue
    priority_queue: Arc<TokioRwLock<BTreeMap<Priority, VecDeque<StreamId>>>>,
    /// Maximum concurrent streams per connection
    max_streams_per_connection: u32,
    /// Stream timeout configuration
    #[allow(dead_code)]
    stream_timeout: Duration,
}

/// Stream identifier
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, PartialOrd, Ord)]
pub struct StreamId(pub u64);

/// Stream priority
#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord)]
pub enum Priority {
    Critical = 0,
    High = 1,
    Normal = 2,
    Low = 3,
}

/// Multiplexed connection wrapper
#[derive(Debug)]
pub struct MultiplexedConnection {
    /// Base connection info
    pub info: ConnectionInfo,
    /// Active streams
    pub streams: HashMap<StreamId, StreamInfo>,
    /// Next stream ID
    pub next_stream_id: u64,
    /// Connection utilization
    pub utilization: f64,
    /// Stream semaphore for flow control
    pub stream_semaphore: Arc<Semaphore>,
}

/// Stream information
#[derive(Debug, Clone)]
pub struct StreamInfo {
    /// Stream identifier
    pub id: StreamId,
    /// Stream priority
    pub priority: Priority,
    /// Stream state
    pub state: StreamState,
    /// Created timestamp
    pub created_at: Instant,
    /// Last activity timestamp
    pub last_activity: Instant,
    /// Bytes sent/received
    pub bytes_transferred: u64,
}

/// Stream state
#[derive(Debug, Clone, PartialEq)]
pub enum StreamState {
    /// Stream is opening
    Opening,
    /// Stream is active
    Active,
    /// Stream is half-closed (local)
    HalfClosedLocal,
    /// Stream is half-closed (remote)
    HalfClosedRemote,
    /// Stream is closed
    Closed,
    /// Stream encountered an error
    Error(String),
}

/// Retry manager with exponential backoff and jitter
#[derive(Debug)]
pub struct RetryManager {
    /// Retry configurations per peer
    retry_configs: Arc<DashMap<PeerId, RetryConfig>>,
    /// Default retry configuration
    default_config: RetryConfig,
    /// Retry statistics
    stats: Arc<TokioRwLock<RetryStats>>,
}

/// Retry configuration
#[derive(Debug, Clone)]
pub struct RetryConfig {
    /// Maximum number of retries
    pub max_retries: u32,
    /// Initial backoff duration
    pub initial_backoff: Duration,
    /// Maximum backoff duration
    pub max_backoff: Duration,
    /// Backoff multiplier
    pub backoff_multiplier: f64,
    /// Jitter factor (0.0 to 1.0)
    pub jitter_factor: f64,
    /// Timeout for each retry attempt
    pub timeout: Duration,
}

/// Retry statistics
#[derive(Debug, Clone, Default)]
pub struct RetryStats {
    /// Total retry attempts
    pub total_attempts: u64,
    /// Successful retries
    pub successful_retries: u64,
    /// Failed retries
    pub failed_retries: u64,
    /// Average retry duration
    pub avg_retry_duration: Duration,
}

/// Load balancer for connection distribution
#[derive(Debug)]
pub struct LoadBalancer {
    /// Load balancing strategy
    strategy: LoadBalancingStrategy,
    /// Connection weights
    weights: Arc<DashMap<PeerId, f64>>,
    /// Round-robin counter
    round_robin_counter: AtomicU64,
    /// Load balancing statistics
    stats: Arc<TokioRwLock<LoadBalancingStats>>,
}

/// Load balancing strategy
#[derive(Debug, Clone)]
pub enum LoadBalancingStrategy {
    /// Round-robin distribution
    RoundRobin,
    /// Least connections
    LeastConnections,
    /// Weighted round-robin
    WeightedRoundRobin,
    /// Response time based
    ResponseTime,
    /// Resource utilization based
    ResourceUtilization,
}

/// Load balancing statistics
#[derive(Debug, Clone, Default)]
pub struct LoadBalancingStats {
    /// Total requests distributed
    pub total_requests: u64,
    /// Distribution by peer
    pub peer_distribution: HashMap<PeerId, u64>,
    /// Average response times
    pub avg_response_times: HashMap<PeerId, Duration>,
}

/// Health monitor for connection quality
#[derive(Debug)]
pub struct HealthMonitor {
    /// Health check configuration
    config: HealthCheckConfig,
    /// Health check results
    results: Arc<DashMap<PeerId, HealthCheckResult>>,
    /// Health check scheduler
    scheduler: Arc<TokioRwLock<HealthCheckScheduler>>,
    /// Health statistics
    stats: Arc<TokioRwLock<HealthStats>>,
}

/// Health check configuration
#[derive(Debug, Clone)]
pub struct HealthCheckConfig {
    /// Health check interval
    pub interval: Duration,
    /// Health check timeout
    pub timeout: Duration,
    /// Failure threshold
    pub failure_threshold: u32,
    /// Recovery threshold
    pub recovery_threshold: u32,
    /// Health check type
    pub check_type: HealthCheckType,
}

/// Health check type
#[derive(Debug, Clone)]
pub enum HealthCheckType {
    /// Ping-based health check
    Ping,
    /// Application-level health check
    Application,
    /// Custom health check with function pointer
    Custom,
}

/// Health check result
#[derive(Debug, Clone)]
pub struct HealthCheckResult {
    /// Check timestamp
    pub timestamp: Instant,
    /// Check success
    pub success: bool,
    /// Response time
    pub response_time: Duration,
    /// Error message if failed
    pub error: Option<String>,
    /// Health score (0.0 to 1.0)
    pub health_score: f64,
}

/// Health check scheduler
#[derive(Debug)]
pub struct HealthCheckScheduler {
    /// Scheduled checks
    scheduled_checks: HashMap<PeerId, Instant>,
    /// Check intervals per peer
    check_intervals: HashMap<PeerId, Duration>,
}

/// Health statistics
#[derive(Debug, Clone, Default)]
pub struct HealthStats {
    /// Total health checks performed
    pub total_checks: u64,
    /// Successful health checks
    pub successful_checks: u64,
    /// Failed health checks
    pub failed_checks: u64,
    /// Average health check response time
    pub avg_response_time: Duration,
}

/// Connection warming manager
#[derive(Debug)]
pub struct WarmingManager {
    /// Warming configuration
    config: WarmingConfig,
    /// Warming state per peer
    warming_states: Arc<DashMap<PeerId, WarmingState>>,
    /// Warming tasks
    #[allow(dead_code)]
    warming_tasks: Arc<DashMap<PeerId, tokio::task::JoinHandle<()>>>,
    /// Warming statistics
    stats: Arc<TokioRwLock<WarmingStats>>,
}

/// Connection warming configuration
#[derive(Debug, Clone)]
pub struct WarmingConfig {
    /// Enable connection warming
    pub enabled: bool,
    /// Minimum pool size to maintain
    pub min_pool_size: usize,
    /// Warming timeout
    pub warming_timeout: Duration,
    /// Warming retry attempts
    pub warming_retries: u32,
    /// Predictive warming threshold
    pub predictive_threshold: f64,
}

/// Connection warming statistics
#[derive(Debug, Clone, Default)]
pub struct WarmingStats {
    /// Total warming attempts
    pub total_attempts: u64,
    /// Successful warming operations
    pub successful_warmings: u64,
    /// Failed warming operations
    pub failed_warmings: u64,
    /// Average warming time
    pub avg_warming_time: Duration,
}

/// Connection limits configuration
#[derive(Debug, Clone)]
pub struct ConnectionLimits {
    /// Maximum total connections
    pub max_total: usize,
    /// Maximum connections per peer
    pub max_per_peer: usize,
    /// Maximum idle connections
    pub max_idle: usize,
    /// Connection timeout
    pub connection_timeout: Duration,
    /// Idle timeout
    pub idle_timeout: Duration,
}

impl Default for ConnectionLimits {
    fn default() -> Self {
        Self {
            max_total: 1000,
            max_per_peer: 10,
            max_idle: 100,
            connection_timeout: Duration::from_secs(30),
            idle_timeout: Duration::from_secs(300),
        }
    }
}

/// Trait for connection health checks
#[async_trait]
pub trait HealthCheck: Send + Sync {
    /// Perform health check on connection
    async fn check(&self, peer_id: &PeerId, connection: &ConnectionInfo) -> HealthCheckResult;
}

/// Default ping-based health check implementation
#[derive(Debug)]
pub struct PingHealthCheck {
    #[allow(dead_code)]
    timeout: Duration,
}

impl Default for PingHealthCheck {
    fn default() -> Self {
        Self {
            timeout: Duration::from_secs(5),
        }
    }
}

#[async_trait]
impl HealthCheck for PingHealthCheck {
    async fn check(&self, _peer_id: &PeerId, connection: &ConnectionInfo) -> HealthCheckResult {
        let start = Instant::now();

        // Simulate ping check (in real implementation, this would send actual ping)
        let success = connection.is_healthy() && rand::random::<f64>() > 0.1;
        let response_time = start.elapsed();

        HealthCheckResult {
            timestamp: Instant::now(),
            success,
            response_time,
            error: if success {
                None
            } else {
                Some("Ping timeout".to_string())
            },
            health_score: if success { 1.0 } else { 0.0 },
        }
    }
}

use std::collections::{BTreeMap, HashMap, VecDeque};
use tokio::sync::RwLock;

impl ConnectionManager {
    /// Recovers from connection failures by attempting reconnection
    pub async fn recover_connection(&self, peer_id: &PeerId) -> Result<(), NetworkError> {
        debug!("Attempting to recover connection for peer {:?}", peer_id);

        // Remove failed connection
        self.connections.remove(peer_id);

        // Clear from pool if exists
        self.connection_pool.remove(peer_id);

        // Attempt reconnection with exponential backoff
        let mut retry_count = 0;
        let max_retries = 5;

        while retry_count < max_retries {
            match self.connect(*peer_id).await {
                Ok(()) => {
                    info!("Successfully recovered connection for peer {:?}", peer_id);
                    return Ok(());
                }
                Err(e) => {
                    retry_count += 1;
                    let backoff_ms = 100u64 * (1 << retry_count); // Exponential backoff
                    warn!(
                        "Connection recovery attempt {} failed for peer {:?}: {}, retrying in {}ms",
                        retry_count, peer_id, e, backoff_ms
                    );

                    if retry_count >= max_retries {
                        error!(
                            "Failed to recover connection for peer {:?} after {} attempts",
                            peer_id, max_retries
                        );
                        return Err(NetworkError::ConnectionError(format!(
                            "Recovery failed after {} attempts",
                            max_retries
                        )));
                    }

                    tokio::time::sleep(std::time::Duration::from_millis(backoff_ms)).await;
                }
            }
        }

        Err(NetworkError::ConnectionError("Max retries exceeded".into()))
    }

    /// Performs health check on all active connections
    pub async fn health_check(&self) -> Result<Vec<PeerId>, NetworkError> {
        let mut unhealthy_peers = Vec::new();

        for entry in self.connections.iter() {
            let peer_id = *entry.key();
            let conn_info = entry.value();

            match &conn_info.status {
                ConnectionStatus::Failed(_) => {
                    unhealthy_peers.push(peer_id);
                    warn!("Detected failed connection for peer {:?}", peer_id);
                }
                ConnectionStatus::Disconnected => {
                    unhealthy_peers.push(peer_id);
                    debug!("Detected disconnected peer {:?}", peer_id);
                }
                _ => {
                    // Check if connection is healthy based on activity and quality
                    if !conn_info.is_healthy() {
                        unhealthy_peers.push(peer_id);
                        debug!(
                            "Detected unhealthy connection for peer {:?} (quality: {:.2})",
                            peer_id, conn_info.quality_score
                        );
                    }
                }
            }
        }

        if !unhealthy_peers.is_empty() {
            info!(
                "Health check found {} unhealthy connections",
                unhealthy_peers.len()
            );
        }

        Ok(unhealthy_peers)
    }

    /// Automatically recovers unhealthy connections
    pub async fn auto_recover(&self) -> Result<usize, NetworkError> {
        let unhealthy_peers = self.health_check().await?;
        let total_unhealthy = unhealthy_peers.len();
        let mut recovered_count = 0;

        for peer_id in unhealthy_peers {
            match self.recover_connection(&peer_id).await {
                Ok(()) => {
                    recovered_count += 1;
                    debug!("Auto-recovered connection for peer {:?}", peer_id);
                }
                Err(e) => {
                    warn!(
                        "Failed to auto-recover connection for peer {:?}: {}",
                        peer_id, e
                    );
                }
            }
        }

        if recovered_count > 0 {
            info!(
                "Auto-recovery completed: {}/{} connections recovered",
                recovered_count, total_unhealthy
            );
        }

        Ok(recovered_count)
    }
    /// Creates a new connection manager with default pool timeout (5 minutes).
    ///
    /// The manager initializes with optimized default settings:
    /// - 5 minute connection pool TTL
    /// - Lock-free concurrent connection tracking
    /// - Comprehensive metrics collection
    /// - Health monitoring and circuit breakers
    ///
    /// # Arguments
    /// * `max_connections` - Maximum number of concurrent connections to maintain
    ///
    /// # Performance Considerations
    /// - Choose max_connections based on system resources
    /// - Connection pooling reduces setup overhead
    /// - Metrics collection has minimal overhead
    /// - Health monitoring provides proactive issue detection
    pub fn new(max_connections: usize) -> Self {
        Self::with_pool_timeout(max_connections, std::time::Duration::from_secs(300))
    }

    /// Creates a new connection manager with enhanced features and custom pool timeout.
    ///
    /// Allows fine-tuning of connection pooling behavior:
    /// - Custom TTL for pooled connections
    /// - Connection reuse optimization
    /// - Resource usage control
    /// - Enhanced health monitoring
    ///
    /// # Arguments
    /// * `max_connections` - Maximum number of concurrent connections
    /// * `pool_timeout` - Time-to-live for pooled connections
    ///
    /// # Connection Pool Behavior
    /// - Connections are cached until timeout
    /// - Expired connections automatically cleaned up
    /// - Pool size limited by max_connections
    /// - Health-based connection scoring and prioritization
    pub fn with_pool_timeout(max_connections: usize, pool_timeout: std::time::Duration) -> Self {
        let connection_limits = ConnectionLimits {
            max_total: max_connections,
            ..Default::default()
        };

        Self {
            max_connections,
            connections: Arc::new(DashMap::new()),
            connection_pool: Arc::new(DashMap::new()),
            enhanced_pool: Arc::new(DashMap::new()),
            multiplexer: Arc::new(ConnectionMultiplexer::new(32, Duration::from_secs(30))),
            retry_manager: Arc::new(RetryManager::new()),
            load_balancer: Arc::new(LoadBalancer::new(LoadBalancingStrategy::WeightedRoundRobin)),
            health_monitor: Arc::new(HealthMonitor::new(HealthCheckConfig::default())),
            warming_manager: Arc::new(WarmingManager::new(WarmingConfig::default())),
            pool_timeout,
            metrics: Arc::new(ParkingRwLock::new(NetworkMetrics::default())),
            queue_metrics: Arc::new(ParkingRwLock::new(QueueMetrics::default())),
            latency_metrics: Arc::new(ParkingRwLock::new(LatencyMetrics::default())),
            throughput_metrics: Arc::new(ParkingRwLock::new(ThroughputMetrics::default())),
            health_tracker: Arc::new(RwLock::new(ConnectionHealthTracker::default())),
            circuit_breakers: Arc::new(DashMap::new()),
            quality_scores: Arc::new(DashMap::new()),
            maintenance_handle: None,
            connection_limits,
            monitoring_interval: Duration::from_secs(30),
        }
    }

    /// Initiates a connection to a peer with automatic pooling and reuse.
    ///
    /// Enhanced connection establishment process:
    /// 1. Check circuit breaker status
    /// 2. Check pool for existing healthy connection
    /// 3. Reuse if valid connection exists
    /// 4. Create new connection if needed
    /// 5. Apply connection limits and health checks
    /// 6. Initialize health monitoring
    ///
    /// # Arguments
    /// * `peer_id` - ID of the peer to connect to
    ///
    /// # Connection Pooling
    /// - Reuses healthy connections when possible
    /// - Validates connection freshness and quality
    /// - Removes expired or unhealthy connections
    /// - Updates usage metrics and health scores
    ///
    /// # Circuit Breaker Protection
    /// - Prevents connections to repeatedly failing peers
    /// - Implements exponential backoff
    /// - Automatic recovery testing
    ///
    /// # Returns
    /// * `Ok(())` - Connection established or reused
    /// * `Err(_)` - Connection failed or circuit breaker open
    pub async fn connect(&self, peer_id: PeerId) -> Result<(), NetworkError> {
        // Check circuit breaker first
        if let Some(mut circuit_breaker) = self.circuit_breakers.get_mut(&peer_id) {
            if !circuit_breaker.allow_request() {
                return Err(NetworkError::ConnectionError(
                    "Circuit breaker is open for this peer".into(),
                ));
            }
        }

        // Check if connection exists in the pool
        if let Some(entry) = self.connection_pool.get(&peer_id) {
            let (conn_info, last_used) = entry.value();
            if last_used.elapsed() < self.pool_timeout && conn_info.is_healthy() {
                // Connection is still valid and healthy, reuse it
                self.connections.insert(peer_id, conn_info.clone());
                debug!("Reusing pooled healthy connection for peer {:?}", peer_id);

                // Record successful circuit breaker operation
                if let Some(mut circuit_breaker) = self.circuit_breakers.get_mut(&peer_id) {
                    circuit_breaker.record_result(true);
                }

                return Ok(());
            } else {
                // Connection expired or unhealthy, remove from pool
                self.connection_pool.remove(&peer_id);
                debug!(
                    "Removing expired/unhealthy connection for peer {:?}",
                    peer_id
                );
            }
        }

        // Check connection limit
        if self.connections.len() >= self.max_connections {
            warn!("Max connections reached");
            return Err(NetworkError::ConnectionError(
                "Max connections reached".into(),
            ));
        }

        // Create new connection with enhanced monitoring
        let connecting_info = ConnectionInfo::new(ConnectionStatus::Connecting);
        self.connections.insert(peer_id, connecting_info);
        debug!("Creating new connection for peer {:?}", peer_id);

        // Simulate connection establishment (in real implementation, this would be actual network code)
        let start_time = Instant::now();
        tokio::time::sleep(std::time::Duration::from_millis(10)).await;
        let connection_time = start_time.elapsed();

        // Simulate connection success/failure (90% success rate)
        let success = rand::random::<f64>() > 0.1;

        if success {
            // Update to connected status on success
            let mut connected_info = ConnectionInfo::new(ConnectionStatus::Connected);
            connected_info.update_activity(true, connection_time, 0);

            self.connections.insert(peer_id, connected_info.clone());
            self.quality_scores
                .insert(peer_id, connected_info.quality_score);

            // Record successful circuit breaker operation
            self.circuit_breakers
                .entry(peer_id)
                .or_insert_with(CircuitBreaker::default)
                .record_result(true);

            debug!(
                "Successfully connected to peer {:?} in {:?}",
                peer_id, connection_time
            );
        } else {
            // Handle connection failure
            let failed_info =
                ConnectionInfo::new(ConnectionStatus::Failed("Connection timeout".into()));
            self.connections.insert(peer_id, failed_info);

            // Record failed circuit breaker operation
            self.circuit_breakers
                .entry(peer_id)
                .or_insert_with(CircuitBreaker::default)
                .record_result(false);

            return Err(NetworkError::ConnectionError(
                "Failed to establish connection".into(),
            ));
        }

        Ok(())
    }

    /// Updates connection status for a peer with lock-free atomic guarantees.
    ///
    /// Enhanced status update process:
    /// 1. Update connection info with new status
    /// 2. Update health and quality metrics
    /// 3. Atomic metrics update
    /// 4. Circuit breaker state management
    /// 5. Event logging and monitoring
    ///
    /// # Arguments
    /// * `peer_id` - ID of the peer to update
    /// * `status` - New connection status
    /// * `response_time` - Optional response time for quality calculation
    /// * `bytes_transferred` - Optional bytes transferred for bandwidth tracking
    ///
    /// # Thread Safety
    /// - Status updates are lock-free and atomic
    /// - Metrics updates use parking_lot for better performance
    /// - Safe for concurrent access with minimal contention
    ///
    /// # Health Tracking
    /// Updates connection health scores, quality metrics, and circuit breaker
    /// states to ensure optimal connection management.
    pub fn update_status(&self, peer_id: PeerId, status: ConnectionStatus) {
        self.update_status_with_metrics(peer_id, status, None, 0);
    }

    /// Updates connection status with detailed performance metrics
    pub fn update_status_with_metrics(
        &self,
        peer_id: PeerId,
        status: ConnectionStatus,
        response_time: Option<Duration>,
        bytes_transferred: u64,
    ) {
        // Update or create connection info
        if let Some(mut conn_info) = self.connections.get_mut(&peer_id) {
            conn_info.status = status.clone();
            if let Some(rt) = response_time {
                let success = matches!(status, ConnectionStatus::Connected);
                conn_info.update_activity(success, rt, bytes_transferred);

                // Update quality score cache
                self.quality_scores.insert(peer_id, conn_info.quality_score);

                // Update circuit breaker
                if let Some(mut circuit_breaker) = self.circuit_breakers.get_mut(&peer_id) {
                    circuit_breaker.record_result(success);
                }
            }
        } else {
            // Create new connection info
            let mut conn_info = ConnectionInfo::new(status);
            if let Some(rt) = response_time {
                let success = matches!(conn_info.status, ConnectionStatus::Connected);
                conn_info.update_activity(success, rt, bytes_transferred);
            }
            self.connections.insert(peer_id, conn_info);
        }

        // Update metrics with high-performance lock
        let mut metrics = self.metrics.write();
        metrics.connections = self.connections.len();

        // Count active (healthy) connections
        let active_count = self
            .connections
            .iter()
            .filter(|entry| entry.value().is_healthy())
            .count();
        metrics.active_connections = active_count;
    }

    /// Disconnects from a peer with enhanced cleanup and health tracking
    pub fn disconnect(&self, peer_id: &PeerId) {
        if let Some((_, conn_info)) = self.connections.remove(peer_id) {
            debug!(
                "Disconnected from peer {:?} with status {:?} (quality: {:.2})",
                peer_id, conn_info.status, conn_info.quality_score
            );

            // Move connection to pool if it was healthy (for potential reuse)
            if conn_info.is_healthy() {
                self.connection_pool
                    .insert(*peer_id, (conn_info, Instant::now()));
            }
        }

        // Remove quality score and circuit breaker entries
        self.quality_scores.remove(peer_id);

        // Keep circuit breaker for future connection attempts
        // but reset if it was in a good state
        if let Some(circuit_breaker) = self.circuit_breakers.get_mut(peer_id) {
            if circuit_breaker.state == CircuitBreakerState::Closed {
                // Keep the circuit breaker but don't reset it completely
                // This preserves failure history while allowing new attempts
            }
        }

        // Clean expired connections from pool (non-blocking)
        self.cleanup_pool();

        // Update metrics with high-performance lock
        let mut metrics = self.metrics.write();
        metrics.connections = self.connections.len();

        // Count active (healthy) connections
        let active_count = self
            .connections
            .iter()
            .filter(|entry| entry.value().is_healthy())
            .count();
        metrics.active_connections = active_count;
    }

    /// Cleanup expired connections from the pool
    fn cleanup_pool(&self) {
        self.connection_pool
            .retain(|_, (_, last_used)| last_used.elapsed() < self.pool_timeout);
    }

    /// Returns connection count (lock-free)
    pub fn connection_count(&self) -> usize {
        self.connections.len()
    }

    /// Returns connection status for a peer (lock-free)
    pub fn get_status(&self, peer_id: &PeerId) -> Option<ConnectionStatus> {
        self.connections
            .get(peer_id)
            .map(|entry| entry.value().status.clone())
    }

    /// Returns detailed connection information for a peer (lock-free)
    pub fn get_connection_info(&self, peer_id: &PeerId) -> Option<ConnectionInfo> {
        self.connections
            .get(peer_id)
            .map(|entry| entry.value().clone())
    }

    /// Get connection quality score for a peer
    pub fn get_quality_score(&self, peer_id: &PeerId) -> Option<f64> {
        self.quality_scores.get(peer_id).map(|entry| *entry.value())
    }

    /// Get circuit breaker state for a peer
    pub fn get_circuit_breaker_state(&self, peer_id: &PeerId) -> Option<CircuitBreakerState> {
        self.circuit_breakers
            .get(peer_id)
            .map(|entry| entry.value().state.clone())
    }

    /// Get all healthy connections sorted by quality score
    pub fn get_healthy_connections(&self) -> Vec<(PeerId, f64)> {
        let mut healthy_peers = Vec::new();

        for entry in self.connections.iter() {
            let peer_id = *entry.key();
            let conn_info = entry.value();

            if conn_info.is_healthy() {
                healthy_peers.push((peer_id, conn_info.quality_score));
            }
        }

        // Sort by quality score in descending order
        healthy_peers.sort_by(|a, b| b.1.partial_cmp(&a.1).unwrap_or(std::cmp::Ordering::Equal));
        healthy_peers
    }

    /// Updates network metrics with optimized locking
    pub fn update_metrics(&self, messages_per_second: f64, avg_latency_ms: u64) {
        let latency_duration = std::time::Duration::from_millis(avg_latency_ms);

        // Update general metrics
        {
            let mut metrics = self.metrics.write();
            metrics.messages_per_second = messages_per_second;
            metrics.avg_latency = latency_duration;
            metrics.active_connections = self.connections.len();
        }

        // Update queue metrics
        {
            let mut queue_metrics = self.queue_metrics.write();
            queue_metrics.current_size = self.connection_pool.len();
            queue_metrics.max_size = self.max_connections;
            queue_metrics.utilization =
                queue_metrics.current_size as f64 / queue_metrics.max_size as f64;
            queue_metrics.messages_per_second = messages_per_second;
        }

        // Update latency metrics
        {
            let mut latency_metrics = self.latency_metrics.write();
            latency_metrics.avg_latency = latency_duration;
            latency_metrics.peak_latency = latency_metrics.peak_latency.max(latency_duration);
        }

        // Update throughput metrics
        {
            let mut throughput_metrics = self.throughput_metrics.write();
            throughput_metrics.messages_per_second = messages_per_second;
            throughput_metrics.total_messages += 1;
            throughput_metrics.avg_throughput =
                (throughput_metrics.avg_throughput + messages_per_second) / 2.0;
            throughput_metrics.peak_throughput =
                throughput_metrics.peak_throughput.max(messages_per_second);
        }

        debug!(
            "Updated network metrics: {} msg/s, {} ms latency",
            messages_per_second, avg_latency_ms
        );
    }

    /// Get current queue metrics
    pub fn get_queue_metrics(&self) -> QueueMetrics {
        self.queue_metrics.read().clone()
    }

    /// Get current latency metrics
    pub fn get_latency_metrics(&self) -> LatencyMetrics {
        self.latency_metrics.read().clone()
    }

    /// Get current throughput metrics
    pub fn get_throughput_metrics(&self) -> ThroughputMetrics {
        self.throughput_metrics.read().clone()
    }

    /// Returns current network metrics (optimized)
    pub fn get_metrics(&self) -> NetworkMetrics {
        self.metrics.read().clone()
    }

    /// Enhanced API methods for production features
    /// Open a multiplexed stream on a connection
    pub async fn open_stream(
        &self,
        peer_id: PeerId,
        priority: Priority,
    ) -> Result<StreamId, NetworkError> {
        // Ensure connection exists and is healthy
        if !self.connections.contains_key(&peer_id) {
            // Attempt to establish connection first
            self.connect(peer_id).await?;
        }

        // Use multiplexer to open stream
        self.multiplexer.open_stream(peer_id, priority).await
    }

    /// Close a multiplexed stream
    pub async fn close_stream(&self, stream_id: StreamId) -> Result<(), NetworkError> {
        self.multiplexer.close_stream(stream_id).await
    }

    /// Send data on a specific stream
    pub async fn send_stream_data(
        &self,
        stream_id: StreamId,
        data: Bytes,
    ) -> Result<(), NetworkError> {
        // Get stream info to validate
        let stream_info = self
            .multiplexer
            .get_stream_info(stream_id)
            .ok_or_else(|| NetworkError::ConnectionError("Stream not found".into()))?;

        if stream_info.state != StreamState::Active {
            return Err(NetworkError::ConnectionError("Stream not active".into()));
        }

        // In a real implementation, this would send data on the specific stream
        // For now, we'll simulate stream-based sending
        info!("Sending {} bytes on stream {:?}", data.len(), stream_id);
        Ok(())
    }

    /// Execute connection operation with retry logic
    pub async fn retry_connect(&self, peer_id: PeerId) -> Result<(), NetworkError> {
        let retry_manager = self.retry_manager.clone();

        retry_manager
            .retry_operation(peer_id, || async { self.connect(peer_id).await })
            .await
    }

    /// Select best connection using load balancer
    pub async fn select_best_connection(&self, available_peers: &[PeerId]) -> Option<PeerId> {
        self.load_balancer.select_connection(available_peers).await
    }

    /// Start health monitoring for a peer
    pub async fn start_health_monitoring(&self, peer_id: PeerId) {
        self.health_monitor.start_monitoring(peer_id).await;
    }

    /// Perform health check on a connection
    pub async fn check_connection_health(&self, peer_id: PeerId) -> Option<HealthCheckResult> {
        if let Some(connection_info) = self.get_connection_info(&peer_id) {
            Some(
                self.health_monitor
                    .check_health(peer_id, &connection_info)
                    .await,
            )
        } else {
            None
        }
    }

    /// Warm up connections for a peer
    pub async fn warm_connections(&self, peer_id: PeerId) -> Result<(), NetworkError> {
        self.warming_manager.warm_connection(peer_id).await
    }

    /// Get warming state for a peer
    pub fn get_warming_state(&self, peer_id: &PeerId) -> WarmingState {
        self.warming_manager.get_warming_state(peer_id)
    }

    /// Get comprehensive connection statistics
    pub async fn get_connection_statistics(&self) -> ConnectionStatistics {
        let health_stats = self.health_monitor.stats.read().await.clone();
        let retry_stats = self.retry_manager.stats.read().await.clone();
        let warming_stats = self.warming_manager.stats.read().await.clone();
        let load_balancing_stats = self.load_balancer.stats.read().await.clone();

        ConnectionStatistics {
            total_connections: self.connections.len(),
            active_connections: self
                .connections
                .iter()
                .filter(|entry| entry.value().is_healthy())
                .count(),
            pooled_connections: self.enhanced_pool.len(),
            health_stats,
            retry_stats,
            warming_stats,
            load_balancing_stats,
        }
    }

    /// Configure connection limits
    pub fn set_connection_limits(&mut self, limits: ConnectionLimits) {
        self.max_connections = limits.max_total;
        self.connection_limits = limits;
    }

    /// Get current connection limits
    pub fn get_connection_limits(&self) -> &ConnectionLimits {
        &self.connection_limits
    }
}

/// Comprehensive connection statistics
#[derive(Debug, Clone)]
pub struct ConnectionStatistics {
    /// Total number of connections
    pub total_connections: usize,
    /// Number of active (healthy) connections
    pub active_connections: usize,
    /// Number of pooled connections
    pub pooled_connections: usize,
    /// Health monitoring statistics
    pub health_stats: HealthStats,
    /// Retry operation statistics
    pub retry_stats: RetryStats,
    /// Connection warming statistics
    pub warming_stats: WarmingStats,
    /// Load balancing statistics
    pub load_balancing_stats: LoadBalancingStats,
}

#[cfg(test)]
mod tests {
    use super::*;
    use std::net::{IpAddr, Ipv4Addr};
    use std::time::Instant;
    use tokio::time::Duration;

    fn setup_test_config() -> SecureConfig {
        SecureConfig {
            transport_keys: TransportKeys::generate(),
            timeout: std::time::Duration::from_secs(5),
            keepalive: std::time::Duration::from_secs(10),
        }
    }

    #[tokio::test]
    async fn test_secure_connection() {
        let test_config = setup_test_config();
        let test_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), 8000);

        // Set up QUIC endpoint
        let server_config = ServerConfig::default();
        let endpoint = Endpoint::server(server_config, "127.0.0.1:0".parse().unwrap())
            .unwrap()
            .0;

        // Create secure connection
        let mut connection = SecureConnection::new(&endpoint, test_addr, test_config)
            .await
            .expect("Failed to create secure connection");

        // Test sending encrypted message
        let test_data = Bytes::from(b"test message".to_vec());
        connection
            .send(test_data)
            .await
            .expect("Failed to send message");
    }

    #[tokio::test]
    async fn test_connection_management() {
        let manager = ConnectionManager::new(2);
        let peer1 = PeerId::random();
        let peer2 = PeerId::random();
        let peer3 = PeerId::random();

        // Test connection limit
        assert!(manager.connect(peer1).await.is_ok());
        assert!(manager.connect(peer2).await.is_ok());
        assert!(manager.connect(peer3).await.is_ok()); // Should be ignored due to limit

        assert_eq!(manager.connection_count(), 2);

        // Test status updates
        manager.update_status(peer1, ConnectionStatus::Connected);
        assert_eq!(
            manager.get_status(&peer1),
            Some(ConnectionStatus::Connected)
        );

        // Test disconnection
        manager.disconnect(&peer1);
        assert_eq!(manager.get_status(&peer1), None);
        assert_eq!(manager.connection_count(), 1);

        // Test metrics
        manager.update_metrics(1000.0, 50);
        let metrics = manager.get_metrics();
        assert_eq!(metrics.messages_per_second, 1000.0);
        assert_eq!(metrics.connections, 1);
    }

    #[tokio::test]
    async fn bench_route_computation() {
        let manager = ConnectionManager::new(100);
        let _rng = rand::thread_rng();
        let mut latencies = Vec::new();

        for _ in 0..1000 {
            let peer = PeerId::random();
            let start = Instant::now();
            manager.connect(peer).await.unwrap();
            latencies.push(start.elapsed());
        }

        let avg_latency = latencies.iter().sum::<Duration>() / latencies.len() as u32;
        let max_latency = latencies.iter().max().unwrap();

        println!("Route Computation Benchmark:");
        println!("Average latency: {:?}", avg_latency);
        println!("Maximum latency: {:?}", max_latency);
        println!("Total routes: {}", manager.connection_count());
    }

    #[tokio::test]
    async fn bench_cache_efficiency() {
        let manager = ConnectionManager::new(1000);
        let mut hit_count = 0;
        let iterations = 10000;

        for _ in 0..iterations {
            let peer = PeerId::random();
            let _start = Instant::now();

            // Try to get from pool first
            if let Some(_) = manager.connection_pool.get(&peer) {
                hit_count += 1;
            } else {
                manager.connect(peer).await.unwrap();
            }
        }

        let hit_rate = (hit_count as f64 / iterations as f64) * 100.0;
        println!("Cache Efficiency Benchmark:");
        println!("Cache hit rate: {:.2}%", hit_rate);
        println!("Pool size: {}", manager.connection_pool.len());
    }

    #[tokio::test]
    async fn bench_circuit_setup() {
        let test_config = setup_test_config();
        let test_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), 8000);
        let server_config = ServerConfig::default();
        let endpoint = Endpoint::server(server_config, "127.0.0.1:0".parse().unwrap())
            .unwrap()
            .0;

        let mut setup_times = Vec::new();
        for _ in 0..100 {
            let start = Instant::now();
            let _connection =
                SecureConnection::new(&endpoint, test_addr, test_config.clone()).await;
            setup_times.push(start.elapsed());
        }

        let avg_setup = setup_times.iter().sum::<Duration>() / setup_times.len() as u32;
        println!("Circuit Setup Benchmark:");
        println!("Average setup time: {:?}", avg_setup);
    }

    #[tokio::test]
    async fn bench_connection_pooling() {
        let manager = ConnectionManager::with_pool_timeout(1000, Duration::from_secs(60));
        let test_peers: Vec<PeerId> = (0..100).map(|_| PeerId::random()).collect();
        let mut reuse_times = Vec::new();

        // Setup initial connections
        for peer in test_peers.iter() {
            manager.connect(*peer).await.unwrap();
        }

        // Test connection reuse
        for peer in test_peers.iter() {
            let start = Instant::now();
            manager.connect(*peer).await.unwrap();
            reuse_times.push(start.elapsed());
        }

        let avg_reuse = reuse_times.iter().sum::<Duration>() / reuse_times.len() as u32;
        println!("Connection Pooling Benchmark:");
        println!("Average reuse time: {:?}", avg_reuse);
        println!(
            "Pool utilization: {:.2}%",
            (manager.get_queue_metrics().utilization * 100.0)
        );
    }

    #[tokio::test]
    async fn bench_message_throughput() {
        let test_config = setup_test_config();
        let test_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), 8000);
        let server_config = ServerConfig::default();
        let endpoint = Endpoint::server(server_config, "127.0.0.1:0".parse().unwrap())
            .unwrap()
            .0;

        let mut connection = SecureConnection::new(&endpoint, test_addr, test_config)
            .await
            .unwrap();
        let start = Instant::now();
        let message_count = 10000;
        let message_size = 1024; // 1KB messages

        for _ in 0..message_count {
            let data = Bytes::from(vec![1u8; message_size]);
            connection.send(data).await.unwrap();
        }

        let elapsed = start.elapsed();
        let throughput = message_count as f64 / elapsed.as_secs_f64();
        let mb_per_sec = (throughput * message_size as f64) / (1024.0 * 1024.0);

        println!("Message Throughput Benchmark:");
        println!("Messages per second: {:.2}", throughput);
        println!("Throughput: {:.2} MB/s", mb_per_sec);
        println!("Total time: {:?}", elapsed);
    }
}

/// Implementations for new structures
impl ConnectionMultiplexer {
    /// Create new connection multiplexer
    pub fn new(max_streams_per_connection: u32, stream_timeout: Duration) -> Self {
        Self {
            connections: Arc::new(DashMap::new()),
            stream_routes: Arc::new(DashMap::new()),
            priority_queue: Arc::new(TokioRwLock::new(BTreeMap::new())),
            max_streams_per_connection,
            stream_timeout,
        }
    }

    /// Open a new stream on a connection
    pub async fn open_stream(
        &self,
        peer_id: PeerId,
        priority: Priority,
    ) -> Result<StreamId, NetworkError> {
        let mut connection = self
            .connections
            .get_mut(&peer_id)
            .ok_or_else(|| NetworkError::ConnectionError("Connection not found".into()))?;

        if connection.streams.len() >= self.max_streams_per_connection as usize {
            return Err(NetworkError::ConnectionError(
                "Maximum streams reached".into(),
            ));
        }

        // Acquire stream permit
        let _ = connection
            .stream_semaphore
            .acquire()
            .await
            .map_err(|_| NetworkError::ConnectionError("Stream semaphore closed".into()))?;

        let stream_id = StreamId(connection.next_stream_id);
        connection.next_stream_id += 1;

        let stream_info = StreamInfo {
            id: stream_id,
            priority,
            state: StreamState::Opening,
            created_at: Instant::now(),
            last_activity: Instant::now(),
            bytes_transferred: 0,
        };

        connection.streams.insert(stream_id, stream_info);
        self.stream_routes.insert(stream_id, peer_id);

        // Add to priority queue
        let mut queue = self.priority_queue.write().await;
        queue
            .entry(priority)
            .or_insert_with(VecDeque::new)
            .push_back(stream_id);

        Ok(stream_id)
    }

    /// Close a stream
    pub async fn close_stream(&self, stream_id: StreamId) -> Result<(), NetworkError> {
        let peer_id = self
            .stream_routes
            .remove(&stream_id)
            .ok_or_else(|| NetworkError::ConnectionError("Stream not found".into()))?
            .1;

        if let Some(mut connection) = self.connections.get_mut(&peer_id) {
            if let Some(stream) = connection.streams.get_mut(&stream_id) {
                stream.state = StreamState::Closed;
                stream.last_activity = Instant::now();
            }
            connection.streams.remove(&stream_id);

            // Update connection utilization
            connection.utilization =
                connection.streams.len() as f64 / self.max_streams_per_connection as f64;
        }

        Ok(())
    }

    /// Get stream information
    pub fn get_stream_info(&self, stream_id: StreamId) -> Option<StreamInfo> {
        let peer_id = self.stream_routes.get(&stream_id)?.value().clone();
        let connection = self.connections.get(&peer_id)?;
        connection.streams.get(&stream_id).cloned()
    }
}

impl RetryManager {
    /// Create new retry manager
    pub fn new() -> Self {
        Self {
            retry_configs: Arc::new(DashMap::new()),
            default_config: RetryConfig::default(),
            stats: Arc::new(TokioRwLock::new(RetryStats::default())),
        }
    }

    /// Execute operation with retry logic
    pub async fn retry_operation<F, Fut, T, E>(&self, peer_id: PeerId, operation: F) -> Result<T, E>
    where
        F: Fn() -> Fut + Send + Sync,
        Fut: Future<Output = Result<T, E>> + Send,
        E: std::fmt::Debug,
    {
        let config = self
            .retry_configs
            .get(&peer_id)
            .map(|entry| entry.value().clone())
            .unwrap_or_else(|| self.default_config.clone());

        let mut attempt = 0;
        let mut backoff = config.initial_backoff;

        loop {
            let start = Instant::now();
            let result = operation().await;
            let _duration = start.elapsed();

            match result {
                Ok(value) => {
                    // Update success statistics
                    let mut stats = self.stats.write().await;
                    stats.total_attempts += 1;
                    stats.successful_retries += 1;
                    return Ok(value);
                }
                Err(error) => {
                    attempt += 1;
                    if attempt >= config.max_retries {
                        // Update failure statistics
                        let mut stats = self.stats.write().await;
                        stats.total_attempts += 1;
                        stats.failed_retries += 1;
                        return Err(error);
                    }

                    // Calculate backoff with jitter
                    let jitter = (rand::random::<f64>() - 0.5) * 2.0 * config.jitter_factor;
                    let backoff_with_jitter = Duration::from_millis(
                        ((backoff.as_millis() as f64) * (1.0 + jitter)) as u64,
                    );

                    sleep(backoff_with_jitter).await;

                    // Exponential backoff
                    backoff = std::cmp::min(
                        Duration::from_millis(
                            (backoff.as_millis() as f64 * config.backoff_multiplier) as u64,
                        ),
                        config.max_backoff,
                    );
                }
            }
        }
    }
}

impl Default for RetryConfig {
    fn default() -> Self {
        Self {
            max_retries: 3,
            initial_backoff: Duration::from_millis(100),
            max_backoff: Duration::from_secs(30),
            backoff_multiplier: 2.0,
            jitter_factor: 0.1,
            timeout: Duration::from_secs(10),
        }
    }
}

impl LoadBalancer {
    /// Create new load balancer
    pub fn new(strategy: LoadBalancingStrategy) -> Self {
        Self {
            strategy,
            weights: Arc::new(DashMap::new()),
            round_robin_counter: AtomicU64::new(0),
            stats: Arc::new(TokioRwLock::new(LoadBalancingStats::default())),
        }
    }

    /// Select best connection for request
    pub async fn select_connection(&self, available_peers: &[PeerId]) -> Option<PeerId> {
        if available_peers.is_empty() {
            return None;
        }

        let selected = match self.strategy {
            LoadBalancingStrategy::RoundRobin => {
                let index = self.round_robin_counter.fetch_add(1, Ordering::Relaxed) as usize;
                available_peers[index % available_peers.len()]
            }
            LoadBalancingStrategy::WeightedRoundRobin => {
                self.select_weighted_round_robin(available_peers).await
            }
            LoadBalancingStrategy::LeastConnections => {
                self.select_least_connections(available_peers).await
            }
            LoadBalancingStrategy::ResponseTime => {
                self.select_best_response_time(available_peers).await
            }
            LoadBalancingStrategy::ResourceUtilization => {
                self.select_least_utilized(available_peers).await
            }
        };

        // Update statistics
        let mut stats = self.stats.write().await;
        stats.total_requests += 1;
        *stats.peer_distribution.entry(selected).or_insert(0) += 1;

        Some(selected)
    }

    /// Select connection using weighted round-robin
    async fn select_weighted_round_robin(&self, peers: &[PeerId]) -> PeerId {
        let mut total_weight = 0.0;
        let mut weighted_peers = Vec::new();

        for &peer_id in peers {
            let weight = self
                .weights
                .get(&peer_id)
                .map(|entry| *entry.value())
                .unwrap_or(1.0);
            total_weight += weight;
            weighted_peers.push((peer_id, weight));
        }

        let mut target = rand::random::<f64>() * total_weight;
        for (peer_id, weight) in weighted_peers {
            target -= weight;
            if target <= 0.0 {
                return peer_id;
            }
        }

        peers[0] // Fallback
    }

    /// Select connection with least connections (placeholder)
    async fn select_least_connections(&self, peers: &[PeerId]) -> PeerId {
        // In a real implementation, this would track connection counts
        peers[0]
    }

    /// Select connection with best response time
    async fn select_best_response_time(&self, peers: &[PeerId]) -> PeerId {
        let stats = self.stats.read().await;
        let mut best_peer = peers[0];
        let mut best_time = Duration::from_secs(u64::MAX);

        for &peer_id in peers {
            if let Some(avg_time) = stats.avg_response_times.get(&peer_id) {
                if *avg_time < best_time {
                    best_time = *avg_time;
                    best_peer = peer_id;
                }
            }
        }

        best_peer
    }

    /// Select least utilized connection (placeholder)
    async fn select_least_utilized(&self, peers: &[PeerId]) -> PeerId {
        // In a real implementation, this would track resource utilization
        peers[0]
    }
}

impl HealthMonitor {
    /// Create new health monitor
    pub fn new(config: HealthCheckConfig) -> Self {
        Self {
            config,
            results: Arc::new(DashMap::new()),
            scheduler: Arc::new(TokioRwLock::new(HealthCheckScheduler {
                scheduled_checks: HashMap::new(),
                check_intervals: HashMap::new(),
            })),
            stats: Arc::new(TokioRwLock::new(HealthStats::default())),
        }
    }

    /// Start health monitoring for a peer
    pub async fn start_monitoring(&self, peer_id: PeerId) {
        let mut scheduler = self.scheduler.write().await;
        scheduler
            .scheduled_checks
            .insert(peer_id, Instant::now() + self.config.interval);
        scheduler
            .check_intervals
            .insert(peer_id, self.config.interval);
    }

    /// Perform health check on a peer
    pub async fn check_health(
        &self,
        peer_id: PeerId,
        connection: &ConnectionInfo,
    ) -> HealthCheckResult {
        let checker = PingHealthCheck::default();
        let result = checker.check(&peer_id, connection).await;

        // Store result
        self.results.insert(peer_id, result.clone());

        // Update statistics
        let mut stats = self.stats.write().await;
        stats.total_checks += 1;
        if result.success {
            stats.successful_checks += 1;
        } else {
            stats.failed_checks += 1;
        }

        // Update average response time
        let total_time = stats.avg_response_time.as_millis() as f64 * stats.total_checks as f64;
        let new_avg = (total_time + result.response_time.as_millis() as f64)
            / (stats.total_checks + 1) as f64;
        stats.avg_response_time = Duration::from_millis(new_avg as u64);

        result
    }

    /// Get latest health check result
    pub fn get_health_result(&self, peer_id: &PeerId) -> Option<HealthCheckResult> {
        self.results.get(peer_id).map(|entry| entry.value().clone())
    }
}

impl Default for HealthCheckConfig {
    fn default() -> Self {
        Self {
            interval: Duration::from_secs(30),
            timeout: Duration::from_secs(5),
            failure_threshold: 3,
            recovery_threshold: 2,
            check_type: HealthCheckType::Ping,
        }
    }
}

impl WarmingManager {
    /// Create new warming manager
    pub fn new(config: WarmingConfig) -> Self {
        Self {
            config,
            warming_states: Arc::new(DashMap::new()),
            warming_tasks: Arc::new(DashMap::new()),
            stats: Arc::new(TokioRwLock::new(WarmingStats::default())),
        }
    }

    /// Start warming connections for a peer
    pub async fn warm_connection(&self, peer_id: PeerId) -> Result<(), NetworkError> {
        if !self.config.enabled {
            return Ok(());
        }

        // Set warming state
        self.warming_states.insert(peer_id, WarmingState::Warming);

        // Simulate connection warming (in real implementation, this would pre-establish connections)
        let start = Instant::now();
        sleep(Duration::from_millis(100)).await; // Simulate warming time
        let warming_time = start.elapsed();

        // Update statistics
        let mut stats = self.stats.write().await;
        stats.total_attempts += 1;

        if rand::random::<f64>() > 0.1 {
            // 90% success rate
            self.warming_states.insert(peer_id, WarmingState::Warm);
            stats.successful_warmings += 1;

            // Update average warming time
            let total_time =
                stats.avg_warming_time.as_millis() as f64 * stats.successful_warmings as f64;
            let new_avg = (total_time + warming_time.as_millis() as f64)
                / (stats.successful_warmings + 1) as f64;
            stats.avg_warming_time = Duration::from_millis(new_avg as u64);

            Ok(())
        } else {
            self.warming_states.insert(
                peer_id,
                WarmingState::FailedToWarm("Warming timeout".to_string()),
            );
            stats.failed_warmings += 1;
            Err(NetworkError::ConnectionError(
                "Connection warming failed".into(),
            ))
        }
    }

    /// Get warming state for a peer
    pub fn get_warming_state(&self, peer_id: &PeerId) -> WarmingState {
        self.warming_states
            .get(peer_id)
            .map(|entry| entry.value().clone())
            .unwrap_or(WarmingState::Cold)
    }
}

impl Default for WarmingConfig {
    fn default() -> Self {
        Self {
            enabled: true,
            min_pool_size: 5,
            warming_timeout: Duration::from_secs(10),
            warming_retries: 3,
            predictive_threshold: 0.8,
        }
    }
}