saorsa_core/
network.rs

1// Copyright 2024 Saorsa Labs Limited
2//
3// This software is dual-licensed under:
4// - GNU Affero General Public License v3.0 or later (AGPL-3.0-or-later)
5// - Commercial License
6//
7// For AGPL-3.0 license, see LICENSE-AGPL-3.0
8// For commercial licensing, contact: saorsalabs@gmail.com
9//
10// Unless required by applicable law or agreed to in writing, software
11// distributed under these licenses is distributed on an "AS IS" BASIS,
12// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
14//! Network module
15//!
16//! This module provides core networking functionality for the P2P Foundation.
17//! It handles peer connections, network events, and node lifecycle management.
18
19use crate::bgp_geo_provider::BgpGeoProvider;
20use crate::bootstrap::{BootstrapManager, ContactEntry, QualityMetrics};
21use crate::config::Config;
22use crate::control::RejectionMessage;
23use crate::dht::DHT;
24use crate::error::{NetworkError, P2PError, P2pResult as Result};
25use crate::identity::rejection::RejectionReason;
26use crate::security::GeoProvider;
27
28use crate::production::{ProductionConfig, ResourceManager, ResourceMetrics};
29use crate::transport::ant_quic_adapter::DualStackNetworkNode;
30#[allow(unused_imports)] // Temporarily unused during migration
31use crate::transport::{TransportOptions, TransportType};
32use crate::validation::RateLimitConfig;
33use crate::validation::RateLimiter;
34use crate::{NetworkAddress, PeerId};
35use serde::{Deserialize, Serialize};
36use serde_json::json;
37use std::collections::{HashMap, HashSet};
38use std::sync::Arc;
39use std::sync::atomic::{AtomicBool, Ordering};
40use std::time::Duration;
41use tokio::sync::{RwLock, broadcast};
42use tokio::time::Instant;
43use tracing::{debug, info, trace, warn};
44
45/// Configuration for a P2P node
46#[derive(Debug, Clone, Serialize, Deserialize)]
47pub struct NodeConfig {
48    /// Local peer ID for this node
49    pub peer_id: Option<PeerId>,
50
51    /// Addresses to listen on for incoming connections
52    pub listen_addrs: Vec<std::net::SocketAddr>,
53
54    /// Primary listen address (for compatibility)
55    pub listen_addr: std::net::SocketAddr,
56
57    /// Bootstrap peers to connect to on startup (legacy)
58    pub bootstrap_peers: Vec<std::net::SocketAddr>,
59
60    /// Bootstrap peers as strings (for integration tests)
61    pub bootstrap_peers_str: Vec<String>,
62
63    /// Enable IPv6 support
64    pub enable_ipv6: bool,
65
66    // MCP removed; will be redesigned later
67    /// Connection timeout duration
68    pub connection_timeout: Duration,
69
70    /// Keep-alive interval for connections
71    pub keep_alive_interval: Duration,
72
73    /// Maximum number of concurrent connections
74    pub max_connections: usize,
75
76    /// Maximum number of incoming connections
77    pub max_incoming_connections: usize,
78
79    /// DHT configuration
80    pub dht_config: DHTConfig,
81
82    /// Security configuration
83    pub security_config: SecurityConfig,
84
85    /// Production hardening configuration
86    pub production_config: Option<ProductionConfig>,
87
88    /// Bootstrap cache configuration
89    pub bootstrap_cache_config: Option<crate::bootstrap::CacheConfig>,
90
91    /// Optional IP diversity configuration for Sybil protection tuning.
92    ///
93    /// When set, this configuration is used by bootstrap peer discovery and
94    /// other diversity-enforcing subsystems. If `None`, defaults are used.
95    pub diversity_config: Option<crate::security::IPDiversityConfig>,
96}
97
98/// DHT-specific configuration
99#[derive(Debug, Clone, Serialize, Deserialize)]
100pub struct DHTConfig {
101    /// Kademlia K parameter (bucket size)
102    pub k_value: usize,
103
104    /// Kademlia alpha parameter (parallelism)
105    pub alpha_value: usize,
106
107    /// DHT record TTL
108    pub record_ttl: Duration,
109
110    /// DHT refresh interval
111    pub refresh_interval: Duration,
112}
113
114/// Security configuration
115#[derive(Debug, Clone, Serialize, Deserialize)]
116pub struct SecurityConfig {
117    /// Enable noise protocol for encryption
118    pub enable_noise: bool,
119
120    /// Enable TLS for secure transport
121    pub enable_tls: bool,
122
123    /// Trust level for peer verification
124    pub trust_level: TrustLevel,
125}
126
127/// Trust level for peer verification
128#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
129pub enum TrustLevel {
130    /// No verification required
131    None,
132    /// Basic peer ID verification
133    Basic,
134    /// Full cryptographic verification
135    Full,
136}
137
138impl NodeConfig {
139    /// Create a new NodeConfig with default values
140    ///
141    /// # Errors
142    ///
143    /// Returns an error if default addresses cannot be parsed
144    pub fn new() -> Result<Self> {
145        // Load config and use its defaults
146        let config = Config::default();
147
148        // Parse the default listen address
149        let listen_addr = config.listen_socket_addr()?;
150
151        // Create listen addresses based on config
152        let mut listen_addrs = vec![];
153
154        // Add IPv6 address if enabled
155        if config.network.ipv6_enabled {
156            let ipv6_addr = std::net::SocketAddr::new(
157                std::net::IpAddr::V6(std::net::Ipv6Addr::UNSPECIFIED),
158                listen_addr.port(),
159            );
160            listen_addrs.push(ipv6_addr);
161        }
162
163        // Always add IPv4
164        let ipv4_addr = std::net::SocketAddr::new(
165            std::net::IpAddr::V4(std::net::Ipv4Addr::UNSPECIFIED),
166            listen_addr.port(),
167        );
168        listen_addrs.push(ipv4_addr);
169
170        Ok(Self {
171            peer_id: None,
172            listen_addrs,
173            listen_addr,
174            bootstrap_peers: Vec::new(),
175            bootstrap_peers_str: config.network.bootstrap_nodes.clone(),
176            enable_ipv6: config.network.ipv6_enabled,
177
178            connection_timeout: Duration::from_secs(config.network.connection_timeout),
179            keep_alive_interval: Duration::from_secs(config.network.keepalive_interval),
180            max_connections: config.network.max_connections,
181            max_incoming_connections: config.security.connection_limit as usize,
182            dht_config: DHTConfig::default(),
183            security_config: SecurityConfig::default(),
184            production_config: None,
185            bootstrap_cache_config: None,
186            diversity_config: None,
187            // identity_config: None,
188        })
189    }
190}
191
192impl Default for NodeConfig {
193    fn default() -> Self {
194        // Use config defaults for network settings
195        let config = Config::default();
196
197        // Parse the default listen address - use safe fallback if parsing fails
198        let listen_addr = config.listen_socket_addr().unwrap_or_else(|_| {
199            std::net::SocketAddr::new(
200                std::net::IpAddr::V4(std::net::Ipv4Addr::new(0, 0, 0, 0)),
201                9000,
202            )
203        });
204
205        Self {
206            peer_id: None,
207            listen_addrs: vec![
208                std::net::SocketAddr::new(
209                    std::net::IpAddr::V6(std::net::Ipv6Addr::UNSPECIFIED),
210                    listen_addr.port(),
211                ),
212                std::net::SocketAddr::new(
213                    std::net::IpAddr::V4(std::net::Ipv4Addr::UNSPECIFIED),
214                    listen_addr.port(),
215                ),
216            ],
217            listen_addr,
218            bootstrap_peers: Vec::new(),
219            bootstrap_peers_str: Vec::new(),
220            enable_ipv6: config.network.ipv6_enabled,
221
222            connection_timeout: Duration::from_secs(config.network.connection_timeout),
223            keep_alive_interval: Duration::from_secs(config.network.keepalive_interval),
224            max_connections: config.network.max_connections,
225            max_incoming_connections: config.security.connection_limit as usize,
226            dht_config: DHTConfig::default(),
227            security_config: SecurityConfig::default(),
228            production_config: None, // Use default production config if enabled
229            bootstrap_cache_config: None,
230            diversity_config: None,
231            // identity_config: None, // Use default identity config if enabled
232        }
233    }
234}
235
236impl NodeConfig {
237    /// Create NodeConfig from Config
238    pub fn from_config(config: &Config) -> Result<Self> {
239        let listen_addr = config.listen_socket_addr()?;
240        let bootstrap_addrs = config.bootstrap_addrs()?;
241
242        let mut node_config = Self {
243            peer_id: None,
244            listen_addrs: vec![listen_addr],
245            listen_addr,
246            bootstrap_peers: bootstrap_addrs
247                .iter()
248                .map(|addr| addr.socket_addr())
249                .collect(),
250            bootstrap_peers_str: config
251                .network
252                .bootstrap_nodes
253                .iter()
254                .map(|addr| addr.to_string())
255                .collect(),
256            enable_ipv6: config.network.ipv6_enabled,
257
258            connection_timeout: Duration::from_secs(config.network.connection_timeout),
259            keep_alive_interval: Duration::from_secs(config.network.keepalive_interval),
260            max_connections: config.network.max_connections,
261            max_incoming_connections: config.security.connection_limit as usize,
262            dht_config: DHTConfig {
263                k_value: 20,
264                alpha_value: 3,
265                record_ttl: Duration::from_secs(3600),
266                refresh_interval: Duration::from_secs(900),
267            },
268            security_config: SecurityConfig {
269                enable_noise: true,
270                enable_tls: true,
271                trust_level: TrustLevel::Basic,
272            },
273            production_config: Some(ProductionConfig {
274                max_connections: config.network.max_connections,
275                max_memory_bytes: 0,  // unlimited
276                max_bandwidth_bps: 0, // unlimited
277                connection_timeout: Duration::from_secs(config.network.connection_timeout),
278                keep_alive_interval: Duration::from_secs(config.network.keepalive_interval),
279                health_check_interval: Duration::from_secs(30),
280                metrics_interval: Duration::from_secs(60),
281                enable_performance_tracking: true,
282                enable_auto_cleanup: true,
283                shutdown_timeout: Duration::from_secs(30),
284                rate_limits: crate::production::RateLimitConfig::default(),
285            }),
286            bootstrap_cache_config: None,
287            diversity_config: None,
288            // identity_config: Some(IdentityManagerConfig {
289            //     cache_ttl: Duration::from_secs(3600),
290            //     challenge_timeout: Duration::from_secs(30),
291            // }),
292        };
293
294        // Add IPv6 listen address if enabled
295        if config.network.ipv6_enabled {
296            node_config.listen_addrs.push(std::net::SocketAddr::new(
297                std::net::IpAddr::V6(std::net::Ipv6Addr::UNSPECIFIED),
298                listen_addr.port(),
299            ));
300        }
301
302        Ok(node_config)
303    }
304
305    /// Try to build a NodeConfig from a listen address string
306    pub fn with_listen_addr(addr: &str) -> Result<Self> {
307        let listen_addr: std::net::SocketAddr = addr
308            .parse()
309            .map_err(|e: std::net::AddrParseError| {
310                NetworkError::InvalidAddress(e.to_string().into())
311            })
312            .map_err(P2PError::Network)?;
313        let cfg = NodeConfig {
314            listen_addr,
315            listen_addrs: vec![listen_addr],
316            diversity_config: None,
317            ..Default::default()
318        };
319        Ok(cfg)
320    }
321}
322
323impl Default for DHTConfig {
324    fn default() -> Self {
325        Self {
326            k_value: 20,
327            alpha_value: 5,
328            record_ttl: Duration::from_secs(3600), // 1 hour
329            refresh_interval: Duration::from_secs(600), // 10 minutes
330        }
331    }
332}
333
334impl Default for SecurityConfig {
335    fn default() -> Self {
336        Self {
337            enable_noise: true,
338            enable_tls: true,
339            trust_level: TrustLevel::Basic,
340        }
341    }
342}
343
344/// Information about a connected peer
345#[derive(Debug, Clone)]
346pub struct PeerInfo {
347    /// Peer identifier
348    pub peer_id: PeerId,
349
350    /// Peer's addresses
351    pub addresses: Vec<String>,
352
353    /// Connection timestamp
354    pub connected_at: Instant,
355
356    /// Last seen timestamp
357    pub last_seen: Instant,
358
359    /// Connection status
360    pub status: ConnectionStatus,
361
362    /// Supported protocols
363    pub protocols: Vec<String>,
364
365    /// Number of heartbeats received
366    pub heartbeat_count: u64,
367}
368
369/// Connection status for a peer
370#[derive(Debug, Clone, PartialEq)]
371pub enum ConnectionStatus {
372    /// Connection is being established
373    Connecting,
374    /// Connection is established and active
375    Connected,
376    /// Connection is being closed
377    Disconnecting,
378    /// Connection is closed
379    Disconnected,
380    /// Connection failed
381    Failed(String),
382}
383
384/// Network events that can occur
385#[derive(Debug, Clone)]
386pub enum NetworkEvent {
387    /// A new peer has connected
388    PeerConnected {
389        /// The identifier of the newly connected peer
390        peer_id: PeerId,
391        /// The network addresses where the peer can be reached
392        addresses: Vec<String>,
393    },
394
395    /// A peer has disconnected
396    PeerDisconnected {
397        /// The identifier of the disconnected peer
398        peer_id: PeerId,
399        /// The reason for the disconnection
400        reason: String,
401    },
402
403    /// A message was received from a peer
404    MessageReceived {
405        /// The identifier of the sending peer
406        peer_id: PeerId,
407        /// The protocol used for the message
408        protocol: String,
409        /// The raw message data
410        data: Vec<u8>,
411    },
412
413    /// A connection attempt failed
414    ConnectionFailed {
415        /// The identifier of the peer (if known)
416        peer_id: Option<PeerId>,
417        /// The address where connection was attempted
418        address: String,
419        /// The error message describing the failure
420        error: String,
421    },
422
423    /// DHT record was stored
424    DHTRecordStored {
425        /// The DHT key where the record was stored
426        key: Vec<u8>,
427        /// The value that was stored
428        value: Vec<u8>,
429    },
430
431    /// DHT record was retrieved
432    DHTRecordRetrieved {
433        /// The DHT key that was queried
434        key: Vec<u8>,
435        /// The retrieved value, if found
436        value: Option<Vec<u8>>,
437    },
438}
439
440/// Network events that can occur in the P2P system
441///
442/// Events are broadcast to all listeners and provide real-time
443/// notifications of network state changes and message arrivals.
444#[derive(Debug, Clone)]
445pub enum P2PEvent {
446    /// Message received from a peer on a specific topic
447    Message {
448        /// Topic or channel the message was sent on
449        topic: String,
450        /// Peer ID of the message sender
451        source: PeerId,
452        /// Raw message data payload
453        data: Vec<u8>,
454    },
455    /// A new peer has connected to the network
456    PeerConnected(PeerId),
457    /// A peer has disconnected from the network
458    PeerDisconnected(PeerId),
459}
460
461/// Main P2P node structure
462/// Main P2P network node that manages connections, routing, and communication
463///
464/// This struct represents a complete P2P network participant that can:
465/// - Connect to other peers via QUIC transport
466/// - Participate in distributed hash table (DHT) operations
467/// - Send and receive messages through various protocols
468/// - Handle network events and peer lifecycle
469/// - Provide MCP (Model Context Protocol) services
470pub struct P2PNode {
471    /// Node configuration
472    config: NodeConfig,
473
474    /// Our peer ID
475    peer_id: PeerId,
476
477    /// Connected peers
478    peers: Arc<RwLock<HashMap<PeerId, PeerInfo>>>,
479
480    /// Network event broadcaster
481    event_tx: broadcast::Sender<P2PEvent>,
482
483    /// Listen addresses
484    listen_addrs: RwLock<Vec<std::net::SocketAddr>>,
485
486    /// Node start time
487    start_time: Instant,
488
489    /// Running state
490    running: RwLock<bool>,
491
492    /// DHT instance (optional)
493    dht: Option<Arc<RwLock<DHT>>>,
494
495    /// Production resource manager (optional)
496    resource_manager: Option<Arc<ResourceManager>>,
497
498    /// Bootstrap cache manager for peer discovery
499    bootstrap_manager: Option<Arc<RwLock<BootstrapManager>>>,
500
501    /// Dual-stack ant-quic nodes (IPv6 + IPv4) with Happy Eyeballs dialing
502    dual_node: Arc<DualStackNetworkNode>,
503
504    /// Rate limiter for connection and request throttling
505    #[allow(dead_code)]
506    rate_limiter: Arc<RateLimiter>,
507
508    /// Active connections (tracked by peer_id)
509    /// This set is synchronized with ant-quic's connection state via event monitoring
510    active_connections: Arc<RwLock<HashSet<PeerId>>>,
511
512    /// Security dashboard for monitoring
513    pub security_dashboard: Option<Arc<crate::dht::metrics::SecurityDashboard>>,
514
515    /// Connection lifecycle monitor task handle
516    #[allow(dead_code)]
517    connection_monitor_handle: Arc<RwLock<Option<tokio::task::JoinHandle<()>>>>,
518
519    /// Keepalive task handle
520    #[allow(dead_code)]
521    keepalive_handle: Arc<RwLock<Option<tokio::task::JoinHandle<()>>>>,
522
523    /// Shutdown flag for background tasks
524    #[allow(dead_code)]
525    shutdown: Arc<AtomicBool>,
526
527    /// GeoIP provider for connection validation
528    #[allow(dead_code)]
529    geo_provider: Arc<BgpGeoProvider>,
530}
531
532/// Normalize wildcard bind addresses to localhost loopback addresses
533///
534/// ant-quic correctly rejects "unspecified" addresses (0.0.0.0 and [::]) for remote connections
535/// because you cannot connect TO an unspecified address - these are only valid for BINDING.
536///
537/// This function converts wildcard addresses to appropriate loopback addresses for local connections:
538/// - IPv6 [::]:port → ::1:port (IPv6 loopback)
539/// - IPv4 0.0.0.0:port → 127.0.0.1:port (IPv4 loopback)
540/// - All other addresses pass through unchanged
541///
542/// # Arguments
543/// * `addr` - The SocketAddr to normalize
544///
545/// # Returns
546/// * Normalized SocketAddr suitable for remote connections
547fn normalize_wildcard_to_loopback(addr: std::net::SocketAddr) -> std::net::SocketAddr {
548    use std::net::{IpAddr, Ipv4Addr, Ipv6Addr};
549
550    if addr.ip().is_unspecified() {
551        // Convert unspecified addresses to loopback
552        let loopback_ip = match addr {
553            std::net::SocketAddr::V6(_) => IpAddr::V6(Ipv6Addr::LOCALHOST), // ::1
554            std::net::SocketAddr::V4(_) => IpAddr::V4(Ipv4Addr::LOCALHOST), // 127.0.0.1
555        };
556        std::net::SocketAddr::new(loopback_ip, addr.port())
557    } else {
558        // Not a wildcard address, pass through unchanged
559        addr
560    }
561}
562
563impl P2PNode {
564    /// Minimal constructor for tests that avoids real networking
565    pub fn new_for_tests() -> Result<Self> {
566        let (event_tx, _) = broadcast::channel(16);
567        Ok(Self {
568            config: NodeConfig::default(),
569            peer_id: "test_peer".to_string(),
570            peers: Arc::new(RwLock::new(HashMap::new())),
571            event_tx,
572            listen_addrs: RwLock::new(Vec::new()),
573            start_time: Instant::now(),
574            running: RwLock::new(false),
575            dht: None,
576            resource_manager: None,
577            bootstrap_manager: None,
578            dual_node: {
579                // Bind dual-stack nodes on ephemeral ports for tests
580                let v6: Option<std::net::SocketAddr> = "[::1]:0"
581                    .parse()
582                    .ok()
583                    .or(Some(std::net::SocketAddr::from(([0, 0, 0, 0], 0))));
584                let v4: Option<std::net::SocketAddr> = "127.0.0.1:0".parse().ok();
585                let handle = tokio::runtime::Handle::current();
586                let dual_attempt = handle.block_on(
587                    crate::transport::ant_quic_adapter::DualStackNetworkNode::new(v6, v4),
588                );
589                let dual = match dual_attempt {
590                    Ok(d) => d,
591                    Err(_e1) => {
592                        // Fallback to IPv4-only ephemeral bind
593                        let fallback = handle.block_on(
594                            crate::transport::ant_quic_adapter::DualStackNetworkNode::new(
595                                None,
596                                "127.0.0.1:0".parse().ok(),
597                            ),
598                        );
599                        match fallback {
600                            Ok(d) => d,
601                            Err(e2) => {
602                                return Err(P2PError::Network(NetworkError::BindError(
603                                    format!("Failed to create dual-stack network node: {}", e2)
604                                        .into(),
605                                )));
606                            }
607                        }
608                    }
609                };
610                Arc::new(dual)
611            },
612            rate_limiter: Arc::new(RateLimiter::new(RateLimitConfig {
613                max_requests: 100,
614                burst_size: 100,
615                window: std::time::Duration::from_secs(1),
616                ..Default::default()
617            })),
618            active_connections: Arc::new(RwLock::new(HashSet::new())),
619            connection_monitor_handle: Arc::new(RwLock::new(None)),
620            keepalive_handle: Arc::new(RwLock::new(None)),
621            shutdown: Arc::new(AtomicBool::new(false)),
622            geo_provider: Arc::new(BgpGeoProvider::new()),
623            security_dashboard: None,
624        })
625    }
626    /// Create a new P2P node with the given configuration
627    pub async fn new(config: NodeConfig) -> Result<Self> {
628        let peer_id = config.peer_id.clone().unwrap_or_else(|| {
629            // Generate a random peer ID for now
630            format!("peer_{}", &uuid::Uuid::new_v4().to_string()[..8])
631        });
632
633        let (event_tx, _) = broadcast::channel(1000);
634
635        // Initialize and register a TrustWeightedKademlia DHT for the global API
636        // Use a deterministic local NodeId derived from the peer_id
637        {
638            use blake3::Hasher;
639            let mut hasher = Hasher::new();
640            hasher.update(peer_id.as_bytes());
641            let digest = hasher.finalize();
642            let mut nid = [0u8; 32];
643            nid.copy_from_slice(digest.as_bytes());
644            let _twdht = std::sync::Arc::new(crate::dht::TrustWeightedKademlia::new(
645                crate::identity::node_identity::NodeId::from_bytes(nid),
646            ));
647            // TODO: Update to use new clean API
648            // let _ = crate::api::set_dht_instance(twdht);
649        }
650
651        // Initialize DHT if needed
652        let (dht, security_dashboard) = if true {
653            // Assuming DHT is always enabled for now, or check config
654            let _dht_config = crate::dht::DHTConfig {
655                replication_factor: config.dht_config.k_value,
656                bucket_size: config.dht_config.k_value,
657                alpha: config.dht_config.alpha_value,
658                record_ttl: config.dht_config.record_ttl,
659                bucket_refresh_interval: config.dht_config.refresh_interval,
660                republish_interval: config.dht_config.refresh_interval,
661                max_distance: 160,
662            };
663            // Convert peer_id String to NodeId
664            let peer_bytes = peer_id.as_bytes();
665            let mut node_id_bytes = [0u8; 32];
666            let len = peer_bytes.len().min(32);
667            node_id_bytes[..len].copy_from_slice(&peer_bytes[..len]);
668            let node_id = crate::dht::core_engine::NodeId::from_bytes(node_id_bytes);
669            let dht_instance = DHT::new(node_id).map_err(|e| {
670                crate::error::P2PError::Dht(crate::error::DhtError::StoreFailed(
671                    e.to_string().into(),
672                ))
673            })?;
674            dht_instance.start_maintenance_tasks();
675
676            // Create Security Dashboard
677            let security_metrics = dht_instance.security_metrics();
678            let dashboard = crate::dht::metrics::SecurityDashboard::new(
679                security_metrics,
680                Arc::new(crate::dht::metrics::DhtMetricsCollector::new()),
681                Arc::new(crate::dht::metrics::TrustMetricsCollector::new()),
682                Arc::new(crate::dht::metrics::PlacementMetricsCollector::new()),
683            );
684
685            (
686                Some(Arc::new(RwLock::new(dht_instance))),
687                Some(Arc::new(dashboard)),
688            )
689        } else {
690            (None, None)
691        };
692
693        // MCP removed
694
695        // Initialize production resource manager if configured
696        let resource_manager = config
697            .production_config
698            .clone()
699            .map(|prod_config| Arc::new(ResourceManager::new(prod_config)));
700
701        // Initialize bootstrap cache manager
702        let diversity_config = config.diversity_config.clone().unwrap_or_default();
703        let bootstrap_manager = if let Some(ref cache_config) = config.bootstrap_cache_config {
704            match BootstrapManager::with_full_config(
705                cache_config.clone(),
706                crate::rate_limit::JoinRateLimiterConfig::default(),
707                diversity_config.clone(),
708            )
709            .await
710            {
711                Ok(manager) => Some(Arc::new(RwLock::new(manager))),
712                Err(e) => {
713                    warn!(
714                        "Failed to initialize bootstrap manager: {}, continuing without cache",
715                        e
716                    );
717                    None
718                }
719            }
720        } else {
721            match BootstrapManager::with_full_config(
722                crate::bootstrap::CacheConfig::default(),
723                crate::rate_limit::JoinRateLimiterConfig::default(),
724                diversity_config,
725            )
726            .await
727            {
728                Ok(manager) => Some(Arc::new(RwLock::new(manager))),
729                Err(e) => {
730                    warn!(
731                        "Failed to initialize bootstrap manager: {}, continuing without cache",
732                        e
733                    );
734                    None
735                }
736            }
737        };
738
739        // Initialize dual-stack ant-quic nodes
740        // Determine bind addresses
741        let (v6_opt, v4_opt) = {
742            let port = config.listen_addr.port();
743            let ip = config.listen_addr.ip();
744
745            let v4_addr = if ip.is_ipv4() {
746                Some(std::net::SocketAddr::new(ip, port))
747            } else {
748                // If config is IPv6, we still might want IPv4 on UNSPECIFIED if dual stack is desired
749                // But for now let's just stick to defaults if not specified
750                Some(std::net::SocketAddr::new(
751                    std::net::IpAddr::V4(std::net::Ipv4Addr::UNSPECIFIED),
752                    port,
753                ))
754            };
755
756            let v6_addr = if config.enable_ipv6 {
757                if ip.is_ipv6() {
758                    Some(std::net::SocketAddr::new(ip, port))
759                } else {
760                    Some(std::net::SocketAddr::new(
761                        std::net::IpAddr::V6(std::net::Ipv6Addr::UNSPECIFIED),
762                        port,
763                    ))
764                }
765            } else {
766                None
767            };
768            (v6_addr, v4_addr)
769        };
770
771        let dual_node = Arc::new(
772            DualStackNetworkNode::new(v6_opt, v4_opt)
773                .await
774                .map_err(|e| {
775                    P2PError::Transport(crate::error::TransportError::SetupFailed(
776                        format!("Failed to create dual-stack network nodes: {}", e).into(),
777                    ))
778                })?,
779        );
780
781        // Initialize rate limiter with default config
782        let rate_limiter = Arc::new(RateLimiter::new(
783            crate::validation::RateLimitConfig::default(),
784        ));
785
786        // Create active connections tracker
787        let active_connections = Arc::new(RwLock::new(HashSet::new()));
788
789        // Initialize GeoIP provider
790        let geo_provider = Arc::new(BgpGeoProvider::new());
791
792        // Create peers map
793        let peers = Arc::new(RwLock::new(HashMap::new()));
794
795        // Start connection lifecycle monitor
796        let connection_monitor_handle = {
797            let active_conns = Arc::clone(&active_connections);
798            let peers_map = Arc::clone(&peers);
799            let event_tx_clone = event_tx.clone();
800            let dual_node_clone = Arc::clone(&dual_node);
801            let geo_provider_clone = Arc::clone(&geo_provider);
802            let peer_id_clone = peer_id.clone();
803
804            let handle = tokio::spawn(async move {
805                Self::connection_lifecycle_monitor(
806                    dual_node_clone,
807                    active_conns,
808                    peers_map,
809                    event_tx_clone,
810                    geo_provider_clone,
811                    peer_id_clone,
812                )
813                .await;
814            });
815
816            Arc::new(RwLock::new(Some(handle)))
817        };
818
819        // Spawn keepalive task
820        let shutdown = Arc::new(AtomicBool::new(false));
821        let keepalive_handle = {
822            let active_conns = Arc::clone(&active_connections);
823            let dual_node_clone = Arc::clone(&dual_node);
824            let shutdown_clone = Arc::clone(&shutdown);
825
826            let handle = tokio::spawn(async move {
827                Self::keepalive_task(active_conns, dual_node_clone, shutdown_clone).await;
828            });
829
830            Arc::new(RwLock::new(Some(handle)))
831        };
832
833        let node = Self {
834            config,
835            peer_id,
836            peers,
837            event_tx,
838            listen_addrs: RwLock::new(Vec::new()),
839            start_time: Instant::now(),
840            running: RwLock::new(false),
841            dht,
842            resource_manager,
843            bootstrap_manager,
844            dual_node,
845            rate_limiter,
846            active_connections,
847            security_dashboard,
848            connection_monitor_handle,
849            keepalive_handle,
850            shutdown,
851            geo_provider,
852        };
853        info!("Created P2P node with peer ID: {}", node.peer_id);
854
855        // Start the network listeners to populate listen addresses
856        node.start_network_listeners().await?;
857
858        // Update the connection monitor with actual peers reference
859        node.start_connection_monitor().await;
860
861        Ok(node)
862    }
863
864    /// Create a new node builder
865    pub fn builder() -> NodeBuilder {
866        NodeBuilder::new()
867    }
868
869    /// Get the peer ID of this node
870    pub fn peer_id(&self) -> &PeerId {
871        &self.peer_id
872    }
873
874    pub fn local_addr(&self) -> Option<String> {
875        self.listen_addrs
876            .try_read()
877            .ok()
878            .and_then(|addrs| addrs.first().map(|a| a.to_string()))
879    }
880
881    pub async fn subscribe(&self, topic: &str) -> Result<()> {
882        // In a real implementation, this would register the topic with the pubsub mechanism.
883        // For now, we just log it.
884        info!("Subscribed to topic: {}", topic);
885        Ok(())
886    }
887
888    pub async fn publish(&self, topic: &str, data: &[u8]) -> Result<()> {
889        info!(
890            "Publishing message to topic: {} ({} bytes)",
891            topic,
892            data.len()
893        );
894
895        // Get list of connected peers
896        let peer_list: Vec<PeerId> = {
897            let peers_guard = self.peers.read().await;
898            peers_guard.keys().cloned().collect()
899        };
900
901        if peer_list.is_empty() {
902            debug!("No peers connected, message will only be sent to local subscribers");
903        } else {
904            // Send message to all connected peers
905            let mut send_count = 0;
906            for peer_id in &peer_list {
907                match self.send_message(peer_id, topic, data.to_vec()).await {
908                    Ok(_) => {
909                        send_count += 1;
910                        debug!("Sent message to peer: {}", peer_id);
911                    }
912                    Err(e) => {
913                        warn!("Failed to send message to peer {}: {}", peer_id, e);
914                    }
915                }
916            }
917            info!(
918                "Published message to {}/{} connected peers",
919                send_count,
920                peer_list.len()
921            );
922        }
923
924        // Also send to local subscribers (for local echo and testing)
925        let event = P2PEvent::Message {
926            topic: topic.to_string(),
927            source: self.peer_id.clone(),
928            data: data.to_vec(),
929        };
930        let _ = self.event_tx.send(event);
931
932        Ok(())
933    }
934
935    /// Get the node configuration
936    pub fn config(&self) -> &NodeConfig {
937        &self.config
938    }
939
940    /// Start the P2P node
941    pub async fn start(&self) -> Result<()> {
942        info!("Starting P2P node...");
943
944        // Start production resource manager if configured
945        if let Some(ref resource_manager) = self.resource_manager {
946            resource_manager.start().await.map_err(|e| {
947                P2PError::Network(crate::error::NetworkError::ProtocolError(
948                    format!("Failed to start resource manager: {e}").into(),
949                ))
950            })?;
951            info!("Production resource manager started");
952        }
953
954        // Start bootstrap manager background tasks
955        if let Some(ref bootstrap_manager) = self.bootstrap_manager {
956            let mut manager = bootstrap_manager.write().await;
957            manager.start_background_tasks().await.map_err(|e| {
958                P2PError::Network(crate::error::NetworkError::ProtocolError(
959                    format!("Failed to start bootstrap manager: {e}").into(),
960                ))
961            })?;
962            info!("Bootstrap cache manager started");
963        }
964
965        // Set running state
966        *self.running.write().await = true;
967
968        // Start listening on configured addresses using transport layer
969        self.start_network_listeners().await?;
970
971        // Log current listen addresses
972        let listen_addrs = self.listen_addrs.read().await;
973        info!("P2P node started on addresses: {:?}", *listen_addrs);
974
975        // MCP removed
976
977        // Start message receiving system
978        self.start_message_receiving_system().await?;
979
980        // Connect to bootstrap peers
981        self.connect_bootstrap_peers().await?;
982
983        Ok(())
984    }
985
986    /// Start network listeners on configured addresses
987    async fn start_network_listeners(&self) -> Result<()> {
988        info!("Starting dual-stack listeners (ant-quic)...");
989        // Update our listen_addrs from the dual node bindings
990        let addrs = self.dual_node.local_addrs().await.map_err(|e| {
991            P2PError::Transport(crate::error::TransportError::SetupFailed(
992                format!("Failed to get local addresses: {}", e).into(),
993            ))
994        })?;
995        {
996            let mut la = self.listen_addrs.write().await;
997            *la = addrs.clone();
998        }
999
1000        // Spawn a background accept loop that handles incoming connections from either stack
1001        let event_tx = self.event_tx.clone();
1002        let peers = self.peers.clone();
1003        let active_connections = self.active_connections.clone();
1004        let rate_limiter = self.rate_limiter.clone();
1005        let dual = self.dual_node.clone();
1006        tokio::spawn(async move {
1007            loop {
1008                match dual.accept_any().await {
1009                    Ok((ant_peer_id, remote_sock)) => {
1010                        let peer_id =
1011                            crate::transport::ant_quic_adapter::ant_peer_id_to_string(&ant_peer_id);
1012                        let remote_addr = NetworkAddress::from(remote_sock);
1013                        // Optional: basic IP rate limiting
1014                        let _ = rate_limiter.check_ip(&remote_sock.ip());
1015                        let _ = event_tx.send(P2PEvent::PeerConnected(peer_id.clone()));
1016                        register_new_peer(&peers, &peer_id, &remote_addr).await;
1017                        active_connections.write().await.insert(peer_id);
1018                    }
1019                    Err(e) => {
1020                        warn!("Accept failed: {}", e);
1021                        tokio::time::sleep(std::time::Duration::from_millis(200)).await;
1022                    }
1023                }
1024            }
1025        });
1026
1027        info!("Dual-stack listeners active on: {:?}", addrs);
1028        Ok(())
1029    }
1030
1031    /// Start a listener on a specific socket address
1032    #[allow(dead_code)]
1033    async fn start_listener_on_address(&self, addr: std::net::SocketAddr) -> Result<()> {
1034        // use crate::transport::{Transport}; // Unused during migration
1035
1036        // DISABLED during ant-quic migration - TODO: Reimplement using AntQuicAdapter
1037        /*
1038        // Try QUIC first (preferred transport)
1039        match crate::transport::QuicTransport::new(Default::default()) {
1040            Ok(quic_transport) => {
1041                match quic_transport.listen(NetworkAddress::new(addr)).await {
1042                    Ok(listen_addrs) => {
1043                        info!("QUIC listener started on {} -> {:?}", addr, listen_addrs);
1044
1045                        // Store the actual listening addresses in the node
1046                        {
1047                            let mut node_listen_addrs = self.listen_addrs.write().await;
1048                            // Don't clear - accumulate addresses from multiple listeners
1049                            node_listen_addrs.push(listen_addrs.socket_addr());
1050                        }
1051
1052                        // Start accepting connections in background
1053                        self.start_connection_acceptor(
1054                            Arc::new(quic_transport),
1055                            addr,
1056                            crate::transport::TransportType::QUIC
1057                        ).await?;
1058
1059                        return Ok(());
1060                    }
1061                    Err(e) => {
1062                        warn!("Failed to start QUIC listener on {}: {}", addr, e);
1063                    }
1064                }
1065            }
1066            Err(e) => {
1067                warn!("Failed to create QUIC transport for listening: {}", e);
1068            }
1069        }
1070        */
1071
1072        warn!("QUIC transport temporarily disabled during ant-quic migration");
1073        // No TCP fallback - QUIC only
1074        Err(crate::P2PError::Transport(
1075            crate::error::TransportError::SetupFailed(
1076                format!(
1077                    "Failed to start QUIC listener on {addr} - transport disabled during migration"
1078                )
1079                .into(),
1080            ),
1081        ))
1082    }
1083
1084    /// Start connection acceptor background task
1085    #[allow(dead_code)] // Deprecated during ant-quic migration
1086    async fn start_connection_acceptor(
1087        &self,
1088        transport: Arc<dyn crate::transport::Transport>,
1089        addr: std::net::SocketAddr,
1090        transport_type: crate::transport::TransportType,
1091    ) -> Result<()> {
1092        info!(
1093            "Starting connection acceptor for {:?} on {}",
1094            transport_type, addr
1095        );
1096
1097        // Clone necessary data for the background task
1098        let event_tx = self.event_tx.clone();
1099        let _peer_id = self.peer_id.clone();
1100        let peers = Arc::clone(&self.peers);
1101        // ant-quic dual-stack node is managed separately; accept loop started in start_network_listeners
1102
1103        let rate_limiter = Arc::clone(&self.rate_limiter);
1104
1105        // Spawn background task to accept incoming connections
1106        tokio::spawn(async move {
1107            loop {
1108                match transport.accept().await {
1109                    Ok(connection) => {
1110                        let remote_addr = connection.remote_addr();
1111                        let connection_peer_id =
1112                            format!("peer_from_{}", remote_addr.to_string().replace(":", "_"));
1113
1114                        // Apply rate limiting for incoming connections
1115                        let socket_addr = remote_addr.socket_addr();
1116                        if check_rate_limit(&rate_limiter, &socket_addr, &remote_addr).is_err() {
1117                            // Connection dropped automatically when it goes out of scope
1118                            continue;
1119                        }
1120
1121                        info!(
1122                            "Accepted {:?} connection from {} (peer: {})",
1123                            transport_type, remote_addr, connection_peer_id
1124                        );
1125
1126                        // Generate peer connected event
1127                        let _ = event_tx.send(P2PEvent::PeerConnected(connection_peer_id.clone()));
1128
1129                        // Store the peer connection
1130                        register_new_peer(&peers, &connection_peer_id, &remote_addr).await;
1131
1132                        // Spawn task to handle this specific connection's messages
1133                        spawn_connection_handler(
1134                            connection,
1135                            connection_peer_id,
1136                            event_tx.clone(),
1137                            Arc::clone(&peers),
1138                        );
1139                    }
1140                    Err(e) => {
1141                        warn!(
1142                            "Failed to accept {:?} connection on {}: {}",
1143                            transport_type, addr, e
1144                        );
1145
1146                        // Brief pause before retrying to avoid busy loop
1147                        tokio::time::sleep(std::time::Duration::from_millis(1000)).await;
1148                    }
1149                }
1150            }
1151        });
1152
1153        info!(
1154            "Connection acceptor background task started for {:?} on {}",
1155            transport_type, addr
1156        );
1157        Ok(())
1158    }
1159
1160    /// Start the message receiving system with background tasks
1161    async fn start_message_receiving_system(&self) -> Result<()> {
1162        info!("Starting message receiving system");
1163        let dual = self.dual_node.clone();
1164        let event_tx = self.event_tx.clone();
1165
1166        tokio::spawn(async move {
1167            loop {
1168                match dual.receive_any().await {
1169                    Ok((_peer_id, bytes)) => {
1170                        // Expect the JSON message wrapper from create_protocol_message
1171                        #[allow(clippy::collapsible_if)]
1172                        if let Ok(value) = serde_json::from_slice::<serde_json::Value>(&bytes) {
1173                            if let (Some(protocol), Some(data), Some(from)) = (
1174                                value.get("protocol").and_then(|v| v.as_str()),
1175                                value.get("data").and_then(|v| v.as_array()),
1176                                value.get("from").and_then(|v| v.as_str()),
1177                            ) {
1178                                let payload: Vec<u8> = data
1179                                    .iter()
1180                                    .filter_map(|v| v.as_u64().map(|n| n as u8))
1181                                    .collect();
1182                                let _ = event_tx.send(P2PEvent::Message {
1183                                    topic: protocol.to_string(),
1184                                    source: from.to_string(),
1185                                    data: payload,
1186                                });
1187                            }
1188                        }
1189                    }
1190                    Err(e) => {
1191                        warn!("Receive error: {}", e);
1192                        tokio::time::sleep(std::time::Duration::from_millis(50)).await;
1193                    }
1194                }
1195            }
1196        });
1197
1198        Ok(())
1199    }
1200
1201    /// Handle a received message and generate appropriate events
1202    #[allow(dead_code)]
1203    async fn handle_received_message(
1204        &self,
1205        message_data: Vec<u8>,
1206        peer_id: &PeerId,
1207        _protocol: &str,
1208        event_tx: &broadcast::Sender<P2PEvent>,
1209    ) -> Result<()> {
1210        // MCP removed: no special protocol handling
1211
1212        // Parse the message format we created in create_protocol_message
1213        match serde_json::from_slice::<serde_json::Value>(&message_data) {
1214            Ok(message) => {
1215                if let (Some(protocol), Some(data), Some(from)) = (
1216                    message.get("protocol").and_then(|v| v.as_str()),
1217                    message.get("data").and_then(|v| v.as_array()),
1218                    message.get("from").and_then(|v| v.as_str()),
1219                ) {
1220                    // Convert data array back to bytes
1221                    let data_bytes: Vec<u8> = data
1222                        .iter()
1223                        .filter_map(|v| v.as_u64().map(|n| n as u8))
1224                        .collect();
1225
1226                    // Generate message event
1227                    let event = P2PEvent::Message {
1228                        topic: protocol.to_string(),
1229                        source: from.to_string(),
1230                        data: data_bytes,
1231                    };
1232
1233                    let _ = event_tx.send(event);
1234                    debug!("Generated message event from peer: {}", peer_id);
1235                }
1236            }
1237            Err(e) => {
1238                warn!("Failed to parse received message from {}: {}", peer_id, e);
1239            }
1240        }
1241
1242        Ok(())
1243    }
1244
1245    // MCP removed
1246
1247    // MCP removed
1248
1249    /// Run the P2P node (blocks until shutdown)
1250    pub async fn run(&self) -> Result<()> {
1251        if !*self.running.read().await {
1252            self.start().await?;
1253        }
1254
1255        info!("P2P node running...");
1256
1257        // Main event loop
1258        loop {
1259            if !*self.running.read().await {
1260                break;
1261            }
1262
1263            // Perform periodic tasks
1264            self.periodic_tasks().await?;
1265
1266            // Sleep for a short interval
1267            tokio::time::sleep(Duration::from_millis(100)).await;
1268        }
1269
1270        info!("P2P node stopped");
1271        Ok(())
1272    }
1273
1274    /// Stop the P2P node
1275    pub async fn stop(&self) -> Result<()> {
1276        info!("Stopping P2P node...");
1277
1278        // Set running state to false
1279        *self.running.write().await = false;
1280
1281        // Disconnect all peers
1282        self.disconnect_all_peers().await?;
1283
1284        // Shutdown production resource manager if configured
1285        if let Some(ref resource_manager) = self.resource_manager {
1286            resource_manager.shutdown().await.map_err(|e| {
1287                P2PError::Network(crate::error::NetworkError::ProtocolError(
1288                    format!("Failed to shutdown resource manager: {e}").into(),
1289                ))
1290            })?;
1291            info!("Production resource manager stopped");
1292        }
1293
1294        info!("P2P node stopped");
1295        Ok(())
1296    }
1297
1298    /// Graceful shutdown alias for tests
1299    pub async fn shutdown(&self) -> Result<()> {
1300        self.stop().await
1301    }
1302
1303    /// Check if the node is running
1304    pub async fn is_running(&self) -> bool {
1305        *self.running.read().await
1306    }
1307
1308    /// Get the current listen addresses
1309    pub async fn listen_addrs(&self) -> Vec<std::net::SocketAddr> {
1310        self.listen_addrs.read().await.clone()
1311    }
1312
1313    /// Get connected peers
1314    pub async fn connected_peers(&self) -> Vec<PeerId> {
1315        self.peers.read().await.keys().cloned().collect()
1316    }
1317
1318    /// Get peer count
1319    pub async fn peer_count(&self) -> usize {
1320        self.peers.read().await.len()
1321    }
1322
1323    /// Get peer info
1324    pub async fn peer_info(&self, peer_id: &PeerId) -> Option<PeerInfo> {
1325        self.peers.read().await.get(peer_id).cloned()
1326    }
1327
1328    /// Get the peer ID for a given socket address, if connected
1329    ///
1330    /// This method searches through all connected peers to find one that has
1331    /// the specified address in its address list.
1332    ///
1333    /// # Arguments
1334    /// * `addr` - The socket address to search for (e.g., "192.168.1.100:9000")
1335    ///
1336    /// # Returns
1337    /// * `Some(PeerId)` - The peer ID if a matching connected peer is found
1338    /// * `None` - If no peer with this address is currently connected
1339    pub async fn get_peer_id_by_address(&self, addr: &str) -> Option<PeerId> {
1340        // Parse the address to a SocketAddr for comparison
1341        let socket_addr: std::net::SocketAddr = addr.parse().ok()?;
1342
1343        let peers = self.peers.read().await;
1344
1345        // Search through all connected peers
1346        for (peer_id, peer_info) in peers.iter() {
1347            // Check if this peer has a matching address
1348            for peer_addr in &peer_info.addresses {
1349                if let Ok(peer_socket) = peer_addr.parse::<std::net::SocketAddr>()
1350                    && peer_socket == socket_addr
1351                {
1352                    return Some(peer_id.clone());
1353                }
1354            }
1355        }
1356
1357        None
1358    }
1359
1360    /// List all active connections with their peer IDs and addresses
1361    ///
1362    /// # Returns
1363    /// A vector of tuples containing (PeerId, Vec<String>) where the Vec<String>
1364    /// contains all known addresses for that peer.
1365    pub async fn list_active_connections(&self) -> Vec<(PeerId, Vec<String>)> {
1366        let peers = self.peers.read().await;
1367
1368        peers
1369            .iter()
1370            .map(|(peer_id, peer_info)| (peer_id.clone(), peer_info.addresses.clone()))
1371            .collect()
1372    }
1373
1374    /// Remove a peer from the peers map
1375    ///
1376    /// This method removes a peer from the internal peers map. It should be used
1377    /// when a connection is no longer valid (e.g., after detecting that the underlying
1378    /// ant-quic connection has closed).
1379    ///
1380    /// # Arguments
1381    /// * `peer_id` - The ID of the peer to remove
1382    ///
1383    /// # Returns
1384    /// `true` if the peer was found and removed, `false` if the peer was not in the map
1385    pub async fn remove_peer(&self, peer_id: &PeerId) -> bool {
1386        // Remove from active connections tracking
1387        self.active_connections.write().await.remove(peer_id);
1388        // Remove from peers map and return whether it existed
1389        self.peers.write().await.remove(peer_id).is_some()
1390    }
1391
1392    /// Check if a peer is connected
1393    ///
1394    /// This method checks if the peer ID exists in the peers map. Note that this
1395    /// only verifies the peer is registered - it does not guarantee the underlying
1396    /// ant-quic connection is still active. For connection validation, use `send_message`
1397    /// which will fail if the connection is closed.
1398    ///
1399    /// # Arguments
1400    /// * `peer_id` - The ID of the peer to check
1401    ///
1402    /// # Returns
1403    /// `true` if the peer exists in the peers map, `false` otherwise
1404    pub async fn is_peer_connected(&self, peer_id: &PeerId) -> bool {
1405        self.peers.read().await.contains_key(peer_id)
1406    }
1407
1408    /// Connect to a peer
1409    pub async fn connect_peer(&self, address: &str) -> Result<PeerId> {
1410        info!("Connecting to peer at: {}", address);
1411
1412        // Check production limits if resource manager is enabled
1413        let _connection_guard = if let Some(ref resource_manager) = self.resource_manager {
1414            Some(resource_manager.acquire_connection().await?)
1415        } else {
1416            None
1417        };
1418
1419        // Parse the address to SocketAddr format
1420        let socket_addr: std::net::SocketAddr = address.parse().map_err(|e| {
1421            P2PError::Network(crate::error::NetworkError::InvalidAddress(
1422                format!("{}: {}", address, e).into(),
1423            ))
1424        })?;
1425
1426        // Normalize wildcard addresses to loopback for local connections
1427        // This converts [::]:port → ::1:port and 0.0.0.0:port → 127.0.0.1:port
1428        let normalized_addr = normalize_wildcard_to_loopback(socket_addr);
1429        if normalized_addr != socket_addr {
1430            info!(
1431                "Normalized wildcard address {} to loopback {}",
1432                socket_addr, normalized_addr
1433            );
1434        }
1435
1436        // Establish a real connection via dual-stack Happy Eyeballs, but cap the wait
1437        let addr_list = vec![normalized_addr];
1438        let peer_id = match tokio::time::timeout(
1439            self.config.connection_timeout,
1440            self.dual_node.connect_happy_eyeballs(&addr_list),
1441        )
1442        .await
1443        {
1444            Ok(Ok(peer)) => {
1445                let connected_peer_id =
1446                    crate::transport::ant_quic_adapter::ant_peer_id_to_string(&peer);
1447                info!("Successfully connected to peer: {}", connected_peer_id);
1448                connected_peer_id
1449            }
1450            Ok(Err(e)) => {
1451                warn!("Failed to connect to peer at {}: {}", address, e);
1452                let sanitized_address = address.replace(['/', ':'], "_");
1453                let demo_peer_id = format!("peer_from_{}", sanitized_address);
1454                warn!(
1455                    "Using demo peer ID: {} (transport connection failed)",
1456                    demo_peer_id
1457                );
1458                demo_peer_id
1459            }
1460            Err(_) => {
1461                warn!(
1462                    "Timed out connecting to peer at {} after {:?}",
1463                    address, self.config.connection_timeout
1464                );
1465                let sanitized_address = address.replace(['/', ':'], "_");
1466                let demo_peer_id = format!("peer_from_{}", sanitized_address);
1467                demo_peer_id
1468            }
1469        };
1470
1471        // Create peer info with connection details
1472        let peer_info = PeerInfo {
1473            peer_id: peer_id.clone(),
1474            addresses: vec![address.to_string()],
1475            connected_at: Instant::now(),
1476            last_seen: Instant::now(),
1477            status: ConnectionStatus::Connected,
1478            protocols: vec!["p2p-foundation/1.0".to_string()],
1479            heartbeat_count: 0,
1480        };
1481
1482        // Store peer information
1483        self.peers.write().await.insert(peer_id.clone(), peer_info);
1484
1485        // Add to active connections tracking
1486        // This is critical for is_connection_active() to work correctly
1487        self.active_connections
1488            .write()
1489            .await
1490            .insert(peer_id.clone());
1491
1492        // Record bandwidth usage if resource manager is enabled
1493        if let Some(ref resource_manager) = self.resource_manager {
1494            resource_manager.record_bandwidth(0, 0); // Placeholder for handshake data
1495        }
1496
1497        // Emit connection event
1498        let _ = self.event_tx.send(P2PEvent::PeerConnected(peer_id.clone()));
1499
1500        info!("Connected to peer: {}", peer_id);
1501        Ok(peer_id)
1502    }
1503
1504    /// Disconnect from a peer
1505    pub async fn disconnect_peer(&self, peer_id: &PeerId) -> Result<()> {
1506        info!("Disconnecting from peer: {}", peer_id);
1507
1508        // Remove from active connections
1509        self.active_connections.write().await.remove(peer_id);
1510
1511        if let Some(mut peer_info) = self.peers.write().await.remove(peer_id) {
1512            peer_info.status = ConnectionStatus::Disconnected;
1513
1514            // Emit event
1515            let _ = self
1516                .event_tx
1517                .send(P2PEvent::PeerDisconnected(peer_id.clone()));
1518
1519            info!("Disconnected from peer: {}", peer_id);
1520        }
1521
1522        Ok(())
1523    }
1524
1525    /// Check if a connection to a peer is active
1526    pub async fn is_connection_active(&self, peer_id: &str) -> bool {
1527        self.active_connections.read().await.contains(peer_id)
1528    }
1529
1530    /// Send a message to a peer
1531    pub async fn send_message(
1532        &self,
1533        peer_id: &PeerId,
1534        protocol: &str,
1535        data: Vec<u8>,
1536    ) -> Result<()> {
1537        debug!(
1538            "Sending message to peer {} on protocol {}",
1539            peer_id, protocol
1540        );
1541
1542        // Check rate limits if resource manager is enabled
1543        if let Some(ref resource_manager) = self.resource_manager
1544            && !resource_manager
1545                .check_rate_limit(peer_id, "message")
1546                .await?
1547        {
1548            return Err(P2PError::ResourceExhausted(
1549                format!("Rate limit exceeded for peer {}", peer_id).into(),
1550            ));
1551        }
1552
1553        // Check if peer exists in peers map
1554        if !self.peers.read().await.contains_key(peer_id) {
1555            return Err(P2PError::Network(crate::error::NetworkError::PeerNotFound(
1556                peer_id.to_string().into(),
1557            )));
1558        }
1559
1560        // **NEW**: Check if the ant-quic connection is actually active
1561        // This is the critical fix for the connection state synchronization issue
1562        if !self.is_connection_active(peer_id).await {
1563            debug!(
1564                "Connection to peer {} exists in peers map but ant-quic connection is closed",
1565                peer_id
1566            );
1567
1568            // Clean up stale peer entry
1569            self.remove_peer(peer_id).await;
1570
1571            return Err(P2PError::Network(
1572                crate::error::NetworkError::ConnectionClosed {
1573                    peer_id: peer_id.to_string().into(),
1574                },
1575            ));
1576        }
1577
1578        // MCP removed: no special-case protocol validation
1579
1580        // Record bandwidth usage if resource manager is enabled
1581        if let Some(ref resource_manager) = self.resource_manager {
1582            resource_manager.record_bandwidth(data.len() as u64, 0);
1583        }
1584
1585        // Create protocol message wrapper
1586        let _message_data = self.create_protocol_message(protocol, data)?;
1587
1588        // Send via ant-quic dual-node
1589        let send_fut = self.dual_node.send_to_peer_string(peer_id, &_message_data);
1590        tokio::time::timeout(self.config.connection_timeout, send_fut)
1591            .await
1592            .map_err(|_| {
1593                P2PError::Transport(crate::error::TransportError::StreamError(
1594                    "Timed out sending message".into(),
1595                ))
1596            })?
1597            .map_err(|e| {
1598                P2PError::Transport(crate::error::TransportError::StreamError(
1599                    e.to_string().into(),
1600                ))
1601            })
1602    }
1603
1604    /// Create a protocol message wrapper
1605    fn create_protocol_message(&self, protocol: &str, data: Vec<u8>) -> Result<Vec<u8>> {
1606        use serde_json::json;
1607
1608        let timestamp = std::time::SystemTime::now()
1609            .duration_since(std::time::UNIX_EPOCH)
1610            .map_err(|e| {
1611                P2PError::Network(NetworkError::ProtocolError(
1612                    format!("System time error: {}", e).into(),
1613                ))
1614            })?
1615            .as_secs();
1616
1617        // Create a simple message format for P2P communication
1618        let message = json!({
1619            "protocol": protocol,
1620            "data": data,
1621            "from": self.peer_id,
1622            "timestamp": timestamp
1623        });
1624
1625        serde_json::to_vec(&message).map_err(|e| {
1626            P2PError::Transport(crate::error::TransportError::StreamError(
1627                format!("Failed to serialize message: {e}").into(),
1628            ))
1629        })
1630    }
1631
1632    // Note: async listen_addrs() already exists above for fetching listen addresses
1633}
1634
1635/// Create a protocol message wrapper (static version for background tasks)
1636#[allow(dead_code)]
1637fn create_protocol_message_static(protocol: &str, data: Vec<u8>) -> Result<Vec<u8>> {
1638    use serde_json::json;
1639
1640    let timestamp = std::time::SystemTime::now()
1641        .duration_since(std::time::UNIX_EPOCH)
1642        .map_err(|e| {
1643            P2PError::Network(NetworkError::ProtocolError(
1644                format!("System time error: {}", e).into(),
1645            ))
1646        })?
1647        .as_secs();
1648
1649    // Create a simple message format for P2P communication
1650    let message = json!({
1651        "protocol": protocol,
1652        "data": data,
1653        "timestamp": timestamp
1654    });
1655
1656    serde_json::to_vec(&message).map_err(|e| {
1657        P2PError::Transport(crate::error::TransportError::StreamError(
1658            format!("Failed to serialize message: {e}").into(),
1659        ))
1660    })
1661}
1662
1663impl P2PNode {
1664    /// Subscribe to network events
1665    pub fn subscribe_events(&self) -> broadcast::Receiver<P2PEvent> {
1666        self.event_tx.subscribe()
1667    }
1668
1669    /// Backwards-compat event stream accessor for tests
1670    pub fn events(&self) -> broadcast::Receiver<P2PEvent> {
1671        self.subscribe_events()
1672    }
1673
1674    /// Get node uptime
1675    pub fn uptime(&self) -> Duration {
1676        self.start_time.elapsed()
1677    }
1678
1679    // MCP removed: all MCP tool/service methods removed
1680
1681    // /// Handle MCP remote tool call with network integration
1682
1683    // /// List tools available on a specific remote peer
1684
1685    // /// Get MCP server statistics
1686
1687    /// Get production resource metrics
1688    pub async fn resource_metrics(&self) -> Result<ResourceMetrics> {
1689        if let Some(ref resource_manager) = self.resource_manager {
1690            Ok(resource_manager.get_metrics().await)
1691        } else {
1692            Err(P2PError::Network(
1693                crate::error::NetworkError::ProtocolError(
1694                    "Production resource manager not enabled".to_string().into(),
1695                ),
1696            ))
1697        }
1698    }
1699
1700    /// Connection lifecycle monitor task - processes ant-quic connection events
1701    /// and updates active_connections HashSet and peers map
1702    async fn connection_lifecycle_monitor(
1703        dual_node: Arc<DualStackNetworkNode>,
1704        active_connections: Arc<RwLock<HashSet<String>>>,
1705        peers: Arc<RwLock<HashMap<String, PeerInfo>>>,
1706        event_tx: broadcast::Sender<P2PEvent>,
1707        geo_provider: Arc<BgpGeoProvider>,
1708        local_peer_id: String,
1709    ) {
1710        use crate::transport::ant_quic_adapter::ConnectionEvent;
1711
1712        let mut event_rx = dual_node.subscribe_connection_events();
1713
1714        info!("Connection lifecycle monitor started");
1715
1716        loop {
1717            match event_rx.recv().await {
1718                Ok(event) => {
1719                    match event {
1720                        ConnectionEvent::Established {
1721                            peer_id,
1722                            remote_address,
1723                        } => {
1724                            let peer_id_str =
1725                                crate::transport::ant_quic_adapter::ant_peer_id_to_string(&peer_id);
1726                            debug!(
1727                                "Connection established: peer={}, addr={}",
1728                                peer_id_str, remote_address
1729                            );
1730
1731                            // **GeoIP Validation**
1732                            // Check if the peer's IP is allowed
1733                            let ip = remote_address.ip();
1734                            let is_rejected = match ip {
1735                                std::net::IpAddr::V4(v4) => {
1736                                    // Check if it's a hosting provider or VPN
1737                                    if let Some(asn) = geo_provider.lookup_ipv4_asn(v4) {
1738                                        geo_provider.is_hosting_asn(asn)
1739                                            || geo_provider.is_vpn_asn(asn)
1740                                    } else {
1741                                        false
1742                                    }
1743                                }
1744                                std::net::IpAddr::V6(v6) => {
1745                                    let info = geo_provider.lookup(v6);
1746                                    info.is_hosting_provider || info.is_vpn_provider
1747                                }
1748                            };
1749
1750                            if is_rejected {
1751                                info!(
1752                                    "Rejecting connection from {} ({}) due to GeoIP policy (Hosting/VPN)",
1753                                    peer_id_str, remote_address
1754                                );
1755
1756                                // Create rejection message
1757                                let rejection = RejectionMessage {
1758                                    reason: RejectionReason::GeoIpPolicy,
1759                                    message:
1760                                        "Connection rejected: Hosting/VPN providers not allowed"
1761                                            .to_string(),
1762                                    suggested_target: None, // Could suggest a different region if we knew more
1763                                };
1764
1765                                // Serialize message
1766                                if let Ok(data) = serde_json::to_vec(&rejection) {
1767                                    // Create protocol message
1768                                    let timestamp = std::time::SystemTime::now()
1769                                        .duration_since(std::time::UNIX_EPOCH)
1770                                        .unwrap_or_default()
1771                                        .as_secs();
1772
1773                                    let message = serde_json::json!({
1774                                        "protocol": "control",
1775                                        "data": data,
1776                                        "from": local_peer_id,
1777                                        "timestamp": timestamp
1778                                    });
1779
1780                                    if let Ok(msg_bytes) = serde_json::to_vec(&message) {
1781                                        // Send rejection message
1782                                        // We use send_to_peer directly on dual_node to avoid the checks in P2PNode::send_message
1783                                        // which might fail if we haven't fully registered the peer yet
1784                                        let _ = dual_node.send_to_peer(&peer_id, &msg_bytes).await;
1785
1786                                        // Give it a moment to send before disconnecting?
1787                                        // ant-quic might handle this, but a small yield is safe
1788                                        tokio::task::yield_now().await;
1789                                    }
1790                                }
1791
1792                                // Disconnect (TODO: Add disconnect method to dual_node or just drop?)
1793                                // For now, we just don't add it to active connections, effectively ignoring it
1794                                // Ideally we should actively close the connection
1795                                continue;
1796                            }
1797
1798                            // Add to active connections
1799                            active_connections.write().await.insert(peer_id_str.clone());
1800
1801                            // Update peer info or insert new
1802                            let mut peers_lock = peers.write().await;
1803                            if let Some(peer_info) = peers_lock.get_mut(&peer_id_str) {
1804                                peer_info.status = ConnectionStatus::Connected;
1805                                peer_info.connected_at = Instant::now();
1806                            } else {
1807                                // New incoming peer
1808                                debug!("Registering new incoming peer: {}", peer_id_str);
1809                                peers_lock.insert(
1810                                    peer_id_str.clone(),
1811                                    PeerInfo {
1812                                        peer_id: peer_id_str.clone(),
1813                                        addresses: vec![remote_address.to_string()],
1814                                        status: ConnectionStatus::Connected,
1815                                        last_seen: Instant::now(),
1816                                        connected_at: Instant::now(),
1817                                        protocols: Vec::new(),
1818                                        heartbeat_count: 0,
1819                                    },
1820                                );
1821                            }
1822
1823                            // Broadcast connection event
1824                            let _ = event_tx.send(P2PEvent::PeerConnected(peer_id_str));
1825                        }
1826                        ConnectionEvent::Lost { peer_id, reason } => {
1827                            let peer_id_str =
1828                                crate::transport::ant_quic_adapter::ant_peer_id_to_string(&peer_id);
1829                            debug!("Connection lost: peer={}, reason={}", peer_id_str, reason);
1830
1831                            // Remove from active connections
1832                            active_connections.write().await.remove(&peer_id_str);
1833
1834                            // Update peer info status
1835                            if let Some(peer_info) = peers.write().await.get_mut(&peer_id_str) {
1836                                peer_info.status = ConnectionStatus::Disconnected;
1837                                peer_info.last_seen = Instant::now();
1838                            }
1839
1840                            // Broadcast disconnection event
1841                            let _ = event_tx.send(P2PEvent::PeerDisconnected(peer_id_str));
1842                        }
1843                        ConnectionEvent::Failed { peer_id, reason } => {
1844                            let peer_id_str =
1845                                crate::transport::ant_quic_adapter::ant_peer_id_to_string(&peer_id);
1846                            warn!("Connection failed: peer={}, reason={}", peer_id_str, reason);
1847
1848                            // Remove from active connections
1849                            active_connections.write().await.remove(&peer_id_str);
1850
1851                            // Update peer info status
1852                            if let Some(peer_info) = peers.write().await.get_mut(&peer_id_str) {
1853                                peer_info.status = ConnectionStatus::Failed(reason.clone());
1854                            }
1855
1856                            // Broadcast disconnection event
1857                            let _ = event_tx.send(P2PEvent::PeerDisconnected(peer_id_str));
1858                        }
1859                    }
1860                }
1861                Err(broadcast::error::RecvError::Lagged(skipped)) => {
1862                    warn!(
1863                        "Connection event monitor lagged, skipped {} events",
1864                        skipped
1865                    );
1866                    continue;
1867                }
1868                Err(broadcast::error::RecvError::Closed) => {
1869                    info!("Connection event channel closed, stopping monitor");
1870                    break;
1871                }
1872            }
1873        }
1874
1875        info!("Connection lifecycle monitor stopped");
1876    }
1877
1878    /// Start connection monitor (called after node initialization)
1879    async fn start_connection_monitor(&self) {
1880        // The monitor task is already spawned in new() with a temporary peers map
1881        // This method is a placeholder for future enhancements where we might
1882        // need to restart the monitor or provide it with updated references
1883        debug!("Connection monitor already running from initialization");
1884    }
1885
1886    /// Keepalive task - sends periodic pings to prevent 30-second idle timeout
1887    ///
1888    /// ant-quic has a 30-second max_idle_timeout. This task sends a small keepalive
1889    /// message every 15 seconds (half the timeout) to all active connections to prevent
1890    /// them from timing out during periods of inactivity.
1891    async fn keepalive_task(
1892        active_connections: Arc<RwLock<HashSet<String>>>,
1893        dual_node: Arc<DualStackNetworkNode>,
1894        shutdown: Arc<AtomicBool>,
1895    ) {
1896        use tokio::time::{Duration, interval};
1897
1898        const KEEPALIVE_INTERVAL_SECS: u64 = 15; // Half of 30-second timeout
1899        const KEEPALIVE_PAYLOAD: &[u8] = b"keepalive"; // Small payload
1900
1901        let mut interval = interval(Duration::from_secs(KEEPALIVE_INTERVAL_SECS));
1902        interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip);
1903
1904        info!(
1905            "Keepalive task started (interval: {}s)",
1906            KEEPALIVE_INTERVAL_SECS
1907        );
1908
1909        loop {
1910            // Check shutdown flag first
1911            if shutdown.load(Ordering::Relaxed) {
1912                info!("Keepalive task shutting down");
1913                break;
1914            }
1915
1916            interval.tick().await;
1917
1918            // Get snapshot of active connections
1919            let peers: Vec<String> = { active_connections.read().await.iter().cloned().collect() };
1920
1921            if peers.is_empty() {
1922                trace!("Keepalive: no active connections");
1923                continue;
1924            }
1925
1926            debug!("Sending keepalive to {} active connections", peers.len());
1927
1928            // Send keepalive to each peer
1929            for peer_id in peers {
1930                match dual_node
1931                    .send_to_peer_string(&peer_id, KEEPALIVE_PAYLOAD)
1932                    .await
1933                {
1934                    Ok(_) => {
1935                        trace!("Keepalive sent to peer: {}", peer_id);
1936                    }
1937                    Err(e) => {
1938                        debug!(
1939                            "Failed to send keepalive to peer {}: {} (connection may have closed)",
1940                            peer_id, e
1941                        );
1942                        // Don't remove from active_connections here - let the lifecycle monitor handle it
1943                    }
1944                }
1945            }
1946        }
1947
1948        info!("Keepalive task stopped");
1949    }
1950
1951    /// Check system health
1952    pub async fn health_check(&self) -> Result<()> {
1953        if let Some(ref resource_manager) = self.resource_manager {
1954            resource_manager.health_check().await
1955        } else {
1956            // Basic health check without resource manager
1957            let peer_count = self.peer_count().await;
1958            if peer_count > self.config.max_connections {
1959                Err(P2PError::Network(
1960                    crate::error::NetworkError::ProtocolError(
1961                        format!("Too many connections: {peer_count}").into(),
1962                    ),
1963                ))
1964            } else {
1965                Ok(())
1966            }
1967        }
1968    }
1969
1970    /// Get production configuration (if enabled)
1971    pub fn production_config(&self) -> Option<&ProductionConfig> {
1972        self.config.production_config.as_ref()
1973    }
1974
1975    /// Check if production hardening is enabled
1976    pub fn is_production_mode(&self) -> bool {
1977        self.resource_manager.is_some()
1978    }
1979
1980    /// Get DHT reference
1981    pub fn dht(&self) -> Option<&Arc<RwLock<DHT>>> {
1982        self.dht.as_ref()
1983    }
1984
1985    /// Store a value in the DHT
1986    pub async fn dht_put(&self, key: crate::dht::Key, value: Vec<u8>) -> Result<()> {
1987        if let Some(ref dht) = self.dht {
1988            let mut dht_instance = dht.write().await;
1989            let dht_key = crate::dht::DhtKey::from_bytes(key);
1990            dht_instance
1991                .store(&dht_key, value.clone())
1992                .await
1993                .map_err(|e| {
1994                    P2PError::Dht(crate::error::DhtError::StoreFailed(
1995                        format!("{:?}: {e}", key).into(),
1996                    ))
1997                })?;
1998
1999            Ok(())
2000        } else {
2001            Err(P2PError::Dht(crate::error::DhtError::RoutingError(
2002                "DHT not enabled".to_string().into(),
2003            )))
2004        }
2005    }
2006
2007    /// Retrieve a value from the DHT
2008    pub async fn dht_get(&self, key: crate::dht::Key) -> Result<Option<Vec<u8>>> {
2009        if let Some(ref dht) = self.dht {
2010            let dht_instance = dht.read().await;
2011            let dht_key = crate::dht::DhtKey::from_bytes(key);
2012            let record_result = dht_instance.retrieve(&dht_key).await.map_err(|e| {
2013                P2PError::Dht(crate::error::DhtError::StoreFailed(
2014                    format!("Retrieve failed: {e}").into(),
2015                ))
2016            })?;
2017
2018            Ok(record_result)
2019        } else {
2020            Err(P2PError::Dht(crate::error::DhtError::RoutingError(
2021                "DHT not enabled".to_string().into(),
2022            )))
2023        }
2024    }
2025
2026    /// Add a discovered peer to the bootstrap cache
2027    pub async fn add_discovered_peer(&self, peer_id: PeerId, addresses: Vec<String>) -> Result<()> {
2028        if let Some(ref bootstrap_manager) = self.bootstrap_manager {
2029            let mut manager = bootstrap_manager.write().await;
2030            let socket_addresses: Vec<std::net::SocketAddr> = addresses
2031                .iter()
2032                .filter_map(|addr| addr.parse().ok())
2033                .collect();
2034            let contact = ContactEntry::new(peer_id, socket_addresses);
2035            manager.add_contact(contact).await.map_err(|e| {
2036                P2PError::Network(crate::error::NetworkError::ProtocolError(
2037                    format!("Failed to add peer to bootstrap cache: {e}").into(),
2038                ))
2039            })?;
2040        }
2041        Ok(())
2042    }
2043
2044    /// Update connection metrics for a peer in the bootstrap cache
2045    pub async fn update_peer_metrics(
2046        &self,
2047        peer_id: &PeerId,
2048        success: bool,
2049        latency_ms: Option<u64>,
2050        _error: Option<String>,
2051    ) -> Result<()> {
2052        if let Some(ref bootstrap_manager) = self.bootstrap_manager {
2053            let mut manager = bootstrap_manager.write().await;
2054
2055            // Create quality metrics based on the connection result
2056            let metrics = QualityMetrics {
2057                success_rate: if success { 1.0 } else { 0.0 },
2058                avg_latency_ms: latency_ms.unwrap_or(0) as f64,
2059                quality_score: if success { 0.8 } else { 0.2 }, // Initial score
2060                last_connection_attempt: chrono::Utc::now(),
2061                last_successful_connection: if success {
2062                    chrono::Utc::now()
2063                } else {
2064                    chrono::Utc::now() - chrono::Duration::hours(1)
2065                },
2066                uptime_score: 0.5,
2067            };
2068
2069            manager
2070                .update_contact_metrics(peer_id, metrics)
2071                .await
2072                .map_err(|e| {
2073                    P2PError::Network(crate::error::NetworkError::ProtocolError(
2074                        format!("Failed to update peer metrics: {e}").into(),
2075                    ))
2076                })?;
2077        }
2078        Ok(())
2079    }
2080
2081    /// Get bootstrap cache statistics
2082    pub async fn get_bootstrap_cache_stats(&self) -> Result<Option<crate::bootstrap::CacheStats>> {
2083        if let Some(ref bootstrap_manager) = self.bootstrap_manager {
2084            let manager = bootstrap_manager.read().await;
2085            let stats = manager.get_stats().await.map_err(|e| {
2086                P2PError::Network(crate::error::NetworkError::ProtocolError(
2087                    format!("Failed to get bootstrap stats: {e}").into(),
2088                ))
2089            })?;
2090            Ok(Some(stats))
2091        } else {
2092            Ok(None)
2093        }
2094    }
2095
2096    /// Get the number of cached bootstrap peers
2097    pub async fn cached_peer_count(&self) -> usize {
2098        if let Some(ref _bootstrap_manager) = self.bootstrap_manager
2099            && let Ok(Some(stats)) = self.get_bootstrap_cache_stats().await
2100        {
2101            return stats.total_contacts;
2102        }
2103        0
2104    }
2105
2106    /// Connect to bootstrap peers
2107    async fn connect_bootstrap_peers(&self) -> Result<()> {
2108        let mut bootstrap_contacts = Vec::new();
2109        let mut used_cache = false;
2110        let mut seen_addresses = std::collections::HashSet::new();
2111
2112        // CLI-provided bootstrap peers take priority - always include them first
2113        let cli_bootstrap_peers = if !self.config.bootstrap_peers_str.is_empty() {
2114            self.config.bootstrap_peers_str.clone()
2115        } else {
2116            // Convert Multiaddr to strings
2117            self.config
2118                .bootstrap_peers
2119                .iter()
2120                .map(|addr| addr.to_string())
2121                .collect::<Vec<_>>()
2122        };
2123
2124        if !cli_bootstrap_peers.is_empty() {
2125            info!(
2126                "Using {} CLI-provided bootstrap peers (priority)",
2127                cli_bootstrap_peers.len()
2128            );
2129            for addr in &cli_bootstrap_peers {
2130                if let Ok(socket_addr) = addr.parse::<std::net::SocketAddr>() {
2131                    seen_addresses.insert(socket_addr);
2132                    let contact = ContactEntry::new(
2133                        format!("cli_peer_{}", addr.chars().take(8).collect::<String>()),
2134                        vec![socket_addr],
2135                    );
2136                    bootstrap_contacts.push(contact);
2137                } else {
2138                    warn!("Invalid bootstrap address format: {}", addr);
2139                }
2140            }
2141        }
2142
2143        // Supplement with cached bootstrap peers (after CLI peers)
2144        if let Some(ref bootstrap_manager) = self.bootstrap_manager {
2145            let manager = bootstrap_manager.read().await;
2146            match manager.get_bootstrap_peers(20).await {
2147                // Try to get top 20 quality peers
2148                Ok(contacts) => {
2149                    if !contacts.is_empty() {
2150                        let mut added_from_cache = 0;
2151                        for contact in contacts {
2152                            // Only add if we haven't already added this address from CLI
2153                            let new_addresses: Vec<_> = contact
2154                                .addresses
2155                                .iter()
2156                                .filter(|addr| !seen_addresses.contains(addr))
2157                                .copied()
2158                                .collect();
2159
2160                            if !new_addresses.is_empty() {
2161                                for addr in &new_addresses {
2162                                    seen_addresses.insert(*addr);
2163                                }
2164                                let mut contact = contact.clone();
2165                                contact.addresses = new_addresses;
2166                                bootstrap_contacts.push(contact);
2167                                added_from_cache += 1;
2168                            }
2169                        }
2170                        if added_from_cache > 0 {
2171                            info!(
2172                                "Added {} cached bootstrap peers (supplementing CLI peers)",
2173                                added_from_cache
2174                            );
2175                            used_cache = true;
2176                        }
2177                    }
2178                }
2179                Err(e) => {
2180                    warn!("Failed to get cached bootstrap peers: {}", e);
2181                }
2182            }
2183        }
2184
2185        if bootstrap_contacts.is_empty() {
2186            info!("No bootstrap peers configured and no cached peers available");
2187            return Ok(());
2188        }
2189
2190        // Connect to bootstrap peers
2191        let mut successful_connections = 0;
2192        for contact in bootstrap_contacts {
2193            for addr in &contact.addresses {
2194                match self.connect_peer(&addr.to_string()).await {
2195                    Ok(peer_id) => {
2196                        info!("Connected to bootstrap peer: {} ({})", peer_id, addr);
2197                        successful_connections += 1;
2198
2199                        // Update bootstrap cache with successful connection
2200                        if let Some(ref bootstrap_manager) = self.bootstrap_manager {
2201                            let mut manager = bootstrap_manager.write().await;
2202                            let mut updated_contact = contact.clone();
2203                            updated_contact.peer_id = peer_id.clone();
2204                            updated_contact.update_connection_result(true, Some(100), None); // Assume 100ms latency for now
2205
2206                            if let Err(e) = manager.add_contact(updated_contact).await {
2207                                warn!("Failed to update bootstrap cache: {}", e);
2208                            }
2209                        }
2210                        break; // Successfully connected, move to next contact
2211                    }
2212                    Err(e) => {
2213                        warn!("Failed to connect to bootstrap peer {}: {}", addr, e);
2214
2215                        // Update bootstrap cache with failed connection
2216                        if used_cache && let Some(ref bootstrap_manager) = self.bootstrap_manager {
2217                            let mut manager = bootstrap_manager.write().await;
2218                            let mut updated_contact = contact.clone();
2219                            updated_contact.update_connection_result(
2220                                false,
2221                                None,
2222                                Some(e.to_string()),
2223                            );
2224
2225                            if let Err(e) = manager.add_contact(updated_contact).await {
2226                                warn!("Failed to update bootstrap cache: {}", e);
2227                            }
2228                        }
2229                    }
2230                }
2231            }
2232        }
2233
2234        if successful_connections == 0 {
2235            if !used_cache {
2236                warn!("Failed to connect to any bootstrap peers");
2237            }
2238            return Err(P2PError::Network(NetworkError::ConnectionFailed {
2239                addr: std::net::SocketAddr::from(([0, 0, 0, 0], 0)), // Placeholder for bootstrap ensemble
2240                reason: "Failed to connect to any bootstrap peers".into(),
2241            }));
2242        }
2243        info!(
2244            "Successfully connected to {} bootstrap peers",
2245            successful_connections
2246        );
2247
2248        Ok(())
2249    }
2250
2251    /// Disconnect from all peers
2252    async fn disconnect_all_peers(&self) -> Result<()> {
2253        let peer_ids: Vec<PeerId> = self.peers.read().await.keys().cloned().collect();
2254
2255        for peer_id in peer_ids {
2256            self.disconnect_peer(&peer_id).await?;
2257        }
2258
2259        Ok(())
2260    }
2261
2262    /// Perform periodic maintenance tasks
2263    async fn periodic_tasks(&self) -> Result<()> {
2264        // Update peer last seen timestamps
2265        // Remove stale connections
2266        // Perform DHT maintenance
2267        // This is a placeholder for now
2268
2269        Ok(())
2270    }
2271}
2272
2273/// Network sender trait for sending messages
2274#[async_trait::async_trait]
2275pub trait NetworkSender: Send + Sync {
2276    /// Send a message to a specific peer
2277    async fn send_message(&self, peer_id: &PeerId, protocol: &str, data: Vec<u8>) -> Result<()>;
2278
2279    /// Get our local peer ID
2280    fn local_peer_id(&self) -> &PeerId;
2281}
2282
2283/// Lightweight wrapper for P2PNode to implement NetworkSender
2284#[derive(Clone)]
2285pub struct P2PNetworkSender {
2286    peer_id: PeerId,
2287    // Use channels for async communication with the P2P node
2288    send_tx: tokio::sync::mpsc::UnboundedSender<(PeerId, String, Vec<u8>)>,
2289}
2290
2291impl P2PNetworkSender {
2292    pub fn new(
2293        peer_id: PeerId,
2294        send_tx: tokio::sync::mpsc::UnboundedSender<(PeerId, String, Vec<u8>)>,
2295    ) -> Self {
2296        Self { peer_id, send_tx }
2297    }
2298}
2299
2300/// Implementation of NetworkSender trait for P2PNetworkSender
2301#[async_trait::async_trait]
2302impl NetworkSender for P2PNetworkSender {
2303    /// Send a message to a specific peer via the P2P network
2304    async fn send_message(&self, peer_id: &PeerId, protocol: &str, data: Vec<u8>) -> Result<()> {
2305        self.send_tx
2306            .send((peer_id.clone(), protocol.to_string(), data))
2307            .map_err(|_| {
2308                P2PError::Network(crate::error::NetworkError::ProtocolError(
2309                    "Failed to send message via channel".to_string().into(),
2310                ))
2311            })?;
2312        Ok(())
2313    }
2314
2315    /// Get our local peer ID
2316    fn local_peer_id(&self) -> &PeerId {
2317        &self.peer_id
2318    }
2319}
2320
2321/// Builder pattern for creating P2P nodes
2322pub struct NodeBuilder {
2323    config: NodeConfig,
2324}
2325
2326impl Default for NodeBuilder {
2327    fn default() -> Self {
2328        Self::new()
2329    }
2330}
2331
2332impl NodeBuilder {
2333    /// Create a new node builder
2334    pub fn new() -> Self {
2335        Self {
2336            config: NodeConfig::default(),
2337        }
2338    }
2339
2340    /// Set the peer ID
2341    pub fn with_peer_id(mut self, peer_id: PeerId) -> Self {
2342        self.config.peer_id = Some(peer_id);
2343        self
2344    }
2345
2346    /// Add a listen address
2347    pub fn listen_on(mut self, addr: &str) -> Self {
2348        if let Ok(multiaddr) = addr.parse() {
2349            self.config.listen_addrs.push(multiaddr);
2350        }
2351        self
2352    }
2353
2354    /// Add a bootstrap peer
2355    pub fn with_bootstrap_peer(mut self, addr: &str) -> Self {
2356        if let Ok(multiaddr) = addr.parse() {
2357            self.config.bootstrap_peers.push(multiaddr);
2358        }
2359        self.config.bootstrap_peers_str.push(addr.to_string());
2360        self
2361    }
2362
2363    /// Enable IPv6 support
2364    pub fn with_ipv6(mut self, enable: bool) -> Self {
2365        self.config.enable_ipv6 = enable;
2366        self
2367    }
2368
2369    // MCP removed: builder methods deleted
2370
2371    /// Set connection timeout
2372    pub fn with_connection_timeout(mut self, timeout: Duration) -> Self {
2373        self.config.connection_timeout = timeout;
2374        self
2375    }
2376
2377    /// Set maximum connections
2378    pub fn with_max_connections(mut self, max: usize) -> Self {
2379        self.config.max_connections = max;
2380        self
2381    }
2382
2383    /// Enable production mode with default configuration
2384    pub fn with_production_mode(mut self) -> Self {
2385        self.config.production_config = Some(ProductionConfig::default());
2386        self
2387    }
2388
2389    /// Configure production settings
2390    pub fn with_production_config(mut self, production_config: ProductionConfig) -> Self {
2391        self.config.production_config = Some(production_config);
2392        self
2393    }
2394
2395    /// Configure IP diversity limits for Sybil protection.
2396    pub fn with_diversity_config(
2397        mut self,
2398        diversity_config: crate::security::IPDiversityConfig,
2399    ) -> Self {
2400        self.config.diversity_config = Some(diversity_config);
2401        self
2402    }
2403
2404    /// Configure DHT settings
2405    pub fn with_dht(mut self, dht_config: DHTConfig) -> Self {
2406        self.config.dht_config = dht_config;
2407        self
2408    }
2409
2410    /// Enable DHT with default configuration
2411    pub fn with_default_dht(mut self) -> Self {
2412        self.config.dht_config = DHTConfig::default();
2413        self
2414    }
2415
2416    /// Build the P2P node
2417    pub async fn build(self) -> Result<P2PNode> {
2418        P2PNode::new(self.config).await
2419    }
2420}
2421
2422#[cfg(test)]
2423#[allow(clippy::unwrap_used, clippy::expect_used)]
2424mod diversity_tests {
2425    use super::*;
2426    use crate::security::IPDiversityConfig;
2427
2428    async fn build_bootstrap_manager_like_prod(config: &NodeConfig) -> BootstrapManager {
2429        let diversity_config = config.diversity_config.clone().unwrap_or_default();
2430        if let Some(ref cache_config) = config.bootstrap_cache_config {
2431            BootstrapManager::with_full_config(
2432                cache_config.clone(),
2433                crate::rate_limit::JoinRateLimiterConfig::default(),
2434                diversity_config,
2435            )
2436            .await
2437            .expect("bootstrap manager")
2438        } else {
2439            BootstrapManager::with_full_config(
2440                crate::bootstrap::CacheConfig::default(),
2441                crate::rate_limit::JoinRateLimiterConfig::default(),
2442                diversity_config,
2443            )
2444            .await
2445            .expect("bootstrap manager")
2446        }
2447    }
2448
2449    #[tokio::test]
2450    async fn test_nodeconfig_diversity_config_used_for_bootstrap() {
2451        let config = NodeConfig {
2452            diversity_config: Some(IPDiversityConfig::testnet()),
2453            ..Default::default()
2454        };
2455
2456        let manager = build_bootstrap_manager_like_prod(&config).await;
2457        assert!(manager.diversity_config().is_relaxed());
2458        assert_eq!(manager.diversity_config().max_nodes_per_asn, 5000);
2459    }
2460}
2461
2462/// Standalone function to handle received messages without borrowing self
2463#[allow(dead_code)] // Deprecated during ant-quic migration
2464async fn handle_received_message_standalone(
2465    message_data: Vec<u8>,
2466    peer_id: &PeerId,
2467    _protocol: &str,
2468    event_tx: &broadcast::Sender<P2PEvent>,
2469) -> Result<()> {
2470    // Parse the message format
2471    match serde_json::from_slice::<serde_json::Value>(&message_data) {
2472        Ok(message) => {
2473            if let (Some(protocol), Some(data), Some(from)) = (
2474                message.get("protocol").and_then(|v| v.as_str()),
2475                message.get("data").and_then(|v| v.as_array()),
2476                message.get("from").and_then(|v| v.as_str()),
2477            ) {
2478                // Convert data array back to bytes
2479                let data_bytes: Vec<u8> = data
2480                    .iter()
2481                    .filter_map(|v| v.as_u64().map(|n| n as u8))
2482                    .collect();
2483
2484                // Generate message event
2485                let event = P2PEvent::Message {
2486                    topic: protocol.to_string(),
2487                    source: from.to_string(),
2488                    data: data_bytes,
2489                };
2490
2491                let _ = event_tx.send(event);
2492                debug!("Generated message event from peer: {}", peer_id);
2493            }
2494        }
2495        Err(e) => {
2496            warn!("Failed to parse received message from {}: {}", peer_id, e);
2497        }
2498    }
2499
2500    Ok(())
2501}
2502
2503// MCP removed: standalone MCP handler deleted
2504
2505/// Helper function to handle protocol message creation
2506#[allow(dead_code)]
2507fn handle_protocol_message_creation(protocol: &str, data: Vec<u8>) -> Option<Vec<u8>> {
2508    match create_protocol_message_static(protocol, data) {
2509        Ok(msg) => Some(msg),
2510        Err(e) => {
2511            warn!("Failed to create protocol message: {}", e);
2512            None
2513        }
2514    }
2515}
2516
2517/// Helper function to handle message send result
2518#[allow(dead_code)]
2519async fn handle_message_send_result(result: crate::error::P2pResult<()>, peer_id: &PeerId) {
2520    match result {
2521        Ok(_) => {
2522            debug!("Message sent to peer {} via transport layer", peer_id);
2523        }
2524        Err(e) => {
2525            warn!("Failed to send message to peer {}: {}", peer_id, e);
2526        }
2527    }
2528}
2529
2530/// Helper function to check rate limit
2531#[allow(dead_code)] // Deprecated during ant-quic migration
2532fn check_rate_limit(
2533    rate_limiter: &RateLimiter,
2534    socket_addr: &std::net::SocketAddr,
2535    remote_addr: &NetworkAddress,
2536) -> Result<()> {
2537    rate_limiter.check_ip(&socket_addr.ip()).map_err(|e| {
2538        warn!("Rate limit exceeded for {}: {}", remote_addr, e);
2539        e
2540    })
2541}
2542
2543/// Helper function to register a new peer
2544#[allow(dead_code)] // Deprecated during ant-quic migration
2545async fn register_new_peer(
2546    peers: &Arc<RwLock<HashMap<PeerId, PeerInfo>>>,
2547    peer_id: &PeerId,
2548    remote_addr: &NetworkAddress,
2549) {
2550    let mut peers_guard = peers.write().await;
2551    let peer_info = PeerInfo {
2552        peer_id: peer_id.clone(),
2553        addresses: vec![remote_addr.to_string()],
2554        connected_at: tokio::time::Instant::now(),
2555        last_seen: tokio::time::Instant::now(),
2556        status: ConnectionStatus::Connected,
2557        protocols: vec!["p2p-chat/1.0.0".to_string()],
2558        heartbeat_count: 0,
2559    };
2560    peers_guard.insert(peer_id.clone(), peer_info);
2561}
2562
2563/// Helper function to spawn connection handler
2564#[allow(dead_code)] // Deprecated during ant-quic migration
2565fn spawn_connection_handler(
2566    connection: Box<dyn crate::transport::Connection>,
2567    peer_id: PeerId,
2568    event_tx: broadcast::Sender<P2PEvent>,
2569    peers: Arc<RwLock<HashMap<PeerId, PeerInfo>>>,
2570) {
2571    tokio::spawn(async move {
2572        handle_peer_connection(connection, peer_id, event_tx, peers).await;
2573    });
2574}
2575
2576/// Helper function to handle peer connection
2577#[allow(dead_code)] // Deprecated during ant-quic migration
2578async fn handle_peer_connection(
2579    mut connection: Box<dyn crate::transport::Connection>,
2580    peer_id: PeerId,
2581    event_tx: broadcast::Sender<P2PEvent>,
2582    peers: Arc<RwLock<HashMap<PeerId, PeerInfo>>>,
2583) {
2584    loop {
2585        match connection.receive().await {
2586            Ok(message_data) => {
2587                debug!(
2588                    "Received {} bytes from peer: {}",
2589                    message_data.len(),
2590                    peer_id
2591                );
2592
2593                // Handle the received message
2594                if let Err(e) = handle_received_message_standalone(
2595                    message_data,
2596                    &peer_id,
2597                    "unknown", // TODO: Extract protocol from message
2598                    &event_tx,
2599                )
2600                .await
2601                {
2602                    warn!("Failed to handle message from peer {}: {}", peer_id, e);
2603                }
2604            }
2605            Err(e) => {
2606                warn!("Failed to receive message from {}: {}", peer_id, e);
2607
2608                // Check if connection is still alive
2609                if !connection.is_alive().await {
2610                    info!("Connection to {} is dead, removing peer", peer_id);
2611
2612                    // Remove dead peer
2613                    remove_peer(&peers, &peer_id).await;
2614
2615                    // Generate peer disconnected event
2616                    let _ = event_tx.send(P2PEvent::PeerDisconnected(peer_id.clone()));
2617
2618                    break; // Exit the message receiving loop
2619                }
2620
2621                // Brief pause before retrying
2622                tokio::time::sleep(std::time::Duration::from_millis(100)).await;
2623            }
2624        }
2625    }
2626}
2627
2628/// Helper function to remove a peer
2629#[allow(dead_code)] // Deprecated during ant-quic migration
2630async fn remove_peer(peers: &Arc<RwLock<HashMap<PeerId, PeerInfo>>>, peer_id: &PeerId) {
2631    let mut peers_guard = peers.write().await;
2632    peers_guard.remove(peer_id);
2633}
2634
2635/// Helper function to update peer heartbeat
2636#[allow(dead_code)]
2637async fn update_peer_heartbeat(
2638    peers: &Arc<RwLock<HashMap<PeerId, PeerInfo>>>,
2639    peer_id: &PeerId,
2640) -> Result<()> {
2641    let mut peers_guard = peers.write().await;
2642    match peers_guard.get_mut(peer_id) {
2643        Some(peer_info) => {
2644            peer_info.last_seen = Instant::now();
2645            peer_info.heartbeat_count += 1;
2646            Ok(())
2647        }
2648        None => {
2649            warn!("Received heartbeat from unknown peer: {}", peer_id);
2650            Err(P2PError::Network(NetworkError::PeerNotFound(
2651                format!("Peer {} not found", peer_id).into(),
2652            )))
2653        }
2654    }
2655}
2656
2657/// Helper function to get resource metrics
2658#[allow(dead_code)]
2659async fn get_resource_metrics(resource_manager: &Option<Arc<ResourceManager>>) -> (u64, f64) {
2660    if let Some(manager) = resource_manager {
2661        let metrics = manager.get_metrics().await;
2662        (metrics.memory_used, metrics.cpu_usage)
2663    } else {
2664        (0, 0.0)
2665    }
2666}
2667
2668#[cfg(test)]
2669mod tests {
2670    use super::*;
2671    // MCP removed from tests
2672    use std::time::Duration;
2673    use tokio::time::timeout;
2674
2675    // Test tool handler for network tests
2676
2677    // MCP removed
2678
2679    /// Helper function to create a test node configuration
2680    fn create_test_node_config() -> NodeConfig {
2681        NodeConfig {
2682            peer_id: Some("test_peer_123".to_string()),
2683            listen_addrs: vec![
2684                std::net::SocketAddr::new(std::net::IpAddr::V6(std::net::Ipv6Addr::LOCALHOST), 0),
2685                std::net::SocketAddr::new(std::net::IpAddr::V4(std::net::Ipv4Addr::LOCALHOST), 0),
2686            ],
2687            listen_addr: std::net::SocketAddr::new(
2688                std::net::IpAddr::V4(std::net::Ipv4Addr::LOCALHOST),
2689                0,
2690            ),
2691            bootstrap_peers: vec![],
2692            bootstrap_peers_str: vec![],
2693            enable_ipv6: true,
2694
2695            connection_timeout: Duration::from_millis(300),
2696            keep_alive_interval: Duration::from_secs(30),
2697            max_connections: 100,
2698            max_incoming_connections: 50,
2699            dht_config: DHTConfig::default(),
2700            security_config: SecurityConfig::default(),
2701            production_config: None,
2702            bootstrap_cache_config: None,
2703            diversity_config: None,
2704            // identity_config: None,
2705        }
2706    }
2707
2708    /// Helper function to create a test tool
2709    // MCP removed: test tool helper deleted
2710
2711    #[tokio::test]
2712    async fn test_node_config_default() {
2713        let config = NodeConfig::default();
2714
2715        assert!(config.peer_id.is_none());
2716        assert_eq!(config.listen_addrs.len(), 2);
2717        assert!(config.enable_ipv6);
2718        assert_eq!(config.max_connections, 10000); // Fixed: matches actual default
2719        assert_eq!(config.max_incoming_connections, 100);
2720        assert_eq!(config.connection_timeout, Duration::from_secs(30));
2721    }
2722
2723    #[tokio::test]
2724    async fn test_dht_config_default() {
2725        let config = DHTConfig::default();
2726
2727        assert_eq!(config.k_value, 20);
2728        assert_eq!(config.alpha_value, 5);
2729        assert_eq!(config.record_ttl, Duration::from_secs(3600));
2730        assert_eq!(config.refresh_interval, Duration::from_secs(600));
2731    }
2732
2733    #[tokio::test]
2734    async fn test_security_config_default() {
2735        let config = SecurityConfig::default();
2736
2737        assert!(config.enable_noise);
2738        assert!(config.enable_tls);
2739        assert_eq!(config.trust_level, TrustLevel::Basic);
2740    }
2741
2742    #[test]
2743    fn test_trust_level_variants() {
2744        // Test that all trust level variants can be created
2745        let _none = TrustLevel::None;
2746        let _basic = TrustLevel::Basic;
2747        let _full = TrustLevel::Full;
2748
2749        // Test equality
2750        assert_eq!(TrustLevel::None, TrustLevel::None);
2751        assert_eq!(TrustLevel::Basic, TrustLevel::Basic);
2752        assert_eq!(TrustLevel::Full, TrustLevel::Full);
2753        assert_ne!(TrustLevel::None, TrustLevel::Basic);
2754    }
2755
2756    #[test]
2757    fn test_connection_status_variants() {
2758        let connecting = ConnectionStatus::Connecting;
2759        let connected = ConnectionStatus::Connected;
2760        let disconnecting = ConnectionStatus::Disconnecting;
2761        let disconnected = ConnectionStatus::Disconnected;
2762        let failed = ConnectionStatus::Failed("test error".to_string());
2763
2764        assert_eq!(connecting, ConnectionStatus::Connecting);
2765        assert_eq!(connected, ConnectionStatus::Connected);
2766        assert_eq!(disconnecting, ConnectionStatus::Disconnecting);
2767        assert_eq!(disconnected, ConnectionStatus::Disconnected);
2768        assert_ne!(connecting, connected);
2769
2770        if let ConnectionStatus::Failed(msg) = failed {
2771            assert_eq!(msg, "test error");
2772        } else {
2773            panic!("Expected Failed status");
2774        }
2775    }
2776
2777    #[tokio::test]
2778    async fn test_node_creation() -> Result<()> {
2779        let config = create_test_node_config();
2780        let node = P2PNode::new(config).await?;
2781
2782        assert_eq!(node.peer_id(), "test_peer_123");
2783        assert!(!node.is_running().await);
2784        assert_eq!(node.peer_count().await, 0);
2785        assert!(node.connected_peers().await.is_empty());
2786
2787        Ok(())
2788    }
2789
2790    #[tokio::test]
2791    async fn test_node_creation_without_peer_id() -> Result<()> {
2792        let mut config = create_test_node_config();
2793        config.peer_id = None;
2794
2795        let node = P2PNode::new(config).await?;
2796
2797        // Should have generated a peer ID
2798        assert!(node.peer_id().starts_with("peer_"));
2799        assert!(!node.is_running().await);
2800
2801        Ok(())
2802    }
2803
2804    #[tokio::test]
2805    async fn test_node_lifecycle() -> Result<()> {
2806        let config = create_test_node_config();
2807        let node = P2PNode::new(config).await?;
2808
2809        // Initially not running
2810        assert!(!node.is_running().await);
2811
2812        // Start the node
2813        node.start().await?;
2814        assert!(node.is_running().await);
2815
2816        // Check listen addresses were set (at least one)
2817        let listen_addrs = node.listen_addrs().await;
2818        assert!(
2819            !listen_addrs.is_empty(),
2820            "Expected at least one listening address"
2821        );
2822
2823        // Stop the node
2824        node.stop().await?;
2825        assert!(!node.is_running().await);
2826
2827        Ok(())
2828    }
2829
2830    #[tokio::test]
2831    async fn test_peer_connection() -> Result<()> {
2832        let config = create_test_node_config();
2833        let node = P2PNode::new(config).await?;
2834
2835        let peer_addr = "127.0.0.1:0";
2836
2837        // Connect to a peer
2838        let peer_id = node.connect_peer(peer_addr).await?;
2839        assert!(peer_id.starts_with("peer_from_"));
2840
2841        // Check peer count
2842        assert_eq!(node.peer_count().await, 1);
2843
2844        // Check connected peers
2845        let connected_peers = node.connected_peers().await;
2846        assert_eq!(connected_peers.len(), 1);
2847        assert_eq!(connected_peers[0], peer_id);
2848
2849        // Get peer info
2850        let peer_info = node.peer_info(&peer_id).await;
2851        assert!(peer_info.is_some());
2852        let info = peer_info.expect("Peer info should exist after adding peer");
2853        assert_eq!(info.peer_id, peer_id);
2854        assert_eq!(info.status, ConnectionStatus::Connected);
2855        assert!(info.protocols.contains(&"p2p-foundation/1.0".to_string()));
2856
2857        // Disconnect from peer
2858        node.disconnect_peer(&peer_id).await?;
2859        assert_eq!(node.peer_count().await, 0);
2860
2861        Ok(())
2862    }
2863
2864    #[tokio::test]
2865    async fn test_event_subscription() -> Result<()> {
2866        let config = create_test_node_config();
2867        let node = P2PNode::new(config).await?;
2868
2869        let mut events = node.subscribe_events();
2870        let peer_addr = "127.0.0.1:0";
2871
2872        // Connect to a peer (this should emit an event)
2873        let peer_id = node.connect_peer(peer_addr).await?;
2874
2875        // Check for PeerConnected event
2876        let event = timeout(Duration::from_millis(100), events.recv()).await;
2877        assert!(event.is_ok());
2878
2879        let event_result = event
2880            .expect("Should receive event")
2881            .expect("Event should not be error");
2882        match event_result {
2883            P2PEvent::PeerConnected(event_peer_id) => {
2884                assert_eq!(event_peer_id, peer_id);
2885            }
2886            _ => panic!("Expected PeerConnected event"),
2887        }
2888
2889        // Disconnect from peer (this should emit another event)
2890        node.disconnect_peer(&peer_id).await?;
2891
2892        // Check for PeerDisconnected event
2893        let event = timeout(Duration::from_millis(100), events.recv()).await;
2894        assert!(event.is_ok());
2895
2896        let event_result = event
2897            .expect("Should receive event")
2898            .expect("Event should not be error");
2899        match event_result {
2900            P2PEvent::PeerDisconnected(event_peer_id) => {
2901                assert_eq!(event_peer_id, peer_id);
2902            }
2903            _ => panic!("Expected PeerDisconnected event"),
2904        }
2905
2906        Ok(())
2907    }
2908
2909    #[tokio::test]
2910    async fn test_message_sending() -> Result<()> {
2911        // Create two nodes
2912        let mut config1 = create_test_node_config();
2913        config1.listen_addr =
2914            std::net::SocketAddr::new(std::net::IpAddr::V4(std::net::Ipv4Addr::LOCALHOST), 0);
2915        let node1 = P2PNode::new(config1).await?;
2916        node1.start().await?;
2917
2918        let mut config2 = create_test_node_config();
2919        config2.listen_addr =
2920            std::net::SocketAddr::new(std::net::IpAddr::V4(std::net::Ipv4Addr::LOCALHOST), 0);
2921        let node2 = P2PNode::new(config2).await?;
2922        node2.start().await?;
2923
2924        // Wait a bit for nodes to start listening
2925        tokio::time::sleep(tokio::time::Duration::from_millis(50)).await;
2926
2927        // Get actual listening address of node2
2928        let node2_addr = node2.local_addr().ok_or_else(|| {
2929            P2PError::Network(crate::error::NetworkError::ProtocolError(
2930                "No listening address".to_string().into(),
2931            ))
2932        })?;
2933
2934        // Connect node1 to node2
2935        let peer_id =
2936            match timeout(Duration::from_millis(500), node1.connect_peer(&node2_addr)).await {
2937                Ok(res) => res?,
2938                Err(_) => return Err(P2PError::Network(NetworkError::Timeout)),
2939            };
2940
2941        // Wait a bit for connection to establish
2942        tokio::time::sleep(tokio::time::Duration::from_millis(30)).await;
2943
2944        // Send a message
2945        let message_data = b"Hello, peer!".to_vec();
2946        let result = match timeout(
2947            Duration::from_millis(500),
2948            node1.send_message(&peer_id, "test-protocol", message_data),
2949        )
2950        .await
2951        {
2952            Ok(res) => res,
2953            Err(_) => return Err(P2PError::Network(NetworkError::Timeout)),
2954        };
2955        // For now, we'll just check that we don't get a "not connected" error
2956        // The actual send might fail due to no handler on the other side
2957        if let Err(e) = &result {
2958            assert!(!e.to_string().contains("not connected"), "Got error: {}", e);
2959        }
2960
2961        // Try to send to non-existent peer
2962        let non_existent_peer = "non_existent_peer".to_string();
2963        let result = node1
2964            .send_message(&non_existent_peer, "test-protocol", vec![])
2965            .await;
2966        assert!(result.is_err(), "Sending to non-existent peer should fail");
2967
2968        Ok(())
2969    }
2970
2971    #[tokio::test]
2972    async fn test_remote_mcp_operations() -> Result<()> {
2973        let config = create_test_node_config();
2974        let node = P2PNode::new(config).await?;
2975
2976        // MCP removed; test reduced to simple start/stop
2977        node.start().await?;
2978        node.stop().await?;
2979        Ok(())
2980    }
2981
2982    #[tokio::test]
2983    async fn test_health_check() -> Result<()> {
2984        let config = create_test_node_config();
2985        let node = P2PNode::new(config).await?;
2986
2987        // Health check should pass with no connections
2988        let result = node.health_check().await;
2989        assert!(result.is_ok());
2990
2991        // Note: We're not actually connecting to real peers here
2992        // since that would require running bootstrap nodes.
2993        // The health check should still pass with no connections.
2994
2995        Ok(())
2996    }
2997
2998    #[tokio::test]
2999    async fn test_node_uptime() -> Result<()> {
3000        let config = create_test_node_config();
3001        let node = P2PNode::new(config).await?;
3002
3003        let uptime1 = node.uptime();
3004        assert!(uptime1 >= Duration::from_secs(0));
3005
3006        // Wait a bit
3007        tokio::time::sleep(Duration::from_millis(10)).await;
3008
3009        let uptime2 = node.uptime();
3010        assert!(uptime2 > uptime1);
3011
3012        Ok(())
3013    }
3014
3015    #[tokio::test]
3016    async fn test_node_config_access() -> Result<()> {
3017        let config = create_test_node_config();
3018        let expected_peer_id = config.peer_id.clone();
3019        let node = P2PNode::new(config).await?;
3020
3021        let node_config = node.config();
3022        assert_eq!(node_config.peer_id, expected_peer_id);
3023        assert_eq!(node_config.max_connections, 100);
3024        // MCP removed
3025
3026        Ok(())
3027    }
3028
3029    #[tokio::test]
3030    async fn test_mcp_server_access() -> Result<()> {
3031        let config = create_test_node_config();
3032        let _node = P2PNode::new(config).await?;
3033
3034        // MCP removed
3035        Ok(())
3036    }
3037
3038    #[tokio::test]
3039    async fn test_dht_access() -> Result<()> {
3040        let config = create_test_node_config();
3041        let node = P2PNode::new(config).await?;
3042
3043        // Should have DHT
3044        assert!(node.dht().is_some());
3045
3046        Ok(())
3047    }
3048
3049    #[tokio::test]
3050    async fn test_node_builder() -> Result<()> {
3051        // Create a config using the builder but don't actually build a real node
3052        let builder = P2PNode::builder()
3053            .with_peer_id("builder_test_peer".to_string())
3054            .listen_on("/ip4/127.0.0.1/tcp/0")
3055            .listen_on("/ip6/::1/tcp/0")
3056            .with_bootstrap_peer("/ip4/127.0.0.1/tcp/9000") // Use a valid port number
3057            .with_ipv6(true)
3058            .with_connection_timeout(Duration::from_secs(15))
3059            .with_max_connections(200);
3060
3061        // Test the configuration that was built
3062        let config = builder.config;
3063        assert_eq!(config.peer_id, Some("builder_test_peer".to_string()));
3064        assert_eq!(config.listen_addrs.len(), 2); // 2 added by builder (no defaults)
3065        assert_eq!(config.bootstrap_peers_str.len(), 1); // Check bootstrap_peers_str instead
3066        assert!(config.enable_ipv6);
3067        assert_eq!(config.connection_timeout, Duration::from_secs(15));
3068        assert_eq!(config.max_connections, 200);
3069
3070        Ok(())
3071    }
3072
3073    #[tokio::test]
3074    async fn test_bootstrap_peers() -> Result<()> {
3075        let mut config = create_test_node_config();
3076        config.bootstrap_peers = vec![
3077            std::net::SocketAddr::new(std::net::IpAddr::V4(std::net::Ipv4Addr::LOCALHOST), 9200),
3078            std::net::SocketAddr::new(std::net::IpAddr::V4(std::net::Ipv4Addr::LOCALHOST), 9201),
3079        ];
3080
3081        let node = P2PNode::new(config).await?;
3082
3083        // Start node (which attempts to connect to bootstrap peers)
3084        node.start().await?;
3085
3086        // In a test environment, bootstrap peers may not be available
3087        // The test verifies the node starts correctly with bootstrap configuration
3088        // Peer count may include local/internal tracking, so we just verify it's reasonable
3089        let _peer_count = node.peer_count().await;
3090
3091        node.stop().await?;
3092        Ok(())
3093    }
3094
3095    #[tokio::test]
3096    async fn test_production_mode_disabled() -> Result<()> {
3097        let config = create_test_node_config();
3098        let node = P2PNode::new(config).await?;
3099
3100        assert!(!node.is_production_mode());
3101        assert!(node.production_config().is_none());
3102
3103        // Resource metrics should fail when production mode is disabled
3104        let result = node.resource_metrics().await;
3105        assert!(result.is_err());
3106        assert!(result.unwrap_err().to_string().contains("not enabled"));
3107
3108        Ok(())
3109    }
3110
3111    #[tokio::test]
3112    async fn test_network_event_variants() {
3113        // Test that all network event variants can be created
3114        let peer_id = "test_peer".to_string();
3115        let address = "/ip4/127.0.0.1/tcp/9000".to_string();
3116
3117        let _peer_connected = NetworkEvent::PeerConnected {
3118            peer_id: peer_id.clone(),
3119            addresses: vec![address.clone()],
3120        };
3121
3122        let _peer_disconnected = NetworkEvent::PeerDisconnected {
3123            peer_id: peer_id.clone(),
3124            reason: "test disconnect".to_string(),
3125        };
3126
3127        let _message_received = NetworkEvent::MessageReceived {
3128            peer_id: peer_id.clone(),
3129            protocol: "test-protocol".to_string(),
3130            data: vec![1, 2, 3],
3131        };
3132
3133        let _connection_failed = NetworkEvent::ConnectionFailed {
3134            peer_id: Some(peer_id.clone()),
3135            address: address.clone(),
3136            error: "connection refused".to_string(),
3137        };
3138
3139        let _dht_stored = NetworkEvent::DHTRecordStored {
3140            key: vec![1, 2, 3],
3141            value: vec![4, 5, 6],
3142        };
3143
3144        let _dht_retrieved = NetworkEvent::DHTRecordRetrieved {
3145            key: vec![1, 2, 3],
3146            value: Some(vec![4, 5, 6]),
3147        };
3148    }
3149
3150    #[tokio::test]
3151    async fn test_peer_info_structure() {
3152        let peer_info = PeerInfo {
3153            peer_id: "test_peer".to_string(),
3154            addresses: vec!["/ip4/127.0.0.1/tcp/9000".to_string()],
3155            connected_at: Instant::now(),
3156            last_seen: Instant::now(),
3157            status: ConnectionStatus::Connected,
3158            protocols: vec!["test-protocol".to_string()],
3159            heartbeat_count: 0,
3160        };
3161
3162        assert_eq!(peer_info.peer_id, "test_peer");
3163        assert_eq!(peer_info.addresses.len(), 1);
3164        assert_eq!(peer_info.status, ConnectionStatus::Connected);
3165        assert_eq!(peer_info.protocols.len(), 1);
3166    }
3167
3168    #[tokio::test]
3169    async fn test_serialization() -> Result<()> {
3170        // Test that configs can be serialized/deserialized
3171        let config = create_test_node_config();
3172        let serialized = serde_json::to_string(&config)?;
3173        let deserialized: NodeConfig = serde_json::from_str(&serialized)?;
3174
3175        assert_eq!(config.peer_id, deserialized.peer_id);
3176        assert_eq!(config.listen_addrs, deserialized.listen_addrs);
3177        assert_eq!(config.enable_ipv6, deserialized.enable_ipv6);
3178
3179        Ok(())
3180    }
3181
3182    #[tokio::test]
3183    async fn test_get_peer_id_by_address_found() -> Result<()> {
3184        let config = create_test_node_config();
3185        let node = P2PNode::new(config).await?;
3186
3187        // Manually insert a peer for testing
3188        let test_peer_id = "peer_test_123".to_string();
3189        let test_address = "192.168.1.100:9000".to_string();
3190
3191        let peer_info = PeerInfo {
3192            peer_id: test_peer_id.clone(),
3193            addresses: vec![test_address.clone()],
3194            connected_at: Instant::now(),
3195            last_seen: Instant::now(),
3196            status: ConnectionStatus::Connected,
3197            protocols: vec!["test-protocol".to_string()],
3198            heartbeat_count: 0,
3199        };
3200
3201        node.peers
3202            .write()
3203            .await
3204            .insert(test_peer_id.clone(), peer_info);
3205
3206        // Test: Find peer by address
3207        let found_peer_id = node.get_peer_id_by_address(&test_address).await;
3208        assert_eq!(found_peer_id, Some(test_peer_id));
3209
3210        Ok(())
3211    }
3212
3213    #[tokio::test]
3214    async fn test_get_peer_id_by_address_not_found() -> Result<()> {
3215        let config = create_test_node_config();
3216        let node = P2PNode::new(config).await?;
3217
3218        // Test: Try to find a peer that doesn't exist
3219        let result = node.get_peer_id_by_address("192.168.1.200:9000").await;
3220        assert_eq!(result, None);
3221
3222        Ok(())
3223    }
3224
3225    #[tokio::test]
3226    async fn test_get_peer_id_by_address_invalid_format() -> Result<()> {
3227        let config = create_test_node_config();
3228        let node = P2PNode::new(config).await?;
3229
3230        // Test: Invalid address format should return None
3231        let result = node.get_peer_id_by_address("invalid-address").await;
3232        assert_eq!(result, None);
3233
3234        Ok(())
3235    }
3236
3237    #[tokio::test]
3238    async fn test_get_peer_id_by_address_multiple_peers() -> Result<()> {
3239        let config = create_test_node_config();
3240        let node = P2PNode::new(config).await?;
3241
3242        // Add multiple peers with different addresses
3243        let peer1_id = "peer_1".to_string();
3244        let peer1_addr = "192.168.1.101:9001".to_string();
3245
3246        let peer2_id = "peer_2".to_string();
3247        let peer2_addr = "192.168.1.102:9002".to_string();
3248
3249        let peer1_info = PeerInfo {
3250            peer_id: peer1_id.clone(),
3251            addresses: vec![peer1_addr.clone()],
3252            connected_at: Instant::now(),
3253            last_seen: Instant::now(),
3254            status: ConnectionStatus::Connected,
3255            protocols: vec!["test-protocol".to_string()],
3256            heartbeat_count: 0,
3257        };
3258
3259        let peer2_info = PeerInfo {
3260            peer_id: peer2_id.clone(),
3261            addresses: vec![peer2_addr.clone()],
3262            connected_at: Instant::now(),
3263            last_seen: Instant::now(),
3264            status: ConnectionStatus::Connected,
3265            protocols: vec!["test-protocol".to_string()],
3266            heartbeat_count: 0,
3267        };
3268
3269        node.peers
3270            .write()
3271            .await
3272            .insert(peer1_id.clone(), peer1_info);
3273        node.peers
3274            .write()
3275            .await
3276            .insert(peer2_id.clone(), peer2_info);
3277
3278        // Test: Find each peer by their unique address
3279        let found_peer1 = node.get_peer_id_by_address(&peer1_addr).await;
3280        let found_peer2 = node.get_peer_id_by_address(&peer2_addr).await;
3281
3282        assert_eq!(found_peer1, Some(peer1_id));
3283        assert_eq!(found_peer2, Some(peer2_id));
3284
3285        Ok(())
3286    }
3287
3288    #[tokio::test]
3289    async fn test_list_active_connections_empty() -> Result<()> {
3290        let config = create_test_node_config();
3291        let node = P2PNode::new(config).await?;
3292
3293        // Test: No connections initially
3294        let connections = node.list_active_connections().await;
3295        assert!(connections.is_empty());
3296
3297        Ok(())
3298    }
3299
3300    #[tokio::test]
3301    async fn test_list_active_connections_with_peers() -> Result<()> {
3302        let config = create_test_node_config();
3303        let node = P2PNode::new(config).await?;
3304
3305        // Add multiple peers
3306        let peer1_id = "peer_1".to_string();
3307        let peer1_addrs = vec![
3308            "192.168.1.101:9001".to_string(),
3309            "192.168.1.101:9002".to_string(),
3310        ];
3311
3312        let peer2_id = "peer_2".to_string();
3313        let peer2_addrs = vec!["192.168.1.102:9003".to_string()];
3314
3315        let peer1_info = PeerInfo {
3316            peer_id: peer1_id.clone(),
3317            addresses: peer1_addrs.clone(),
3318            connected_at: Instant::now(),
3319            last_seen: Instant::now(),
3320            status: ConnectionStatus::Connected,
3321            protocols: vec!["test-protocol".to_string()],
3322            heartbeat_count: 0,
3323        };
3324
3325        let peer2_info = PeerInfo {
3326            peer_id: peer2_id.clone(),
3327            addresses: peer2_addrs.clone(),
3328            connected_at: Instant::now(),
3329            last_seen: Instant::now(),
3330            status: ConnectionStatus::Connected,
3331            protocols: vec!["test-protocol".to_string()],
3332            heartbeat_count: 0,
3333        };
3334
3335        node.peers
3336            .write()
3337            .await
3338            .insert(peer1_id.clone(), peer1_info);
3339        node.peers
3340            .write()
3341            .await
3342            .insert(peer2_id.clone(), peer2_info);
3343
3344        // Test: List all active connections
3345        let connections = node.list_active_connections().await;
3346        assert_eq!(connections.len(), 2);
3347
3348        // Verify peer1 and peer2 are in the list
3349        let peer1_conn = connections.iter().find(|(id, _)| id == &peer1_id);
3350        let peer2_conn = connections.iter().find(|(id, _)| id == &peer2_id);
3351
3352        assert!(peer1_conn.is_some());
3353        assert!(peer2_conn.is_some());
3354
3355        // Verify addresses match
3356        assert_eq!(peer1_conn.unwrap().1, peer1_addrs);
3357        assert_eq!(peer2_conn.unwrap().1, peer2_addrs);
3358
3359        Ok(())
3360    }
3361
3362    #[tokio::test]
3363    async fn test_remove_peer_success() -> Result<()> {
3364        let config = create_test_node_config();
3365        let node = P2PNode::new(config).await?;
3366
3367        // Add a peer
3368        let peer_id = "peer_to_remove".to_string();
3369        let peer_info = PeerInfo {
3370            peer_id: peer_id.clone(),
3371            addresses: vec!["192.168.1.100:9000".to_string()],
3372            connected_at: Instant::now(),
3373            last_seen: Instant::now(),
3374            status: ConnectionStatus::Connected,
3375            protocols: vec!["test-protocol".to_string()],
3376            heartbeat_count: 0,
3377        };
3378
3379        node.peers.write().await.insert(peer_id.clone(), peer_info);
3380
3381        // Verify peer exists
3382        assert!(node.is_peer_connected(&peer_id).await);
3383
3384        // Remove the peer
3385        let removed = node.remove_peer(&peer_id).await;
3386        assert!(removed);
3387
3388        // Verify peer no longer exists
3389        assert!(!node.is_peer_connected(&peer_id).await);
3390
3391        Ok(())
3392    }
3393
3394    #[tokio::test]
3395    async fn test_remove_peer_nonexistent() -> Result<()> {
3396        let config = create_test_node_config();
3397        let node = P2PNode::new(config).await?;
3398
3399        // Try to remove a peer that doesn't exist
3400        let removed = node.remove_peer(&"nonexistent_peer".to_string()).await;
3401        assert!(!removed);
3402
3403        Ok(())
3404    }
3405
3406    #[tokio::test]
3407    async fn test_is_peer_connected() -> Result<()> {
3408        let config = create_test_node_config();
3409        let node = P2PNode::new(config).await?;
3410
3411        let peer_id = "test_peer".to_string();
3412
3413        // Initially not connected
3414        assert!(!node.is_peer_connected(&peer_id).await);
3415
3416        // Add peer
3417        let peer_info = PeerInfo {
3418            peer_id: peer_id.clone(),
3419            addresses: vec!["192.168.1.100:9000".to_string()],
3420            connected_at: Instant::now(),
3421            last_seen: Instant::now(),
3422            status: ConnectionStatus::Connected,
3423            protocols: vec!["test-protocol".to_string()],
3424            heartbeat_count: 0,
3425        };
3426
3427        node.peers.write().await.insert(peer_id.clone(), peer_info);
3428
3429        // Now connected
3430        assert!(node.is_peer_connected(&peer_id).await);
3431
3432        // Remove peer
3433        node.remove_peer(&peer_id).await;
3434
3435        // No longer connected
3436        assert!(!node.is_peer_connected(&peer_id).await);
3437
3438        Ok(())
3439    }
3440
3441    #[test]
3442    fn test_normalize_ipv6_wildcard() {
3443        use std::net::{IpAddr, Ipv6Addr, SocketAddr};
3444
3445        let wildcard = SocketAddr::new(IpAddr::V6(Ipv6Addr::UNSPECIFIED), 8080);
3446        let normalized = normalize_wildcard_to_loopback(wildcard);
3447
3448        assert_eq!(normalized.ip(), IpAddr::V6(Ipv6Addr::LOCALHOST));
3449        assert_eq!(normalized.port(), 8080);
3450    }
3451
3452    #[test]
3453    fn test_normalize_ipv4_wildcard() {
3454        use std::net::{IpAddr, Ipv4Addr, SocketAddr};
3455
3456        let wildcard = SocketAddr::new(IpAddr::V4(Ipv4Addr::UNSPECIFIED), 9000);
3457        let normalized = normalize_wildcard_to_loopback(wildcard);
3458
3459        assert_eq!(normalized.ip(), IpAddr::V4(Ipv4Addr::LOCALHOST));
3460        assert_eq!(normalized.port(), 9000);
3461    }
3462
3463    #[test]
3464    fn test_normalize_specific_address_unchanged() {
3465        let specific: std::net::SocketAddr = "192.168.1.100:3000".parse().unwrap();
3466        let normalized = normalize_wildcard_to_loopback(specific);
3467
3468        assert_eq!(normalized, specific);
3469    }
3470
3471    #[test]
3472    fn test_normalize_loopback_unchanged() {
3473        use std::net::{IpAddr, Ipv4Addr, Ipv6Addr, SocketAddr};
3474
3475        let loopback_v6 = SocketAddr::new(IpAddr::V6(Ipv6Addr::LOCALHOST), 5000);
3476        let normalized_v6 = normalize_wildcard_to_loopback(loopback_v6);
3477        assert_eq!(normalized_v6, loopback_v6);
3478
3479        let loopback_v4 = SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), 5000);
3480        let normalized_v4 = normalize_wildcard_to_loopback(loopback_v4);
3481        assert_eq!(normalized_v4, loopback_v4);
3482    }
3483}