saorsa_core/
network.rs

1// Copyright 2024 Saorsa Labs Limited
2//
3// This software is dual-licensed under:
4// - GNU Affero General Public License v3.0 or later (AGPL-3.0-or-later)
5// - Commercial License
6//
7// For AGPL-3.0 license, see LICENSE-AGPL-3.0
8// For commercial licensing, contact: david@saorsalabs.com
9//
10// Unless required by applicable law or agreed to in writing, software
11// distributed under these licenses is distributed on an "AS IS" BASIS,
12// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
14//! Network module
15//!
16//! This module provides core networking functionality for the P2P Foundation.
17//! It handles peer connections, network events, and node lifecycle management.
18
19use crate::bgp_geo_provider::BgpGeoProvider;
20use crate::bootstrap::{BootstrapManager, ContactEntry, QualityMetrics};
21use crate::config::Config;
22use crate::control::RejectionMessage;
23use crate::dht::DHT;
24use crate::error::{NetworkError, P2PError, P2pResult as Result};
25use crate::identity::rejection::RejectionReason;
26use crate::security::GeoProvider;
27
28use crate::production::{ProductionConfig, ResourceManager, ResourceMetrics};
29use crate::transport::ant_quic_adapter::DualStackNetworkNode;
30#[allow(unused_imports)] // Temporarily unused during migration
31use crate::transport::{TransportOptions, TransportType};
32use crate::validation::RateLimitConfig;
33use crate::validation::RateLimiter;
34use crate::{NetworkAddress, PeerId};
35use serde::{Deserialize, Serialize};
36use std::collections::{HashMap, HashSet};
37use std::sync::Arc;
38use std::sync::atomic::{AtomicBool, Ordering};
39use std::time::Duration;
40use tokio::sync::{RwLock, broadcast};
41use tokio::time::Instant;
42use tracing::{debug, info, trace, warn};
43
44/// Configuration for a P2P node
45#[derive(Debug, Clone, Serialize, Deserialize)]
46pub struct NodeConfig {
47    /// Local peer ID for this node
48    pub peer_id: Option<PeerId>,
49
50    /// Addresses to listen on for incoming connections
51    pub listen_addrs: Vec<std::net::SocketAddr>,
52
53    /// Primary listen address (for compatibility)
54    pub listen_addr: std::net::SocketAddr,
55
56    /// Bootstrap peers to connect to on startup (legacy)
57    pub bootstrap_peers: Vec<std::net::SocketAddr>,
58
59    /// Bootstrap peers as strings (for integration tests)
60    pub bootstrap_peers_str: Vec<String>,
61
62    /// Enable IPv6 support
63    pub enable_ipv6: bool,
64
65    // MCP removed; will be redesigned later
66    /// Connection timeout duration
67    pub connection_timeout: Duration,
68
69    /// Keep-alive interval for connections
70    pub keep_alive_interval: Duration,
71
72    /// Maximum number of concurrent connections
73    pub max_connections: usize,
74
75    /// Maximum number of incoming connections
76    pub max_incoming_connections: usize,
77
78    /// DHT configuration
79    pub dht_config: DHTConfig,
80
81    /// Security configuration
82    pub security_config: SecurityConfig,
83
84    /// Production hardening configuration
85    pub production_config: Option<ProductionConfig>,
86
87    /// Bootstrap cache configuration
88    pub bootstrap_cache_config: Option<crate::bootstrap::CacheConfig>,
89
90    /// Optional IP diversity configuration for Sybil protection tuning.
91    ///
92    /// When set, this configuration is used by bootstrap peer discovery and
93    /// other diversity-enforcing subsystems. If `None`, defaults are used.
94    pub diversity_config: Option<crate::security::IPDiversityConfig>,
95}
96
97/// DHT-specific configuration
98#[derive(Debug, Clone, Serialize, Deserialize)]
99pub struct DHTConfig {
100    /// Kademlia K parameter (bucket size)
101    pub k_value: usize,
102
103    /// Kademlia alpha parameter (parallelism)
104    pub alpha_value: usize,
105
106    /// DHT record TTL
107    pub record_ttl: Duration,
108
109    /// DHT refresh interval
110    pub refresh_interval: Duration,
111}
112
113/// Security configuration
114#[derive(Debug, Clone, Serialize, Deserialize)]
115pub struct SecurityConfig {
116    /// Enable noise protocol for encryption
117    pub enable_noise: bool,
118
119    /// Enable TLS for secure transport
120    pub enable_tls: bool,
121
122    /// Trust level for peer verification
123    pub trust_level: TrustLevel,
124}
125
126/// Trust level for peer verification
127#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
128pub enum TrustLevel {
129    /// No verification required
130    None,
131    /// Basic peer ID verification
132    Basic,
133    /// Full cryptographic verification
134    Full,
135}
136
137impl NodeConfig {
138    /// Create a new NodeConfig with default values
139    ///
140    /// # Errors
141    ///
142    /// Returns an error if default addresses cannot be parsed
143    pub fn new() -> Result<Self> {
144        // Load config and use its defaults
145        let config = Config::default();
146
147        // Parse the default listen address
148        let listen_addr = config.listen_socket_addr()?;
149
150        // Create listen addresses based on config
151        let mut listen_addrs = vec![];
152
153        // Add IPv6 address if enabled
154        if config.network.ipv6_enabled {
155            let ipv6_addr = std::net::SocketAddr::new(
156                std::net::IpAddr::V6(std::net::Ipv6Addr::UNSPECIFIED),
157                listen_addr.port(),
158            );
159            listen_addrs.push(ipv6_addr);
160        }
161
162        // Always add IPv4
163        let ipv4_addr = std::net::SocketAddr::new(
164            std::net::IpAddr::V4(std::net::Ipv4Addr::UNSPECIFIED),
165            listen_addr.port(),
166        );
167        listen_addrs.push(ipv4_addr);
168
169        Ok(Self {
170            peer_id: None,
171            listen_addrs,
172            listen_addr,
173            bootstrap_peers: Vec::new(),
174            bootstrap_peers_str: config.network.bootstrap_nodes.clone(),
175            enable_ipv6: config.network.ipv6_enabled,
176
177            connection_timeout: Duration::from_secs(config.network.connection_timeout),
178            keep_alive_interval: Duration::from_secs(config.network.keepalive_interval),
179            max_connections: config.network.max_connections,
180            max_incoming_connections: config.security.connection_limit as usize,
181            dht_config: DHTConfig::default(),
182            security_config: SecurityConfig::default(),
183            production_config: None,
184            bootstrap_cache_config: None,
185            diversity_config: None,
186            // identity_config: None,
187        })
188    }
189}
190
191impl Default for NodeConfig {
192    fn default() -> Self {
193        // Use config defaults for network settings
194        let config = Config::default();
195
196        // Parse the default listen address - use safe fallback if parsing fails
197        let listen_addr = config.listen_socket_addr().unwrap_or_else(|_| {
198            std::net::SocketAddr::new(
199                std::net::IpAddr::V4(std::net::Ipv4Addr::new(0, 0, 0, 0)),
200                9000,
201            )
202        });
203
204        Self {
205            peer_id: None,
206            listen_addrs: vec![
207                std::net::SocketAddr::new(
208                    std::net::IpAddr::V6(std::net::Ipv6Addr::UNSPECIFIED),
209                    listen_addr.port(),
210                ),
211                std::net::SocketAddr::new(
212                    std::net::IpAddr::V4(std::net::Ipv4Addr::UNSPECIFIED),
213                    listen_addr.port(),
214                ),
215            ],
216            listen_addr,
217            bootstrap_peers: Vec::new(),
218            bootstrap_peers_str: Vec::new(),
219            enable_ipv6: config.network.ipv6_enabled,
220
221            connection_timeout: Duration::from_secs(config.network.connection_timeout),
222            keep_alive_interval: Duration::from_secs(config.network.keepalive_interval),
223            max_connections: config.network.max_connections,
224            max_incoming_connections: config.security.connection_limit as usize,
225            dht_config: DHTConfig::default(),
226            security_config: SecurityConfig::default(),
227            production_config: None, // Use default production config if enabled
228            bootstrap_cache_config: None,
229            diversity_config: None,
230            // identity_config: None, // Use default identity config if enabled
231        }
232    }
233}
234
235impl NodeConfig {
236    /// Create NodeConfig from Config
237    pub fn from_config(config: &Config) -> Result<Self> {
238        let listen_addr = config.listen_socket_addr()?;
239        let bootstrap_addrs = config.bootstrap_addrs()?;
240
241        let mut node_config = Self {
242            peer_id: None,
243            listen_addrs: vec![listen_addr],
244            listen_addr,
245            bootstrap_peers: bootstrap_addrs
246                .iter()
247                .map(|addr| addr.socket_addr())
248                .collect(),
249            bootstrap_peers_str: config
250                .network
251                .bootstrap_nodes
252                .iter()
253                .map(|addr| addr.to_string())
254                .collect(),
255            enable_ipv6: config.network.ipv6_enabled,
256
257            connection_timeout: Duration::from_secs(config.network.connection_timeout),
258            keep_alive_interval: Duration::from_secs(config.network.keepalive_interval),
259            max_connections: config.network.max_connections,
260            max_incoming_connections: config.security.connection_limit as usize,
261            dht_config: DHTConfig {
262                k_value: 20,
263                alpha_value: 3,
264                record_ttl: Duration::from_secs(3600),
265                refresh_interval: Duration::from_secs(900),
266            },
267            security_config: SecurityConfig {
268                enable_noise: true,
269                enable_tls: true,
270                trust_level: TrustLevel::Basic,
271            },
272            production_config: Some(ProductionConfig {
273                max_connections: config.network.max_connections,
274                max_memory_bytes: 0,  // unlimited
275                max_bandwidth_bps: 0, // unlimited
276                connection_timeout: Duration::from_secs(config.network.connection_timeout),
277                keep_alive_interval: Duration::from_secs(config.network.keepalive_interval),
278                health_check_interval: Duration::from_secs(30),
279                metrics_interval: Duration::from_secs(60),
280                enable_performance_tracking: true,
281                enable_auto_cleanup: true,
282                shutdown_timeout: Duration::from_secs(30),
283                rate_limits: crate::production::RateLimitConfig::default(),
284            }),
285            bootstrap_cache_config: None,
286            diversity_config: None,
287            // identity_config: Some(IdentityManagerConfig {
288            //     cache_ttl: Duration::from_secs(3600),
289            //     challenge_timeout: Duration::from_secs(30),
290            // }),
291        };
292
293        // Add IPv6 listen address if enabled
294        if config.network.ipv6_enabled {
295            node_config.listen_addrs.push(std::net::SocketAddr::new(
296                std::net::IpAddr::V6(std::net::Ipv6Addr::UNSPECIFIED),
297                listen_addr.port(),
298            ));
299        }
300
301        Ok(node_config)
302    }
303
304    /// Try to build a NodeConfig from a listen address string
305    pub fn with_listen_addr(addr: &str) -> Result<Self> {
306        let listen_addr: std::net::SocketAddr = addr
307            .parse()
308            .map_err(|e: std::net::AddrParseError| {
309                NetworkError::InvalidAddress(e.to_string().into())
310            })
311            .map_err(P2PError::Network)?;
312        let cfg = NodeConfig {
313            listen_addr,
314            listen_addrs: vec![listen_addr],
315            diversity_config: None,
316            ..Default::default()
317        };
318        Ok(cfg)
319    }
320}
321
322impl Default for DHTConfig {
323    fn default() -> Self {
324        Self {
325            k_value: 20,
326            alpha_value: 5,
327            record_ttl: Duration::from_secs(3600), // 1 hour
328            refresh_interval: Duration::from_secs(600), // 10 minutes
329        }
330    }
331}
332
333impl Default for SecurityConfig {
334    fn default() -> Self {
335        Self {
336            enable_noise: true,
337            enable_tls: true,
338            trust_level: TrustLevel::Basic,
339        }
340    }
341}
342
343/// Information about a connected peer
344#[derive(Debug, Clone)]
345pub struct PeerInfo {
346    /// Peer identifier
347    pub peer_id: PeerId,
348
349    /// Peer's addresses
350    pub addresses: Vec<String>,
351
352    /// Connection timestamp
353    pub connected_at: Instant,
354
355    /// Last seen timestamp
356    pub last_seen: Instant,
357
358    /// Connection status
359    pub status: ConnectionStatus,
360
361    /// Supported protocols
362    pub protocols: Vec<String>,
363
364    /// Number of heartbeats received
365    pub heartbeat_count: u64,
366}
367
368/// Connection status for a peer
369#[derive(Debug, Clone, PartialEq)]
370pub enum ConnectionStatus {
371    /// Connection is being established
372    Connecting,
373    /// Connection is established and active
374    Connected,
375    /// Connection is being closed
376    Disconnecting,
377    /// Connection is closed
378    Disconnected,
379    /// Connection failed
380    Failed(String),
381}
382
383/// Network events that can occur
384#[derive(Debug, Clone)]
385pub enum NetworkEvent {
386    /// A new peer has connected
387    PeerConnected {
388        /// The identifier of the newly connected peer
389        peer_id: PeerId,
390        /// The network addresses where the peer can be reached
391        addresses: Vec<String>,
392    },
393
394    /// A peer has disconnected
395    PeerDisconnected {
396        /// The identifier of the disconnected peer
397        peer_id: PeerId,
398        /// The reason for the disconnection
399        reason: String,
400    },
401
402    /// A message was received from a peer
403    MessageReceived {
404        /// The identifier of the sending peer
405        peer_id: PeerId,
406        /// The protocol used for the message
407        protocol: String,
408        /// The raw message data
409        data: Vec<u8>,
410    },
411
412    /// A connection attempt failed
413    ConnectionFailed {
414        /// The identifier of the peer (if known)
415        peer_id: Option<PeerId>,
416        /// The address where connection was attempted
417        address: String,
418        /// The error message describing the failure
419        error: String,
420    },
421
422    /// DHT record was stored
423    DHTRecordStored {
424        /// The DHT key where the record was stored
425        key: Vec<u8>,
426        /// The value that was stored
427        value: Vec<u8>,
428    },
429
430    /// DHT record was retrieved
431    DHTRecordRetrieved {
432        /// The DHT key that was queried
433        key: Vec<u8>,
434        /// The retrieved value, if found
435        value: Option<Vec<u8>>,
436    },
437}
438
439/// Network events that can occur in the P2P system
440///
441/// Events are broadcast to all listeners and provide real-time
442/// notifications of network state changes and message arrivals.
443#[derive(Debug, Clone)]
444pub enum P2PEvent {
445    /// Message received from a peer on a specific topic
446    Message {
447        /// Topic or channel the message was sent on
448        topic: String,
449        /// Peer ID of the message sender
450        source: PeerId,
451        /// Raw message data payload
452        data: Vec<u8>,
453    },
454    /// A new peer has connected to the network
455    PeerConnected(PeerId),
456    /// A peer has disconnected from the network
457    PeerDisconnected(PeerId),
458}
459
460/// Main P2P node structure
461/// Main P2P network node that manages connections, routing, and communication
462///
463/// This struct represents a complete P2P network participant that can:
464/// - Connect to other peers via QUIC transport
465/// - Participate in distributed hash table (DHT) operations
466/// - Send and receive messages through various protocols
467/// - Handle network events and peer lifecycle
468/// - Provide MCP (Model Context Protocol) services
469pub struct P2PNode {
470    /// Node configuration
471    config: NodeConfig,
472
473    /// Our peer ID
474    peer_id: PeerId,
475
476    /// Connected peers
477    peers: Arc<RwLock<HashMap<PeerId, PeerInfo>>>,
478
479    /// Network event broadcaster
480    event_tx: broadcast::Sender<P2PEvent>,
481
482    /// Listen addresses
483    listen_addrs: RwLock<Vec<std::net::SocketAddr>>,
484
485    /// Node start time
486    start_time: Instant,
487
488    /// Running state
489    running: RwLock<bool>,
490
491    /// DHT instance (optional)
492    dht: Option<Arc<RwLock<DHT>>>,
493
494    /// Production resource manager (optional)
495    resource_manager: Option<Arc<ResourceManager>>,
496
497    /// Bootstrap cache manager for peer discovery
498    bootstrap_manager: Option<Arc<RwLock<BootstrapManager>>>,
499
500    /// Dual-stack ant-quic nodes (IPv6 + IPv4) with Happy Eyeballs dialing
501    dual_node: Arc<DualStackNetworkNode>,
502
503    /// Rate limiter for connection and request throttling
504    #[allow(dead_code)]
505    rate_limiter: Arc<RateLimiter>,
506
507    /// Active connections (tracked by peer_id)
508    /// This set is synchronized with ant-quic's connection state via event monitoring
509    active_connections: Arc<RwLock<HashSet<PeerId>>>,
510
511    /// Security dashboard for monitoring
512    pub security_dashboard: Option<Arc<crate::dht::metrics::SecurityDashboard>>,
513
514    /// Connection lifecycle monitor task handle
515    #[allow(dead_code)]
516    connection_monitor_handle: Arc<RwLock<Option<tokio::task::JoinHandle<()>>>>,
517
518    /// Keepalive task handle
519    #[allow(dead_code)]
520    keepalive_handle: Arc<RwLock<Option<tokio::task::JoinHandle<()>>>>,
521
522    /// Shutdown flag for background tasks
523    #[allow(dead_code)]
524    shutdown: Arc<AtomicBool>,
525
526    /// GeoIP provider for connection validation
527    #[allow(dead_code)]
528    geo_provider: Arc<BgpGeoProvider>,
529}
530
531/// Normalize wildcard bind addresses to localhost loopback addresses
532///
533/// ant-quic correctly rejects "unspecified" addresses (0.0.0.0 and [::]) for remote connections
534/// because you cannot connect TO an unspecified address - these are only valid for BINDING.
535///
536/// This function converts wildcard addresses to appropriate loopback addresses for local connections:
537/// - IPv6 [::]:port → ::1:port (IPv6 loopback)
538/// - IPv4 0.0.0.0:port → 127.0.0.1:port (IPv4 loopback)
539/// - All other addresses pass through unchanged
540///
541/// # Arguments
542/// * `addr` - The SocketAddr to normalize
543///
544/// # Returns
545/// * Normalized SocketAddr suitable for remote connections
546fn normalize_wildcard_to_loopback(addr: std::net::SocketAddr) -> std::net::SocketAddr {
547    use std::net::{IpAddr, Ipv4Addr, Ipv6Addr};
548
549    if addr.ip().is_unspecified() {
550        // Convert unspecified addresses to loopback
551        let loopback_ip = match addr {
552            std::net::SocketAddr::V6(_) => IpAddr::V6(Ipv6Addr::LOCALHOST), // ::1
553            std::net::SocketAddr::V4(_) => IpAddr::V4(Ipv4Addr::LOCALHOST), // 127.0.0.1
554        };
555        std::net::SocketAddr::new(loopback_ip, addr.port())
556    } else {
557        // Not a wildcard address, pass through unchanged
558        addr
559    }
560}
561
562impl P2PNode {
563    /// Minimal constructor for tests that avoids real networking
564    pub fn new_for_tests() -> Result<Self> {
565        let (event_tx, _) = broadcast::channel(16);
566        Ok(Self {
567            config: NodeConfig::default(),
568            peer_id: "test_peer".to_string(),
569            peers: Arc::new(RwLock::new(HashMap::new())),
570            event_tx,
571            listen_addrs: RwLock::new(Vec::new()),
572            start_time: Instant::now(),
573            running: RwLock::new(false),
574            dht: None,
575            resource_manager: None,
576            bootstrap_manager: None,
577            dual_node: {
578                // Bind dual-stack nodes on ephemeral ports for tests
579                let v6: Option<std::net::SocketAddr> = "[::1]:0"
580                    .parse()
581                    .ok()
582                    .or(Some(std::net::SocketAddr::from(([0, 0, 0, 0], 0))));
583                let v4: Option<std::net::SocketAddr> = "127.0.0.1:0".parse().ok();
584                let handle = tokio::runtime::Handle::current();
585                let dual_attempt = handle.block_on(
586                    crate::transport::ant_quic_adapter::DualStackNetworkNode::new(v6, v4),
587                );
588                let dual = match dual_attempt {
589                    Ok(d) => d,
590                    Err(_e1) => {
591                        // Fallback to IPv4-only ephemeral bind
592                        let fallback = handle.block_on(
593                            crate::transport::ant_quic_adapter::DualStackNetworkNode::new(
594                                None,
595                                "127.0.0.1:0".parse().ok(),
596                            ),
597                        );
598                        match fallback {
599                            Ok(d) => d,
600                            Err(e2) => {
601                                return Err(P2PError::Network(NetworkError::BindError(
602                                    format!("Failed to create dual-stack network node: {}", e2)
603                                        .into(),
604                                )));
605                            }
606                        }
607                    }
608                };
609                Arc::new(dual)
610            },
611            rate_limiter: Arc::new(RateLimiter::new(RateLimitConfig {
612                max_requests: 100,
613                burst_size: 100,
614                window: std::time::Duration::from_secs(1),
615                ..Default::default()
616            })),
617            active_connections: Arc::new(RwLock::new(HashSet::new())),
618            connection_monitor_handle: Arc::new(RwLock::new(None)),
619            keepalive_handle: Arc::new(RwLock::new(None)),
620            shutdown: Arc::new(AtomicBool::new(false)),
621            geo_provider: Arc::new(BgpGeoProvider::new()),
622            security_dashboard: None,
623        })
624    }
625    /// Create a new P2P node with the given configuration
626    pub async fn new(config: NodeConfig) -> Result<Self> {
627        let peer_id = config.peer_id.clone().unwrap_or_else(|| {
628            // Generate a random peer ID for now
629            format!("peer_{}", &uuid::Uuid::new_v4().to_string()[..8])
630        });
631
632        let (event_tx, _) = broadcast::channel(1000);
633
634        // Initialize and register a TrustWeightedKademlia DHT for the global API
635        // Use a deterministic local NodeId derived from the peer_id
636        {
637            use blake3::Hasher;
638            let mut hasher = Hasher::new();
639            hasher.update(peer_id.as_bytes());
640            let digest = hasher.finalize();
641            let mut nid = [0u8; 32];
642            nid.copy_from_slice(digest.as_bytes());
643            let _twdht = std::sync::Arc::new(crate::dht::TrustWeightedKademlia::new(
644                crate::identity::node_identity::NodeId::from_bytes(nid),
645            ));
646            // TODO: Update to use new clean API
647            // let _ = crate::api::set_dht_instance(twdht);
648        }
649
650        // Initialize DHT if needed
651        let (dht, security_dashboard) = if true {
652            // Assuming DHT is always enabled for now, or check config
653            let _dht_config = crate::dht::DHTConfig {
654                replication_factor: config.dht_config.k_value,
655                bucket_size: config.dht_config.k_value,
656                alpha: config.dht_config.alpha_value,
657                record_ttl: config.dht_config.record_ttl,
658                bucket_refresh_interval: config.dht_config.refresh_interval,
659                republish_interval: config.dht_config.refresh_interval,
660                max_distance: 160,
661            };
662            // Convert peer_id String to NodeId
663            let peer_bytes = peer_id.as_bytes();
664            let mut node_id_bytes = [0u8; 32];
665            let len = peer_bytes.len().min(32);
666            node_id_bytes[..len].copy_from_slice(&peer_bytes[..len]);
667            let node_id = crate::dht::core_engine::NodeId::from_bytes(node_id_bytes);
668            let dht_instance = DHT::new(node_id).map_err(|e| {
669                crate::error::P2PError::Dht(crate::error::DhtError::StoreFailed(
670                    e.to_string().into(),
671                ))
672            })?;
673            dht_instance.start_maintenance_tasks();
674
675            // Create Security Dashboard
676            let security_metrics = dht_instance.security_metrics();
677            let dashboard = crate::dht::metrics::SecurityDashboard::new(
678                security_metrics,
679                Arc::new(crate::dht::metrics::DhtMetricsCollector::new()),
680                Arc::new(crate::dht::metrics::TrustMetricsCollector::new()),
681                Arc::new(crate::dht::metrics::PlacementMetricsCollector::new()),
682            );
683
684            (
685                Some(Arc::new(RwLock::new(dht_instance))),
686                Some(Arc::new(dashboard)),
687            )
688        } else {
689            (None, None)
690        };
691
692        // MCP removed
693
694        // Initialize production resource manager if configured
695        let resource_manager = config
696            .production_config
697            .clone()
698            .map(|prod_config| Arc::new(ResourceManager::new(prod_config)));
699
700        // Initialize bootstrap cache manager
701        let diversity_config = config.diversity_config.clone().unwrap_or_default();
702        let bootstrap_manager = if let Some(ref cache_config) = config.bootstrap_cache_config {
703            match BootstrapManager::with_full_config(
704                cache_config.clone(),
705                crate::rate_limit::JoinRateLimiterConfig::default(),
706                diversity_config.clone(),
707            )
708            .await
709            {
710                Ok(manager) => Some(Arc::new(RwLock::new(manager))),
711                Err(e) => {
712                    warn!(
713                        "Failed to initialize bootstrap manager: {}, continuing without cache",
714                        e
715                    );
716                    None
717                }
718            }
719        } else {
720            match BootstrapManager::with_full_config(
721                crate::bootstrap::CacheConfig::default(),
722                crate::rate_limit::JoinRateLimiterConfig::default(),
723                diversity_config,
724            )
725            .await
726            {
727                Ok(manager) => Some(Arc::new(RwLock::new(manager))),
728                Err(e) => {
729                    warn!(
730                        "Failed to initialize bootstrap manager: {}, continuing without cache",
731                        e
732                    );
733                    None
734                }
735            }
736        };
737
738        // Initialize dual-stack ant-quic nodes
739        // Determine bind addresses
740        let (v6_opt, v4_opt) = {
741            let port = config.listen_addr.port();
742            let ip = config.listen_addr.ip();
743
744            let v4_addr = if ip.is_ipv4() {
745                Some(std::net::SocketAddr::new(ip, port))
746            } else {
747                // If config is IPv6, we still might want IPv4 on UNSPECIFIED if dual stack is desired
748                // But for now let's just stick to defaults if not specified
749                Some(std::net::SocketAddr::new(
750                    std::net::IpAddr::V4(std::net::Ipv4Addr::UNSPECIFIED),
751                    port,
752                ))
753            };
754
755            let v6_addr = if config.enable_ipv6 {
756                if ip.is_ipv6() {
757                    Some(std::net::SocketAddr::new(ip, port))
758                } else {
759                    Some(std::net::SocketAddr::new(
760                        std::net::IpAddr::V6(std::net::Ipv6Addr::UNSPECIFIED),
761                        port,
762                    ))
763                }
764            } else {
765                None
766            };
767            (v6_addr, v4_addr)
768        };
769
770        let dual_node = Arc::new(
771            DualStackNetworkNode::new(v6_opt, v4_opt)
772                .await
773                .map_err(|e| {
774                    P2PError::Transport(crate::error::TransportError::SetupFailed(
775                        format!("Failed to create dual-stack network nodes: {}", e).into(),
776                    ))
777                })?,
778        );
779
780        // Initialize rate limiter with default config
781        let rate_limiter = Arc::new(RateLimiter::new(
782            crate::validation::RateLimitConfig::default(),
783        ));
784
785        // Create active connections tracker
786        let active_connections = Arc::new(RwLock::new(HashSet::new()));
787
788        // Initialize GeoIP provider
789        let geo_provider = Arc::new(BgpGeoProvider::new());
790
791        // Create peers map
792        let peers = Arc::new(RwLock::new(HashMap::new()));
793
794        // Start connection lifecycle monitor
795        let connection_monitor_handle = {
796            let active_conns = Arc::clone(&active_connections);
797            let peers_map = Arc::clone(&peers);
798            let event_tx_clone = event_tx.clone();
799            let dual_node_clone = Arc::clone(&dual_node);
800            let geo_provider_clone = Arc::clone(&geo_provider);
801            let peer_id_clone = peer_id.clone();
802
803            let handle = tokio::spawn(async move {
804                Self::connection_lifecycle_monitor(
805                    dual_node_clone,
806                    active_conns,
807                    peers_map,
808                    event_tx_clone,
809                    geo_provider_clone,
810                    peer_id_clone,
811                )
812                .await;
813            });
814
815            Arc::new(RwLock::new(Some(handle)))
816        };
817
818        // Spawn keepalive task
819        let shutdown = Arc::new(AtomicBool::new(false));
820        let keepalive_handle = {
821            let active_conns = Arc::clone(&active_connections);
822            let dual_node_clone = Arc::clone(&dual_node);
823            let shutdown_clone = Arc::clone(&shutdown);
824
825            let handle = tokio::spawn(async move {
826                Self::keepalive_task(active_conns, dual_node_clone, shutdown_clone).await;
827            });
828
829            Arc::new(RwLock::new(Some(handle)))
830        };
831
832        let node = Self {
833            config,
834            peer_id,
835            peers,
836            event_tx,
837            listen_addrs: RwLock::new(Vec::new()),
838            start_time: Instant::now(),
839            running: RwLock::new(false),
840            dht,
841            resource_manager,
842            bootstrap_manager,
843            dual_node,
844            rate_limiter,
845            active_connections,
846            security_dashboard,
847            connection_monitor_handle,
848            keepalive_handle,
849            shutdown,
850            geo_provider,
851        };
852        info!("Created P2P node with peer ID: {}", node.peer_id);
853
854        // Start the network listeners to populate listen addresses
855        node.start_network_listeners().await?;
856
857        // Update the connection monitor with actual peers reference
858        node.start_connection_monitor().await;
859
860        Ok(node)
861    }
862
863    /// Create a new node builder
864    pub fn builder() -> NodeBuilder {
865        NodeBuilder::new()
866    }
867
868    /// Get the peer ID of this node
869    pub fn peer_id(&self) -> &PeerId {
870        &self.peer_id
871    }
872
873    pub fn local_addr(&self) -> Option<String> {
874        self.listen_addrs
875            .try_read()
876            .ok()
877            .and_then(|addrs| addrs.first().map(|a| a.to_string()))
878    }
879
880    pub async fn subscribe(&self, topic: &str) -> Result<()> {
881        // In a real implementation, this would register the topic with the pubsub mechanism.
882        // For now, we just log it.
883        info!("Subscribed to topic: {}", topic);
884        Ok(())
885    }
886
887    pub async fn publish(&self, topic: &str, data: &[u8]) -> Result<()> {
888        info!(
889            "Publishing message to topic: {} ({} bytes)",
890            topic,
891            data.len()
892        );
893
894        // Get list of connected peers
895        let peer_list: Vec<PeerId> = {
896            let peers_guard = self.peers.read().await;
897            peers_guard.keys().cloned().collect()
898        };
899
900        if peer_list.is_empty() {
901            debug!("No peers connected, message will only be sent to local subscribers");
902        } else {
903            // Send message to all connected peers
904            let mut send_count = 0;
905            for peer_id in &peer_list {
906                match self.send_message(peer_id, topic, data.to_vec()).await {
907                    Ok(_) => {
908                        send_count += 1;
909                        debug!("Sent message to peer: {}", peer_id);
910                    }
911                    Err(e) => {
912                        warn!("Failed to send message to peer {}: {}", peer_id, e);
913                    }
914                }
915            }
916            info!(
917                "Published message to {}/{} connected peers",
918                send_count,
919                peer_list.len()
920            );
921        }
922
923        // Also send to local subscribers (for local echo and testing)
924        let event = P2PEvent::Message {
925            topic: topic.to_string(),
926            source: self.peer_id.clone(),
927            data: data.to_vec(),
928        };
929        let _ = self.event_tx.send(event);
930
931        Ok(())
932    }
933
934    /// Get the node configuration
935    pub fn config(&self) -> &NodeConfig {
936        &self.config
937    }
938
939    /// Start the P2P node
940    pub async fn start(&self) -> Result<()> {
941        info!("Starting P2P node...");
942
943        // Start production resource manager if configured
944        if let Some(ref resource_manager) = self.resource_manager {
945            resource_manager.start().await.map_err(|e| {
946                P2PError::Network(crate::error::NetworkError::ProtocolError(
947                    format!("Failed to start resource manager: {e}").into(),
948                ))
949            })?;
950            info!("Production resource manager started");
951        }
952
953        // Start bootstrap manager background tasks
954        if let Some(ref bootstrap_manager) = self.bootstrap_manager {
955            let mut manager = bootstrap_manager.write().await;
956            manager.start_background_tasks().await.map_err(|e| {
957                P2PError::Network(crate::error::NetworkError::ProtocolError(
958                    format!("Failed to start bootstrap manager: {e}").into(),
959                ))
960            })?;
961            info!("Bootstrap cache manager started");
962        }
963
964        // Set running state
965        *self.running.write().await = true;
966
967        // Start listening on configured addresses using transport layer
968        self.start_network_listeners().await?;
969
970        // Log current listen addresses
971        let listen_addrs = self.listen_addrs.read().await;
972        info!("P2P node started on addresses: {:?}", *listen_addrs);
973
974        // MCP removed
975
976        // Start message receiving system
977        self.start_message_receiving_system().await?;
978
979        // Connect to bootstrap peers
980        self.connect_bootstrap_peers().await?;
981
982        Ok(())
983    }
984
985    /// Start network listeners on configured addresses
986    async fn start_network_listeners(&self) -> Result<()> {
987        info!("Starting dual-stack listeners (ant-quic)...");
988        // Update our listen_addrs from the dual node bindings
989        let addrs = self.dual_node.local_addrs().await.map_err(|e| {
990            P2PError::Transport(crate::error::TransportError::SetupFailed(
991                format!("Failed to get local addresses: {}", e).into(),
992            ))
993        })?;
994        {
995            let mut la = self.listen_addrs.write().await;
996            *la = addrs.clone();
997        }
998
999        // Spawn a background accept loop that handles incoming connections from either stack
1000        let event_tx = self.event_tx.clone();
1001        let peers = self.peers.clone();
1002        let active_connections = self.active_connections.clone();
1003        let rate_limiter = self.rate_limiter.clone();
1004        let dual = self.dual_node.clone();
1005        tokio::spawn(async move {
1006            loop {
1007                match dual.accept_any().await {
1008                    Ok((ant_peer_id, remote_sock)) => {
1009                        let peer_id =
1010                            crate::transport::ant_quic_adapter::ant_peer_id_to_string(&ant_peer_id);
1011                        let remote_addr = NetworkAddress::from(remote_sock);
1012                        // Optional: basic IP rate limiting
1013                        let _ = rate_limiter.check_ip(&remote_sock.ip());
1014                        let _ = event_tx.send(P2PEvent::PeerConnected(peer_id.clone()));
1015                        register_new_peer(&peers, &peer_id, &remote_addr).await;
1016                        active_connections.write().await.insert(peer_id);
1017                    }
1018                    Err(e) => {
1019                        warn!("Accept failed: {}", e);
1020                        tokio::time::sleep(std::time::Duration::from_millis(200)).await;
1021                    }
1022                }
1023            }
1024        });
1025
1026        info!("Dual-stack listeners active on: {:?}", addrs);
1027        Ok(())
1028    }
1029
1030    /// Start a listener on a specific socket address
1031    #[allow(dead_code)]
1032    async fn start_listener_on_address(&self, addr: std::net::SocketAddr) -> Result<()> {
1033        // use crate::transport::{Transport}; // Unused during migration
1034
1035        // DISABLED during ant-quic migration - TODO: Reimplement using AntQuicAdapter
1036        /*
1037        // Try QUIC first (preferred transport)
1038        match crate::transport::QuicTransport::new(Default::default()) {
1039            Ok(quic_transport) => {
1040                match quic_transport.listen(NetworkAddress::new(addr)).await {
1041                    Ok(listen_addrs) => {
1042                        info!("QUIC listener started on {} -> {:?}", addr, listen_addrs);
1043
1044                        // Store the actual listening addresses in the node
1045                        {
1046                            let mut node_listen_addrs = self.listen_addrs.write().await;
1047                            // Don't clear - accumulate addresses from multiple listeners
1048                            node_listen_addrs.push(listen_addrs.socket_addr());
1049                        }
1050
1051                        // Start accepting connections in background
1052                        self.start_connection_acceptor(
1053                            Arc::new(quic_transport),
1054                            addr,
1055                            crate::transport::TransportType::QUIC
1056                        ).await?;
1057
1058                        return Ok(());
1059                    }
1060                    Err(e) => {
1061                        warn!("Failed to start QUIC listener on {}: {}", addr, e);
1062                    }
1063                }
1064            }
1065            Err(e) => {
1066                warn!("Failed to create QUIC transport for listening: {}", e);
1067            }
1068        }
1069        */
1070
1071        warn!("QUIC transport temporarily disabled during ant-quic migration");
1072        // No TCP fallback - QUIC only
1073        Err(crate::P2PError::Transport(
1074            crate::error::TransportError::SetupFailed(
1075                format!(
1076                    "Failed to start QUIC listener on {addr} - transport disabled during migration"
1077                )
1078                .into(),
1079            ),
1080        ))
1081    }
1082
1083    /// Start connection acceptor background task
1084    #[allow(dead_code)] // Deprecated during ant-quic migration
1085    async fn start_connection_acceptor(
1086        &self,
1087        transport: Arc<dyn crate::transport::Transport>,
1088        addr: std::net::SocketAddr,
1089        transport_type: crate::transport::TransportType,
1090    ) -> Result<()> {
1091        info!(
1092            "Starting connection acceptor for {:?} on {}",
1093            transport_type, addr
1094        );
1095
1096        // Clone necessary data for the background task
1097        let event_tx = self.event_tx.clone();
1098        let _peer_id = self.peer_id.clone();
1099        let peers = Arc::clone(&self.peers);
1100        // ant-quic dual-stack node is managed separately; accept loop started in start_network_listeners
1101
1102        let rate_limiter = Arc::clone(&self.rate_limiter);
1103
1104        // Spawn background task to accept incoming connections
1105        tokio::spawn(async move {
1106            loop {
1107                match transport.accept().await {
1108                    Ok(connection) => {
1109                        let remote_addr = connection.remote_addr();
1110                        let connection_peer_id =
1111                            format!("peer_from_{}", remote_addr.to_string().replace(":", "_"));
1112
1113                        // Apply rate limiting for incoming connections
1114                        let socket_addr = remote_addr.socket_addr();
1115                        if check_rate_limit(&rate_limiter, &socket_addr, &remote_addr).is_err() {
1116                            // Connection dropped automatically when it goes out of scope
1117                            continue;
1118                        }
1119
1120                        info!(
1121                            "Accepted {:?} connection from {} (peer: {})",
1122                            transport_type, remote_addr, connection_peer_id
1123                        );
1124
1125                        // Generate peer connected event
1126                        let _ = event_tx.send(P2PEvent::PeerConnected(connection_peer_id.clone()));
1127
1128                        // Store the peer connection
1129                        register_new_peer(&peers, &connection_peer_id, &remote_addr).await;
1130
1131                        // Spawn task to handle this specific connection's messages
1132                        spawn_connection_handler(
1133                            connection,
1134                            connection_peer_id,
1135                            event_tx.clone(),
1136                            Arc::clone(&peers),
1137                        );
1138                    }
1139                    Err(e) => {
1140                        warn!(
1141                            "Failed to accept {:?} connection on {}: {}",
1142                            transport_type, addr, e
1143                        );
1144
1145                        // Brief pause before retrying to avoid busy loop
1146                        tokio::time::sleep(std::time::Duration::from_millis(1000)).await;
1147                    }
1148                }
1149            }
1150        });
1151
1152        info!(
1153            "Connection acceptor background task started for {:?} on {}",
1154            transport_type, addr
1155        );
1156        Ok(())
1157    }
1158
1159    /// Start the message receiving system with background tasks
1160    async fn start_message_receiving_system(&self) -> Result<()> {
1161        info!("Starting message receiving system");
1162        let dual = self.dual_node.clone();
1163        let event_tx = self.event_tx.clone();
1164
1165        tokio::spawn(async move {
1166            loop {
1167                match dual.receive_any().await {
1168                    Ok((_peer_id, bytes)) => {
1169                        // Expect the JSON message wrapper from create_protocol_message
1170                        #[allow(clippy::collapsible_if)]
1171                        if let Ok(value) = serde_json::from_slice::<serde_json::Value>(&bytes) {
1172                            if let (Some(protocol), Some(data), Some(from)) = (
1173                                value.get("protocol").and_then(|v| v.as_str()),
1174                                value.get("data").and_then(|v| v.as_array()),
1175                                value.get("from").and_then(|v| v.as_str()),
1176                            ) {
1177                                let payload: Vec<u8> = data
1178                                    .iter()
1179                                    .filter_map(|v| v.as_u64().map(|n| n as u8))
1180                                    .collect();
1181                                let _ = event_tx.send(P2PEvent::Message {
1182                                    topic: protocol.to_string(),
1183                                    source: from.to_string(),
1184                                    data: payload,
1185                                });
1186                            }
1187                        }
1188                    }
1189                    Err(e) => {
1190                        warn!("Receive error: {}", e);
1191                        tokio::time::sleep(std::time::Duration::from_millis(50)).await;
1192                    }
1193                }
1194            }
1195        });
1196
1197        Ok(())
1198    }
1199
1200    /// Handle a received message and generate appropriate events
1201    #[allow(dead_code)]
1202    async fn handle_received_message(
1203        &self,
1204        message_data: Vec<u8>,
1205        peer_id: &PeerId,
1206        _protocol: &str,
1207        event_tx: &broadcast::Sender<P2PEvent>,
1208    ) -> Result<()> {
1209        // MCP removed: no special protocol handling
1210
1211        // Parse the message format we created in create_protocol_message
1212        match serde_json::from_slice::<serde_json::Value>(&message_data) {
1213            Ok(message) => {
1214                if let (Some(protocol), Some(data), Some(from)) = (
1215                    message.get("protocol").and_then(|v| v.as_str()),
1216                    message.get("data").and_then(|v| v.as_array()),
1217                    message.get("from").and_then(|v| v.as_str()),
1218                ) {
1219                    // Convert data array back to bytes
1220                    let data_bytes: Vec<u8> = data
1221                        .iter()
1222                        .filter_map(|v| v.as_u64().map(|n| n as u8))
1223                        .collect();
1224
1225                    // Generate message event
1226                    let event = P2PEvent::Message {
1227                        topic: protocol.to_string(),
1228                        source: from.to_string(),
1229                        data: data_bytes,
1230                    };
1231
1232                    let _ = event_tx.send(event);
1233                    debug!("Generated message event from peer: {}", peer_id);
1234                }
1235            }
1236            Err(e) => {
1237                warn!("Failed to parse received message from {}: {}", peer_id, e);
1238            }
1239        }
1240
1241        Ok(())
1242    }
1243
1244    // MCP removed
1245
1246    // MCP removed
1247
1248    /// Run the P2P node (blocks until shutdown)
1249    pub async fn run(&self) -> Result<()> {
1250        if !*self.running.read().await {
1251            self.start().await?;
1252        }
1253
1254        info!("P2P node running...");
1255
1256        // Main event loop
1257        loop {
1258            if !*self.running.read().await {
1259                break;
1260            }
1261
1262            // Perform periodic tasks
1263            self.periodic_tasks().await?;
1264
1265            // Sleep for a short interval
1266            tokio::time::sleep(Duration::from_millis(100)).await;
1267        }
1268
1269        info!("P2P node stopped");
1270        Ok(())
1271    }
1272
1273    /// Stop the P2P node
1274    pub async fn stop(&self) -> Result<()> {
1275        info!("Stopping P2P node...");
1276
1277        // Set running state to false
1278        *self.running.write().await = false;
1279
1280        // Disconnect all peers
1281        self.disconnect_all_peers().await?;
1282
1283        // Shutdown production resource manager if configured
1284        if let Some(ref resource_manager) = self.resource_manager {
1285            resource_manager.shutdown().await.map_err(|e| {
1286                P2PError::Network(crate::error::NetworkError::ProtocolError(
1287                    format!("Failed to shutdown resource manager: {e}").into(),
1288                ))
1289            })?;
1290            info!("Production resource manager stopped");
1291        }
1292
1293        info!("P2P node stopped");
1294        Ok(())
1295    }
1296
1297    /// Graceful shutdown alias for tests
1298    pub async fn shutdown(&self) -> Result<()> {
1299        self.stop().await
1300    }
1301
1302    /// Check if the node is running
1303    pub async fn is_running(&self) -> bool {
1304        *self.running.read().await
1305    }
1306
1307    /// Get the current listen addresses
1308    pub async fn listen_addrs(&self) -> Vec<std::net::SocketAddr> {
1309        self.listen_addrs.read().await.clone()
1310    }
1311
1312    /// Get connected peers
1313    pub async fn connected_peers(&self) -> Vec<PeerId> {
1314        self.peers.read().await.keys().cloned().collect()
1315    }
1316
1317    /// Get peer count
1318    pub async fn peer_count(&self) -> usize {
1319        self.peers.read().await.len()
1320    }
1321
1322    /// Get peer info
1323    pub async fn peer_info(&self, peer_id: &PeerId) -> Option<PeerInfo> {
1324        self.peers.read().await.get(peer_id).cloned()
1325    }
1326
1327    /// Get the peer ID for a given socket address, if connected
1328    ///
1329    /// This method searches through all connected peers to find one that has
1330    /// the specified address in its address list.
1331    ///
1332    /// # Arguments
1333    /// * `addr` - The socket address to search for (e.g., "192.168.1.100:9000")
1334    ///
1335    /// # Returns
1336    /// * `Some(PeerId)` - The peer ID if a matching connected peer is found
1337    /// * `None` - If no peer with this address is currently connected
1338    pub async fn get_peer_id_by_address(&self, addr: &str) -> Option<PeerId> {
1339        // Parse the address to a SocketAddr for comparison
1340        let socket_addr: std::net::SocketAddr = addr.parse().ok()?;
1341
1342        let peers = self.peers.read().await;
1343
1344        // Search through all connected peers
1345        for (peer_id, peer_info) in peers.iter() {
1346            // Check if this peer has a matching address
1347            for peer_addr in &peer_info.addresses {
1348                if let Ok(peer_socket) = peer_addr.parse::<std::net::SocketAddr>()
1349                    && peer_socket == socket_addr
1350                {
1351                    return Some(peer_id.clone());
1352                }
1353            }
1354        }
1355
1356        None
1357    }
1358
1359    /// List all active connections with their peer IDs and addresses
1360    ///
1361    /// # Returns
1362    /// A vector of tuples containing (PeerId, Vec<String>) where the Vec<String>
1363    /// contains all known addresses for that peer.
1364    pub async fn list_active_connections(&self) -> Vec<(PeerId, Vec<String>)> {
1365        let peers = self.peers.read().await;
1366
1367        peers
1368            .iter()
1369            .map(|(peer_id, peer_info)| (peer_id.clone(), peer_info.addresses.clone()))
1370            .collect()
1371    }
1372
1373    /// Remove a peer from the peers map
1374    ///
1375    /// This method removes a peer from the internal peers map. It should be used
1376    /// when a connection is no longer valid (e.g., after detecting that the underlying
1377    /// ant-quic connection has closed).
1378    ///
1379    /// # Arguments
1380    /// * `peer_id` - The ID of the peer to remove
1381    ///
1382    /// # Returns
1383    /// `true` if the peer was found and removed, `false` if the peer was not in the map
1384    pub async fn remove_peer(&self, peer_id: &PeerId) -> bool {
1385        // Remove from active connections tracking
1386        self.active_connections.write().await.remove(peer_id);
1387        // Remove from peers map and return whether it existed
1388        self.peers.write().await.remove(peer_id).is_some()
1389    }
1390
1391    /// Check if a peer is connected
1392    ///
1393    /// This method checks if the peer ID exists in the peers map. Note that this
1394    /// only verifies the peer is registered - it does not guarantee the underlying
1395    /// ant-quic connection is still active. For connection validation, use `send_message`
1396    /// which will fail if the connection is closed.
1397    ///
1398    /// # Arguments
1399    /// * `peer_id` - The ID of the peer to check
1400    ///
1401    /// # Returns
1402    /// `true` if the peer exists in the peers map, `false` otherwise
1403    pub async fn is_peer_connected(&self, peer_id: &PeerId) -> bool {
1404        self.peers.read().await.contains_key(peer_id)
1405    }
1406
1407    /// Connect to a peer
1408    pub async fn connect_peer(&self, address: &str) -> Result<PeerId> {
1409        info!("Connecting to peer at: {}", address);
1410
1411        // Check production limits if resource manager is enabled
1412        let _connection_guard = if let Some(ref resource_manager) = self.resource_manager {
1413            Some(resource_manager.acquire_connection().await?)
1414        } else {
1415            None
1416        };
1417
1418        // Parse the address to SocketAddr format
1419        let socket_addr: std::net::SocketAddr = address.parse().map_err(|e| {
1420            P2PError::Network(crate::error::NetworkError::InvalidAddress(
1421                format!("{}: {}", address, e).into(),
1422            ))
1423        })?;
1424
1425        // Normalize wildcard addresses to loopback for local connections
1426        // This converts [::]:port → ::1:port and 0.0.0.0:port → 127.0.0.1:port
1427        let normalized_addr = normalize_wildcard_to_loopback(socket_addr);
1428        if normalized_addr != socket_addr {
1429            info!(
1430                "Normalized wildcard address {} to loopback {}",
1431                socket_addr, normalized_addr
1432            );
1433        }
1434
1435        // Establish a real connection via dual-stack Happy Eyeballs, but cap the wait
1436        let addr_list = vec![normalized_addr];
1437        let peer_id = match tokio::time::timeout(
1438            self.config.connection_timeout,
1439            self.dual_node.connect_happy_eyeballs(&addr_list),
1440        )
1441        .await
1442        {
1443            Ok(Ok(peer)) => {
1444                let connected_peer_id =
1445                    crate::transport::ant_quic_adapter::ant_peer_id_to_string(&peer);
1446                info!("Successfully connected to peer: {}", connected_peer_id);
1447                connected_peer_id
1448            }
1449            Ok(Err(e)) => {
1450                warn!("Failed to connect to peer at {}: {}", address, e);
1451                let sanitized_address = address.replace(['/', ':'], "_");
1452                let demo_peer_id = format!("peer_from_{}", sanitized_address);
1453                warn!(
1454                    "Using demo peer ID: {} (transport connection failed)",
1455                    demo_peer_id
1456                );
1457                demo_peer_id
1458            }
1459            Err(_) => {
1460                warn!(
1461                    "Timed out connecting to peer at {} after {:?}",
1462                    address, self.config.connection_timeout
1463                );
1464                let sanitized_address = address.replace(['/', ':'], "_");
1465                let demo_peer_id = format!("peer_from_{}", sanitized_address);
1466                demo_peer_id
1467            }
1468        };
1469
1470        // Create peer info with connection details
1471        let peer_info = PeerInfo {
1472            peer_id: peer_id.clone(),
1473            addresses: vec![address.to_string()],
1474            connected_at: Instant::now(),
1475            last_seen: Instant::now(),
1476            status: ConnectionStatus::Connected,
1477            protocols: vec!["p2p-foundation/1.0".to_string()],
1478            heartbeat_count: 0,
1479        };
1480
1481        // Store peer information
1482        self.peers.write().await.insert(peer_id.clone(), peer_info);
1483
1484        // Add to active connections tracking
1485        // This is critical for is_connection_active() to work correctly
1486        self.active_connections
1487            .write()
1488            .await
1489            .insert(peer_id.clone());
1490
1491        // Record bandwidth usage if resource manager is enabled
1492        if let Some(ref resource_manager) = self.resource_manager {
1493            resource_manager.record_bandwidth(0, 0); // Placeholder for handshake data
1494        }
1495
1496        // Emit connection event
1497        let _ = self.event_tx.send(P2PEvent::PeerConnected(peer_id.clone()));
1498
1499        info!("Connected to peer: {}", peer_id);
1500        Ok(peer_id)
1501    }
1502
1503    /// Disconnect from a peer
1504    pub async fn disconnect_peer(&self, peer_id: &PeerId) -> Result<()> {
1505        info!("Disconnecting from peer: {}", peer_id);
1506
1507        // Remove from active connections
1508        self.active_connections.write().await.remove(peer_id);
1509
1510        if let Some(mut peer_info) = self.peers.write().await.remove(peer_id) {
1511            peer_info.status = ConnectionStatus::Disconnected;
1512
1513            // Emit event
1514            let _ = self
1515                .event_tx
1516                .send(P2PEvent::PeerDisconnected(peer_id.clone()));
1517
1518            info!("Disconnected from peer: {}", peer_id);
1519        }
1520
1521        Ok(())
1522    }
1523
1524    /// Check if a connection to a peer is active
1525    pub async fn is_connection_active(&self, peer_id: &str) -> bool {
1526        self.active_connections.read().await.contains(peer_id)
1527    }
1528
1529    /// Send a message to a peer
1530    pub async fn send_message(
1531        &self,
1532        peer_id: &PeerId,
1533        protocol: &str,
1534        data: Vec<u8>,
1535    ) -> Result<()> {
1536        debug!(
1537            "Sending message to peer {} on protocol {}",
1538            peer_id, protocol
1539        );
1540
1541        // Check rate limits if resource manager is enabled
1542        if let Some(ref resource_manager) = self.resource_manager
1543            && !resource_manager
1544                .check_rate_limit(peer_id, "message")
1545                .await?
1546        {
1547            return Err(P2PError::ResourceExhausted(
1548                format!("Rate limit exceeded for peer {}", peer_id).into(),
1549            ));
1550        }
1551
1552        // Check if peer exists in peers map
1553        if !self.peers.read().await.contains_key(peer_id) {
1554            return Err(P2PError::Network(crate::error::NetworkError::PeerNotFound(
1555                peer_id.to_string().into(),
1556            )));
1557        }
1558
1559        // **NEW**: Check if the ant-quic connection is actually active
1560        // This is the critical fix for the connection state synchronization issue
1561        if !self.is_connection_active(peer_id).await {
1562            debug!(
1563                "Connection to peer {} exists in peers map but ant-quic connection is closed",
1564                peer_id
1565            );
1566
1567            // Clean up stale peer entry
1568            self.remove_peer(peer_id).await;
1569
1570            return Err(P2PError::Network(
1571                crate::error::NetworkError::ConnectionClosed {
1572                    peer_id: peer_id.to_string().into(),
1573                },
1574            ));
1575        }
1576
1577        // MCP removed: no special-case protocol validation
1578
1579        // Record bandwidth usage if resource manager is enabled
1580        if let Some(ref resource_manager) = self.resource_manager {
1581            resource_manager.record_bandwidth(data.len() as u64, 0);
1582        }
1583
1584        // Create protocol message wrapper
1585        let _message_data = self.create_protocol_message(protocol, data)?;
1586
1587        // Send via ant-quic dual-node
1588        let send_fut = self.dual_node.send_to_peer_string(peer_id, &_message_data);
1589        tokio::time::timeout(self.config.connection_timeout, send_fut)
1590            .await
1591            .map_err(|_| {
1592                P2PError::Transport(crate::error::TransportError::StreamError(
1593                    "Timed out sending message".into(),
1594                ))
1595            })?
1596            .map_err(|e| {
1597                P2PError::Transport(crate::error::TransportError::StreamError(
1598                    e.to_string().into(),
1599                ))
1600            })
1601    }
1602
1603    /// Create a protocol message wrapper
1604    fn create_protocol_message(&self, protocol: &str, data: Vec<u8>) -> Result<Vec<u8>> {
1605        use serde_json::json;
1606
1607        let timestamp = std::time::SystemTime::now()
1608            .duration_since(std::time::UNIX_EPOCH)
1609            .map_err(|e| {
1610                P2PError::Network(NetworkError::ProtocolError(
1611                    format!("System time error: {}", e).into(),
1612                ))
1613            })?
1614            .as_secs();
1615
1616        // Create a simple message format for P2P communication
1617        let message = json!({
1618            "protocol": protocol,
1619            "data": data,
1620            "from": self.peer_id,
1621            "timestamp": timestamp
1622        });
1623
1624        serde_json::to_vec(&message).map_err(|e| {
1625            P2PError::Transport(crate::error::TransportError::StreamError(
1626                format!("Failed to serialize message: {e}").into(),
1627            ))
1628        })
1629    }
1630
1631    // Note: async listen_addrs() already exists above for fetching listen addresses
1632}
1633
1634/// Create a protocol message wrapper (static version for background tasks)
1635#[allow(dead_code)]
1636fn create_protocol_message_static(protocol: &str, data: Vec<u8>) -> Result<Vec<u8>> {
1637    use serde_json::json;
1638
1639    let timestamp = std::time::SystemTime::now()
1640        .duration_since(std::time::UNIX_EPOCH)
1641        .map_err(|e| {
1642            P2PError::Network(NetworkError::ProtocolError(
1643                format!("System time error: {}", e).into(),
1644            ))
1645        })?
1646        .as_secs();
1647
1648    // Create a simple message format for P2P communication
1649    let message = json!({
1650        "protocol": protocol,
1651        "data": data,
1652        "timestamp": timestamp
1653    });
1654
1655    serde_json::to_vec(&message).map_err(|e| {
1656        P2PError::Transport(crate::error::TransportError::StreamError(
1657            format!("Failed to serialize message: {e}").into(),
1658        ))
1659    })
1660}
1661
1662impl P2PNode {
1663    /// Subscribe to network events
1664    pub fn subscribe_events(&self) -> broadcast::Receiver<P2PEvent> {
1665        self.event_tx.subscribe()
1666    }
1667
1668    /// Backwards-compat event stream accessor for tests
1669    pub fn events(&self) -> broadcast::Receiver<P2PEvent> {
1670        self.subscribe_events()
1671    }
1672
1673    /// Get node uptime
1674    pub fn uptime(&self) -> Duration {
1675        self.start_time.elapsed()
1676    }
1677
1678    // MCP removed: all MCP tool/service methods removed
1679
1680    // /// Handle MCP remote tool call with network integration
1681
1682    // /// List tools available on a specific remote peer
1683
1684    // /// Get MCP server statistics
1685
1686    /// Get production resource metrics
1687    pub async fn resource_metrics(&self) -> Result<ResourceMetrics> {
1688        if let Some(ref resource_manager) = self.resource_manager {
1689            Ok(resource_manager.get_metrics().await)
1690        } else {
1691            Err(P2PError::Network(
1692                crate::error::NetworkError::ProtocolError(
1693                    "Production resource manager not enabled".to_string().into(),
1694                ),
1695            ))
1696        }
1697    }
1698
1699    /// Connection lifecycle monitor task - processes ant-quic connection events
1700    /// and updates active_connections HashSet and peers map
1701    async fn connection_lifecycle_monitor(
1702        dual_node: Arc<DualStackNetworkNode>,
1703        active_connections: Arc<RwLock<HashSet<String>>>,
1704        peers: Arc<RwLock<HashMap<String, PeerInfo>>>,
1705        event_tx: broadcast::Sender<P2PEvent>,
1706        geo_provider: Arc<BgpGeoProvider>,
1707        local_peer_id: String,
1708    ) {
1709        use crate::transport::ant_quic_adapter::ConnectionEvent;
1710
1711        let mut event_rx = dual_node.subscribe_connection_events();
1712
1713        info!("Connection lifecycle monitor started");
1714
1715        loop {
1716            match event_rx.recv().await {
1717                Ok(event) => {
1718                    match event {
1719                        ConnectionEvent::Established {
1720                            peer_id,
1721                            remote_address,
1722                        } => {
1723                            let peer_id_str =
1724                                crate::transport::ant_quic_adapter::ant_peer_id_to_string(&peer_id);
1725                            debug!(
1726                                "Connection established: peer={}, addr={}",
1727                                peer_id_str, remote_address
1728                            );
1729
1730                            // **GeoIP Validation**
1731                            // Check if the peer's IP is allowed
1732                            let ip = remote_address.ip();
1733                            let is_rejected = match ip {
1734                                std::net::IpAddr::V4(v4) => {
1735                                    // Check if it's a hosting provider or VPN
1736                                    if let Some(asn) = geo_provider.lookup_ipv4_asn(v4) {
1737                                        geo_provider.is_hosting_asn(asn)
1738                                            || geo_provider.is_vpn_asn(asn)
1739                                    } else {
1740                                        false
1741                                    }
1742                                }
1743                                std::net::IpAddr::V6(v6) => {
1744                                    let info = geo_provider.lookup(v6);
1745                                    info.is_hosting_provider || info.is_vpn_provider
1746                                }
1747                            };
1748
1749                            if is_rejected {
1750                                info!(
1751                                    "Rejecting connection from {} ({}) due to GeoIP policy (Hosting/VPN)",
1752                                    peer_id_str, remote_address
1753                                );
1754
1755                                // Create rejection message
1756                                let rejection = RejectionMessage {
1757                                    reason: RejectionReason::GeoIpPolicy,
1758                                    message:
1759                                        "Connection rejected: Hosting/VPN providers not allowed"
1760                                            .to_string(),
1761                                    suggested_target: None, // Could suggest a different region if we knew more
1762                                };
1763
1764                                // Serialize message
1765                                if let Ok(data) = serde_json::to_vec(&rejection) {
1766                                    // Create protocol message
1767                                    let timestamp = std::time::SystemTime::now()
1768                                        .duration_since(std::time::UNIX_EPOCH)
1769                                        .unwrap_or_default()
1770                                        .as_secs();
1771
1772                                    let message = serde_json::json!({
1773                                        "protocol": "control",
1774                                        "data": data,
1775                                        "from": local_peer_id,
1776                                        "timestamp": timestamp
1777                                    });
1778
1779                                    if let Ok(msg_bytes) = serde_json::to_vec(&message) {
1780                                        // Send rejection message
1781                                        // We use send_to_peer directly on dual_node to avoid the checks in P2PNode::send_message
1782                                        // which might fail if we haven't fully registered the peer yet
1783                                        let _ = dual_node.send_to_peer(&peer_id, &msg_bytes).await;
1784
1785                                        // Give it a moment to send before disconnecting?
1786                                        // ant-quic might handle this, but a small yield is safe
1787                                        tokio::task::yield_now().await;
1788                                    }
1789                                }
1790
1791                                // Disconnect (TODO: Add disconnect method to dual_node or just drop?)
1792                                // For now, we just don't add it to active connections, effectively ignoring it
1793                                // Ideally we should actively close the connection
1794                                continue;
1795                            }
1796
1797                            // Add to active connections
1798                            active_connections.write().await.insert(peer_id_str.clone());
1799
1800                            // Update peer info or insert new
1801                            let mut peers_lock = peers.write().await;
1802                            if let Some(peer_info) = peers_lock.get_mut(&peer_id_str) {
1803                                peer_info.status = ConnectionStatus::Connected;
1804                                peer_info.connected_at = Instant::now();
1805                            } else {
1806                                // New incoming peer
1807                                debug!("Registering new incoming peer: {}", peer_id_str);
1808                                peers_lock.insert(
1809                                    peer_id_str.clone(),
1810                                    PeerInfo {
1811                                        peer_id: peer_id_str.clone(),
1812                                        addresses: vec![remote_address.to_string()],
1813                                        status: ConnectionStatus::Connected,
1814                                        last_seen: Instant::now(),
1815                                        connected_at: Instant::now(),
1816                                        protocols: Vec::new(),
1817                                        heartbeat_count: 0,
1818                                    },
1819                                );
1820                            }
1821
1822                            // Broadcast connection event
1823                            let _ = event_tx.send(P2PEvent::PeerConnected(peer_id_str));
1824                        }
1825                        ConnectionEvent::Lost { peer_id, reason } => {
1826                            let peer_id_str =
1827                                crate::transport::ant_quic_adapter::ant_peer_id_to_string(&peer_id);
1828                            debug!("Connection lost: peer={}, reason={}", peer_id_str, reason);
1829
1830                            // Remove from active connections
1831                            active_connections.write().await.remove(&peer_id_str);
1832
1833                            // Update peer info status
1834                            if let Some(peer_info) = peers.write().await.get_mut(&peer_id_str) {
1835                                peer_info.status = ConnectionStatus::Disconnected;
1836                                peer_info.last_seen = Instant::now();
1837                            }
1838
1839                            // Broadcast disconnection event
1840                            let _ = event_tx.send(P2PEvent::PeerDisconnected(peer_id_str));
1841                        }
1842                        ConnectionEvent::Failed { peer_id, reason } => {
1843                            let peer_id_str =
1844                                crate::transport::ant_quic_adapter::ant_peer_id_to_string(&peer_id);
1845                            warn!("Connection failed: peer={}, reason={}", peer_id_str, reason);
1846
1847                            // Remove from active connections
1848                            active_connections.write().await.remove(&peer_id_str);
1849
1850                            // Update peer info status
1851                            if let Some(peer_info) = peers.write().await.get_mut(&peer_id_str) {
1852                                peer_info.status = ConnectionStatus::Failed(reason.clone());
1853                            }
1854
1855                            // Broadcast disconnection event
1856                            let _ = event_tx.send(P2PEvent::PeerDisconnected(peer_id_str));
1857                        }
1858                    }
1859                }
1860                Err(broadcast::error::RecvError::Lagged(skipped)) => {
1861                    warn!(
1862                        "Connection event monitor lagged, skipped {} events",
1863                        skipped
1864                    );
1865                    continue;
1866                }
1867                Err(broadcast::error::RecvError::Closed) => {
1868                    info!("Connection event channel closed, stopping monitor");
1869                    break;
1870                }
1871            }
1872        }
1873
1874        info!("Connection lifecycle monitor stopped");
1875    }
1876
1877    /// Start connection monitor (called after node initialization)
1878    async fn start_connection_monitor(&self) {
1879        // The monitor task is already spawned in new() with a temporary peers map
1880        // This method is a placeholder for future enhancements where we might
1881        // need to restart the monitor or provide it with updated references
1882        debug!("Connection monitor already running from initialization");
1883    }
1884
1885    /// Keepalive task - sends periodic pings to prevent 30-second idle timeout
1886    ///
1887    /// ant-quic has a 30-second max_idle_timeout. This task sends a small keepalive
1888    /// message every 15 seconds (half the timeout) to all active connections to prevent
1889    /// them from timing out during periods of inactivity.
1890    async fn keepalive_task(
1891        active_connections: Arc<RwLock<HashSet<String>>>,
1892        dual_node: Arc<DualStackNetworkNode>,
1893        shutdown: Arc<AtomicBool>,
1894    ) {
1895        use tokio::time::{Duration, interval};
1896
1897        const KEEPALIVE_INTERVAL_SECS: u64 = 15; // Half of 30-second timeout
1898        const KEEPALIVE_PAYLOAD: &[u8] = b"keepalive"; // Small payload
1899
1900        let mut interval = interval(Duration::from_secs(KEEPALIVE_INTERVAL_SECS));
1901        interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip);
1902
1903        info!(
1904            "Keepalive task started (interval: {}s)",
1905            KEEPALIVE_INTERVAL_SECS
1906        );
1907
1908        loop {
1909            // Check shutdown flag first
1910            if shutdown.load(Ordering::Relaxed) {
1911                info!("Keepalive task shutting down");
1912                break;
1913            }
1914
1915            interval.tick().await;
1916
1917            // Get snapshot of active connections
1918            let peers: Vec<String> = { active_connections.read().await.iter().cloned().collect() };
1919
1920            if peers.is_empty() {
1921                trace!("Keepalive: no active connections");
1922                continue;
1923            }
1924
1925            debug!("Sending keepalive to {} active connections", peers.len());
1926
1927            // Send keepalive to each peer
1928            for peer_id in peers {
1929                match dual_node
1930                    .send_to_peer_string(&peer_id, KEEPALIVE_PAYLOAD)
1931                    .await
1932                {
1933                    Ok(_) => {
1934                        trace!("Keepalive sent to peer: {}", peer_id);
1935                    }
1936                    Err(e) => {
1937                        debug!(
1938                            "Failed to send keepalive to peer {}: {} (connection may have closed)",
1939                            peer_id, e
1940                        );
1941                        // Don't remove from active_connections here - let the lifecycle monitor handle it
1942                    }
1943                }
1944            }
1945        }
1946
1947        info!("Keepalive task stopped");
1948    }
1949
1950    /// Check system health
1951    pub async fn health_check(&self) -> Result<()> {
1952        if let Some(ref resource_manager) = self.resource_manager {
1953            resource_manager.health_check().await
1954        } else {
1955            // Basic health check without resource manager
1956            let peer_count = self.peer_count().await;
1957            if peer_count > self.config.max_connections {
1958                Err(P2PError::Network(
1959                    crate::error::NetworkError::ProtocolError(
1960                        format!("Too many connections: {peer_count}").into(),
1961                    ),
1962                ))
1963            } else {
1964                Ok(())
1965            }
1966        }
1967    }
1968
1969    /// Get production configuration (if enabled)
1970    pub fn production_config(&self) -> Option<&ProductionConfig> {
1971        self.config.production_config.as_ref()
1972    }
1973
1974    /// Check if production hardening is enabled
1975    pub fn is_production_mode(&self) -> bool {
1976        self.resource_manager.is_some()
1977    }
1978
1979    /// Get DHT reference
1980    pub fn dht(&self) -> Option<&Arc<RwLock<DHT>>> {
1981        self.dht.as_ref()
1982    }
1983
1984    /// Store a value in the DHT
1985    pub async fn dht_put(&self, key: crate::dht::Key, value: Vec<u8>) -> Result<()> {
1986        if let Some(ref dht) = self.dht {
1987            let mut dht_instance = dht.write().await;
1988            let dht_key = crate::dht::DhtKey::from_bytes(key);
1989            dht_instance
1990                .store(&dht_key, value.clone())
1991                .await
1992                .map_err(|e| {
1993                    P2PError::Dht(crate::error::DhtError::StoreFailed(
1994                        format!("{:?}: {e}", key).into(),
1995                    ))
1996                })?;
1997
1998            Ok(())
1999        } else {
2000            Err(P2PError::Dht(crate::error::DhtError::RoutingError(
2001                "DHT not enabled".to_string().into(),
2002            )))
2003        }
2004    }
2005
2006    /// Retrieve a value from the DHT
2007    pub async fn dht_get(&self, key: crate::dht::Key) -> Result<Option<Vec<u8>>> {
2008        if let Some(ref dht) = self.dht {
2009            let dht_instance = dht.read().await;
2010            let dht_key = crate::dht::DhtKey::from_bytes(key);
2011            let record_result = dht_instance.retrieve(&dht_key).await.map_err(|e| {
2012                P2PError::Dht(crate::error::DhtError::StoreFailed(
2013                    format!("Retrieve failed: {e}").into(),
2014                ))
2015            })?;
2016
2017            Ok(record_result)
2018        } else {
2019            Err(P2PError::Dht(crate::error::DhtError::RoutingError(
2020                "DHT not enabled".to_string().into(),
2021            )))
2022        }
2023    }
2024
2025    /// Add a discovered peer to the bootstrap cache
2026    pub async fn add_discovered_peer(&self, peer_id: PeerId, addresses: Vec<String>) -> Result<()> {
2027        if let Some(ref bootstrap_manager) = self.bootstrap_manager {
2028            let mut manager = bootstrap_manager.write().await;
2029            let socket_addresses: Vec<std::net::SocketAddr> = addresses
2030                .iter()
2031                .filter_map(|addr| addr.parse().ok())
2032                .collect();
2033            let contact = ContactEntry::new(peer_id, socket_addresses);
2034            manager.add_contact(contact).await.map_err(|e| {
2035                P2PError::Network(crate::error::NetworkError::ProtocolError(
2036                    format!("Failed to add peer to bootstrap cache: {e}").into(),
2037                ))
2038            })?;
2039        }
2040        Ok(())
2041    }
2042
2043    /// Update connection metrics for a peer in the bootstrap cache
2044    pub async fn update_peer_metrics(
2045        &self,
2046        peer_id: &PeerId,
2047        success: bool,
2048        latency_ms: Option<u64>,
2049        _error: Option<String>,
2050    ) -> Result<()> {
2051        if let Some(ref bootstrap_manager) = self.bootstrap_manager {
2052            let mut manager = bootstrap_manager.write().await;
2053
2054            // Create quality metrics based on the connection result
2055            let metrics = QualityMetrics {
2056                success_rate: if success { 1.0 } else { 0.0 },
2057                avg_latency_ms: latency_ms.unwrap_or(0) as f64,
2058                quality_score: if success { 0.8 } else { 0.2 }, // Initial score
2059                last_connection_attempt: chrono::Utc::now(),
2060                last_successful_connection: if success {
2061                    chrono::Utc::now()
2062                } else {
2063                    chrono::Utc::now() - chrono::Duration::hours(1)
2064                },
2065                uptime_score: 0.5,
2066            };
2067
2068            manager
2069                .update_contact_metrics(peer_id, metrics)
2070                .await
2071                .map_err(|e| {
2072                    P2PError::Network(crate::error::NetworkError::ProtocolError(
2073                        format!("Failed to update peer metrics: {e}").into(),
2074                    ))
2075                })?;
2076        }
2077        Ok(())
2078    }
2079
2080    /// Get bootstrap cache statistics
2081    pub async fn get_bootstrap_cache_stats(&self) -> Result<Option<crate::bootstrap::CacheStats>> {
2082        if let Some(ref bootstrap_manager) = self.bootstrap_manager {
2083            let manager = bootstrap_manager.read().await;
2084            let stats = manager.get_stats().await.map_err(|e| {
2085                P2PError::Network(crate::error::NetworkError::ProtocolError(
2086                    format!("Failed to get bootstrap stats: {e}").into(),
2087                ))
2088            })?;
2089            Ok(Some(stats))
2090        } else {
2091            Ok(None)
2092        }
2093    }
2094
2095    /// Get the number of cached bootstrap peers
2096    pub async fn cached_peer_count(&self) -> usize {
2097        if let Some(ref _bootstrap_manager) = self.bootstrap_manager
2098            && let Ok(Some(stats)) = self.get_bootstrap_cache_stats().await
2099        {
2100            return stats.total_contacts;
2101        }
2102        0
2103    }
2104
2105    /// Connect to bootstrap peers
2106    async fn connect_bootstrap_peers(&self) -> Result<()> {
2107        let mut bootstrap_contacts = Vec::new();
2108        let mut used_cache = false;
2109        let mut seen_addresses = std::collections::HashSet::new();
2110
2111        // CLI-provided bootstrap peers take priority - always include them first
2112        let cli_bootstrap_peers = if !self.config.bootstrap_peers_str.is_empty() {
2113            self.config.bootstrap_peers_str.clone()
2114        } else {
2115            // Convert Multiaddr to strings
2116            self.config
2117                .bootstrap_peers
2118                .iter()
2119                .map(|addr| addr.to_string())
2120                .collect::<Vec<_>>()
2121        };
2122
2123        if !cli_bootstrap_peers.is_empty() {
2124            info!(
2125                "Using {} CLI-provided bootstrap peers (priority)",
2126                cli_bootstrap_peers.len()
2127            );
2128            for addr in &cli_bootstrap_peers {
2129                if let Ok(socket_addr) = addr.parse::<std::net::SocketAddr>() {
2130                    seen_addresses.insert(socket_addr);
2131                    let contact = ContactEntry::new(
2132                        format!("cli_peer_{}", addr.chars().take(8).collect::<String>()),
2133                        vec![socket_addr],
2134                    );
2135                    bootstrap_contacts.push(contact);
2136                } else {
2137                    warn!("Invalid bootstrap address format: {}", addr);
2138                }
2139            }
2140        }
2141
2142        // Supplement with cached bootstrap peers (after CLI peers)
2143        if let Some(ref bootstrap_manager) = self.bootstrap_manager {
2144            let manager = bootstrap_manager.read().await;
2145            match manager.get_bootstrap_peers(20).await {
2146                // Try to get top 20 quality peers
2147                Ok(contacts) => {
2148                    if !contacts.is_empty() {
2149                        let mut added_from_cache = 0;
2150                        for contact in contacts {
2151                            // Only add if we haven't already added this address from CLI
2152                            let new_addresses: Vec<_> = contact
2153                                .addresses
2154                                .iter()
2155                                .filter(|addr| !seen_addresses.contains(addr))
2156                                .copied()
2157                                .collect();
2158
2159                            if !new_addresses.is_empty() {
2160                                for addr in &new_addresses {
2161                                    seen_addresses.insert(*addr);
2162                                }
2163                                let mut contact = contact.clone();
2164                                contact.addresses = new_addresses;
2165                                bootstrap_contacts.push(contact);
2166                                added_from_cache += 1;
2167                            }
2168                        }
2169                        if added_from_cache > 0 {
2170                            info!(
2171                                "Added {} cached bootstrap peers (supplementing CLI peers)",
2172                                added_from_cache
2173                            );
2174                            used_cache = true;
2175                        }
2176                    }
2177                }
2178                Err(e) => {
2179                    warn!("Failed to get cached bootstrap peers: {}", e);
2180                }
2181            }
2182        }
2183
2184        if bootstrap_contacts.is_empty() {
2185            info!("No bootstrap peers configured and no cached peers available");
2186            return Ok(());
2187        }
2188
2189        // Connect to bootstrap peers
2190        let mut successful_connections = 0;
2191        for contact in bootstrap_contacts {
2192            for addr in &contact.addresses {
2193                match self.connect_peer(&addr.to_string()).await {
2194                    Ok(peer_id) => {
2195                        info!("Connected to bootstrap peer: {} ({})", peer_id, addr);
2196                        successful_connections += 1;
2197
2198                        // Update bootstrap cache with successful connection
2199                        if let Some(ref bootstrap_manager) = self.bootstrap_manager {
2200                            let mut manager = bootstrap_manager.write().await;
2201                            let mut updated_contact = contact.clone();
2202                            updated_contact.peer_id = peer_id.clone();
2203                            updated_contact.update_connection_result(true, Some(100), None); // Assume 100ms latency for now
2204
2205                            if let Err(e) = manager.add_contact(updated_contact).await {
2206                                warn!("Failed to update bootstrap cache: {}", e);
2207                            }
2208                        }
2209                        break; // Successfully connected, move to next contact
2210                    }
2211                    Err(e) => {
2212                        warn!("Failed to connect to bootstrap peer {}: {}", addr, e);
2213
2214                        // Update bootstrap cache with failed connection
2215                        if used_cache && let Some(ref bootstrap_manager) = self.bootstrap_manager {
2216                            let mut manager = bootstrap_manager.write().await;
2217                            let mut updated_contact = contact.clone();
2218                            updated_contact.update_connection_result(
2219                                false,
2220                                None,
2221                                Some(e.to_string()),
2222                            );
2223
2224                            if let Err(e) = manager.add_contact(updated_contact).await {
2225                                warn!("Failed to update bootstrap cache: {}", e);
2226                            }
2227                        }
2228                    }
2229                }
2230            }
2231        }
2232
2233        if successful_connections == 0 {
2234            if !used_cache {
2235                warn!("Failed to connect to any bootstrap peers");
2236            }
2237            return Err(P2PError::Network(NetworkError::ConnectionFailed {
2238                addr: std::net::SocketAddr::from(([0, 0, 0, 0], 0)), // Placeholder for bootstrap ensemble
2239                reason: "Failed to connect to any bootstrap peers".into(),
2240            }));
2241        }
2242        info!(
2243            "Successfully connected to {} bootstrap peers",
2244            successful_connections
2245        );
2246
2247        Ok(())
2248    }
2249
2250    /// Disconnect from all peers
2251    async fn disconnect_all_peers(&self) -> Result<()> {
2252        let peer_ids: Vec<PeerId> = self.peers.read().await.keys().cloned().collect();
2253
2254        for peer_id in peer_ids {
2255            self.disconnect_peer(&peer_id).await?;
2256        }
2257
2258        Ok(())
2259    }
2260
2261    /// Perform periodic maintenance tasks
2262    async fn periodic_tasks(&self) -> Result<()> {
2263        // Update peer last seen timestamps
2264        // Remove stale connections
2265        // Perform DHT maintenance
2266        // This is a placeholder for now
2267
2268        Ok(())
2269    }
2270}
2271
2272/// Network sender trait for sending messages
2273#[async_trait::async_trait]
2274pub trait NetworkSender: Send + Sync {
2275    /// Send a message to a specific peer
2276    async fn send_message(&self, peer_id: &PeerId, protocol: &str, data: Vec<u8>) -> Result<()>;
2277
2278    /// Get our local peer ID
2279    fn local_peer_id(&self) -> &PeerId;
2280}
2281
2282/// Lightweight wrapper for P2PNode to implement NetworkSender
2283#[derive(Clone)]
2284pub struct P2PNetworkSender {
2285    peer_id: PeerId,
2286    // Use channels for async communication with the P2P node
2287    send_tx: tokio::sync::mpsc::UnboundedSender<(PeerId, String, Vec<u8>)>,
2288}
2289
2290impl P2PNetworkSender {
2291    pub fn new(
2292        peer_id: PeerId,
2293        send_tx: tokio::sync::mpsc::UnboundedSender<(PeerId, String, Vec<u8>)>,
2294    ) -> Self {
2295        Self { peer_id, send_tx }
2296    }
2297}
2298
2299/// Implementation of NetworkSender trait for P2PNetworkSender
2300#[async_trait::async_trait]
2301impl NetworkSender for P2PNetworkSender {
2302    /// Send a message to a specific peer via the P2P network
2303    async fn send_message(&self, peer_id: &PeerId, protocol: &str, data: Vec<u8>) -> Result<()> {
2304        self.send_tx
2305            .send((peer_id.clone(), protocol.to_string(), data))
2306            .map_err(|_| {
2307                P2PError::Network(crate::error::NetworkError::ProtocolError(
2308                    "Failed to send message via channel".to_string().into(),
2309                ))
2310            })?;
2311        Ok(())
2312    }
2313
2314    /// Get our local peer ID
2315    fn local_peer_id(&self) -> &PeerId {
2316        &self.peer_id
2317    }
2318}
2319
2320/// Builder pattern for creating P2P nodes
2321pub struct NodeBuilder {
2322    config: NodeConfig,
2323}
2324
2325impl Default for NodeBuilder {
2326    fn default() -> Self {
2327        Self::new()
2328    }
2329}
2330
2331impl NodeBuilder {
2332    /// Create a new node builder
2333    pub fn new() -> Self {
2334        Self {
2335            config: NodeConfig::default(),
2336        }
2337    }
2338
2339    /// Set the peer ID
2340    pub fn with_peer_id(mut self, peer_id: PeerId) -> Self {
2341        self.config.peer_id = Some(peer_id);
2342        self
2343    }
2344
2345    /// Add a listen address
2346    pub fn listen_on(mut self, addr: &str) -> Self {
2347        if let Ok(multiaddr) = addr.parse() {
2348            self.config.listen_addrs.push(multiaddr);
2349        }
2350        self
2351    }
2352
2353    /// Add a bootstrap peer
2354    pub fn with_bootstrap_peer(mut self, addr: &str) -> Self {
2355        if let Ok(multiaddr) = addr.parse() {
2356            self.config.bootstrap_peers.push(multiaddr);
2357        }
2358        self.config.bootstrap_peers_str.push(addr.to_string());
2359        self
2360    }
2361
2362    /// Enable IPv6 support
2363    pub fn with_ipv6(mut self, enable: bool) -> Self {
2364        self.config.enable_ipv6 = enable;
2365        self
2366    }
2367
2368    // MCP removed: builder methods deleted
2369
2370    /// Set connection timeout
2371    pub fn with_connection_timeout(mut self, timeout: Duration) -> Self {
2372        self.config.connection_timeout = timeout;
2373        self
2374    }
2375
2376    /// Set maximum connections
2377    pub fn with_max_connections(mut self, max: usize) -> Self {
2378        self.config.max_connections = max;
2379        self
2380    }
2381
2382    /// Enable production mode with default configuration
2383    pub fn with_production_mode(mut self) -> Self {
2384        self.config.production_config = Some(ProductionConfig::default());
2385        self
2386    }
2387
2388    /// Configure production settings
2389    pub fn with_production_config(mut self, production_config: ProductionConfig) -> Self {
2390        self.config.production_config = Some(production_config);
2391        self
2392    }
2393
2394    /// Configure IP diversity limits for Sybil protection.
2395    pub fn with_diversity_config(
2396        mut self,
2397        diversity_config: crate::security::IPDiversityConfig,
2398    ) -> Self {
2399        self.config.diversity_config = Some(diversity_config);
2400        self
2401    }
2402
2403    /// Configure DHT settings
2404    pub fn with_dht(mut self, dht_config: DHTConfig) -> Self {
2405        self.config.dht_config = dht_config;
2406        self
2407    }
2408
2409    /// Enable DHT with default configuration
2410    pub fn with_default_dht(mut self) -> Self {
2411        self.config.dht_config = DHTConfig::default();
2412        self
2413    }
2414
2415    /// Build the P2P node
2416    pub async fn build(self) -> Result<P2PNode> {
2417        P2PNode::new(self.config).await
2418    }
2419}
2420
2421#[cfg(test)]
2422#[allow(clippy::unwrap_used, clippy::expect_used)]
2423mod diversity_tests {
2424    use super::*;
2425    use crate::security::IPDiversityConfig;
2426
2427    async fn build_bootstrap_manager_like_prod(config: &NodeConfig) -> BootstrapManager {
2428        let diversity_config = config.diversity_config.clone().unwrap_or_default();
2429        if let Some(ref cache_config) = config.bootstrap_cache_config {
2430            BootstrapManager::with_full_config(
2431                cache_config.clone(),
2432                crate::rate_limit::JoinRateLimiterConfig::default(),
2433                diversity_config,
2434            )
2435            .await
2436            .expect("bootstrap manager")
2437        } else {
2438            BootstrapManager::with_full_config(
2439                crate::bootstrap::CacheConfig::default(),
2440                crate::rate_limit::JoinRateLimiterConfig::default(),
2441                diversity_config,
2442            )
2443            .await
2444            .expect("bootstrap manager")
2445        }
2446    }
2447
2448    #[tokio::test]
2449    async fn test_nodeconfig_diversity_config_used_for_bootstrap() {
2450        let config = NodeConfig {
2451            diversity_config: Some(IPDiversityConfig::testnet()),
2452            ..Default::default()
2453        };
2454
2455        let manager = build_bootstrap_manager_like_prod(&config).await;
2456        assert!(manager.diversity_config().is_relaxed());
2457        assert_eq!(manager.diversity_config().max_nodes_per_asn, 5000);
2458    }
2459}
2460
2461/// Standalone function to handle received messages without borrowing self
2462#[allow(dead_code)] // Deprecated during ant-quic migration
2463async fn handle_received_message_standalone(
2464    message_data: Vec<u8>,
2465    peer_id: &PeerId,
2466    _protocol: &str,
2467    event_tx: &broadcast::Sender<P2PEvent>,
2468) -> Result<()> {
2469    // Parse the message format
2470    match serde_json::from_slice::<serde_json::Value>(&message_data) {
2471        Ok(message) => {
2472            if let (Some(protocol), Some(data), Some(from)) = (
2473                message.get("protocol").and_then(|v| v.as_str()),
2474                message.get("data").and_then(|v| v.as_array()),
2475                message.get("from").and_then(|v| v.as_str()),
2476            ) {
2477                // Convert data array back to bytes
2478                let data_bytes: Vec<u8> = data
2479                    .iter()
2480                    .filter_map(|v| v.as_u64().map(|n| n as u8))
2481                    .collect();
2482
2483                // Generate message event
2484                let event = P2PEvent::Message {
2485                    topic: protocol.to_string(),
2486                    source: from.to_string(),
2487                    data: data_bytes,
2488                };
2489
2490                let _ = event_tx.send(event);
2491                debug!("Generated message event from peer: {}", peer_id);
2492            }
2493        }
2494        Err(e) => {
2495            warn!("Failed to parse received message from {}: {}", peer_id, e);
2496        }
2497    }
2498
2499    Ok(())
2500}
2501
2502// MCP removed: standalone MCP handler deleted
2503
2504/// Helper function to handle protocol message creation
2505#[allow(dead_code)]
2506fn handle_protocol_message_creation(protocol: &str, data: Vec<u8>) -> Option<Vec<u8>> {
2507    match create_protocol_message_static(protocol, data) {
2508        Ok(msg) => Some(msg),
2509        Err(e) => {
2510            warn!("Failed to create protocol message: {}", e);
2511            None
2512        }
2513    }
2514}
2515
2516/// Helper function to handle message send result
2517#[allow(dead_code)]
2518async fn handle_message_send_result(result: crate::error::P2pResult<()>, peer_id: &PeerId) {
2519    match result {
2520        Ok(_) => {
2521            debug!("Message sent to peer {} via transport layer", peer_id);
2522        }
2523        Err(e) => {
2524            warn!("Failed to send message to peer {}: {}", peer_id, e);
2525        }
2526    }
2527}
2528
2529/// Helper function to check rate limit
2530#[allow(dead_code)] // Deprecated during ant-quic migration
2531fn check_rate_limit(
2532    rate_limiter: &RateLimiter,
2533    socket_addr: &std::net::SocketAddr,
2534    remote_addr: &NetworkAddress,
2535) -> Result<()> {
2536    rate_limiter.check_ip(&socket_addr.ip()).map_err(|e| {
2537        warn!("Rate limit exceeded for {}: {}", remote_addr, e);
2538        e
2539    })
2540}
2541
2542/// Helper function to register a new peer
2543#[allow(dead_code)] // Deprecated during ant-quic migration
2544async fn register_new_peer(
2545    peers: &Arc<RwLock<HashMap<PeerId, PeerInfo>>>,
2546    peer_id: &PeerId,
2547    remote_addr: &NetworkAddress,
2548) {
2549    let mut peers_guard = peers.write().await;
2550    let peer_info = PeerInfo {
2551        peer_id: peer_id.clone(),
2552        addresses: vec![remote_addr.to_string()],
2553        connected_at: tokio::time::Instant::now(),
2554        last_seen: tokio::time::Instant::now(),
2555        status: ConnectionStatus::Connected,
2556        protocols: vec!["p2p-chat/1.0.0".to_string()],
2557        heartbeat_count: 0,
2558    };
2559    peers_guard.insert(peer_id.clone(), peer_info);
2560}
2561
2562/// Helper function to spawn connection handler
2563#[allow(dead_code)] // Deprecated during ant-quic migration
2564fn spawn_connection_handler(
2565    connection: Box<dyn crate::transport::Connection>,
2566    peer_id: PeerId,
2567    event_tx: broadcast::Sender<P2PEvent>,
2568    peers: Arc<RwLock<HashMap<PeerId, PeerInfo>>>,
2569) {
2570    tokio::spawn(async move {
2571        handle_peer_connection(connection, peer_id, event_tx, peers).await;
2572    });
2573}
2574
2575/// Helper function to handle peer connection
2576#[allow(dead_code)] // Deprecated during ant-quic migration
2577async fn handle_peer_connection(
2578    mut connection: Box<dyn crate::transport::Connection>,
2579    peer_id: PeerId,
2580    event_tx: broadcast::Sender<P2PEvent>,
2581    peers: Arc<RwLock<HashMap<PeerId, PeerInfo>>>,
2582) {
2583    loop {
2584        match connection.receive().await {
2585            Ok(message_data) => {
2586                debug!(
2587                    "Received {} bytes from peer: {}",
2588                    message_data.len(),
2589                    peer_id
2590                );
2591
2592                // Handle the received message
2593                if let Err(e) = handle_received_message_standalone(
2594                    message_data,
2595                    &peer_id,
2596                    "unknown", // TODO: Extract protocol from message
2597                    &event_tx,
2598                )
2599                .await
2600                {
2601                    warn!("Failed to handle message from peer {}: {}", peer_id, e);
2602                }
2603            }
2604            Err(e) => {
2605                warn!("Failed to receive message from {}: {}", peer_id, e);
2606
2607                // Check if connection is still alive
2608                if !connection.is_alive().await {
2609                    info!("Connection to {} is dead, removing peer", peer_id);
2610
2611                    // Remove dead peer
2612                    remove_peer(&peers, &peer_id).await;
2613
2614                    // Generate peer disconnected event
2615                    let _ = event_tx.send(P2PEvent::PeerDisconnected(peer_id.clone()));
2616
2617                    break; // Exit the message receiving loop
2618                }
2619
2620                // Brief pause before retrying
2621                tokio::time::sleep(std::time::Duration::from_millis(100)).await;
2622            }
2623        }
2624    }
2625}
2626
2627/// Helper function to remove a peer
2628#[allow(dead_code)] // Deprecated during ant-quic migration
2629async fn remove_peer(peers: &Arc<RwLock<HashMap<PeerId, PeerInfo>>>, peer_id: &PeerId) {
2630    let mut peers_guard = peers.write().await;
2631    peers_guard.remove(peer_id);
2632}
2633
2634/// Helper function to update peer heartbeat
2635#[allow(dead_code)]
2636async fn update_peer_heartbeat(
2637    peers: &Arc<RwLock<HashMap<PeerId, PeerInfo>>>,
2638    peer_id: &PeerId,
2639) -> Result<()> {
2640    let mut peers_guard = peers.write().await;
2641    match peers_guard.get_mut(peer_id) {
2642        Some(peer_info) => {
2643            peer_info.last_seen = Instant::now();
2644            peer_info.heartbeat_count += 1;
2645            Ok(())
2646        }
2647        None => {
2648            warn!("Received heartbeat from unknown peer: {}", peer_id);
2649            Err(P2PError::Network(NetworkError::PeerNotFound(
2650                format!("Peer {} not found", peer_id).into(),
2651            )))
2652        }
2653    }
2654}
2655
2656/// Helper function to get resource metrics
2657#[allow(dead_code)]
2658async fn get_resource_metrics(resource_manager: &Option<Arc<ResourceManager>>) -> (u64, f64) {
2659    if let Some(manager) = resource_manager {
2660        let metrics = manager.get_metrics().await;
2661        (metrics.memory_used, metrics.cpu_usage)
2662    } else {
2663        (0, 0.0)
2664    }
2665}
2666
2667#[cfg(test)]
2668mod tests {
2669    use super::*;
2670    // MCP removed from tests
2671    use std::time::Duration;
2672    use tokio::time::timeout;
2673
2674    // Test tool handler for network tests
2675
2676    // MCP removed
2677
2678    /// Helper function to create a test node configuration
2679    fn create_test_node_config() -> NodeConfig {
2680        NodeConfig {
2681            peer_id: Some("test_peer_123".to_string()),
2682            listen_addrs: vec![
2683                std::net::SocketAddr::new(std::net::IpAddr::V6(std::net::Ipv6Addr::LOCALHOST), 0),
2684                std::net::SocketAddr::new(std::net::IpAddr::V4(std::net::Ipv4Addr::LOCALHOST), 0),
2685            ],
2686            listen_addr: std::net::SocketAddr::new(
2687                std::net::IpAddr::V4(std::net::Ipv4Addr::LOCALHOST),
2688                0,
2689            ),
2690            bootstrap_peers: vec![],
2691            bootstrap_peers_str: vec![],
2692            enable_ipv6: true,
2693
2694            connection_timeout: Duration::from_millis(300),
2695            keep_alive_interval: Duration::from_secs(30),
2696            max_connections: 100,
2697            max_incoming_connections: 50,
2698            dht_config: DHTConfig::default(),
2699            security_config: SecurityConfig::default(),
2700            production_config: None,
2701            bootstrap_cache_config: None,
2702            diversity_config: None,
2703            // identity_config: None,
2704        }
2705    }
2706
2707    /// Helper function to create a test tool
2708    // MCP removed: test tool helper deleted
2709
2710    #[tokio::test]
2711    async fn test_node_config_default() {
2712        let config = NodeConfig::default();
2713
2714        assert!(config.peer_id.is_none());
2715        assert_eq!(config.listen_addrs.len(), 2);
2716        assert!(config.enable_ipv6);
2717        assert_eq!(config.max_connections, 10000); // Fixed: matches actual default
2718        assert_eq!(config.max_incoming_connections, 100);
2719        assert_eq!(config.connection_timeout, Duration::from_secs(30));
2720    }
2721
2722    #[tokio::test]
2723    async fn test_dht_config_default() {
2724        let config = DHTConfig::default();
2725
2726        assert_eq!(config.k_value, 20);
2727        assert_eq!(config.alpha_value, 5);
2728        assert_eq!(config.record_ttl, Duration::from_secs(3600));
2729        assert_eq!(config.refresh_interval, Duration::from_secs(600));
2730    }
2731
2732    #[tokio::test]
2733    async fn test_security_config_default() {
2734        let config = SecurityConfig::default();
2735
2736        assert!(config.enable_noise);
2737        assert!(config.enable_tls);
2738        assert_eq!(config.trust_level, TrustLevel::Basic);
2739    }
2740
2741    #[test]
2742    fn test_trust_level_variants() {
2743        // Test that all trust level variants can be created
2744        let _none = TrustLevel::None;
2745        let _basic = TrustLevel::Basic;
2746        let _full = TrustLevel::Full;
2747
2748        // Test equality
2749        assert_eq!(TrustLevel::None, TrustLevel::None);
2750        assert_eq!(TrustLevel::Basic, TrustLevel::Basic);
2751        assert_eq!(TrustLevel::Full, TrustLevel::Full);
2752        assert_ne!(TrustLevel::None, TrustLevel::Basic);
2753    }
2754
2755    #[test]
2756    fn test_connection_status_variants() {
2757        let connecting = ConnectionStatus::Connecting;
2758        let connected = ConnectionStatus::Connected;
2759        let disconnecting = ConnectionStatus::Disconnecting;
2760        let disconnected = ConnectionStatus::Disconnected;
2761        let failed = ConnectionStatus::Failed("test error".to_string());
2762
2763        assert_eq!(connecting, ConnectionStatus::Connecting);
2764        assert_eq!(connected, ConnectionStatus::Connected);
2765        assert_eq!(disconnecting, ConnectionStatus::Disconnecting);
2766        assert_eq!(disconnected, ConnectionStatus::Disconnected);
2767        assert_ne!(connecting, connected);
2768
2769        if let ConnectionStatus::Failed(msg) = failed {
2770            assert_eq!(msg, "test error");
2771        } else {
2772            panic!("Expected Failed status");
2773        }
2774    }
2775
2776    #[tokio::test]
2777    async fn test_node_creation() -> Result<()> {
2778        let config = create_test_node_config();
2779        let node = P2PNode::new(config).await?;
2780
2781        assert_eq!(node.peer_id(), "test_peer_123");
2782        assert!(!node.is_running().await);
2783        assert_eq!(node.peer_count().await, 0);
2784        assert!(node.connected_peers().await.is_empty());
2785
2786        Ok(())
2787    }
2788
2789    #[tokio::test]
2790    async fn test_node_creation_without_peer_id() -> Result<()> {
2791        let mut config = create_test_node_config();
2792        config.peer_id = None;
2793
2794        let node = P2PNode::new(config).await?;
2795
2796        // Should have generated a peer ID
2797        assert!(node.peer_id().starts_with("peer_"));
2798        assert!(!node.is_running().await);
2799
2800        Ok(())
2801    }
2802
2803    #[tokio::test]
2804    async fn test_node_lifecycle() -> Result<()> {
2805        let config = create_test_node_config();
2806        let node = P2PNode::new(config).await?;
2807
2808        // Initially not running
2809        assert!(!node.is_running().await);
2810
2811        // Start the node
2812        node.start().await?;
2813        assert!(node.is_running().await);
2814
2815        // Check listen addresses were set (at least one)
2816        let listen_addrs = node.listen_addrs().await;
2817        assert!(
2818            !listen_addrs.is_empty(),
2819            "Expected at least one listening address"
2820        );
2821
2822        // Stop the node
2823        node.stop().await?;
2824        assert!(!node.is_running().await);
2825
2826        Ok(())
2827    }
2828
2829    #[tokio::test]
2830    async fn test_peer_connection() -> Result<()> {
2831        let config = create_test_node_config();
2832        let node = P2PNode::new(config).await?;
2833
2834        let peer_addr = "127.0.0.1:0";
2835
2836        // Connect to a peer
2837        let peer_id = node.connect_peer(peer_addr).await?;
2838        assert!(peer_id.starts_with("peer_from_"));
2839
2840        // Check peer count
2841        assert_eq!(node.peer_count().await, 1);
2842
2843        // Check connected peers
2844        let connected_peers = node.connected_peers().await;
2845        assert_eq!(connected_peers.len(), 1);
2846        assert_eq!(connected_peers[0], peer_id);
2847
2848        // Get peer info
2849        let peer_info = node.peer_info(&peer_id).await;
2850        assert!(peer_info.is_some());
2851        let info = peer_info.expect("Peer info should exist after adding peer");
2852        assert_eq!(info.peer_id, peer_id);
2853        assert_eq!(info.status, ConnectionStatus::Connected);
2854        assert!(info.protocols.contains(&"p2p-foundation/1.0".to_string()));
2855
2856        // Disconnect from peer
2857        node.disconnect_peer(&peer_id).await?;
2858        assert_eq!(node.peer_count().await, 0);
2859
2860        Ok(())
2861    }
2862
2863    #[tokio::test]
2864    async fn test_event_subscription() -> Result<()> {
2865        let config = create_test_node_config();
2866        let node = P2PNode::new(config).await?;
2867
2868        let mut events = node.subscribe_events();
2869        let peer_addr = "127.0.0.1:0";
2870
2871        // Connect to a peer (this should emit an event)
2872        let peer_id = node.connect_peer(peer_addr).await?;
2873
2874        // Check for PeerConnected event
2875        let event = timeout(Duration::from_millis(100), events.recv()).await;
2876        assert!(event.is_ok());
2877
2878        let event_result = event
2879            .expect("Should receive event")
2880            .expect("Event should not be error");
2881        match event_result {
2882            P2PEvent::PeerConnected(event_peer_id) => {
2883                assert_eq!(event_peer_id, peer_id);
2884            }
2885            _ => panic!("Expected PeerConnected event"),
2886        }
2887
2888        // Disconnect from peer (this should emit another event)
2889        node.disconnect_peer(&peer_id).await?;
2890
2891        // Check for PeerDisconnected event
2892        let event = timeout(Duration::from_millis(100), events.recv()).await;
2893        assert!(event.is_ok());
2894
2895        let event_result = event
2896            .expect("Should receive event")
2897            .expect("Event should not be error");
2898        match event_result {
2899            P2PEvent::PeerDisconnected(event_peer_id) => {
2900                assert_eq!(event_peer_id, peer_id);
2901            }
2902            _ => panic!("Expected PeerDisconnected event"),
2903        }
2904
2905        Ok(())
2906    }
2907
2908    #[tokio::test]
2909    async fn test_message_sending() -> Result<()> {
2910        // Create two nodes
2911        let mut config1 = create_test_node_config();
2912        config1.listen_addr =
2913            std::net::SocketAddr::new(std::net::IpAddr::V4(std::net::Ipv4Addr::LOCALHOST), 0);
2914        let node1 = P2PNode::new(config1).await?;
2915        node1.start().await?;
2916
2917        let mut config2 = create_test_node_config();
2918        config2.listen_addr =
2919            std::net::SocketAddr::new(std::net::IpAddr::V4(std::net::Ipv4Addr::LOCALHOST), 0);
2920        let node2 = P2PNode::new(config2).await?;
2921        node2.start().await?;
2922
2923        // Wait a bit for nodes to start listening
2924        tokio::time::sleep(tokio::time::Duration::from_millis(50)).await;
2925
2926        // Get actual listening address of node2
2927        let node2_addr = node2.local_addr().ok_or_else(|| {
2928            P2PError::Network(crate::error::NetworkError::ProtocolError(
2929                "No listening address".to_string().into(),
2930            ))
2931        })?;
2932
2933        // Connect node1 to node2
2934        let peer_id =
2935            match timeout(Duration::from_millis(500), node1.connect_peer(&node2_addr)).await {
2936                Ok(res) => res?,
2937                Err(_) => return Err(P2PError::Network(NetworkError::Timeout)),
2938            };
2939
2940        // Wait a bit for connection to establish
2941        tokio::time::sleep(tokio::time::Duration::from_millis(30)).await;
2942
2943        // Send a message
2944        let message_data = b"Hello, peer!".to_vec();
2945        let result = match timeout(
2946            Duration::from_millis(500),
2947            node1.send_message(&peer_id, "test-protocol", message_data),
2948        )
2949        .await
2950        {
2951            Ok(res) => res,
2952            Err(_) => return Err(P2PError::Network(NetworkError::Timeout)),
2953        };
2954        // For now, we'll just check that we don't get a "not connected" error
2955        // The actual send might fail due to no handler on the other side
2956        if let Err(e) = &result {
2957            assert!(!e.to_string().contains("not connected"), "Got error: {}", e);
2958        }
2959
2960        // Try to send to non-existent peer
2961        let non_existent_peer = "non_existent_peer".to_string();
2962        let result = node1
2963            .send_message(&non_existent_peer, "test-protocol", vec![])
2964            .await;
2965        assert!(result.is_err(), "Sending to non-existent peer should fail");
2966
2967        Ok(())
2968    }
2969
2970    #[tokio::test]
2971    async fn test_remote_mcp_operations() -> Result<()> {
2972        let config = create_test_node_config();
2973        let node = P2PNode::new(config).await?;
2974
2975        // MCP removed; test reduced to simple start/stop
2976        node.start().await?;
2977        node.stop().await?;
2978        Ok(())
2979    }
2980
2981    #[tokio::test]
2982    async fn test_health_check() -> Result<()> {
2983        let config = create_test_node_config();
2984        let node = P2PNode::new(config).await?;
2985
2986        // Health check should pass with no connections
2987        let result = node.health_check().await;
2988        assert!(result.is_ok());
2989
2990        // Note: We're not actually connecting to real peers here
2991        // since that would require running bootstrap nodes.
2992        // The health check should still pass with no connections.
2993
2994        Ok(())
2995    }
2996
2997    #[tokio::test]
2998    async fn test_node_uptime() -> Result<()> {
2999        let config = create_test_node_config();
3000        let node = P2PNode::new(config).await?;
3001
3002        let uptime1 = node.uptime();
3003        assert!(uptime1 >= Duration::from_secs(0));
3004
3005        // Wait a bit
3006        tokio::time::sleep(Duration::from_millis(10)).await;
3007
3008        let uptime2 = node.uptime();
3009        assert!(uptime2 > uptime1);
3010
3011        Ok(())
3012    }
3013
3014    #[tokio::test]
3015    async fn test_node_config_access() -> Result<()> {
3016        let config = create_test_node_config();
3017        let expected_peer_id = config.peer_id.clone();
3018        let node = P2PNode::new(config).await?;
3019
3020        let node_config = node.config();
3021        assert_eq!(node_config.peer_id, expected_peer_id);
3022        assert_eq!(node_config.max_connections, 100);
3023        // MCP removed
3024
3025        Ok(())
3026    }
3027
3028    #[tokio::test]
3029    async fn test_mcp_server_access() -> Result<()> {
3030        let config = create_test_node_config();
3031        let _node = P2PNode::new(config).await?;
3032
3033        // MCP removed
3034        Ok(())
3035    }
3036
3037    #[tokio::test]
3038    async fn test_dht_access() -> Result<()> {
3039        let config = create_test_node_config();
3040        let node = P2PNode::new(config).await?;
3041
3042        // Should have DHT
3043        assert!(node.dht().is_some());
3044
3045        Ok(())
3046    }
3047
3048    #[tokio::test]
3049    async fn test_node_builder() -> Result<()> {
3050        // Create a config using the builder but don't actually build a real node
3051        let builder = P2PNode::builder()
3052            .with_peer_id("builder_test_peer".to_string())
3053            .listen_on("/ip4/127.0.0.1/tcp/0")
3054            .listen_on("/ip6/::1/tcp/0")
3055            .with_bootstrap_peer("/ip4/127.0.0.1/tcp/9000") // Use a valid port number
3056            .with_ipv6(true)
3057            .with_connection_timeout(Duration::from_secs(15))
3058            .with_max_connections(200);
3059
3060        // Test the configuration that was built
3061        let config = builder.config;
3062        assert_eq!(config.peer_id, Some("builder_test_peer".to_string()));
3063        assert_eq!(config.listen_addrs.len(), 2); // 2 added by builder (no defaults)
3064        assert_eq!(config.bootstrap_peers_str.len(), 1); // Check bootstrap_peers_str instead
3065        assert!(config.enable_ipv6);
3066        assert_eq!(config.connection_timeout, Duration::from_secs(15));
3067        assert_eq!(config.max_connections, 200);
3068
3069        Ok(())
3070    }
3071
3072    #[tokio::test]
3073    async fn test_bootstrap_peers() -> Result<()> {
3074        let mut config = create_test_node_config();
3075        config.bootstrap_peers = vec![
3076            std::net::SocketAddr::new(std::net::IpAddr::V4(std::net::Ipv4Addr::LOCALHOST), 9200),
3077            std::net::SocketAddr::new(std::net::IpAddr::V4(std::net::Ipv4Addr::LOCALHOST), 9201),
3078        ];
3079
3080        let node = P2PNode::new(config).await?;
3081
3082        // Start node (which attempts to connect to bootstrap peers)
3083        node.start().await?;
3084
3085        // In a test environment, bootstrap peers may not be available
3086        // The test verifies the node starts correctly with bootstrap configuration
3087        // Peer count may include local/internal tracking, so we just verify it's reasonable
3088        let _peer_count = node.peer_count().await;
3089
3090        node.stop().await?;
3091        Ok(())
3092    }
3093
3094    #[tokio::test]
3095    async fn test_production_mode_disabled() -> Result<()> {
3096        let config = create_test_node_config();
3097        let node = P2PNode::new(config).await?;
3098
3099        assert!(!node.is_production_mode());
3100        assert!(node.production_config().is_none());
3101
3102        // Resource metrics should fail when production mode is disabled
3103        let result = node.resource_metrics().await;
3104        assert!(result.is_err());
3105        assert!(result.unwrap_err().to_string().contains("not enabled"));
3106
3107        Ok(())
3108    }
3109
3110    #[tokio::test]
3111    async fn test_network_event_variants() {
3112        // Test that all network event variants can be created
3113        let peer_id = "test_peer".to_string();
3114        let address = "/ip4/127.0.0.1/tcp/9000".to_string();
3115
3116        let _peer_connected = NetworkEvent::PeerConnected {
3117            peer_id: peer_id.clone(),
3118            addresses: vec![address.clone()],
3119        };
3120
3121        let _peer_disconnected = NetworkEvent::PeerDisconnected {
3122            peer_id: peer_id.clone(),
3123            reason: "test disconnect".to_string(),
3124        };
3125
3126        let _message_received = NetworkEvent::MessageReceived {
3127            peer_id: peer_id.clone(),
3128            protocol: "test-protocol".to_string(),
3129            data: vec![1, 2, 3],
3130        };
3131
3132        let _connection_failed = NetworkEvent::ConnectionFailed {
3133            peer_id: Some(peer_id.clone()),
3134            address: address.clone(),
3135            error: "connection refused".to_string(),
3136        };
3137
3138        let _dht_stored = NetworkEvent::DHTRecordStored {
3139            key: vec![1, 2, 3],
3140            value: vec![4, 5, 6],
3141        };
3142
3143        let _dht_retrieved = NetworkEvent::DHTRecordRetrieved {
3144            key: vec![1, 2, 3],
3145            value: Some(vec![4, 5, 6]),
3146        };
3147    }
3148
3149    #[tokio::test]
3150    async fn test_peer_info_structure() {
3151        let peer_info = PeerInfo {
3152            peer_id: "test_peer".to_string(),
3153            addresses: vec!["/ip4/127.0.0.1/tcp/9000".to_string()],
3154            connected_at: Instant::now(),
3155            last_seen: Instant::now(),
3156            status: ConnectionStatus::Connected,
3157            protocols: vec!["test-protocol".to_string()],
3158            heartbeat_count: 0,
3159        };
3160
3161        assert_eq!(peer_info.peer_id, "test_peer");
3162        assert_eq!(peer_info.addresses.len(), 1);
3163        assert_eq!(peer_info.status, ConnectionStatus::Connected);
3164        assert_eq!(peer_info.protocols.len(), 1);
3165    }
3166
3167    #[tokio::test]
3168    async fn test_serialization() -> Result<()> {
3169        // Test that configs can be serialized/deserialized
3170        let config = create_test_node_config();
3171        let serialized = serde_json::to_string(&config)?;
3172        let deserialized: NodeConfig = serde_json::from_str(&serialized)?;
3173
3174        assert_eq!(config.peer_id, deserialized.peer_id);
3175        assert_eq!(config.listen_addrs, deserialized.listen_addrs);
3176        assert_eq!(config.enable_ipv6, deserialized.enable_ipv6);
3177
3178        Ok(())
3179    }
3180
3181    #[tokio::test]
3182    async fn test_get_peer_id_by_address_found() -> Result<()> {
3183        let config = create_test_node_config();
3184        let node = P2PNode::new(config).await?;
3185
3186        // Manually insert a peer for testing
3187        let test_peer_id = "peer_test_123".to_string();
3188        let test_address = "192.168.1.100:9000".to_string();
3189
3190        let peer_info = PeerInfo {
3191            peer_id: test_peer_id.clone(),
3192            addresses: vec![test_address.clone()],
3193            connected_at: Instant::now(),
3194            last_seen: Instant::now(),
3195            status: ConnectionStatus::Connected,
3196            protocols: vec!["test-protocol".to_string()],
3197            heartbeat_count: 0,
3198        };
3199
3200        node.peers
3201            .write()
3202            .await
3203            .insert(test_peer_id.clone(), peer_info);
3204
3205        // Test: Find peer by address
3206        let found_peer_id = node.get_peer_id_by_address(&test_address).await;
3207        assert_eq!(found_peer_id, Some(test_peer_id));
3208
3209        Ok(())
3210    }
3211
3212    #[tokio::test]
3213    async fn test_get_peer_id_by_address_not_found() -> Result<()> {
3214        let config = create_test_node_config();
3215        let node = P2PNode::new(config).await?;
3216
3217        // Test: Try to find a peer that doesn't exist
3218        let result = node.get_peer_id_by_address("192.168.1.200:9000").await;
3219        assert_eq!(result, None);
3220
3221        Ok(())
3222    }
3223
3224    #[tokio::test]
3225    async fn test_get_peer_id_by_address_invalid_format() -> Result<()> {
3226        let config = create_test_node_config();
3227        let node = P2PNode::new(config).await?;
3228
3229        // Test: Invalid address format should return None
3230        let result = node.get_peer_id_by_address("invalid-address").await;
3231        assert_eq!(result, None);
3232
3233        Ok(())
3234    }
3235
3236    #[tokio::test]
3237    async fn test_get_peer_id_by_address_multiple_peers() -> Result<()> {
3238        let config = create_test_node_config();
3239        let node = P2PNode::new(config).await?;
3240
3241        // Add multiple peers with different addresses
3242        let peer1_id = "peer_1".to_string();
3243        let peer1_addr = "192.168.1.101:9001".to_string();
3244
3245        let peer2_id = "peer_2".to_string();
3246        let peer2_addr = "192.168.1.102:9002".to_string();
3247
3248        let peer1_info = PeerInfo {
3249            peer_id: peer1_id.clone(),
3250            addresses: vec![peer1_addr.clone()],
3251            connected_at: Instant::now(),
3252            last_seen: Instant::now(),
3253            status: ConnectionStatus::Connected,
3254            protocols: vec!["test-protocol".to_string()],
3255            heartbeat_count: 0,
3256        };
3257
3258        let peer2_info = PeerInfo {
3259            peer_id: peer2_id.clone(),
3260            addresses: vec![peer2_addr.clone()],
3261            connected_at: Instant::now(),
3262            last_seen: Instant::now(),
3263            status: ConnectionStatus::Connected,
3264            protocols: vec!["test-protocol".to_string()],
3265            heartbeat_count: 0,
3266        };
3267
3268        node.peers
3269            .write()
3270            .await
3271            .insert(peer1_id.clone(), peer1_info);
3272        node.peers
3273            .write()
3274            .await
3275            .insert(peer2_id.clone(), peer2_info);
3276
3277        // Test: Find each peer by their unique address
3278        let found_peer1 = node.get_peer_id_by_address(&peer1_addr).await;
3279        let found_peer2 = node.get_peer_id_by_address(&peer2_addr).await;
3280
3281        assert_eq!(found_peer1, Some(peer1_id));
3282        assert_eq!(found_peer2, Some(peer2_id));
3283
3284        Ok(())
3285    }
3286
3287    #[tokio::test]
3288    async fn test_list_active_connections_empty() -> Result<()> {
3289        let config = create_test_node_config();
3290        let node = P2PNode::new(config).await?;
3291
3292        // Test: No connections initially
3293        let connections = node.list_active_connections().await;
3294        assert!(connections.is_empty());
3295
3296        Ok(())
3297    }
3298
3299    #[tokio::test]
3300    async fn test_list_active_connections_with_peers() -> Result<()> {
3301        let config = create_test_node_config();
3302        let node = P2PNode::new(config).await?;
3303
3304        // Add multiple peers
3305        let peer1_id = "peer_1".to_string();
3306        let peer1_addrs = vec![
3307            "192.168.1.101:9001".to_string(),
3308            "192.168.1.101:9002".to_string(),
3309        ];
3310
3311        let peer2_id = "peer_2".to_string();
3312        let peer2_addrs = vec!["192.168.1.102:9003".to_string()];
3313
3314        let peer1_info = PeerInfo {
3315            peer_id: peer1_id.clone(),
3316            addresses: peer1_addrs.clone(),
3317            connected_at: Instant::now(),
3318            last_seen: Instant::now(),
3319            status: ConnectionStatus::Connected,
3320            protocols: vec!["test-protocol".to_string()],
3321            heartbeat_count: 0,
3322        };
3323
3324        let peer2_info = PeerInfo {
3325            peer_id: peer2_id.clone(),
3326            addresses: peer2_addrs.clone(),
3327            connected_at: Instant::now(),
3328            last_seen: Instant::now(),
3329            status: ConnectionStatus::Connected,
3330            protocols: vec!["test-protocol".to_string()],
3331            heartbeat_count: 0,
3332        };
3333
3334        node.peers
3335            .write()
3336            .await
3337            .insert(peer1_id.clone(), peer1_info);
3338        node.peers
3339            .write()
3340            .await
3341            .insert(peer2_id.clone(), peer2_info);
3342
3343        // Test: List all active connections
3344        let connections = node.list_active_connections().await;
3345        assert_eq!(connections.len(), 2);
3346
3347        // Verify peer1 and peer2 are in the list
3348        let peer1_conn = connections.iter().find(|(id, _)| id == &peer1_id);
3349        let peer2_conn = connections.iter().find(|(id, _)| id == &peer2_id);
3350
3351        assert!(peer1_conn.is_some());
3352        assert!(peer2_conn.is_some());
3353
3354        // Verify addresses match
3355        assert_eq!(peer1_conn.unwrap().1, peer1_addrs);
3356        assert_eq!(peer2_conn.unwrap().1, peer2_addrs);
3357
3358        Ok(())
3359    }
3360
3361    #[tokio::test]
3362    async fn test_remove_peer_success() -> Result<()> {
3363        let config = create_test_node_config();
3364        let node = P2PNode::new(config).await?;
3365
3366        // Add a peer
3367        let peer_id = "peer_to_remove".to_string();
3368        let peer_info = PeerInfo {
3369            peer_id: peer_id.clone(),
3370            addresses: vec!["192.168.1.100:9000".to_string()],
3371            connected_at: Instant::now(),
3372            last_seen: Instant::now(),
3373            status: ConnectionStatus::Connected,
3374            protocols: vec!["test-protocol".to_string()],
3375            heartbeat_count: 0,
3376        };
3377
3378        node.peers.write().await.insert(peer_id.clone(), peer_info);
3379
3380        // Verify peer exists
3381        assert!(node.is_peer_connected(&peer_id).await);
3382
3383        // Remove the peer
3384        let removed = node.remove_peer(&peer_id).await;
3385        assert!(removed);
3386
3387        // Verify peer no longer exists
3388        assert!(!node.is_peer_connected(&peer_id).await);
3389
3390        Ok(())
3391    }
3392
3393    #[tokio::test]
3394    async fn test_remove_peer_nonexistent() -> Result<()> {
3395        let config = create_test_node_config();
3396        let node = P2PNode::new(config).await?;
3397
3398        // Try to remove a peer that doesn't exist
3399        let removed = node.remove_peer(&"nonexistent_peer".to_string()).await;
3400        assert!(!removed);
3401
3402        Ok(())
3403    }
3404
3405    #[tokio::test]
3406    async fn test_is_peer_connected() -> Result<()> {
3407        let config = create_test_node_config();
3408        let node = P2PNode::new(config).await?;
3409
3410        let peer_id = "test_peer".to_string();
3411
3412        // Initially not connected
3413        assert!(!node.is_peer_connected(&peer_id).await);
3414
3415        // Add peer
3416        let peer_info = PeerInfo {
3417            peer_id: peer_id.clone(),
3418            addresses: vec!["192.168.1.100:9000".to_string()],
3419            connected_at: Instant::now(),
3420            last_seen: Instant::now(),
3421            status: ConnectionStatus::Connected,
3422            protocols: vec!["test-protocol".to_string()],
3423            heartbeat_count: 0,
3424        };
3425
3426        node.peers.write().await.insert(peer_id.clone(), peer_info);
3427
3428        // Now connected
3429        assert!(node.is_peer_connected(&peer_id).await);
3430
3431        // Remove peer
3432        node.remove_peer(&peer_id).await;
3433
3434        // No longer connected
3435        assert!(!node.is_peer_connected(&peer_id).await);
3436
3437        Ok(())
3438    }
3439
3440    #[test]
3441    fn test_normalize_ipv6_wildcard() {
3442        use std::net::{IpAddr, Ipv6Addr, SocketAddr};
3443
3444        let wildcard = SocketAddr::new(IpAddr::V6(Ipv6Addr::UNSPECIFIED), 8080);
3445        let normalized = normalize_wildcard_to_loopback(wildcard);
3446
3447        assert_eq!(normalized.ip(), IpAddr::V6(Ipv6Addr::LOCALHOST));
3448        assert_eq!(normalized.port(), 8080);
3449    }
3450
3451    #[test]
3452    fn test_normalize_ipv4_wildcard() {
3453        use std::net::{IpAddr, Ipv4Addr, SocketAddr};
3454
3455        let wildcard = SocketAddr::new(IpAddr::V4(Ipv4Addr::UNSPECIFIED), 9000);
3456        let normalized = normalize_wildcard_to_loopback(wildcard);
3457
3458        assert_eq!(normalized.ip(), IpAddr::V4(Ipv4Addr::LOCALHOST));
3459        assert_eq!(normalized.port(), 9000);
3460    }
3461
3462    #[test]
3463    fn test_normalize_specific_address_unchanged() {
3464        let specific: std::net::SocketAddr = "192.168.1.100:3000".parse().unwrap();
3465        let normalized = normalize_wildcard_to_loopback(specific);
3466
3467        assert_eq!(normalized, specific);
3468    }
3469
3470    #[test]
3471    fn test_normalize_loopback_unchanged() {
3472        use std::net::{IpAddr, Ipv4Addr, Ipv6Addr, SocketAddr};
3473
3474        let loopback_v6 = SocketAddr::new(IpAddr::V6(Ipv6Addr::LOCALHOST), 5000);
3475        let normalized_v6 = normalize_wildcard_to_loopback(loopback_v6);
3476        assert_eq!(normalized_v6, loopback_v6);
3477
3478        let loopback_v4 = SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), 5000);
3479        let normalized_v4 = normalize_wildcard_to_loopback(loopback_v4);
3480        assert_eq!(normalized_v4, loopback_v4);
3481    }
3482}