qudag_network/
lib.rs

1#![allow(missing_docs)]
2
3//! P2P networking layer with anonymous routing.
4//!
5//! This module provides the networking layer for the QuDAG protocol,
6//! implementing anonymous routing, P2P communication, and traffic obfuscation.
7
8pub mod circuit_breaker;
9pub mod connection;
10pub mod connection_pool;
11pub mod dark_resolver;
12pub mod discovery;
13pub mod dns;
14pub mod kademlia;
15pub mod message;
16pub mod metrics;
17pub mod nat_traversal;
18pub mod onion;
19// Optimization features disabled for initial release
20// pub mod optimized;
21pub mod p2p;
22pub mod peer;
23pub mod quantum_crypto;
24pub mod router;
25pub mod routing;
26pub mod shadow_address;
27pub mod traffic_obfuscation;
28pub mod transport;
29pub mod types;
30
31pub use dark_resolver::{DarkDomainRecord, DarkResolver, DarkResolverError};
32pub use discovery::{
33    DiscoveredPeer, DiscoveryConfig, DiscoveryEvent, DiscoveryMethod, DiscoveryStats,
34    KademliaPeerDiscovery,
35};
36pub use dns::{CloudflareClient, CloudflareConfig, DnsError, DnsManager, DnsRecord, RecordType};
37pub use kademlia::{BootstrapConfig, ContentRoutingConfig, KademliaDHT, PeerReputation};
38pub use message::MessageEnvelope;
39pub use nat_traversal::{
40    ConnectionType, ConnectionUpgradeManager, HolePunchCoordinator, HolePunchPhase, NatInfo,
41    NatPmpClient, NatTraversalConfig, NatTraversalError, NatTraversalManager, NatTraversalStats,
42    NatType, PortMapping, PortMappingMethod, PortMappingProtocol, RelayConnection, RelayManager,
43    RelayServer, StunClient, StunServer, TurnClient, TurnServer, UpgradeAttempt,
44};
45pub use onion::{
46    Circuit, CircuitManager, CircuitState, CircuitStats, DirectoryClient, HopMetadata, LayerFlags,
47    MLKEMOnionRouter, MetadataConfig, MetadataProtector, MixConfig, MixMessage, MixMessageType,
48    MixNode, MixNodeStats, NodeFlags, NodeInfo, OnionError, OnionLayer, OnionRouter,
49    ProtectedMetadata, TrafficAnalysisConfig, TrafficAnalysisResistance,
50};
51pub use p2p::{
52    NetworkConfig as P2PNetworkConfig, P2PCommand, P2PEvent, P2PHandle, P2PNode, QuDagRequest,
53    QuDagResponse,
54};
55pub use quantum_crypto::{
56    MlKemCiphertext, MlKemPublicKey, MlKemSecretKey, MlKemSecurityLevel, QuantumKeyExchange,
57    SharedSecret,
58};
59pub use router::{HopInfo, Router};
60pub use shadow_address::{
61    DefaultShadowAddressHandler, NetworkType, RotationPolicies, ShadowAddress, ShadowAddressError,
62    ShadowAddressGenerator, ShadowAddressManager, ShadowAddressMixer, ShadowAddressPool,
63    ShadowAddressResolver, ShadowFeatures, ShadowMetadata,
64};
65pub use traffic_obfuscation::{
66    ObfuscationPattern, ObfuscationStats, TrafficObfuscationConfig, TrafficObfuscator,
67    DEFAULT_MESSAGE_SIZE, STANDARD_MESSAGE_SIZES,
68};
69pub use transport::{AsyncTransport, Transport, TransportConfig, TransportError};
70pub use types::{
71    ConnectionStatus, LatencyMetrics, MessagePriority, NetworkAddress, NetworkError,
72    NetworkMessage, PeerId, QueueMetrics, RoutingStrategy, ThroughputMetrics,
73};
74
75use libp2p::PeerId as LibP2PPeerId;
76use std::collections::HashMap;
77use std::sync::Arc;
78use tokio::sync::{mpsc, RwLock};
79use tracing::{debug, error, info, warn};
80
81/// Comprehensive network manager for P2P operations
82pub struct NetworkManager {
83    /// Local peer ID
84    local_peer_id: Option<LibP2PPeerId>,
85    /// Connected peers
86    connected_peers: Arc<RwLock<HashMap<LibP2PPeerId, PeerMetadata>>>,
87    /// Message channel for inter-component communication
88    message_tx: Option<mpsc::Sender<NetworkEvent>>,
89    /// Network configuration
90    config: NetworkConfig,
91    /// Connection manager instance
92    connection_manager: Option<Arc<ConnectionManager>>,
93    /// Peer discovery service
94    discovery_service: Option<Arc<dyn PeerDiscoveryService>>,
95    /// Reputation manager
96    reputation_manager: Arc<RwLock<ReputationManager>>,
97    /// NAT traversal manager
98    nat_traversal_manager: Option<Arc<NatTraversalManager>>,
99}
100
101/// Network configuration for the manager
102#[derive(Debug, Clone)]
103pub struct NetworkConfig {
104    /// Maximum number of connections
105    pub max_connections: usize,
106    /// Connection timeout
107    pub connection_timeout: std::time::Duration,
108    /// Discovery interval
109    pub discovery_interval: std::time::Duration,
110    /// Bootstrap peers
111    pub bootstrap_peers: Vec<String>,
112    /// Enable DHT
113    pub enable_dht: bool,
114    /// Quantum-resistant mode
115    pub quantum_resistant: bool,
116    /// Enable NAT traversal
117    pub enable_nat_traversal: bool,
118    /// NAT traversal configuration
119    pub nat_traversal_config: Option<NatTraversalConfig>,
120}
121
122impl Default for NetworkConfig {
123    fn default() -> Self {
124        Self {
125            max_connections: 50,
126            connection_timeout: std::time::Duration::from_secs(30),
127            discovery_interval: std::time::Duration::from_secs(60),
128            bootstrap_peers: vec![],
129            enable_dht: true,
130            quantum_resistant: true,
131            enable_nat_traversal: true,
132            nat_traversal_config: None,
133        }
134    }
135}
136
137/// Peer metadata for tracking
138#[derive(Debug, Clone)]
139pub struct PeerMetadata {
140    /// Peer address information
141    pub address: String,
142    /// Connection timestamp
143    pub connected_at: std::time::Instant,
144    /// Last activity timestamp
145    pub last_activity: std::time::Instant,
146    /// Reputation score
147    pub reputation: f64,
148    /// Protocol version
149    pub protocol_version: u32,
150    /// Connection quality metrics
151    pub latency_ms: u64,
152}
153
154/// Network events for inter-component communication
155#[derive(Debug, Clone)]
156pub enum NetworkEvent {
157    /// Peer connected
158    PeerConnected(LibP2PPeerId),
159    /// Peer disconnected
160    PeerDisconnected(LibP2PPeerId),
161    /// Message received
162    MessageReceived { from: LibP2PPeerId, data: Vec<u8> },
163    /// Discovery update
164    DiscoveryUpdate(Vec<LibP2PPeerId>),
165    /// Network error
166    NetworkError(String),
167}
168
169/// Trait for peer discovery services
170pub trait PeerDiscoveryService: Send + Sync {
171    /// Start discovery service
172    fn start(&self) -> Result<(), NetworkError>;
173    /// Stop discovery service
174    fn stop(&self) -> Result<(), NetworkError>;
175    /// Get discovered peers
176    fn get_peers(&self) -> Vec<LibP2PPeerId>;
177    /// Add bootstrap peer
178    fn add_bootstrap_peer(&mut self, peer: String) -> Result<(), NetworkError>;
179}
180
181/// Reputation management for peers
182#[derive(Debug)]
183pub struct ReputationManager {
184    /// Peer reputation scores
185    scores: HashMap<LibP2PPeerId, f64>,
186    /// Blacklisted peers
187    blacklist: HashMap<LibP2PPeerId, std::time::Instant>,
188    /// Trusted peers
189    trusted: HashMap<LibP2PPeerId, std::time::Instant>,
190}
191
192impl Default for ReputationManager {
193    fn default() -> Self {
194        Self {
195            scores: HashMap::new(),
196            blacklist: HashMap::new(),
197            trusted: HashMap::new(),
198        }
199    }
200}
201
202impl ReputationManager {
203    /// Get reputation score for a peer
204    pub fn get_reputation(&self, peer_id: &LibP2PPeerId) -> f64 {
205        self.scores.get(peer_id).copied().unwrap_or(0.0)
206    }
207
208    /// Update reputation score
209    pub fn update_reputation(&mut self, peer_id: LibP2PPeerId, delta: f64) {
210        let current = self.scores.get(&peer_id).copied().unwrap_or(0.0);
211        let new_score = (current + delta).clamp(-100.0, 100.0);
212        self.scores.insert(peer_id, new_score);
213
214        // Auto-blacklist peers with very low reputation
215        if new_score < -50.0 {
216            self.blacklist.insert(peer_id, std::time::Instant::now());
217            warn!(
218                "Auto-blacklisted peer {:?} due to low reputation: {}",
219                peer_id, new_score
220            );
221        }
222    }
223
224    /// Check if peer is blacklisted
225    pub fn is_blacklisted(&self, peer_id: &LibP2PPeerId) -> bool {
226        self.blacklist.contains_key(peer_id)
227    }
228
229    /// Add peer to trusted list
230    pub fn add_trusted(&mut self, peer_id: LibP2PPeerId) {
231        self.trusted.insert(peer_id, std::time::Instant::now());
232        // Set high reputation for trusted peers
233        self.scores.insert(peer_id, 75.0);
234    }
235
236    /// Check if peer is trusted
237    pub fn is_trusted(&self, peer_id: &LibP2PPeerId) -> bool {
238        self.trusted.contains_key(peer_id)
239    }
240
241    /// Remove expired blacklist entries (24 hours)
242    pub fn cleanup_expired(&mut self) {
243        let now = std::time::Instant::now();
244        let expire_time = std::time::Duration::from_secs(24 * 60 * 60);
245
246        self.blacklist
247            .retain(|_, timestamp| now.duration_since(*timestamp) < expire_time);
248    }
249}
250
251impl Default for NetworkManager {
252    fn default() -> Self {
253        Self::new()
254    }
255}
256
257impl NetworkManager {
258    /// Create new network manager with default configuration
259    pub fn new() -> Self {
260        Self::with_config(NetworkConfig::default())
261    }
262
263    /// Create new network manager with custom configuration
264    pub fn with_config(config: NetworkConfig) -> Self {
265        Self {
266            local_peer_id: None,
267            connected_peers: Arc::new(RwLock::new(HashMap::new())),
268            message_tx: None,
269            config,
270            connection_manager: None,
271            discovery_service: None,
272            reputation_manager: Arc::new(RwLock::new(ReputationManager::default())),
273            nat_traversal_manager: None,
274        }
275    }
276
277    /// Initialize the network manager
278    pub async fn initialize(&mut self) -> Result<(), NetworkError> {
279        // Generate or load peer identity
280        self.local_peer_id = Some(LibP2PPeerId::random());
281
282        // Set up message channel
283        let (tx, mut rx) = mpsc::channel(1024);
284        self.message_tx = Some(tx);
285
286        // Initialize connection manager
287        let connection_manager = Arc::new(ConnectionManager::new(self.config.max_connections));
288        self.connection_manager = Some(connection_manager.clone());
289
290        // Initialize NAT traversal if enabled
291        if self.config.enable_nat_traversal {
292            let nat_config = self.config.nat_traversal_config.clone().unwrap_or_default();
293            let nat_manager = Arc::new(NatTraversalManager::new(
294                nat_config,
295                connection_manager.clone(),
296            ));
297
298            if let Err(e) = nat_manager.initialize().await {
299                warn!("NAT traversal initialization failed: {}", e);
300            } else {
301                info!("NAT traversal initialized successfully");
302            }
303
304            self.nat_traversal_manager = Some(nat_manager);
305        }
306
307        // Start background event processing
308        let connected_peers = Arc::clone(&self.connected_peers);
309        let reputation_manager = Arc::clone(&self.reputation_manager);
310
311        tokio::spawn(async move {
312            while let Some(event) = rx.recv().await {
313                Self::handle_network_event(event, &connected_peers, &reputation_manager).await;
314            }
315        });
316
317        info!(
318            "NetworkManager initialized with peer ID: {:?}",
319            self.local_peer_id
320        );
321        Ok(())
322    }
323
324    /// Handle network events in background task
325    async fn handle_network_event(
326        event: NetworkEvent,
327        connected_peers: &Arc<RwLock<HashMap<LibP2PPeerId, PeerMetadata>>>,
328        reputation_manager: &Arc<RwLock<ReputationManager>>,
329    ) {
330        match event {
331            NetworkEvent::PeerConnected(peer_id) => {
332                debug!("Handling peer connection: {:?}", peer_id);
333                let metadata = PeerMetadata {
334                    address: "unknown".to_string(),
335                    connected_at: std::time::Instant::now(),
336                    last_activity: std::time::Instant::now(),
337                    reputation: 0.0,
338                    protocol_version: 1,
339                    latency_ms: 0,
340                };
341                connected_peers.write().await.insert(peer_id, metadata);
342            }
343            NetworkEvent::PeerDisconnected(peer_id) => {
344                debug!("Handling peer disconnection: {:?}", peer_id);
345                connected_peers.write().await.remove(&peer_id);
346            }
347            NetworkEvent::MessageReceived { from, data: _ } => {
348                // Update last activity and reputation
349                if let Some(metadata) = connected_peers.write().await.get_mut(&from) {
350                    metadata.last_activity = std::time::Instant::now();
351                }
352                reputation_manager
353                    .write()
354                    .await
355                    .update_reputation(from, 0.1);
356            }
357            NetworkEvent::NetworkError(error) => {
358                error!("Network error: {}", error);
359            }
360            NetworkEvent::DiscoveryUpdate(peers) => {
361                debug!("Discovery update: {} new peers found", peers.len());
362            }
363        }
364    }
365
366    /// Connect to a peer
367    pub async fn connect_peer(&self, _peer_address: &str) -> Result<LibP2PPeerId, NetworkError> {
368        let peer_id = LibP2PPeerId::random(); // In real implementation, derive from address
369
370        // Check if peer is blacklisted
371        if self
372            .reputation_manager
373            .read()
374            .await
375            .is_blacklisted(&peer_id)
376        {
377            return Err(NetworkError::ConnectionError(
378                "Peer is blacklisted".to_string(),
379            ));
380        }
381
382        // Convert LibP2PPeerId to our PeerId type for connection manager
383        let peer_bytes = peer_id.to_bytes();
384        let mut bytes_array = [0u8; 32];
385        let len = peer_bytes.len().min(32);
386        bytes_array[..len].copy_from_slice(&peer_bytes[..len]);
387        let our_peer_id = crate::types::PeerId::from_bytes(bytes_array);
388
389        // Try NAT traversal if available
390        if let Some(nat_manager) = &self.nat_traversal_manager {
391            match nat_manager.connect_peer(our_peer_id).await {
392                Ok(()) => {
393                    info!("Connected to peer {:?} via NAT traversal", peer_id);
394                }
395                Err(e) => {
396                    warn!("NAT traversal failed for peer {:?}: {}", peer_id, e);
397                    // Fall back to direct connection
398                    if let Some(conn_mgr) = &self.connection_manager {
399                        conn_mgr.connect(our_peer_id).await?;
400                    }
401                }
402            }
403        } else if let Some(conn_mgr) = &self.connection_manager {
404            // Use regular connection manager
405            conn_mgr.connect(our_peer_id).await?;
406        }
407
408        // Notify about new connection
409        if let Some(tx) = &self.message_tx {
410            let _ = tx.send(NetworkEvent::PeerConnected(peer_id)).await;
411        }
412
413        info!("Successfully connected to peer: {:?}", peer_id);
414        Ok(peer_id)
415    }
416
417    /// Disconnect from a peer
418    pub async fn disconnect_peer(&self, peer_id: &LibP2PPeerId) -> Result<(), NetworkError> {
419        // Use connection manager to close connection
420        if let Some(conn_mgr) = &self.connection_manager {
421            let peer_bytes = peer_id.to_bytes();
422            let mut bytes_array = [0u8; 32];
423            let len = peer_bytes.len().min(32);
424            bytes_array[..len].copy_from_slice(&peer_bytes[..len]);
425            let our_peer_id = crate::types::PeerId::from_bytes(bytes_array);
426            conn_mgr.disconnect(&our_peer_id);
427        }
428
429        // Notify about disconnection
430        if let Some(tx) = &self.message_tx {
431            let _ = tx.send(NetworkEvent::PeerDisconnected(*peer_id)).await;
432        }
433
434        info!("Disconnected from peer: {:?}", peer_id);
435        Ok(())
436    }
437
438    /// Send message to a peer
439    pub async fn send_message(
440        &self,
441        peer_id: &LibP2PPeerId,
442        data: Vec<u8>,
443    ) -> Result<(), NetworkError> {
444        // Check if peer is connected
445        if !self.connected_peers.read().await.contains_key(peer_id) {
446            return Err(NetworkError::ConnectionError(
447                "Peer not connected".to_string(),
448            ));
449        }
450
451        // In real implementation, this would use the actual transport layer
452        debug!("Sending {} bytes to peer {:?}", data.len(), peer_id);
453
454        // Update peer activity
455        if let Some(metadata) = self.connected_peers.write().await.get_mut(peer_id) {
456            metadata.last_activity = std::time::Instant::now();
457        }
458
459        Ok(())
460    }
461
462    /// Get list of connected peers
463    pub async fn get_connected_peers(&self) -> Vec<LibP2PPeerId> {
464        self.connected_peers.read().await.keys().cloned().collect()
465    }
466
467    /// Get peer metadata
468    pub async fn get_peer_metadata(&self, peer_id: &LibP2PPeerId) -> Option<PeerMetadata> {
469        self.connected_peers.read().await.get(peer_id).cloned()
470    }
471
472    /// Get network statistics
473    pub async fn get_network_stats(&self) -> NetworkStats {
474        let connected_count = self.connected_peers.read().await.len();
475        let reputation_scores: Vec<f64> = {
476            let rep_mgr = self.reputation_manager.read().await;
477            rep_mgr.scores.values().cloned().collect()
478        };
479
480        let avg_reputation = if reputation_scores.is_empty() {
481            0.0
482        } else {
483            reputation_scores.iter().sum::<f64>() / reputation_scores.len() as f64
484        };
485
486        NetworkStats {
487            connected_peers: connected_count,
488            average_reputation: avg_reputation,
489            blacklisted_peers: self.reputation_manager.read().await.blacklist.len(),
490            trusted_peers: self.reputation_manager.read().await.trusted.len(),
491        }
492    }
493
494    /// Add peer to trusted list
495    pub async fn add_trusted_peer(&self, peer_id: LibP2PPeerId) {
496        self.reputation_manager.write().await.add_trusted(peer_id);
497        info!("Added trusted peer: {:?}", peer_id);
498    }
499
500    /// Blacklist a peer
501    pub async fn blacklist_peer(&self, peer_id: LibP2PPeerId) {
502        self.reputation_manager
503            .write()
504            .await
505            .update_reputation(peer_id, -100.0);
506
507        // Disconnect if currently connected
508        let _ = self.disconnect_peer(&peer_id).await;
509
510        warn!("Blacklisted peer: {:?}", peer_id);
511    }
512
513    /// Start peer discovery
514    pub async fn start_discovery(&mut self) -> Result<(), NetworkError> {
515        // TODO: Initialize and start discovery service
516        info!("Starting peer discovery");
517        Ok(())
518    }
519
520    /// Stop the network manager
521    pub async fn shutdown(&mut self) -> Result<(), NetworkError> {
522        info!("Shutting down NetworkManager");
523
524        // Disconnect all peers
525        let peers: Vec<_> = self.get_connected_peers().await;
526        for peer_id in peers {
527            let _ = self.disconnect_peer(&peer_id).await;
528        }
529
530        // Stop discovery service
531        if let Some(discovery) = &self.discovery_service {
532            discovery.stop()?;
533        }
534
535        // Shutdown NAT traversal
536        if let Some(nat_manager) = &self.nat_traversal_manager {
537            if let Err(e) = nat_manager.shutdown().await {
538                warn!("NAT traversal shutdown error: {}", e);
539            }
540        }
541
542        Ok(())
543    }
544
545    /// Get the local peer ID
546    pub fn local_peer_id(&self) -> Option<LibP2PPeerId> {
547        self.local_peer_id
548    }
549
550    /// Perform maintenance tasks
551    pub async fn maintenance(&self) {
552        // Cleanup expired blacklist entries
553        self.reputation_manager.write().await.cleanup_expired();
554
555        // Remove inactive peers (older than 5 minutes with no activity)
556        let now = std::time::Instant::now();
557        let timeout = std::time::Duration::from_secs(300);
558
559        let mut to_disconnect = Vec::new();
560        {
561            let peers = self.connected_peers.read().await;
562            for (peer_id, metadata) in peers.iter() {
563                if now.duration_since(metadata.last_activity) > timeout {
564                    to_disconnect.push(*peer_id);
565                }
566            }
567        }
568
569        for peer_id in to_disconnect {
570            warn!("Disconnecting inactive peer: {:?}", peer_id);
571            let _ = self.disconnect_peer(&peer_id).await;
572        }
573    }
574
575    /// Get NAT information
576    pub fn get_nat_info(&self) -> Option<NatInfo> {
577        self.nat_traversal_manager.as_ref()?.get_nat_info()
578    }
579
580    /// Create port mapping
581    pub async fn create_port_mapping(
582        &self,
583        local_port: u16,
584        external_port: u16,
585        protocol: crate::nat_traversal::PortMappingProtocol,
586    ) -> Result<PortMapping, NetworkError> {
587        if let Some(nat_manager) = &self.nat_traversal_manager {
588            nat_manager
589                .create_port_mapping(local_port, external_port, protocol)
590                .await
591                .map_err(|e| NetworkError::ConnectionError(e.to_string()))
592        } else {
593            Err(NetworkError::ConnectionError(
594                "NAT traversal not enabled".to_string(),
595            ))
596        }
597    }
598
599    /// Get NAT traversal statistics
600    pub fn get_nat_traversal_stats(&self) -> Option<NatTraversalStats> {
601        self.nat_traversal_manager.as_ref().map(|m| m.get_stats())
602    }
603}
604
605/// Network statistics
606#[derive(Debug, Clone)]
607pub struct NetworkStats {
608    /// Number of connected peers
609    pub connected_peers: usize,
610    /// Average reputation score
611    pub average_reputation: f64,
612    /// Number of blacklisted peers
613    pub blacklisted_peers: usize,
614    /// Number of trusted peers
615    pub trusted_peers: usize,
616}
617pub use circuit_breaker::{CircuitBreaker, CircuitState as CircuitBreakerState};
618pub use connection::{
619    ConnectionInfo, ConnectionManager, HealthStatistics, SecureConfig, SecureConnection,
620    TransportKeys, UnhealthyConnectionInfo,
621};