1#![allow(missing_docs)]
2
3pub mod circuit_breaker;
9pub mod connection;
10pub mod connection_pool;
11pub mod dark_resolver;
12pub mod discovery;
13pub mod dns;
14pub mod kademlia;
15pub mod message;
16pub mod metrics;
17pub mod nat_traversal;
18pub mod onion;
19pub mod p2p;
22pub mod peer;
23pub mod quantum_crypto;
24pub mod router;
25pub mod routing;
26pub mod shadow_address;
27pub mod traffic_obfuscation;
28pub mod transport;
29pub mod types;
30
31pub use dark_resolver::{DarkDomainRecord, DarkResolver, DarkResolverError};
32pub use discovery::{
33 DiscoveredPeer, DiscoveryConfig, DiscoveryEvent, DiscoveryMethod, DiscoveryStats,
34 KademliaPeerDiscovery,
35};
36pub use dns::{CloudflareClient, CloudflareConfig, DnsError, DnsManager, DnsRecord, RecordType};
37pub use kademlia::{BootstrapConfig, ContentRoutingConfig, KademliaDHT, PeerReputation};
38pub use message::MessageEnvelope;
39pub use nat_traversal::{
40 ConnectionType, ConnectionUpgradeManager, HolePunchCoordinator, HolePunchPhase, NatInfo,
41 NatPmpClient, NatTraversalConfig, NatTraversalError, NatTraversalManager, NatTraversalStats,
42 NatType, PortMapping, PortMappingMethod, PortMappingProtocol, RelayConnection, RelayManager,
43 RelayServer, StunClient, StunServer, TurnClient, TurnServer, UpgradeAttempt,
44};
45pub use onion::{
46 Circuit, CircuitManager, CircuitState, CircuitStats, DirectoryClient, HopMetadata, LayerFlags,
47 MLKEMOnionRouter, MetadataConfig, MetadataProtector, MixConfig, MixMessage, MixMessageType,
48 MixNode, MixNodeStats, NodeFlags, NodeInfo, OnionError, OnionLayer, OnionRouter,
49 ProtectedMetadata, TrafficAnalysisConfig, TrafficAnalysisResistance,
50};
51pub use p2p::{
52 NetworkConfig as P2PNetworkConfig, P2PCommand, P2PEvent, P2PHandle, P2PNode, QuDagRequest,
53 QuDagResponse,
54};
55pub use quantum_crypto::{
56 MlKemCiphertext, MlKemPublicKey, MlKemSecretKey, MlKemSecurityLevel, QuantumKeyExchange,
57 SharedSecret,
58};
59pub use router::{HopInfo, Router};
60pub use shadow_address::{
61 DefaultShadowAddressHandler, NetworkType, RotationPolicies, ShadowAddress, ShadowAddressError,
62 ShadowAddressGenerator, ShadowAddressManager, ShadowAddressMixer, ShadowAddressPool,
63 ShadowAddressResolver, ShadowFeatures, ShadowMetadata,
64};
65pub use traffic_obfuscation::{
66 ObfuscationPattern, ObfuscationStats, TrafficObfuscationConfig, TrafficObfuscator,
67 DEFAULT_MESSAGE_SIZE, STANDARD_MESSAGE_SIZES,
68};
69pub use transport::{AsyncTransport, Transport, TransportConfig, TransportError};
70pub use types::{
71 ConnectionStatus, LatencyMetrics, MessagePriority, NetworkAddress, NetworkError,
72 NetworkMessage, PeerId, QueueMetrics, RoutingStrategy, ThroughputMetrics,
73};
74
75use libp2p::PeerId as LibP2PPeerId;
76use std::collections::HashMap;
77use std::sync::Arc;
78use tokio::sync::{mpsc, RwLock};
79use tracing::{debug, error, info, warn};
80
81pub struct NetworkManager {
83 local_peer_id: Option<LibP2PPeerId>,
85 connected_peers: Arc<RwLock<HashMap<LibP2PPeerId, PeerMetadata>>>,
87 message_tx: Option<mpsc::Sender<NetworkEvent>>,
89 config: NetworkConfig,
91 connection_manager: Option<Arc<ConnectionManager>>,
93 discovery_service: Option<Arc<dyn PeerDiscoveryService>>,
95 reputation_manager: Arc<RwLock<ReputationManager>>,
97 nat_traversal_manager: Option<Arc<NatTraversalManager>>,
99}
100
101#[derive(Debug, Clone)]
103pub struct NetworkConfig {
104 pub max_connections: usize,
106 pub connection_timeout: std::time::Duration,
108 pub discovery_interval: std::time::Duration,
110 pub bootstrap_peers: Vec<String>,
112 pub enable_dht: bool,
114 pub quantum_resistant: bool,
116 pub enable_nat_traversal: bool,
118 pub nat_traversal_config: Option<NatTraversalConfig>,
120}
121
122impl Default for NetworkConfig {
123 fn default() -> Self {
124 Self {
125 max_connections: 50,
126 connection_timeout: std::time::Duration::from_secs(30),
127 discovery_interval: std::time::Duration::from_secs(60),
128 bootstrap_peers: vec![],
129 enable_dht: true,
130 quantum_resistant: true,
131 enable_nat_traversal: true,
132 nat_traversal_config: None,
133 }
134 }
135}
136
137#[derive(Debug, Clone)]
139pub struct PeerMetadata {
140 pub address: String,
142 pub connected_at: std::time::Instant,
144 pub last_activity: std::time::Instant,
146 pub reputation: f64,
148 pub protocol_version: u32,
150 pub latency_ms: u64,
152}
153
154#[derive(Debug, Clone)]
156pub enum NetworkEvent {
157 PeerConnected(LibP2PPeerId),
159 PeerDisconnected(LibP2PPeerId),
161 MessageReceived { from: LibP2PPeerId, data: Vec<u8> },
163 DiscoveryUpdate(Vec<LibP2PPeerId>),
165 NetworkError(String),
167}
168
169pub trait PeerDiscoveryService: Send + Sync {
171 fn start(&self) -> Result<(), NetworkError>;
173 fn stop(&self) -> Result<(), NetworkError>;
175 fn get_peers(&self) -> Vec<LibP2PPeerId>;
177 fn add_bootstrap_peer(&mut self, peer: String) -> Result<(), NetworkError>;
179}
180
181#[derive(Debug)]
183pub struct ReputationManager {
184 scores: HashMap<LibP2PPeerId, f64>,
186 blacklist: HashMap<LibP2PPeerId, std::time::Instant>,
188 trusted: HashMap<LibP2PPeerId, std::time::Instant>,
190}
191
192impl Default for ReputationManager {
193 fn default() -> Self {
194 Self {
195 scores: HashMap::new(),
196 blacklist: HashMap::new(),
197 trusted: HashMap::new(),
198 }
199 }
200}
201
202impl ReputationManager {
203 pub fn get_reputation(&self, peer_id: &LibP2PPeerId) -> f64 {
205 self.scores.get(peer_id).copied().unwrap_or(0.0)
206 }
207
208 pub fn update_reputation(&mut self, peer_id: LibP2PPeerId, delta: f64) {
210 let current = self.scores.get(&peer_id).copied().unwrap_or(0.0);
211 let new_score = (current + delta).clamp(-100.0, 100.0);
212 self.scores.insert(peer_id, new_score);
213
214 if new_score < -50.0 {
216 self.blacklist.insert(peer_id, std::time::Instant::now());
217 warn!(
218 "Auto-blacklisted peer {:?} due to low reputation: {}",
219 peer_id, new_score
220 );
221 }
222 }
223
224 pub fn is_blacklisted(&self, peer_id: &LibP2PPeerId) -> bool {
226 self.blacklist.contains_key(peer_id)
227 }
228
229 pub fn add_trusted(&mut self, peer_id: LibP2PPeerId) {
231 self.trusted.insert(peer_id, std::time::Instant::now());
232 self.scores.insert(peer_id, 75.0);
234 }
235
236 pub fn is_trusted(&self, peer_id: &LibP2PPeerId) -> bool {
238 self.trusted.contains_key(peer_id)
239 }
240
241 pub fn cleanup_expired(&mut self) {
243 let now = std::time::Instant::now();
244 let expire_time = std::time::Duration::from_secs(24 * 60 * 60);
245
246 self.blacklist
247 .retain(|_, timestamp| now.duration_since(*timestamp) < expire_time);
248 }
249}
250
251impl Default for NetworkManager {
252 fn default() -> Self {
253 Self::new()
254 }
255}
256
257impl NetworkManager {
258 pub fn new() -> Self {
260 Self::with_config(NetworkConfig::default())
261 }
262
263 pub fn with_config(config: NetworkConfig) -> Self {
265 Self {
266 local_peer_id: None,
267 connected_peers: Arc::new(RwLock::new(HashMap::new())),
268 message_tx: None,
269 config,
270 connection_manager: None,
271 discovery_service: None,
272 reputation_manager: Arc::new(RwLock::new(ReputationManager::default())),
273 nat_traversal_manager: None,
274 }
275 }
276
277 pub async fn initialize(&mut self) -> Result<(), NetworkError> {
279 self.local_peer_id = Some(LibP2PPeerId::random());
281
282 let (tx, mut rx) = mpsc::channel(1024);
284 self.message_tx = Some(tx);
285
286 let connection_manager = Arc::new(ConnectionManager::new(self.config.max_connections));
288 self.connection_manager = Some(connection_manager.clone());
289
290 if self.config.enable_nat_traversal {
292 let nat_config = self.config.nat_traversal_config.clone().unwrap_or_default();
293 let nat_manager = Arc::new(NatTraversalManager::new(
294 nat_config,
295 connection_manager.clone(),
296 ));
297
298 if let Err(e) = nat_manager.initialize().await {
299 warn!("NAT traversal initialization failed: {}", e);
300 } else {
301 info!("NAT traversal initialized successfully");
302 }
303
304 self.nat_traversal_manager = Some(nat_manager);
305 }
306
307 let connected_peers = Arc::clone(&self.connected_peers);
309 let reputation_manager = Arc::clone(&self.reputation_manager);
310
311 tokio::spawn(async move {
312 while let Some(event) = rx.recv().await {
313 Self::handle_network_event(event, &connected_peers, &reputation_manager).await;
314 }
315 });
316
317 info!(
318 "NetworkManager initialized with peer ID: {:?}",
319 self.local_peer_id
320 );
321 Ok(())
322 }
323
324 async fn handle_network_event(
326 event: NetworkEvent,
327 connected_peers: &Arc<RwLock<HashMap<LibP2PPeerId, PeerMetadata>>>,
328 reputation_manager: &Arc<RwLock<ReputationManager>>,
329 ) {
330 match event {
331 NetworkEvent::PeerConnected(peer_id) => {
332 debug!("Handling peer connection: {:?}", peer_id);
333 let metadata = PeerMetadata {
334 address: "unknown".to_string(),
335 connected_at: std::time::Instant::now(),
336 last_activity: std::time::Instant::now(),
337 reputation: 0.0,
338 protocol_version: 1,
339 latency_ms: 0,
340 };
341 connected_peers.write().await.insert(peer_id, metadata);
342 }
343 NetworkEvent::PeerDisconnected(peer_id) => {
344 debug!("Handling peer disconnection: {:?}", peer_id);
345 connected_peers.write().await.remove(&peer_id);
346 }
347 NetworkEvent::MessageReceived { from, data: _ } => {
348 if let Some(metadata) = connected_peers.write().await.get_mut(&from) {
350 metadata.last_activity = std::time::Instant::now();
351 }
352 reputation_manager
353 .write()
354 .await
355 .update_reputation(from, 0.1);
356 }
357 NetworkEvent::NetworkError(error) => {
358 error!("Network error: {}", error);
359 }
360 NetworkEvent::DiscoveryUpdate(peers) => {
361 debug!("Discovery update: {} new peers found", peers.len());
362 }
363 }
364 }
365
366 pub async fn connect_peer(&self, _peer_address: &str) -> Result<LibP2PPeerId, NetworkError> {
368 let peer_id = LibP2PPeerId::random(); if self
372 .reputation_manager
373 .read()
374 .await
375 .is_blacklisted(&peer_id)
376 {
377 return Err(NetworkError::ConnectionError(
378 "Peer is blacklisted".to_string(),
379 ));
380 }
381
382 let peer_bytes = peer_id.to_bytes();
384 let mut bytes_array = [0u8; 32];
385 let len = peer_bytes.len().min(32);
386 bytes_array[..len].copy_from_slice(&peer_bytes[..len]);
387 let our_peer_id = crate::types::PeerId::from_bytes(bytes_array);
388
389 if let Some(nat_manager) = &self.nat_traversal_manager {
391 match nat_manager.connect_peer(our_peer_id).await {
392 Ok(()) => {
393 info!("Connected to peer {:?} via NAT traversal", peer_id);
394 }
395 Err(e) => {
396 warn!("NAT traversal failed for peer {:?}: {}", peer_id, e);
397 if let Some(conn_mgr) = &self.connection_manager {
399 conn_mgr.connect(our_peer_id).await?;
400 }
401 }
402 }
403 } else if let Some(conn_mgr) = &self.connection_manager {
404 conn_mgr.connect(our_peer_id).await?;
406 }
407
408 if let Some(tx) = &self.message_tx {
410 let _ = tx.send(NetworkEvent::PeerConnected(peer_id)).await;
411 }
412
413 info!("Successfully connected to peer: {:?}", peer_id);
414 Ok(peer_id)
415 }
416
417 pub async fn disconnect_peer(&self, peer_id: &LibP2PPeerId) -> Result<(), NetworkError> {
419 if let Some(conn_mgr) = &self.connection_manager {
421 let peer_bytes = peer_id.to_bytes();
422 let mut bytes_array = [0u8; 32];
423 let len = peer_bytes.len().min(32);
424 bytes_array[..len].copy_from_slice(&peer_bytes[..len]);
425 let our_peer_id = crate::types::PeerId::from_bytes(bytes_array);
426 conn_mgr.disconnect(&our_peer_id);
427 }
428
429 if let Some(tx) = &self.message_tx {
431 let _ = tx.send(NetworkEvent::PeerDisconnected(*peer_id)).await;
432 }
433
434 info!("Disconnected from peer: {:?}", peer_id);
435 Ok(())
436 }
437
438 pub async fn send_message(
440 &self,
441 peer_id: &LibP2PPeerId,
442 data: Vec<u8>,
443 ) -> Result<(), NetworkError> {
444 if !self.connected_peers.read().await.contains_key(peer_id) {
446 return Err(NetworkError::ConnectionError(
447 "Peer not connected".to_string(),
448 ));
449 }
450
451 debug!("Sending {} bytes to peer {:?}", data.len(), peer_id);
453
454 if let Some(metadata) = self.connected_peers.write().await.get_mut(peer_id) {
456 metadata.last_activity = std::time::Instant::now();
457 }
458
459 Ok(())
460 }
461
462 pub async fn get_connected_peers(&self) -> Vec<LibP2PPeerId> {
464 self.connected_peers.read().await.keys().cloned().collect()
465 }
466
467 pub async fn get_peer_metadata(&self, peer_id: &LibP2PPeerId) -> Option<PeerMetadata> {
469 self.connected_peers.read().await.get(peer_id).cloned()
470 }
471
472 pub async fn get_network_stats(&self) -> NetworkStats {
474 let connected_count = self.connected_peers.read().await.len();
475 let reputation_scores: Vec<f64> = {
476 let rep_mgr = self.reputation_manager.read().await;
477 rep_mgr.scores.values().cloned().collect()
478 };
479
480 let avg_reputation = if reputation_scores.is_empty() {
481 0.0
482 } else {
483 reputation_scores.iter().sum::<f64>() / reputation_scores.len() as f64
484 };
485
486 NetworkStats {
487 connected_peers: connected_count,
488 average_reputation: avg_reputation,
489 blacklisted_peers: self.reputation_manager.read().await.blacklist.len(),
490 trusted_peers: self.reputation_manager.read().await.trusted.len(),
491 }
492 }
493
494 pub async fn add_trusted_peer(&self, peer_id: LibP2PPeerId) {
496 self.reputation_manager.write().await.add_trusted(peer_id);
497 info!("Added trusted peer: {:?}", peer_id);
498 }
499
500 pub async fn blacklist_peer(&self, peer_id: LibP2PPeerId) {
502 self.reputation_manager
503 .write()
504 .await
505 .update_reputation(peer_id, -100.0);
506
507 let _ = self.disconnect_peer(&peer_id).await;
509
510 warn!("Blacklisted peer: {:?}", peer_id);
511 }
512
513 pub async fn start_discovery(&mut self) -> Result<(), NetworkError> {
515 info!("Starting peer discovery");
517 Ok(())
518 }
519
520 pub async fn shutdown(&mut self) -> Result<(), NetworkError> {
522 info!("Shutting down NetworkManager");
523
524 let peers: Vec<_> = self.get_connected_peers().await;
526 for peer_id in peers {
527 let _ = self.disconnect_peer(&peer_id).await;
528 }
529
530 if let Some(discovery) = &self.discovery_service {
532 discovery.stop()?;
533 }
534
535 if let Some(nat_manager) = &self.nat_traversal_manager {
537 if let Err(e) = nat_manager.shutdown().await {
538 warn!("NAT traversal shutdown error: {}", e);
539 }
540 }
541
542 Ok(())
543 }
544
545 pub fn local_peer_id(&self) -> Option<LibP2PPeerId> {
547 self.local_peer_id
548 }
549
550 pub async fn maintenance(&self) {
552 self.reputation_manager.write().await.cleanup_expired();
554
555 let now = std::time::Instant::now();
557 let timeout = std::time::Duration::from_secs(300);
558
559 let mut to_disconnect = Vec::new();
560 {
561 let peers = self.connected_peers.read().await;
562 for (peer_id, metadata) in peers.iter() {
563 if now.duration_since(metadata.last_activity) > timeout {
564 to_disconnect.push(*peer_id);
565 }
566 }
567 }
568
569 for peer_id in to_disconnect {
570 warn!("Disconnecting inactive peer: {:?}", peer_id);
571 let _ = self.disconnect_peer(&peer_id).await;
572 }
573 }
574
575 pub fn get_nat_info(&self) -> Option<NatInfo> {
577 self.nat_traversal_manager.as_ref()?.get_nat_info()
578 }
579
580 pub async fn create_port_mapping(
582 &self,
583 local_port: u16,
584 external_port: u16,
585 protocol: crate::nat_traversal::PortMappingProtocol,
586 ) -> Result<PortMapping, NetworkError> {
587 if let Some(nat_manager) = &self.nat_traversal_manager {
588 nat_manager
589 .create_port_mapping(local_port, external_port, protocol)
590 .await
591 .map_err(|e| NetworkError::ConnectionError(e.to_string()))
592 } else {
593 Err(NetworkError::ConnectionError(
594 "NAT traversal not enabled".to_string(),
595 ))
596 }
597 }
598
599 pub fn get_nat_traversal_stats(&self) -> Option<NatTraversalStats> {
601 self.nat_traversal_manager.as_ref().map(|m| m.get_stats())
602 }
603}
604
605#[derive(Debug, Clone)]
607pub struct NetworkStats {
608 pub connected_peers: usize,
610 pub average_reputation: f64,
612 pub blacklisted_peers: usize,
614 pub trusted_peers: usize,
616}
617pub use circuit_breaker::{CircuitBreaker, CircuitState as CircuitBreakerState};
618pub use connection::{
619 ConnectionInfo, ConnectionManager, HealthStatistics, SecureConfig, SecureConnection,
620 TransportKeys, UnhealthyConnectionInfo,
621};