saorsa_core/
network.rs

1// Copyright 2024 Saorsa Labs Limited
2//
3// This software is dual-licensed under:
4// - GNU Affero General Public License v3.0 or later (AGPL-3.0-or-later)
5// - Commercial License
6//
7// For AGPL-3.0 license, see LICENSE-AGPL-3.0
8// For commercial licensing, contact: saorsalabs@gmail.com
9//
10// Unless required by applicable law or agreed to in writing, software
11// distributed under these licenses is distributed on an "AS IS" BASIS,
12// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
14//! Network module
15//!
16//! This module provides core networking functionality for the P2P Foundation.
17//! It handles peer connections, network events, and node lifecycle management.
18
19use crate::bgp_geo_provider::BgpGeoProvider;
20use crate::bootstrap::{BootstrapManager, ContactEntry, QualityMetrics};
21use crate::config::Config;
22use crate::control::RejectionMessage;
23use crate::dht::DHT;
24use crate::error::{NetworkError, P2PError, P2pResult as Result};
25use crate::identity::rejection::RejectionReason;
26use crate::security::GeoProvider;
27
28use crate::production::{ProductionConfig, ResourceManager, ResourceMetrics};
29use crate::transport::ant_quic_adapter::DualStackNetworkNode;
30#[allow(unused_imports)] // Temporarily unused during migration
31use crate::transport::{TransportOptions, TransportType};
32use crate::validation::RateLimitConfig;
33use crate::validation::RateLimiter;
34use crate::{NetworkAddress, PeerId};
35use serde::{Deserialize, Serialize};
36use serde_json::json;
37use std::collections::{HashMap, HashSet};
38use std::sync::Arc;
39use std::sync::atomic::{AtomicBool, Ordering};
40use std::time::Duration;
41use tokio::sync::{RwLock, broadcast};
42use tokio::time::Instant;
43use tracing::{debug, info, trace, warn};
44
45/// Configuration for a P2P node
46#[derive(Debug, Clone, Serialize, Deserialize)]
47pub struct NodeConfig {
48    /// Local peer ID for this node
49    pub peer_id: Option<PeerId>,
50
51    /// Addresses to listen on for incoming connections
52    pub listen_addrs: Vec<std::net::SocketAddr>,
53
54    /// Primary listen address (for compatibility)
55    pub listen_addr: std::net::SocketAddr,
56
57    /// Bootstrap peers to connect to on startup (legacy)
58    pub bootstrap_peers: Vec<std::net::SocketAddr>,
59
60    /// Bootstrap peers as strings (for integration tests)
61    pub bootstrap_peers_str: Vec<String>,
62
63    /// Enable IPv6 support
64    pub enable_ipv6: bool,
65
66    // MCP removed; will be redesigned later
67    /// Connection timeout duration
68    pub connection_timeout: Duration,
69
70    /// Keep-alive interval for connections
71    pub keep_alive_interval: Duration,
72
73    /// Maximum number of concurrent connections
74    pub max_connections: usize,
75
76    /// Maximum number of incoming connections
77    pub max_incoming_connections: usize,
78
79    /// DHT configuration
80    pub dht_config: DHTConfig,
81
82    /// Security configuration
83    pub security_config: SecurityConfig,
84
85    /// Production hardening configuration
86    pub production_config: Option<ProductionConfig>,
87
88    /// Bootstrap cache configuration
89    pub bootstrap_cache_config: Option<crate::bootstrap::CacheConfig>,
90}
91
92/// DHT-specific configuration
93#[derive(Debug, Clone, Serialize, Deserialize)]
94pub struct DHTConfig {
95    /// Kademlia K parameter (bucket size)
96    pub k_value: usize,
97
98    /// Kademlia alpha parameter (parallelism)
99    pub alpha_value: usize,
100
101    /// DHT record TTL
102    pub record_ttl: Duration,
103
104    /// DHT refresh interval
105    pub refresh_interval: Duration,
106}
107
108/// Security configuration
109#[derive(Debug, Clone, Serialize, Deserialize)]
110pub struct SecurityConfig {
111    /// Enable noise protocol for encryption
112    pub enable_noise: bool,
113
114    /// Enable TLS for secure transport
115    pub enable_tls: bool,
116
117    /// Trust level for peer verification
118    pub trust_level: TrustLevel,
119}
120
121/// Trust level for peer verification
122#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
123pub enum TrustLevel {
124    /// No verification required
125    None,
126    /// Basic peer ID verification
127    Basic,
128    /// Full cryptographic verification
129    Full,
130}
131
132impl NodeConfig {
133    /// Create a new NodeConfig with default values
134    ///
135    /// # Errors
136    ///
137    /// Returns an error if default addresses cannot be parsed
138    pub fn new() -> Result<Self> {
139        // Load config and use its defaults
140        let config = Config::default();
141
142        // Parse the default listen address
143        let listen_addr = config.listen_socket_addr()?;
144
145        // Create listen addresses based on config
146        let mut listen_addrs = vec![];
147
148        // Add IPv6 address if enabled
149        if config.network.ipv6_enabled {
150            let ipv6_addr = std::net::SocketAddr::new(
151                std::net::IpAddr::V6(std::net::Ipv6Addr::UNSPECIFIED),
152                listen_addr.port(),
153            );
154            listen_addrs.push(ipv6_addr);
155        }
156
157        // Always add IPv4
158        let ipv4_addr = std::net::SocketAddr::new(
159            std::net::IpAddr::V4(std::net::Ipv4Addr::UNSPECIFIED),
160            listen_addr.port(),
161        );
162        listen_addrs.push(ipv4_addr);
163
164        Ok(Self {
165            peer_id: None,
166            listen_addrs,
167            listen_addr,
168            bootstrap_peers: Vec::new(),
169            bootstrap_peers_str: config.network.bootstrap_nodes.clone(),
170            enable_ipv6: config.network.ipv6_enabled,
171
172            connection_timeout: Duration::from_secs(config.network.connection_timeout),
173            keep_alive_interval: Duration::from_secs(config.network.keepalive_interval),
174            max_connections: config.network.max_connections,
175            max_incoming_connections: config.security.connection_limit as usize,
176            dht_config: DHTConfig::default(),
177            security_config: SecurityConfig::default(),
178            production_config: None,
179            bootstrap_cache_config: None,
180            // identity_config: None,
181        })
182    }
183}
184
185impl Default for NodeConfig {
186    fn default() -> Self {
187        // Use config defaults for network settings
188        let config = Config::default();
189
190        // Parse the default listen address - use safe fallback if parsing fails
191        let listen_addr = config.listen_socket_addr().unwrap_or_else(|_| {
192            std::net::SocketAddr::new(
193                std::net::IpAddr::V4(std::net::Ipv4Addr::new(0, 0, 0, 0)),
194                9000,
195            )
196        });
197
198        Self {
199            peer_id: None,
200            listen_addrs: vec![
201                std::net::SocketAddr::new(
202                    std::net::IpAddr::V6(std::net::Ipv6Addr::UNSPECIFIED),
203                    listen_addr.port(),
204                ),
205                std::net::SocketAddr::new(
206                    std::net::IpAddr::V4(std::net::Ipv4Addr::UNSPECIFIED),
207                    listen_addr.port(),
208                ),
209            ],
210            listen_addr,
211            bootstrap_peers: Vec::new(),
212            bootstrap_peers_str: Vec::new(),
213            enable_ipv6: config.network.ipv6_enabled,
214
215            connection_timeout: Duration::from_secs(config.network.connection_timeout),
216            keep_alive_interval: Duration::from_secs(config.network.keepalive_interval),
217            max_connections: config.network.max_connections,
218            max_incoming_connections: config.security.connection_limit as usize,
219            dht_config: DHTConfig::default(),
220            security_config: SecurityConfig::default(),
221            production_config: None, // Use default production config if enabled
222            bootstrap_cache_config: None,
223            // identity_config: None, // Use default identity config if enabled
224        }
225    }
226}
227
228impl NodeConfig {
229    /// Create NodeConfig from Config
230    pub fn from_config(config: &Config) -> Result<Self> {
231        let listen_addr = config.listen_socket_addr()?;
232        let bootstrap_addrs = config.bootstrap_addrs()?;
233
234        let mut node_config = Self {
235            peer_id: None,
236            listen_addrs: vec![listen_addr],
237            listen_addr,
238            bootstrap_peers: bootstrap_addrs
239                .iter()
240                .map(|addr| addr.socket_addr())
241                .collect(),
242            bootstrap_peers_str: config
243                .network
244                .bootstrap_nodes
245                .iter()
246                .map(|addr| addr.to_string())
247                .collect(),
248            enable_ipv6: config.network.ipv6_enabled,
249
250            connection_timeout: Duration::from_secs(config.network.connection_timeout),
251            keep_alive_interval: Duration::from_secs(config.network.keepalive_interval),
252            max_connections: config.network.max_connections,
253            max_incoming_connections: config.security.connection_limit as usize,
254            dht_config: DHTConfig {
255                k_value: 20,
256                alpha_value: 3,
257                record_ttl: Duration::from_secs(3600),
258                refresh_interval: Duration::from_secs(900),
259            },
260            security_config: SecurityConfig {
261                enable_noise: true,
262                enable_tls: true,
263                trust_level: TrustLevel::Basic,
264            },
265            production_config: Some(ProductionConfig {
266                max_connections: config.network.max_connections,
267                max_memory_bytes: 0,  // unlimited
268                max_bandwidth_bps: 0, // unlimited
269                connection_timeout: Duration::from_secs(config.network.connection_timeout),
270                keep_alive_interval: Duration::from_secs(config.network.keepalive_interval),
271                health_check_interval: Duration::from_secs(30),
272                metrics_interval: Duration::from_secs(60),
273                enable_performance_tracking: true,
274                enable_auto_cleanup: true,
275                shutdown_timeout: Duration::from_secs(30),
276                rate_limits: crate::production::RateLimitConfig::default(),
277            }),
278            bootstrap_cache_config: None,
279            // identity_config: Some(IdentityManagerConfig {
280            //     cache_ttl: Duration::from_secs(3600),
281            //     challenge_timeout: Duration::from_secs(30),
282            // }),
283        };
284
285        // Add IPv6 listen address if enabled
286        if config.network.ipv6_enabled {
287            node_config.listen_addrs.push(std::net::SocketAddr::new(
288                std::net::IpAddr::V6(std::net::Ipv6Addr::UNSPECIFIED),
289                listen_addr.port(),
290            ));
291        }
292
293        Ok(node_config)
294    }
295
296    /// Try to build a NodeConfig from a listen address string
297    pub fn with_listen_addr(addr: &str) -> Result<Self> {
298        let listen_addr: std::net::SocketAddr = addr
299            .parse()
300            .map_err(|e: std::net::AddrParseError| {
301                NetworkError::InvalidAddress(e.to_string().into())
302            })
303            .map_err(P2PError::Network)?;
304        let cfg = NodeConfig {
305            listen_addr,
306            listen_addrs: vec![listen_addr],
307            ..Default::default()
308        };
309        Ok(cfg)
310    }
311}
312
313impl Default for DHTConfig {
314    fn default() -> Self {
315        Self {
316            k_value: 20,
317            alpha_value: 5,
318            record_ttl: Duration::from_secs(3600), // 1 hour
319            refresh_interval: Duration::from_secs(600), // 10 minutes
320        }
321    }
322}
323
324impl Default for SecurityConfig {
325    fn default() -> Self {
326        Self {
327            enable_noise: true,
328            enable_tls: true,
329            trust_level: TrustLevel::Basic,
330        }
331    }
332}
333
334/// Information about a connected peer
335#[derive(Debug, Clone)]
336pub struct PeerInfo {
337    /// Peer identifier
338    pub peer_id: PeerId,
339
340    /// Peer's addresses
341    pub addresses: Vec<String>,
342
343    /// Connection timestamp
344    pub connected_at: Instant,
345
346    /// Last seen timestamp
347    pub last_seen: Instant,
348
349    /// Connection status
350    pub status: ConnectionStatus,
351
352    /// Supported protocols
353    pub protocols: Vec<String>,
354
355    /// Number of heartbeats received
356    pub heartbeat_count: u64,
357}
358
359/// Connection status for a peer
360#[derive(Debug, Clone, PartialEq)]
361pub enum ConnectionStatus {
362    /// Connection is being established
363    Connecting,
364    /// Connection is established and active
365    Connected,
366    /// Connection is being closed
367    Disconnecting,
368    /// Connection is closed
369    Disconnected,
370    /// Connection failed
371    Failed(String),
372}
373
374/// Network events that can occur
375#[derive(Debug, Clone)]
376pub enum NetworkEvent {
377    /// A new peer has connected
378    PeerConnected {
379        /// The identifier of the newly connected peer
380        peer_id: PeerId,
381        /// The network addresses where the peer can be reached
382        addresses: Vec<String>,
383    },
384
385    /// A peer has disconnected
386    PeerDisconnected {
387        /// The identifier of the disconnected peer
388        peer_id: PeerId,
389        /// The reason for the disconnection
390        reason: String,
391    },
392
393    /// A message was received from a peer
394    MessageReceived {
395        /// The identifier of the sending peer
396        peer_id: PeerId,
397        /// The protocol used for the message
398        protocol: String,
399        /// The raw message data
400        data: Vec<u8>,
401    },
402
403    /// A connection attempt failed
404    ConnectionFailed {
405        /// The identifier of the peer (if known)
406        peer_id: Option<PeerId>,
407        /// The address where connection was attempted
408        address: String,
409        /// The error message describing the failure
410        error: String,
411    },
412
413    /// DHT record was stored
414    DHTRecordStored {
415        /// The DHT key where the record was stored
416        key: Vec<u8>,
417        /// The value that was stored
418        value: Vec<u8>,
419    },
420
421    /// DHT record was retrieved
422    DHTRecordRetrieved {
423        /// The DHT key that was queried
424        key: Vec<u8>,
425        /// The retrieved value, if found
426        value: Option<Vec<u8>>,
427    },
428}
429
430/// Network events that can occur in the P2P system
431///
432/// Events are broadcast to all listeners and provide real-time
433/// notifications of network state changes and message arrivals.
434#[derive(Debug, Clone)]
435pub enum P2PEvent {
436    /// Message received from a peer on a specific topic
437    Message {
438        /// Topic or channel the message was sent on
439        topic: String,
440        /// Peer ID of the message sender
441        source: PeerId,
442        /// Raw message data payload
443        data: Vec<u8>,
444    },
445    /// A new peer has connected to the network
446    PeerConnected(PeerId),
447    /// A peer has disconnected from the network
448    PeerDisconnected(PeerId),
449}
450
451/// Main P2P node structure
452/// Main P2P network node that manages connections, routing, and communication
453///
454/// This struct represents a complete P2P network participant that can:
455/// - Connect to other peers via QUIC transport
456/// - Participate in distributed hash table (DHT) operations
457/// - Send and receive messages through various protocols
458/// - Handle network events and peer lifecycle
459/// - Provide MCP (Model Context Protocol) services
460pub struct P2PNode {
461    /// Node configuration
462    config: NodeConfig,
463
464    /// Our peer ID
465    peer_id: PeerId,
466
467    /// Connected peers
468    peers: Arc<RwLock<HashMap<PeerId, PeerInfo>>>,
469
470    /// Network event broadcaster
471    event_tx: broadcast::Sender<P2PEvent>,
472
473    /// Listen addresses
474    listen_addrs: RwLock<Vec<std::net::SocketAddr>>,
475
476    /// Node start time
477    start_time: Instant,
478
479    /// Running state
480    running: RwLock<bool>,
481
482    /// DHT instance (optional)
483    dht: Option<Arc<RwLock<DHT>>>,
484
485    /// Production resource manager (optional)
486    resource_manager: Option<Arc<ResourceManager>>,
487
488    /// Bootstrap cache manager for peer discovery
489    bootstrap_manager: Option<Arc<RwLock<BootstrapManager>>>,
490
491    /// Dual-stack ant-quic nodes (IPv6 + IPv4) with Happy Eyeballs dialing
492    dual_node: Arc<DualStackNetworkNode>,
493
494    /// Rate limiter for connection and request throttling
495    #[allow(dead_code)]
496    rate_limiter: Arc<RateLimiter>,
497
498    /// Active connections (tracked by peer_id)
499    /// This set is synchronized with ant-quic's connection state via event monitoring
500    active_connections: Arc<RwLock<HashSet<PeerId>>>,
501
502    /// Security dashboard for monitoring
503    pub security_dashboard: Option<Arc<crate::dht::metrics::SecurityDashboard>>,
504
505    /// Connection lifecycle monitor task handle
506    #[allow(dead_code)]
507    connection_monitor_handle: Arc<RwLock<Option<tokio::task::JoinHandle<()>>>>,
508
509    /// Keepalive task handle
510    #[allow(dead_code)]
511    keepalive_handle: Arc<RwLock<Option<tokio::task::JoinHandle<()>>>>,
512
513    /// Shutdown flag for background tasks
514    #[allow(dead_code)]
515    shutdown: Arc<AtomicBool>,
516
517    /// GeoIP provider for connection validation
518    #[allow(dead_code)]
519    geo_provider: Arc<BgpGeoProvider>,
520}
521
522/// Normalize wildcard bind addresses to localhost loopback addresses
523///
524/// ant-quic correctly rejects "unspecified" addresses (0.0.0.0 and [::]) for remote connections
525/// because you cannot connect TO an unspecified address - these are only valid for BINDING.
526///
527/// This function converts wildcard addresses to appropriate loopback addresses for local connections:
528/// - IPv6 [::]:port → ::1:port (IPv6 loopback)
529/// - IPv4 0.0.0.0:port → 127.0.0.1:port (IPv4 loopback)
530/// - All other addresses pass through unchanged
531///
532/// # Arguments
533/// * `addr` - The SocketAddr to normalize
534///
535/// # Returns
536/// * Normalized SocketAddr suitable for remote connections
537fn normalize_wildcard_to_loopback(addr: std::net::SocketAddr) -> std::net::SocketAddr {
538    use std::net::{IpAddr, Ipv4Addr, Ipv6Addr};
539
540    if addr.ip().is_unspecified() {
541        // Convert unspecified addresses to loopback
542        let loopback_ip = match addr {
543            std::net::SocketAddr::V6(_) => IpAddr::V6(Ipv6Addr::LOCALHOST), // ::1
544            std::net::SocketAddr::V4(_) => IpAddr::V4(Ipv4Addr::LOCALHOST), // 127.0.0.1
545        };
546        std::net::SocketAddr::new(loopback_ip, addr.port())
547    } else {
548        // Not a wildcard address, pass through unchanged
549        addr
550    }
551}
552
553impl P2PNode {
554    /// Minimal constructor for tests that avoids real networking
555    pub fn new_for_tests() -> Result<Self> {
556        let (event_tx, _) = broadcast::channel(16);
557        Ok(Self {
558            config: NodeConfig::default(),
559            peer_id: "test_peer".to_string(),
560            peers: Arc::new(RwLock::new(HashMap::new())),
561            event_tx,
562            listen_addrs: RwLock::new(Vec::new()),
563            start_time: Instant::now(),
564            running: RwLock::new(false),
565            dht: None,
566            resource_manager: None,
567            bootstrap_manager: None,
568            dual_node: {
569                // Bind dual-stack nodes on ephemeral ports for tests
570                let v6: Option<std::net::SocketAddr> = "[::1]:0"
571                    .parse()
572                    .ok()
573                    .or(Some(std::net::SocketAddr::from(([0, 0, 0, 0], 0))));
574                let v4: Option<std::net::SocketAddr> = "127.0.0.1:0".parse().ok();
575                let handle = tokio::runtime::Handle::current();
576                let dual_attempt = handle.block_on(
577                    crate::transport::ant_quic_adapter::DualStackNetworkNode::new(v6, v4),
578                );
579                let dual = match dual_attempt {
580                    Ok(d) => d,
581                    Err(_e1) => {
582                        // Fallback to IPv4-only ephemeral bind
583                        let fallback = handle.block_on(
584                            crate::transport::ant_quic_adapter::DualStackNetworkNode::new(
585                                None,
586                                "127.0.0.1:0".parse().ok(),
587                            ),
588                        );
589                        match fallback {
590                            Ok(d) => d,
591                            Err(e2) => {
592                                return Err(P2PError::Network(NetworkError::BindError(
593                                    format!("Failed to create dual-stack network node: {}", e2)
594                                        .into(),
595                                )));
596                            }
597                        }
598                    }
599                };
600                Arc::new(dual)
601            },
602            rate_limiter: Arc::new(RateLimiter::new(RateLimitConfig {
603                max_requests: 100,
604                burst_size: 100,
605                window: std::time::Duration::from_secs(1),
606                ..Default::default()
607            })),
608            active_connections: Arc::new(RwLock::new(HashSet::new())),
609            connection_monitor_handle: Arc::new(RwLock::new(None)),
610            keepalive_handle: Arc::new(RwLock::new(None)),
611            shutdown: Arc::new(AtomicBool::new(false)),
612            geo_provider: Arc::new(BgpGeoProvider::new()),
613            security_dashboard: None,
614        })
615    }
616    /// Create a new P2P node with the given configuration
617    pub async fn new(config: NodeConfig) -> Result<Self> {
618        let peer_id = config.peer_id.clone().unwrap_or_else(|| {
619            // Generate a random peer ID for now
620            format!("peer_{}", &uuid::Uuid::new_v4().to_string()[..8])
621        });
622
623        let (event_tx, _) = broadcast::channel(1000);
624
625        // Initialize and register a TrustWeightedKademlia DHT for the global API
626        // Use a deterministic local NodeId derived from the peer_id
627        {
628            use blake3::Hasher;
629            let mut hasher = Hasher::new();
630            hasher.update(peer_id.as_bytes());
631            let digest = hasher.finalize();
632            let mut nid = [0u8; 32];
633            nid.copy_from_slice(digest.as_bytes());
634            let _twdht = std::sync::Arc::new(crate::dht::TrustWeightedKademlia::new(
635                crate::identity::node_identity::NodeId::from_bytes(nid),
636            ));
637            // TODO: Update to use new clean API
638            // let _ = crate::api::set_dht_instance(twdht);
639        }
640
641        // Initialize DHT if needed
642        let (dht, security_dashboard) = if true { // Assuming DHT is always enabled for now, or check config
643            let _dht_config = crate::dht::DHTConfig {
644                replication_factor: config.dht_config.k_value,
645                bucket_size: config.dht_config.k_value,
646                alpha: config.dht_config.alpha_value,
647                record_ttl: config.dht_config.record_ttl,
648                bucket_refresh_interval: config.dht_config.refresh_interval,
649                republish_interval: config.dht_config.refresh_interval,
650                max_distance: 160,
651            };
652            // Convert peer_id String to NodeId
653            let peer_bytes = peer_id.as_bytes();
654            let mut node_id_bytes = [0u8; 32];
655            let len = peer_bytes.len().min(32);
656            node_id_bytes[..len].copy_from_slice(&peer_bytes[..len]);
657            let node_id = crate::dht::core_engine::NodeId::from_bytes(node_id_bytes);
658            let dht_instance = DHT::new(node_id).map_err(|e| {
659                crate::error::P2PError::Dht(crate::error::DhtError::StoreFailed(
660                    e.to_string().into(),
661                ))
662            })?;
663            dht_instance.start_maintenance_tasks();
664            
665            // Create Security Dashboard
666            let security_metrics = dht_instance.security_metrics();
667             let dashboard = crate::dht::metrics::SecurityDashboard::new(
668                security_metrics,
669                Arc::new(crate::dht::metrics::DhtMetricsCollector::new()),
670                Arc::new(crate::dht::metrics::TrustMetricsCollector::new()),
671                Arc::new(crate::dht::metrics::PlacementMetricsCollector::new()),
672            );
673            
674            (Some(Arc::new(RwLock::new(dht_instance))), Some(Arc::new(dashboard)))
675        } else {
676            (None, None)
677        };
678
679        // MCP removed
680
681        // Initialize production resource manager if configured
682        let resource_manager = config
683            .production_config
684            .clone()
685            .map(|prod_config| Arc::new(ResourceManager::new(prod_config)));
686
687        // Initialize bootstrap cache manager
688        let bootstrap_manager = if let Some(ref cache_config) = config.bootstrap_cache_config {
689            match BootstrapManager::with_config(cache_config.clone()).await {
690                Ok(manager) => Some(Arc::new(RwLock::new(manager))),
691                Err(e) => {
692                    warn!(
693                        "Failed to initialize bootstrap manager: {}, continuing without cache",
694                        e
695                    );
696                    None
697                }
698            }
699        } else {
700            match BootstrapManager::new().await {
701                Ok(manager) => Some(Arc::new(RwLock::new(manager))),
702                Err(e) => {
703                    warn!(
704                        "Failed to initialize bootstrap manager: {}, continuing without cache",
705                        e
706                    );
707                    None
708                }
709            }
710        };
711
712        // Initialize dual-stack ant-quic nodes
713        // Determine bind addresses
714        let (v6_opt, v4_opt) = {
715            let port = config.listen_addr.port();
716            let ip = config.listen_addr.ip();
717
718            let v4_addr = if ip.is_ipv4() {
719                Some(std::net::SocketAddr::new(ip, port))
720            } else {
721                // If config is IPv6, we still might want IPv4 on UNSPECIFIED if dual stack is desired
722                // But for now let's just stick to defaults if not specified
723                Some(std::net::SocketAddr::new(
724                    std::net::IpAddr::V4(std::net::Ipv4Addr::UNSPECIFIED),
725                    port,
726                ))
727            };
728
729            let v6_addr = if config.enable_ipv6 {
730                if ip.is_ipv6() {
731                    Some(std::net::SocketAddr::new(ip, port))
732                } else {
733                    Some(std::net::SocketAddr::new(
734                        std::net::IpAddr::V6(std::net::Ipv6Addr::UNSPECIFIED),
735                        port,
736                    ))
737                }
738            } else {
739                None
740            };
741            (v6_addr, v4_addr)
742        };
743
744        let dual_node = Arc::new(
745            DualStackNetworkNode::new(v6_opt, v4_opt)
746                .await
747                .map_err(|e| {
748                    P2PError::Transport(crate::error::TransportError::SetupFailed(
749                        format!("Failed to create dual-stack network nodes: {}", e).into(),
750                    ))
751                })?,
752        );
753
754        // Initialize rate limiter with default config
755        let rate_limiter = Arc::new(RateLimiter::new(
756            crate::validation::RateLimitConfig::default(),
757        ));
758
759        // Create active connections tracker
760        let active_connections = Arc::new(RwLock::new(HashSet::new()));
761
762        // Initialize GeoIP provider
763        let geo_provider = Arc::new(BgpGeoProvider::new());
764
765        // Create peers map
766        let peers = Arc::new(RwLock::new(HashMap::new()));
767
768        // Start connection lifecycle monitor
769        let connection_monitor_handle = {
770            let active_conns = Arc::clone(&active_connections);
771            let peers_map = Arc::clone(&peers);
772            let event_tx_clone = event_tx.clone();
773            let dual_node_clone = Arc::clone(&dual_node);
774            let geo_provider_clone = Arc::clone(&geo_provider);
775            let peer_id_clone = peer_id.clone();
776
777            let handle = tokio::spawn(async move {
778                Self::connection_lifecycle_monitor(
779                    dual_node_clone,
780                    active_conns,
781                    peers_map,
782                    event_tx_clone,
783                    geo_provider_clone,
784                    peer_id_clone,
785                )
786                .await;
787            });
788
789            Arc::new(RwLock::new(Some(handle)))
790        };
791
792        // Spawn keepalive task
793        let shutdown = Arc::new(AtomicBool::new(false));
794        let keepalive_handle = {
795            let active_conns = Arc::clone(&active_connections);
796            let dual_node_clone = Arc::clone(&dual_node);
797            let shutdown_clone = Arc::clone(&shutdown);
798
799            let handle = tokio::spawn(async move {
800                Self::keepalive_task(active_conns, dual_node_clone, shutdown_clone).await;
801            });
802
803            Arc::new(RwLock::new(Some(handle)))
804        };
805
806        let node = Self {
807            config,
808            peer_id,
809            peers,
810            event_tx,
811            listen_addrs: RwLock::new(Vec::new()),
812            start_time: Instant::now(),
813            running: RwLock::new(false),
814            dht,
815            resource_manager,
816            bootstrap_manager,
817            dual_node,
818            rate_limiter,
819            active_connections,
820            security_dashboard, 
821            connection_monitor_handle,
822            keepalive_handle,
823            shutdown,
824            geo_provider,
825        };
826        info!("Created P2P node with peer ID: {}", node.peer_id);
827
828        // Start the network listeners to populate listen addresses
829        node.start_network_listeners().await?;
830
831        // Update the connection monitor with actual peers reference
832        node.start_connection_monitor().await;
833
834        Ok(node)
835    }
836
837    /// Create a new node builder
838    pub fn builder() -> NodeBuilder {
839        NodeBuilder::new()
840    }
841
842    /// Get the peer ID of this node
843    pub fn peer_id(&self) -> &PeerId {
844        &self.peer_id
845    }
846
847    pub fn local_addr(&self) -> Option<String> {
848        self.listen_addrs
849            .try_read()
850            .ok()
851            .and_then(|addrs| addrs.first().map(|a| a.to_string()))
852    }
853
854    pub async fn subscribe(&self, topic: &str) -> Result<()> {
855        // In a real implementation, this would register the topic with the pubsub mechanism.
856        // For now, we just log it.
857        info!("Subscribed to topic: {}", topic);
858        Ok(())
859    }
860
861    pub async fn publish(&self, topic: &str, data: &[u8]) -> Result<()> {
862        info!(
863            "Publishing message to topic: {} ({} bytes)",
864            topic,
865            data.len()
866        );
867
868        // Get list of connected peers
869        let peer_list: Vec<PeerId> = {
870            let peers_guard = self.peers.read().await;
871            peers_guard.keys().cloned().collect()
872        };
873
874        if peer_list.is_empty() {
875            debug!("No peers connected, message will only be sent to local subscribers");
876        } else {
877            // Send message to all connected peers
878            let mut send_count = 0;
879            for peer_id in &peer_list {
880                match self.send_message(peer_id, topic, data.to_vec()).await {
881                    Ok(_) => {
882                        send_count += 1;
883                        debug!("Sent message to peer: {}", peer_id);
884                    }
885                    Err(e) => {
886                        warn!("Failed to send message to peer {}: {}", peer_id, e);
887                    }
888                }
889            }
890            info!(
891                "Published message to {}/{} connected peers",
892                send_count,
893                peer_list.len()
894            );
895        }
896
897        // Also send to local subscribers (for local echo and testing)
898        let event = P2PEvent::Message {
899            topic: topic.to_string(),
900            source: self.peer_id.clone(),
901            data: data.to_vec(),
902        };
903        let _ = self.event_tx.send(event);
904
905        Ok(())
906    }
907
908    /// Get the node configuration
909    pub fn config(&self) -> &NodeConfig {
910        &self.config
911    }
912
913    /// Start the P2P node
914    pub async fn start(&self) -> Result<()> {
915        info!("Starting P2P node...");
916
917        // Start production resource manager if configured
918        if let Some(ref resource_manager) = self.resource_manager {
919            resource_manager.start().await.map_err(|e| {
920                P2PError::Network(crate::error::NetworkError::ProtocolError(
921                    format!("Failed to start resource manager: {e}").into(),
922                ))
923            })?;
924            info!("Production resource manager started");
925        }
926
927        // Start bootstrap manager background tasks
928        if let Some(ref bootstrap_manager) = self.bootstrap_manager {
929            let mut manager = bootstrap_manager.write().await;
930            manager.start_background_tasks().await.map_err(|e| {
931                P2PError::Network(crate::error::NetworkError::ProtocolError(
932                    format!("Failed to start bootstrap manager: {e}").into(),
933                ))
934            })?;
935            info!("Bootstrap cache manager started");
936        }
937
938        // Set running state
939        *self.running.write().await = true;
940
941        // Start listening on configured addresses using transport layer
942        self.start_network_listeners().await?;
943
944        // Log current listen addresses
945        let listen_addrs = self.listen_addrs.read().await;
946        info!("P2P node started on addresses: {:?}", *listen_addrs);
947
948        // MCP removed
949
950        // Start message receiving system
951        self.start_message_receiving_system().await?;
952
953        // Connect to bootstrap peers
954        self.connect_bootstrap_peers().await?;
955
956        Ok(())
957    }
958
959    /// Start network listeners on configured addresses
960    async fn start_network_listeners(&self) -> Result<()> {
961        info!("Starting dual-stack listeners (ant-quic)...");
962        // Update our listen_addrs from the dual node bindings
963        let addrs = self.dual_node.local_addrs().await.map_err(|e| {
964            P2PError::Transport(crate::error::TransportError::SetupFailed(
965                format!("Failed to get local addresses: {}", e).into(),
966            ))
967        })?;
968        {
969            let mut la = self.listen_addrs.write().await;
970            *la = addrs.clone();
971        }
972
973        // Spawn a background accept loop that handles incoming connections from either stack
974        let event_tx = self.event_tx.clone();
975        let peers = self.peers.clone();
976        let active_connections = self.active_connections.clone();
977        let rate_limiter = self.rate_limiter.clone();
978        let dual = self.dual_node.clone();
979        tokio::spawn(async move {
980            loop {
981                match dual.accept_any().await {
982                    Ok((ant_peer_id, remote_sock)) => {
983                        let peer_id =
984                            crate::transport::ant_quic_adapter::ant_peer_id_to_string(&ant_peer_id);
985                        let remote_addr = NetworkAddress::from(remote_sock);
986                        // Optional: basic IP rate limiting
987                        let _ = rate_limiter.check_ip(&remote_sock.ip());
988                        let _ = event_tx.send(P2PEvent::PeerConnected(peer_id.clone()));
989                        register_new_peer(&peers, &peer_id, &remote_addr).await;
990                        active_connections.write().await.insert(peer_id);
991                    }
992                    Err(e) => {
993                        warn!("Accept failed: {}", e);
994                        tokio::time::sleep(std::time::Duration::from_millis(200)).await;
995                    }
996                }
997            }
998        });
999
1000        info!("Dual-stack listeners active on: {:?}", addrs);
1001        Ok(())
1002    }
1003
1004    /// Start a listener on a specific socket address
1005    #[allow(dead_code)]
1006    async fn start_listener_on_address(&self, addr: std::net::SocketAddr) -> Result<()> {
1007        // use crate::transport::{Transport}; // Unused during migration
1008
1009        // DISABLED during ant-quic migration - TODO: Reimplement using AntQuicAdapter
1010        /*
1011        // Try QUIC first (preferred transport)
1012        match crate::transport::QuicTransport::new(Default::default()) {
1013            Ok(quic_transport) => {
1014                match quic_transport.listen(NetworkAddress::new(addr)).await {
1015                    Ok(listen_addrs) => {
1016                        info!("QUIC listener started on {} -> {:?}", addr, listen_addrs);
1017
1018                        // Store the actual listening addresses in the node
1019                        {
1020                            let mut node_listen_addrs = self.listen_addrs.write().await;
1021                            // Don't clear - accumulate addresses from multiple listeners
1022                            node_listen_addrs.push(listen_addrs.socket_addr());
1023                        }
1024
1025                        // Start accepting connections in background
1026                        self.start_connection_acceptor(
1027                            Arc::new(quic_transport),
1028                            addr,
1029                            crate::transport::TransportType::QUIC
1030                        ).await?;
1031
1032                        return Ok(());
1033                    }
1034                    Err(e) => {
1035                        warn!("Failed to start QUIC listener on {}: {}", addr, e);
1036                    }
1037                }
1038            }
1039            Err(e) => {
1040                warn!("Failed to create QUIC transport for listening: {}", e);
1041            }
1042        }
1043        */
1044
1045        warn!("QUIC transport temporarily disabled during ant-quic migration");
1046        // No TCP fallback - QUIC only
1047        Err(crate::P2PError::Transport(
1048            crate::error::TransportError::SetupFailed(
1049                format!(
1050                    "Failed to start QUIC listener on {addr} - transport disabled during migration"
1051                )
1052                .into(),
1053            ),
1054        ))
1055    }
1056
1057    /// Start connection acceptor background task
1058    #[allow(dead_code)] // Deprecated during ant-quic migration
1059    async fn start_connection_acceptor(
1060        &self,
1061        transport: Arc<dyn crate::transport::Transport>,
1062        addr: std::net::SocketAddr,
1063        transport_type: crate::transport::TransportType,
1064    ) -> Result<()> {
1065        info!(
1066            "Starting connection acceptor for {:?} on {}",
1067            transport_type, addr
1068        );
1069
1070        // Clone necessary data for the background task
1071        let event_tx = self.event_tx.clone();
1072        let _peer_id = self.peer_id.clone();
1073        let peers = Arc::clone(&self.peers);
1074        // ant-quic dual-stack node is managed separately; accept loop started in start_network_listeners
1075
1076        let rate_limiter = Arc::clone(&self.rate_limiter);
1077
1078        // Spawn background task to accept incoming connections
1079        tokio::spawn(async move {
1080            loop {
1081                match transport.accept().await {
1082                    Ok(connection) => {
1083                        let remote_addr = connection.remote_addr();
1084                        let connection_peer_id =
1085                            format!("peer_from_{}", remote_addr.to_string().replace(":", "_"));
1086
1087                        // Apply rate limiting for incoming connections
1088                        let socket_addr = remote_addr.socket_addr();
1089                        if check_rate_limit(&rate_limiter, &socket_addr, &remote_addr).is_err() {
1090                            // Connection dropped automatically when it goes out of scope
1091                            continue;
1092                        }
1093
1094                        info!(
1095                            "Accepted {:?} connection from {} (peer: {})",
1096                            transport_type, remote_addr, connection_peer_id
1097                        );
1098
1099                        // Generate peer connected event
1100                        let _ = event_tx.send(P2PEvent::PeerConnected(connection_peer_id.clone()));
1101
1102                        // Store the peer connection
1103                        register_new_peer(&peers, &connection_peer_id, &remote_addr).await;
1104
1105                        // Spawn task to handle this specific connection's messages
1106                        spawn_connection_handler(
1107                            connection,
1108                            connection_peer_id,
1109                            event_tx.clone(),
1110                            Arc::clone(&peers),
1111                        );
1112                    }
1113                    Err(e) => {
1114                        warn!(
1115                            "Failed to accept {:?} connection on {}: {}",
1116                            transport_type, addr, e
1117                        );
1118
1119                        // Brief pause before retrying to avoid busy loop
1120                        tokio::time::sleep(std::time::Duration::from_millis(1000)).await;
1121                    }
1122                }
1123            }
1124        });
1125
1126        info!(
1127            "Connection acceptor background task started for {:?} on {}",
1128            transport_type, addr
1129        );
1130        Ok(())
1131    }
1132
1133    /// Start the message receiving system with background tasks
1134    async fn start_message_receiving_system(&self) -> Result<()> {
1135        info!("Starting message receiving system");
1136        let dual = self.dual_node.clone();
1137        let event_tx = self.event_tx.clone();
1138
1139        tokio::spawn(async move {
1140            loop {
1141                match dual.receive_any().await {
1142                    Ok((_peer_id, bytes)) => {
1143                        // Expect the JSON message wrapper from create_protocol_message
1144                        #[allow(clippy::collapsible_if)]
1145                        if let Ok(value) = serde_json::from_slice::<serde_json::Value>(&bytes) {
1146                            if let (Some(protocol), Some(data), Some(from)) = (
1147                                value.get("protocol").and_then(|v| v.as_str()),
1148                                value.get("data").and_then(|v| v.as_array()),
1149                                value.get("from").and_then(|v| v.as_str()),
1150                            ) {
1151                                let payload: Vec<u8> = data
1152                                    .iter()
1153                                    .filter_map(|v| v.as_u64().map(|n| n as u8))
1154                                    .collect();
1155                                let _ = event_tx.send(P2PEvent::Message {
1156                                    topic: protocol.to_string(),
1157                                    source: from.to_string(),
1158                                    data: payload,
1159                                });
1160                            }
1161                        }
1162                    }
1163                    Err(e) => {
1164                        warn!("Receive error: {}", e);
1165                        tokio::time::sleep(std::time::Duration::from_millis(50)).await;
1166                    }
1167                }
1168            }
1169        });
1170
1171        Ok(())
1172    }
1173
1174    /// Handle a received message and generate appropriate events
1175    #[allow(dead_code)]
1176    async fn handle_received_message(
1177        &self,
1178        message_data: Vec<u8>,
1179        peer_id: &PeerId,
1180        _protocol: &str,
1181        event_tx: &broadcast::Sender<P2PEvent>,
1182    ) -> Result<()> {
1183        // MCP removed: no special protocol handling
1184
1185        // Parse the message format we created in create_protocol_message
1186        match serde_json::from_slice::<serde_json::Value>(&message_data) {
1187            Ok(message) => {
1188                if let (Some(protocol), Some(data), Some(from)) = (
1189                    message.get("protocol").and_then(|v| v.as_str()),
1190                    message.get("data").and_then(|v| v.as_array()),
1191                    message.get("from").and_then(|v| v.as_str()),
1192                ) {
1193                    // Convert data array back to bytes
1194                    let data_bytes: Vec<u8> = data
1195                        .iter()
1196                        .filter_map(|v| v.as_u64().map(|n| n as u8))
1197                        .collect();
1198
1199                    // Generate message event
1200                    let event = P2PEvent::Message {
1201                        topic: protocol.to_string(),
1202                        source: from.to_string(),
1203                        data: data_bytes,
1204                    };
1205
1206                    let _ = event_tx.send(event);
1207                    debug!("Generated message event from peer: {}", peer_id);
1208                }
1209            }
1210            Err(e) => {
1211                warn!("Failed to parse received message from {}: {}", peer_id, e);
1212            }
1213        }
1214
1215        Ok(())
1216    }
1217
1218    // MCP removed
1219
1220    // MCP removed
1221
1222    /// Run the P2P node (blocks until shutdown)
1223    pub async fn run(&self) -> Result<()> {
1224        if !*self.running.read().await {
1225            self.start().await?;
1226        }
1227
1228        info!("P2P node running...");
1229
1230        // Main event loop
1231        loop {
1232            if !*self.running.read().await {
1233                break;
1234            }
1235
1236            // Perform periodic tasks
1237            self.periodic_tasks().await?;
1238
1239            // Sleep for a short interval
1240            tokio::time::sleep(Duration::from_millis(100)).await;
1241        }
1242
1243        info!("P2P node stopped");
1244        Ok(())
1245    }
1246
1247    /// Stop the P2P node
1248    pub async fn stop(&self) -> Result<()> {
1249        info!("Stopping P2P node...");
1250
1251        // Set running state to false
1252        *self.running.write().await = false;
1253
1254        // Disconnect all peers
1255        self.disconnect_all_peers().await?;
1256
1257        // Shutdown production resource manager if configured
1258        if let Some(ref resource_manager) = self.resource_manager {
1259            resource_manager.shutdown().await.map_err(|e| {
1260                P2PError::Network(crate::error::NetworkError::ProtocolError(
1261                    format!("Failed to shutdown resource manager: {e}").into(),
1262                ))
1263            })?;
1264            info!("Production resource manager stopped");
1265        }
1266
1267        info!("P2P node stopped");
1268        Ok(())
1269    }
1270
1271    /// Graceful shutdown alias for tests
1272    pub async fn shutdown(&self) -> Result<()> {
1273        self.stop().await
1274    }
1275
1276    /// Check if the node is running
1277    pub async fn is_running(&self) -> bool {
1278        *self.running.read().await
1279    }
1280
1281    /// Get the current listen addresses
1282    pub async fn listen_addrs(&self) -> Vec<std::net::SocketAddr> {
1283        self.listen_addrs.read().await.clone()
1284    }
1285
1286    /// Get connected peers
1287    pub async fn connected_peers(&self) -> Vec<PeerId> {
1288        self.peers.read().await.keys().cloned().collect()
1289    }
1290
1291    /// Get peer count
1292    pub async fn peer_count(&self) -> usize {
1293        self.peers.read().await.len()
1294    }
1295
1296    /// Get peer info
1297    pub async fn peer_info(&self, peer_id: &PeerId) -> Option<PeerInfo> {
1298        self.peers.read().await.get(peer_id).cloned()
1299    }
1300
1301    /// Get the peer ID for a given socket address, if connected
1302    ///
1303    /// This method searches through all connected peers to find one that has
1304    /// the specified address in its address list.
1305    ///
1306    /// # Arguments
1307    /// * `addr` - The socket address to search for (e.g., "192.168.1.100:9000")
1308    ///
1309    /// # Returns
1310    /// * `Some(PeerId)` - The peer ID if a matching connected peer is found
1311    /// * `None` - If no peer with this address is currently connected
1312    pub async fn get_peer_id_by_address(&self, addr: &str) -> Option<PeerId> {
1313        // Parse the address to a SocketAddr for comparison
1314        let socket_addr: std::net::SocketAddr = addr.parse().ok()?;
1315
1316        let peers = self.peers.read().await;
1317
1318        // Search through all connected peers
1319        for (peer_id, peer_info) in peers.iter() {
1320            // Check if this peer has a matching address
1321            for peer_addr in &peer_info.addresses {
1322                if let Ok(peer_socket) = peer_addr.parse::<std::net::SocketAddr>()
1323                    && peer_socket == socket_addr
1324                {
1325                    return Some(peer_id.clone());
1326                }
1327            }
1328        }
1329
1330        None
1331    }
1332
1333    /// List all active connections with their peer IDs and addresses
1334    ///
1335    /// # Returns
1336    /// A vector of tuples containing (PeerId, Vec<String>) where the Vec<String>
1337    /// contains all known addresses for that peer.
1338    pub async fn list_active_connections(&self) -> Vec<(PeerId, Vec<String>)> {
1339        let peers = self.peers.read().await;
1340
1341        peers
1342            .iter()
1343            .map(|(peer_id, peer_info)| (peer_id.clone(), peer_info.addresses.clone()))
1344            .collect()
1345    }
1346
1347    /// Remove a peer from the peers map
1348    ///
1349    /// This method removes a peer from the internal peers map. It should be used
1350    /// when a connection is no longer valid (e.g., after detecting that the underlying
1351    /// ant-quic connection has closed).
1352    ///
1353    /// # Arguments
1354    /// * `peer_id` - The ID of the peer to remove
1355    ///
1356    /// # Returns
1357    /// `true` if the peer was found and removed, `false` if the peer was not in the map
1358    pub async fn remove_peer(&self, peer_id: &PeerId) -> bool {
1359        // Remove from active connections tracking
1360        self.active_connections.write().await.remove(peer_id);
1361        // Remove from peers map and return whether it existed
1362        self.peers.write().await.remove(peer_id).is_some()
1363    }
1364
1365    /// Check if a peer is connected
1366    ///
1367    /// This method checks if the peer ID exists in the peers map. Note that this
1368    /// only verifies the peer is registered - it does not guarantee the underlying
1369    /// ant-quic connection is still active. For connection validation, use `send_message`
1370    /// which will fail if the connection is closed.
1371    ///
1372    /// # Arguments
1373    /// * `peer_id` - The ID of the peer to check
1374    ///
1375    /// # Returns
1376    /// `true` if the peer exists in the peers map, `false` otherwise
1377    pub async fn is_peer_connected(&self, peer_id: &PeerId) -> bool {
1378        self.peers.read().await.contains_key(peer_id)
1379    }
1380
1381    /// Connect to a peer
1382    pub async fn connect_peer(&self, address: &str) -> Result<PeerId> {
1383        info!("Connecting to peer at: {}", address);
1384
1385        // Check production limits if resource manager is enabled
1386        let _connection_guard = if let Some(ref resource_manager) = self.resource_manager {
1387            Some(resource_manager.acquire_connection().await?)
1388        } else {
1389            None
1390        };
1391
1392        // Parse the address to SocketAddr format
1393        let socket_addr: std::net::SocketAddr = address.parse().map_err(|e| {
1394            P2PError::Network(crate::error::NetworkError::InvalidAddress(
1395                format!("{}: {}", address, e).into(),
1396            ))
1397        })?;
1398
1399        // Normalize wildcard addresses to loopback for local connections
1400        // This converts [::]:port → ::1:port and 0.0.0.0:port → 127.0.0.1:port
1401        let normalized_addr = normalize_wildcard_to_loopback(socket_addr);
1402        if normalized_addr != socket_addr {
1403            info!(
1404                "Normalized wildcard address {} to loopback {}",
1405                socket_addr, normalized_addr
1406            );
1407        }
1408
1409        // Establish a real connection via dual-stack Happy Eyeballs, but cap the wait
1410        let addr_list = vec![normalized_addr];
1411        let peer_id = match tokio::time::timeout(
1412            self.config.connection_timeout,
1413            self.dual_node.connect_happy_eyeballs(&addr_list),
1414        )
1415        .await
1416        {
1417            Ok(Ok(peer)) => {
1418                let connected_peer_id =
1419                    crate::transport::ant_quic_adapter::ant_peer_id_to_string(&peer);
1420                info!("Successfully connected to peer: {}", connected_peer_id);
1421                connected_peer_id
1422            }
1423            Ok(Err(e)) => {
1424                warn!("Failed to connect to peer at {}: {}", address, e);
1425                let sanitized_address = address.replace(['/', ':'], "_");
1426                let demo_peer_id = format!("peer_from_{}", sanitized_address);
1427                warn!(
1428                    "Using demo peer ID: {} (transport connection failed)",
1429                    demo_peer_id
1430                );
1431                demo_peer_id
1432            }
1433            Err(_) => {
1434                warn!(
1435                    "Timed out connecting to peer at {} after {:?}",
1436                    address, self.config.connection_timeout
1437                );
1438                let sanitized_address = address.replace(['/', ':'], "_");
1439                let demo_peer_id = format!("peer_from_{}", sanitized_address);
1440                demo_peer_id
1441            }
1442        };
1443
1444        // Create peer info with connection details
1445        let peer_info = PeerInfo {
1446            peer_id: peer_id.clone(),
1447            addresses: vec![address.to_string()],
1448            connected_at: Instant::now(),
1449            last_seen: Instant::now(),
1450            status: ConnectionStatus::Connected,
1451            protocols: vec!["p2p-foundation/1.0".to_string()],
1452            heartbeat_count: 0,
1453        };
1454
1455        // Store peer information
1456        self.peers.write().await.insert(peer_id.clone(), peer_info);
1457
1458        // Add to active connections tracking
1459        // This is critical for is_connection_active() to work correctly
1460        self.active_connections
1461            .write()
1462            .await
1463            .insert(peer_id.clone());
1464
1465        // Record bandwidth usage if resource manager is enabled
1466        if let Some(ref resource_manager) = self.resource_manager {
1467            resource_manager.record_bandwidth(0, 0); // Placeholder for handshake data
1468        }
1469
1470        // Emit connection event
1471        let _ = self.event_tx.send(P2PEvent::PeerConnected(peer_id.clone()));
1472
1473        info!("Connected to peer: {}", peer_id);
1474        Ok(peer_id)
1475    }
1476
1477    /// Disconnect from a peer
1478    pub async fn disconnect_peer(&self, peer_id: &PeerId) -> Result<()> {
1479        info!("Disconnecting from peer: {}", peer_id);
1480
1481        // Remove from active connections
1482        self.active_connections.write().await.remove(peer_id);
1483
1484        if let Some(mut peer_info) = self.peers.write().await.remove(peer_id) {
1485            peer_info.status = ConnectionStatus::Disconnected;
1486
1487            // Emit event
1488            let _ = self
1489                .event_tx
1490                .send(P2PEvent::PeerDisconnected(peer_id.clone()));
1491
1492            info!("Disconnected from peer: {}", peer_id);
1493        }
1494
1495        Ok(())
1496    }
1497
1498    /// Check if a connection to a peer is active
1499    pub async fn is_connection_active(&self, peer_id: &str) -> bool {
1500        self.active_connections.read().await.contains(peer_id)
1501    }
1502
1503    /// Send a message to a peer
1504    pub async fn send_message(
1505        &self,
1506        peer_id: &PeerId,
1507        protocol: &str,
1508        data: Vec<u8>,
1509    ) -> Result<()> {
1510        debug!(
1511            "Sending message to peer {} on protocol {}",
1512            peer_id, protocol
1513        );
1514
1515        // Check rate limits if resource manager is enabled
1516        if let Some(ref resource_manager) = self.resource_manager
1517            && !resource_manager
1518                .check_rate_limit(peer_id, "message")
1519                .await?
1520        {
1521            return Err(P2PError::ResourceExhausted(
1522                format!("Rate limit exceeded for peer {}", peer_id).into(),
1523            ));
1524        }
1525
1526        // Check if peer exists in peers map
1527        if !self.peers.read().await.contains_key(peer_id) {
1528            return Err(P2PError::Network(crate::error::NetworkError::PeerNotFound(
1529                peer_id.to_string().into(),
1530            )));
1531        }
1532
1533        // **NEW**: Check if the ant-quic connection is actually active
1534        // This is the critical fix for the connection state synchronization issue
1535        if !self.is_connection_active(peer_id).await {
1536            debug!(
1537                "Connection to peer {} exists in peers map but ant-quic connection is closed",
1538                peer_id
1539            );
1540
1541            // Clean up stale peer entry
1542            self.remove_peer(peer_id).await;
1543
1544            return Err(P2PError::Network(
1545                crate::error::NetworkError::ConnectionClosed {
1546                    peer_id: peer_id.to_string().into(),
1547                },
1548            ));
1549        }
1550
1551        // MCP removed: no special-case protocol validation
1552
1553        // Record bandwidth usage if resource manager is enabled
1554        if let Some(ref resource_manager) = self.resource_manager {
1555            resource_manager.record_bandwidth(data.len() as u64, 0);
1556        }
1557
1558        // Create protocol message wrapper
1559        let _message_data = self.create_protocol_message(protocol, data)?;
1560
1561        // Send via ant-quic dual-node
1562        let send_fut = self.dual_node.send_to_peer_string(peer_id, &_message_data);
1563        tokio::time::timeout(self.config.connection_timeout, send_fut)
1564            .await
1565            .map_err(|_| {
1566                P2PError::Transport(crate::error::TransportError::StreamError(
1567                    "Timed out sending message".into(),
1568                ))
1569            })?
1570            .map_err(|e| {
1571                P2PError::Transport(crate::error::TransportError::StreamError(
1572                    e.to_string().into(),
1573                ))
1574            })
1575    }
1576
1577    /// Create a protocol message wrapper
1578    fn create_protocol_message(&self, protocol: &str, data: Vec<u8>) -> Result<Vec<u8>> {
1579        use serde_json::json;
1580
1581        let timestamp = std::time::SystemTime::now()
1582            .duration_since(std::time::UNIX_EPOCH)
1583            .map_err(|e| {
1584                P2PError::Network(NetworkError::ProtocolError(
1585                    format!("System time error: {}", e).into(),
1586                ))
1587            })?
1588            .as_secs();
1589
1590        // Create a simple message format for P2P communication
1591        let message = json!({
1592            "protocol": protocol,
1593            "data": data,
1594            "from": self.peer_id,
1595            "timestamp": timestamp
1596        });
1597
1598        serde_json::to_vec(&message).map_err(|e| {
1599            P2PError::Transport(crate::error::TransportError::StreamError(
1600                format!("Failed to serialize message: {e}").into(),
1601            ))
1602        })
1603    }
1604
1605    // Note: async listen_addrs() already exists above for fetching listen addresses
1606}
1607
1608/// Create a protocol message wrapper (static version for background tasks)
1609#[allow(dead_code)]
1610fn create_protocol_message_static(protocol: &str, data: Vec<u8>) -> Result<Vec<u8>> {
1611    use serde_json::json;
1612
1613    let timestamp = std::time::SystemTime::now()
1614        .duration_since(std::time::UNIX_EPOCH)
1615        .map_err(|e| {
1616            P2PError::Network(NetworkError::ProtocolError(
1617                format!("System time error: {}", e).into(),
1618            ))
1619        })?
1620        .as_secs();
1621
1622    // Create a simple message format for P2P communication
1623    let message = json!({
1624        "protocol": protocol,
1625        "data": data,
1626        "timestamp": timestamp
1627    });
1628
1629    serde_json::to_vec(&message).map_err(|e| {
1630        P2PError::Transport(crate::error::TransportError::StreamError(
1631            format!("Failed to serialize message: {e}").into(),
1632        ))
1633    })
1634}
1635
1636impl P2PNode {
1637    /// Subscribe to network events
1638    pub fn subscribe_events(&self) -> broadcast::Receiver<P2PEvent> {
1639        self.event_tx.subscribe()
1640    }
1641
1642    /// Backwards-compat event stream accessor for tests
1643    pub fn events(&self) -> broadcast::Receiver<P2PEvent> {
1644        self.subscribe_events()
1645    }
1646
1647    /// Get node uptime
1648    pub fn uptime(&self) -> Duration {
1649        self.start_time.elapsed()
1650    }
1651
1652    // MCP removed: all MCP tool/service methods removed
1653
1654    // /// Handle MCP remote tool call with network integration
1655
1656    // /// List tools available on a specific remote peer
1657
1658    // /// Get MCP server statistics
1659
1660    /// Get production resource metrics
1661    pub async fn resource_metrics(&self) -> Result<ResourceMetrics> {
1662        if let Some(ref resource_manager) = self.resource_manager {
1663            Ok(resource_manager.get_metrics().await)
1664        } else {
1665            Err(P2PError::Network(
1666                crate::error::NetworkError::ProtocolError(
1667                    "Production resource manager not enabled".to_string().into(),
1668                ),
1669            ))
1670        }
1671    }
1672
1673    /// Connection lifecycle monitor task - processes ant-quic connection events
1674    /// and updates active_connections HashSet and peers map
1675    async fn connection_lifecycle_monitor(
1676        dual_node: Arc<DualStackNetworkNode>,
1677        active_connections: Arc<RwLock<HashSet<String>>>,
1678        peers: Arc<RwLock<HashMap<String, PeerInfo>>>,
1679        event_tx: broadcast::Sender<P2PEvent>,
1680        geo_provider: Arc<BgpGeoProvider>,
1681        local_peer_id: String,
1682    ) {
1683        use crate::transport::ant_quic_adapter::ConnectionEvent;
1684
1685        let mut event_rx = dual_node.subscribe_connection_events();
1686
1687        info!("Connection lifecycle monitor started");
1688
1689        loop {
1690            match event_rx.recv().await {
1691                Ok(event) => {
1692                    match event {
1693                        ConnectionEvent::Established {
1694                            peer_id,
1695                            remote_address,
1696                        } => {
1697                            let peer_id_str =
1698                                crate::transport::ant_quic_adapter::ant_peer_id_to_string(&peer_id);
1699                            debug!(
1700                                "Connection established: peer={}, addr={}",
1701                                peer_id_str, remote_address
1702                            );
1703
1704                            // **GeoIP Validation**
1705                            // Check if the peer's IP is allowed
1706                            let ip = remote_address.ip();
1707                            let is_rejected = match ip {
1708                                std::net::IpAddr::V4(v4) => {
1709                                    // Check if it's a hosting provider or VPN
1710                                    if let Some(asn) = geo_provider.lookup_ipv4_asn(v4) {
1711                                        geo_provider.is_hosting_asn(asn)
1712                                            || geo_provider.is_vpn_asn(asn)
1713                                    } else {
1714                                        false
1715                                    }
1716                                }
1717                                std::net::IpAddr::V6(v6) => {
1718                                    let info = geo_provider.lookup(v6);
1719                                    info.is_hosting_provider || info.is_vpn_provider
1720                                }
1721                            };
1722
1723                            if is_rejected {
1724                                info!(
1725                                    "Rejecting connection from {} ({}) due to GeoIP policy (Hosting/VPN)",
1726                                    peer_id_str, remote_address
1727                                );
1728
1729                                // Create rejection message
1730                                let rejection = RejectionMessage {
1731                                    reason: RejectionReason::GeoIpPolicy,
1732                                    message:
1733                                        "Connection rejected: Hosting/VPN providers not allowed"
1734                                            .to_string(),
1735                                    suggested_target: None, // Could suggest a different region if we knew more
1736                                };
1737
1738                                // Serialize message
1739                                if let Ok(data) = serde_json::to_vec(&rejection) {
1740                                    // Create protocol message
1741                                    let timestamp = std::time::SystemTime::now()
1742                                        .duration_since(std::time::UNIX_EPOCH)
1743                                        .unwrap_or_default()
1744                                        .as_secs();
1745
1746                                    let message = serde_json::json!({
1747                                        "protocol": "control",
1748                                        "data": data,
1749                                        "from": local_peer_id,
1750                                        "timestamp": timestamp
1751                                    });
1752
1753                                    if let Ok(msg_bytes) = serde_json::to_vec(&message) {
1754                                        // Send rejection message
1755                                        // We use send_to_peer directly on dual_node to avoid the checks in P2PNode::send_message
1756                                        // which might fail if we haven't fully registered the peer yet
1757                                        let _ = dual_node.send_to_peer(&peer_id, &msg_bytes).await;
1758
1759                                        // Give it a moment to send before disconnecting?
1760                                        // ant-quic might handle this, but a small yield is safe
1761                                        tokio::task::yield_now().await;
1762                                    }
1763                                }
1764
1765                                // Disconnect (TODO: Add disconnect method to dual_node or just drop?)
1766                                // For now, we just don't add it to active connections, effectively ignoring it
1767                                // Ideally we should actively close the connection
1768                                continue;
1769                            }
1770
1771                            // Add to active connections
1772                            active_connections.write().await.insert(peer_id_str.clone());
1773
1774                            // Update peer info or insert new
1775                            let mut peers_lock = peers.write().await;
1776                            if let Some(peer_info) = peers_lock.get_mut(&peer_id_str) {
1777                                peer_info.status = ConnectionStatus::Connected;
1778                                peer_info.connected_at = Instant::now();
1779                            } else {
1780                                // New incoming peer
1781                                debug!("Registering new incoming peer: {}", peer_id_str);
1782                                peers_lock.insert(
1783                                    peer_id_str.clone(),
1784                                    PeerInfo {
1785                                        peer_id: peer_id_str.clone(),
1786                                        addresses: vec![remote_address.to_string()],
1787                                        status: ConnectionStatus::Connected,
1788                                        last_seen: Instant::now(),
1789                                        connected_at: Instant::now(),
1790                                        protocols: Vec::new(),
1791                                        heartbeat_count: 0,
1792                                    },
1793                                );
1794                            }
1795
1796                            // Broadcast connection event
1797                            let _ = event_tx.send(P2PEvent::PeerConnected(peer_id_str));
1798                        }
1799                        ConnectionEvent::Lost { peer_id, reason } => {
1800                            let peer_id_str =
1801                                crate::transport::ant_quic_adapter::ant_peer_id_to_string(&peer_id);
1802                            debug!("Connection lost: peer={}, reason={}", peer_id_str, reason);
1803
1804                            // Remove from active connections
1805                            active_connections.write().await.remove(&peer_id_str);
1806
1807                            // Update peer info status
1808                            if let Some(peer_info) = peers.write().await.get_mut(&peer_id_str) {
1809                                peer_info.status = ConnectionStatus::Disconnected;
1810                                peer_info.last_seen = Instant::now();
1811                            }
1812
1813                            // Broadcast disconnection event
1814                            let _ = event_tx.send(P2PEvent::PeerDisconnected(peer_id_str));
1815                        }
1816                        ConnectionEvent::Failed { peer_id, reason } => {
1817                            let peer_id_str =
1818                                crate::transport::ant_quic_adapter::ant_peer_id_to_string(&peer_id);
1819                            warn!("Connection failed: peer={}, reason={}", peer_id_str, reason);
1820
1821                            // Remove from active connections
1822                            active_connections.write().await.remove(&peer_id_str);
1823
1824                            // Update peer info status
1825                            if let Some(peer_info) = peers.write().await.get_mut(&peer_id_str) {
1826                                peer_info.status = ConnectionStatus::Failed(reason.clone());
1827                            }
1828
1829                            // Broadcast disconnection event
1830                            let _ = event_tx.send(P2PEvent::PeerDisconnected(peer_id_str));
1831                        }
1832                    }
1833                }
1834                Err(broadcast::error::RecvError::Lagged(skipped)) => {
1835                    warn!(
1836                        "Connection event monitor lagged, skipped {} events",
1837                        skipped
1838                    );
1839                    continue;
1840                }
1841                Err(broadcast::error::RecvError::Closed) => {
1842                    info!("Connection event channel closed, stopping monitor");
1843                    break;
1844                }
1845            }
1846        }
1847
1848        info!("Connection lifecycle monitor stopped");
1849    }
1850
1851    /// Start connection monitor (called after node initialization)
1852    async fn start_connection_monitor(&self) {
1853        // The monitor task is already spawned in new() with a temporary peers map
1854        // This method is a placeholder for future enhancements where we might
1855        // need to restart the monitor or provide it with updated references
1856        debug!("Connection monitor already running from initialization");
1857    }
1858
1859    /// Keepalive task - sends periodic pings to prevent 30-second idle timeout
1860    ///
1861    /// ant-quic has a 30-second max_idle_timeout. This task sends a small keepalive
1862    /// message every 15 seconds (half the timeout) to all active connections to prevent
1863    /// them from timing out during periods of inactivity.
1864    async fn keepalive_task(
1865        active_connections: Arc<RwLock<HashSet<String>>>,
1866        dual_node: Arc<DualStackNetworkNode>,
1867        shutdown: Arc<AtomicBool>,
1868    ) {
1869        use tokio::time::{Duration, interval};
1870
1871        const KEEPALIVE_INTERVAL_SECS: u64 = 15; // Half of 30-second timeout
1872        const KEEPALIVE_PAYLOAD: &[u8] = b"keepalive"; // Small payload
1873
1874        let mut interval = interval(Duration::from_secs(KEEPALIVE_INTERVAL_SECS));
1875        interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip);
1876
1877        info!(
1878            "Keepalive task started (interval: {}s)",
1879            KEEPALIVE_INTERVAL_SECS
1880        );
1881
1882        loop {
1883            // Check shutdown flag first
1884            if shutdown.load(Ordering::Relaxed) {
1885                info!("Keepalive task shutting down");
1886                break;
1887            }
1888
1889            interval.tick().await;
1890
1891            // Get snapshot of active connections
1892            let peers: Vec<String> = { active_connections.read().await.iter().cloned().collect() };
1893
1894            if peers.is_empty() {
1895                trace!("Keepalive: no active connections");
1896                continue;
1897            }
1898
1899            debug!("Sending keepalive to {} active connections", peers.len());
1900
1901            // Send keepalive to each peer
1902            for peer_id in peers {
1903                match dual_node
1904                    .send_to_peer_string(&peer_id, KEEPALIVE_PAYLOAD)
1905                    .await
1906                {
1907                    Ok(_) => {
1908                        trace!("Keepalive sent to peer: {}", peer_id);
1909                    }
1910                    Err(e) => {
1911                        debug!(
1912                            "Failed to send keepalive to peer {}: {} (connection may have closed)",
1913                            peer_id, e
1914                        );
1915                        // Don't remove from active_connections here - let the lifecycle monitor handle it
1916                    }
1917                }
1918            }
1919        }
1920
1921        info!("Keepalive task stopped");
1922    }
1923
1924    /// Check system health
1925    pub async fn health_check(&self) -> Result<()> {
1926        if let Some(ref resource_manager) = self.resource_manager {
1927            resource_manager.health_check().await
1928        } else {
1929            // Basic health check without resource manager
1930            let peer_count = self.peer_count().await;
1931            if peer_count > self.config.max_connections {
1932                Err(P2PError::Network(
1933                    crate::error::NetworkError::ProtocolError(
1934                        format!("Too many connections: {peer_count}").into(),
1935                    ),
1936                ))
1937            } else {
1938                Ok(())
1939            }
1940        }
1941    }
1942
1943    /// Get production configuration (if enabled)
1944    pub fn production_config(&self) -> Option<&ProductionConfig> {
1945        self.config.production_config.as_ref()
1946    }
1947
1948    /// Check if production hardening is enabled
1949    pub fn is_production_mode(&self) -> bool {
1950        self.resource_manager.is_some()
1951    }
1952
1953    /// Get DHT reference
1954    pub fn dht(&self) -> Option<&Arc<RwLock<DHT>>> {
1955        self.dht.as_ref()
1956    }
1957
1958    /// Store a value in the DHT
1959    pub async fn dht_put(&self, key: crate::dht::Key, value: Vec<u8>) -> Result<()> {
1960        if let Some(ref dht) = self.dht {
1961            let mut dht_instance = dht.write().await;
1962            let dht_key = crate::dht::DhtKey::from_bytes(key);
1963            dht_instance
1964                .store(&dht_key, value.clone())
1965                .await
1966                .map_err(|e| {
1967                    P2PError::Dht(crate::error::DhtError::StoreFailed(
1968                        format!("{:?}: {e}", key).into(),
1969                    ))
1970                })?;
1971
1972            Ok(())
1973        } else {
1974            Err(P2PError::Dht(crate::error::DhtError::RoutingError(
1975                "DHT not enabled".to_string().into(),
1976            )))
1977        }
1978    }
1979
1980    /// Retrieve a value from the DHT
1981    pub async fn dht_get(&self, key: crate::dht::Key) -> Result<Option<Vec<u8>>> {
1982        if let Some(ref dht) = self.dht {
1983            let dht_instance = dht.read().await;
1984            let dht_key = crate::dht::DhtKey::from_bytes(key);
1985            let record_result = dht_instance.retrieve(&dht_key).await.map_err(|e| {
1986                P2PError::Dht(crate::error::DhtError::StoreFailed(
1987                    format!("Retrieve failed: {e}").into(),
1988                ))
1989            })?;
1990
1991            Ok(record_result)
1992        } else {
1993            Err(P2PError::Dht(crate::error::DhtError::RoutingError(
1994                "DHT not enabled".to_string().into(),
1995            )))
1996        }
1997    }
1998
1999    /// Add a discovered peer to the bootstrap cache
2000    pub async fn add_discovered_peer(&self, peer_id: PeerId, addresses: Vec<String>) -> Result<()> {
2001        if let Some(ref bootstrap_manager) = self.bootstrap_manager {
2002            let mut manager = bootstrap_manager.write().await;
2003            let socket_addresses: Vec<std::net::SocketAddr> = addresses
2004                .iter()
2005                .filter_map(|addr| addr.parse().ok())
2006                .collect();
2007            let contact = ContactEntry::new(peer_id, socket_addresses);
2008            manager.add_contact(contact).await.map_err(|e| {
2009                P2PError::Network(crate::error::NetworkError::ProtocolError(
2010                    format!("Failed to add peer to bootstrap cache: {e}").into(),
2011                ))
2012            })?;
2013        }
2014        Ok(())
2015    }
2016
2017    /// Update connection metrics for a peer in the bootstrap cache
2018    pub async fn update_peer_metrics(
2019        &self,
2020        peer_id: &PeerId,
2021        success: bool,
2022        latency_ms: Option<u64>,
2023        _error: Option<String>,
2024    ) -> Result<()> {
2025        if let Some(ref bootstrap_manager) = self.bootstrap_manager {
2026            let mut manager = bootstrap_manager.write().await;
2027
2028            // Create quality metrics based on the connection result
2029            let metrics = QualityMetrics {
2030                success_rate: if success { 1.0 } else { 0.0 },
2031                avg_latency_ms: latency_ms.unwrap_or(0) as f64,
2032                quality_score: if success { 0.8 } else { 0.2 }, // Initial score
2033                last_connection_attempt: chrono::Utc::now(),
2034                last_successful_connection: if success {
2035                    chrono::Utc::now()
2036                } else {
2037                    chrono::Utc::now() - chrono::Duration::hours(1)
2038                },
2039                uptime_score: 0.5,
2040            };
2041
2042            manager
2043                .update_contact_metrics(peer_id, metrics)
2044                .await
2045                .map_err(|e| {
2046                    P2PError::Network(crate::error::NetworkError::ProtocolError(
2047                        format!("Failed to update peer metrics: {e}").into(),
2048                    ))
2049                })?;
2050        }
2051        Ok(())
2052    }
2053
2054    /// Get bootstrap cache statistics
2055    pub async fn get_bootstrap_cache_stats(&self) -> Result<Option<crate::bootstrap::CacheStats>> {
2056        if let Some(ref bootstrap_manager) = self.bootstrap_manager {
2057            let manager = bootstrap_manager.read().await;
2058            let stats = manager.get_stats().await.map_err(|e| {
2059                P2PError::Network(crate::error::NetworkError::ProtocolError(
2060                    format!("Failed to get bootstrap stats: {e}").into(),
2061                ))
2062            })?;
2063            Ok(Some(stats))
2064        } else {
2065            Ok(None)
2066        }
2067    }
2068
2069    /// Get the number of cached bootstrap peers
2070    pub async fn cached_peer_count(&self) -> usize {
2071        if let Some(ref _bootstrap_manager) = self.bootstrap_manager
2072            && let Ok(Some(stats)) = self.get_bootstrap_cache_stats().await
2073        {
2074            return stats.total_contacts;
2075        }
2076        0
2077    }
2078
2079    /// Connect to bootstrap peers
2080    async fn connect_bootstrap_peers(&self) -> Result<()> {
2081        let mut bootstrap_contacts = Vec::new();
2082        let mut used_cache = false;
2083        let mut seen_addresses = std::collections::HashSet::new();
2084
2085        // CLI-provided bootstrap peers take priority - always include them first
2086        let cli_bootstrap_peers = if !self.config.bootstrap_peers_str.is_empty() {
2087            self.config.bootstrap_peers_str.clone()
2088        } else {
2089            // Convert Multiaddr to strings
2090            self.config
2091                .bootstrap_peers
2092                .iter()
2093                .map(|addr| addr.to_string())
2094                .collect::<Vec<_>>()
2095        };
2096
2097        if !cli_bootstrap_peers.is_empty() {
2098            info!(
2099                "Using {} CLI-provided bootstrap peers (priority)",
2100                cli_bootstrap_peers.len()
2101            );
2102            for addr in &cli_bootstrap_peers {
2103                if let Ok(socket_addr) = addr.parse::<std::net::SocketAddr>() {
2104                    seen_addresses.insert(socket_addr);
2105                    let contact = ContactEntry::new(
2106                        format!("cli_peer_{}", addr.chars().take(8).collect::<String>()),
2107                        vec![socket_addr],
2108                    );
2109                    bootstrap_contacts.push(contact);
2110                } else {
2111                    warn!("Invalid bootstrap address format: {}", addr);
2112                }
2113            }
2114        }
2115
2116        // Supplement with cached bootstrap peers (after CLI peers)
2117        if let Some(ref bootstrap_manager) = self.bootstrap_manager {
2118            let manager = bootstrap_manager.read().await;
2119            match manager.get_bootstrap_peers(20).await {
2120                // Try to get top 20 quality peers
2121                Ok(contacts) => {
2122                    if !contacts.is_empty() {
2123                        let mut added_from_cache = 0;
2124                        for contact in contacts {
2125                            // Only add if we haven't already added this address from CLI
2126                            let new_addresses: Vec<_> = contact
2127                                .addresses
2128                                .iter()
2129                                .filter(|addr| !seen_addresses.contains(addr))
2130                                .copied()
2131                                .collect();
2132
2133                            if !new_addresses.is_empty() {
2134                                for addr in &new_addresses {
2135                                    seen_addresses.insert(*addr);
2136                                }
2137                                let mut contact = contact.clone();
2138                                contact.addresses = new_addresses;
2139                                bootstrap_contacts.push(contact);
2140                                added_from_cache += 1;
2141                            }
2142                        }
2143                        if added_from_cache > 0 {
2144                            info!(
2145                                "Added {} cached bootstrap peers (supplementing CLI peers)",
2146                                added_from_cache
2147                            );
2148                            used_cache = true;
2149                        }
2150                    }
2151                }
2152                Err(e) => {
2153                    warn!("Failed to get cached bootstrap peers: {}", e);
2154                }
2155            }
2156        }
2157
2158        if bootstrap_contacts.is_empty() {
2159            info!("No bootstrap peers configured and no cached peers available");
2160            return Ok(());
2161        }
2162
2163        // Connect to bootstrap peers
2164        let mut successful_connections = 0;
2165        for contact in bootstrap_contacts {
2166            for addr in &contact.addresses {
2167                match self.connect_peer(&addr.to_string()).await {
2168                    Ok(peer_id) => {
2169                        info!("Connected to bootstrap peer: {} ({})", peer_id, addr);
2170                        successful_connections += 1;
2171
2172                        // Update bootstrap cache with successful connection
2173                        if let Some(ref bootstrap_manager) = self.bootstrap_manager {
2174                            let mut manager = bootstrap_manager.write().await;
2175                            let mut updated_contact = contact.clone();
2176                            updated_contact.peer_id = peer_id.clone();
2177                            updated_contact.update_connection_result(true, Some(100), None); // Assume 100ms latency for now
2178
2179                            if let Err(e) = manager.add_contact(updated_contact).await {
2180                                warn!("Failed to update bootstrap cache: {}", e);
2181                            }
2182                        }
2183                        break; // Successfully connected, move to next contact
2184                    }
2185                    Err(e) => {
2186                        warn!("Failed to connect to bootstrap peer {}: {}", addr, e);
2187
2188                        // Update bootstrap cache with failed connection
2189                        if used_cache && let Some(ref bootstrap_manager) = self.bootstrap_manager {
2190                            let mut manager = bootstrap_manager.write().await;
2191                            let mut updated_contact = contact.clone();
2192                            updated_contact.update_connection_result(
2193                                false,
2194                                None,
2195                                Some(e.to_string()),
2196                            );
2197
2198                            if let Err(e) = manager.add_contact(updated_contact).await {
2199                                warn!("Failed to update bootstrap cache: {}", e);
2200                            }
2201                        }
2202                    }
2203                }
2204            }
2205        }
2206
2207        if successful_connections == 0 {
2208            if !used_cache {
2209                warn!("Failed to connect to any bootstrap peers");
2210            }
2211            return Err(P2PError::Network(NetworkError::ConnectionFailed {
2212                addr: std::net::SocketAddr::from(([0, 0, 0, 0], 0)), // Placeholder for bootstrap ensemble
2213                reason: "Failed to connect to any bootstrap peers".into(),
2214            }));
2215        }
2216        info!(
2217            "Successfully connected to {} bootstrap peers",
2218            successful_connections
2219        );
2220
2221        Ok(())
2222    }
2223
2224    /// Disconnect from all peers
2225    async fn disconnect_all_peers(&self) -> Result<()> {
2226        let peer_ids: Vec<PeerId> = self.peers.read().await.keys().cloned().collect();
2227
2228        for peer_id in peer_ids {
2229            self.disconnect_peer(&peer_id).await?;
2230        }
2231
2232        Ok(())
2233    }
2234
2235    /// Perform periodic maintenance tasks
2236    async fn periodic_tasks(&self) -> Result<()> {
2237        // Update peer last seen timestamps
2238        // Remove stale connections
2239        // Perform DHT maintenance
2240        // This is a placeholder for now
2241
2242        Ok(())
2243    }
2244}
2245
2246/// Network sender trait for sending messages
2247#[async_trait::async_trait]
2248pub trait NetworkSender: Send + Sync {
2249    /// Send a message to a specific peer
2250    async fn send_message(&self, peer_id: &PeerId, protocol: &str, data: Vec<u8>) -> Result<()>;
2251
2252    /// Get our local peer ID
2253    fn local_peer_id(&self) -> &PeerId;
2254}
2255
2256/// Lightweight wrapper for P2PNode to implement NetworkSender
2257#[derive(Clone)]
2258pub struct P2PNetworkSender {
2259    peer_id: PeerId,
2260    // Use channels for async communication with the P2P node
2261    send_tx: tokio::sync::mpsc::UnboundedSender<(PeerId, String, Vec<u8>)>,
2262}
2263
2264impl P2PNetworkSender {
2265    pub fn new(
2266        peer_id: PeerId,
2267        send_tx: tokio::sync::mpsc::UnboundedSender<(PeerId, String, Vec<u8>)>,
2268    ) -> Self {
2269        Self { peer_id, send_tx }
2270    }
2271}
2272
2273/// Implementation of NetworkSender trait for P2PNetworkSender
2274#[async_trait::async_trait]
2275impl NetworkSender for P2PNetworkSender {
2276    /// Send a message to a specific peer via the P2P network
2277    async fn send_message(&self, peer_id: &PeerId, protocol: &str, data: Vec<u8>) -> Result<()> {
2278        self.send_tx
2279            .send((peer_id.clone(), protocol.to_string(), data))
2280            .map_err(|_| {
2281                P2PError::Network(crate::error::NetworkError::ProtocolError(
2282                    "Failed to send message via channel".to_string().into(),
2283                ))
2284            })?;
2285        Ok(())
2286    }
2287
2288    /// Get our local peer ID
2289    fn local_peer_id(&self) -> &PeerId {
2290        &self.peer_id
2291    }
2292}
2293
2294/// Builder pattern for creating P2P nodes
2295pub struct NodeBuilder {
2296    config: NodeConfig,
2297}
2298
2299impl Default for NodeBuilder {
2300    fn default() -> Self {
2301        Self::new()
2302    }
2303}
2304
2305impl NodeBuilder {
2306    /// Create a new node builder
2307    pub fn new() -> Self {
2308        Self {
2309            config: NodeConfig::default(),
2310        }
2311    }
2312
2313    /// Set the peer ID
2314    pub fn with_peer_id(mut self, peer_id: PeerId) -> Self {
2315        self.config.peer_id = Some(peer_id);
2316        self
2317    }
2318
2319    /// Add a listen address
2320    pub fn listen_on(mut self, addr: &str) -> Self {
2321        if let Ok(multiaddr) = addr.parse() {
2322            self.config.listen_addrs.push(multiaddr);
2323        }
2324        self
2325    }
2326
2327    /// Add a bootstrap peer
2328    pub fn with_bootstrap_peer(mut self, addr: &str) -> Self {
2329        if let Ok(multiaddr) = addr.parse() {
2330            self.config.bootstrap_peers.push(multiaddr);
2331        }
2332        self.config.bootstrap_peers_str.push(addr.to_string());
2333        self
2334    }
2335
2336    /// Enable IPv6 support
2337    pub fn with_ipv6(mut self, enable: bool) -> Self {
2338        self.config.enable_ipv6 = enable;
2339        self
2340    }
2341
2342    // MCP removed: builder methods deleted
2343
2344    /// Set connection timeout
2345    pub fn with_connection_timeout(mut self, timeout: Duration) -> Self {
2346        self.config.connection_timeout = timeout;
2347        self
2348    }
2349
2350    /// Set maximum connections
2351    pub fn with_max_connections(mut self, max: usize) -> Self {
2352        self.config.max_connections = max;
2353        self
2354    }
2355
2356    /// Enable production mode with default configuration
2357    pub fn with_production_mode(mut self) -> Self {
2358        self.config.production_config = Some(ProductionConfig::default());
2359        self
2360    }
2361
2362    /// Configure production settings
2363    pub fn with_production_config(mut self, production_config: ProductionConfig) -> Self {
2364        self.config.production_config = Some(production_config);
2365        self
2366    }
2367
2368    /// Configure DHT settings
2369    pub fn with_dht(mut self, dht_config: DHTConfig) -> Self {
2370        self.config.dht_config = dht_config;
2371        self
2372    }
2373
2374    /// Enable DHT with default configuration
2375    pub fn with_default_dht(mut self) -> Self {
2376        self.config.dht_config = DHTConfig::default();
2377        self
2378    }
2379
2380    /// Build the P2P node
2381    pub async fn build(self) -> Result<P2PNode> {
2382        P2PNode::new(self.config).await
2383    }
2384}
2385
2386/// Standalone function to handle received messages without borrowing self
2387#[allow(dead_code)] // Deprecated during ant-quic migration
2388async fn handle_received_message_standalone(
2389    message_data: Vec<u8>,
2390    peer_id: &PeerId,
2391    _protocol: &str,
2392    event_tx: &broadcast::Sender<P2PEvent>,
2393) -> Result<()> {
2394    // Parse the message format
2395    match serde_json::from_slice::<serde_json::Value>(&message_data) {
2396        Ok(message) => {
2397            if let (Some(protocol), Some(data), Some(from)) = (
2398                message.get("protocol").and_then(|v| v.as_str()),
2399                message.get("data").and_then(|v| v.as_array()),
2400                message.get("from").and_then(|v| v.as_str()),
2401            ) {
2402                // Convert data array back to bytes
2403                let data_bytes: Vec<u8> = data
2404                    .iter()
2405                    .filter_map(|v| v.as_u64().map(|n| n as u8))
2406                    .collect();
2407
2408                // Generate message event
2409                let event = P2PEvent::Message {
2410                    topic: protocol.to_string(),
2411                    source: from.to_string(),
2412                    data: data_bytes,
2413                };
2414
2415                let _ = event_tx.send(event);
2416                debug!("Generated message event from peer: {}", peer_id);
2417            }
2418        }
2419        Err(e) => {
2420            warn!("Failed to parse received message from {}: {}", peer_id, e);
2421        }
2422    }
2423
2424    Ok(())
2425}
2426
2427// MCP removed: standalone MCP handler deleted
2428
2429/// Helper function to handle protocol message creation
2430#[allow(dead_code)]
2431fn handle_protocol_message_creation(protocol: &str, data: Vec<u8>) -> Option<Vec<u8>> {
2432    match create_protocol_message_static(protocol, data) {
2433        Ok(msg) => Some(msg),
2434        Err(e) => {
2435            warn!("Failed to create protocol message: {}", e);
2436            None
2437        }
2438    }
2439}
2440
2441/// Helper function to handle message send result
2442#[allow(dead_code)]
2443async fn handle_message_send_result(result: crate::error::P2pResult<()>, peer_id: &PeerId) {
2444    match result {
2445        Ok(_) => {
2446            debug!("Message sent to peer {} via transport layer", peer_id);
2447        }
2448        Err(e) => {
2449            warn!("Failed to send message to peer {}: {}", peer_id, e);
2450        }
2451    }
2452}
2453
2454/// Helper function to check rate limit
2455#[allow(dead_code)] // Deprecated during ant-quic migration
2456fn check_rate_limit(
2457    rate_limiter: &RateLimiter,
2458    socket_addr: &std::net::SocketAddr,
2459    remote_addr: &NetworkAddress,
2460) -> Result<()> {
2461    rate_limiter.check_ip(&socket_addr.ip()).map_err(|e| {
2462        warn!("Rate limit exceeded for {}: {}", remote_addr, e);
2463        e
2464    })
2465}
2466
2467/// Helper function to register a new peer
2468#[allow(dead_code)] // Deprecated during ant-quic migration
2469async fn register_new_peer(
2470    peers: &Arc<RwLock<HashMap<PeerId, PeerInfo>>>,
2471    peer_id: &PeerId,
2472    remote_addr: &NetworkAddress,
2473) {
2474    let mut peers_guard = peers.write().await;
2475    let peer_info = PeerInfo {
2476        peer_id: peer_id.clone(),
2477        addresses: vec![remote_addr.to_string()],
2478        connected_at: tokio::time::Instant::now(),
2479        last_seen: tokio::time::Instant::now(),
2480        status: ConnectionStatus::Connected,
2481        protocols: vec!["p2p-chat/1.0.0".to_string()],
2482        heartbeat_count: 0,
2483    };
2484    peers_guard.insert(peer_id.clone(), peer_info);
2485}
2486
2487/// Helper function to spawn connection handler
2488#[allow(dead_code)] // Deprecated during ant-quic migration
2489fn spawn_connection_handler(
2490    connection: Box<dyn crate::transport::Connection>,
2491    peer_id: PeerId,
2492    event_tx: broadcast::Sender<P2PEvent>,
2493    peers: Arc<RwLock<HashMap<PeerId, PeerInfo>>>,
2494) {
2495    tokio::spawn(async move {
2496        handle_peer_connection(connection, peer_id, event_tx, peers).await;
2497    });
2498}
2499
2500/// Helper function to handle peer connection
2501#[allow(dead_code)] // Deprecated during ant-quic migration
2502async fn handle_peer_connection(
2503    mut connection: Box<dyn crate::transport::Connection>,
2504    peer_id: PeerId,
2505    event_tx: broadcast::Sender<P2PEvent>,
2506    peers: Arc<RwLock<HashMap<PeerId, PeerInfo>>>,
2507) {
2508    loop {
2509        match connection.receive().await {
2510            Ok(message_data) => {
2511                debug!(
2512                    "Received {} bytes from peer: {}",
2513                    message_data.len(),
2514                    peer_id
2515                );
2516
2517                // Handle the received message
2518                if let Err(e) = handle_received_message_standalone(
2519                    message_data,
2520                    &peer_id,
2521                    "unknown", // TODO: Extract protocol from message
2522                    &event_tx,
2523                )
2524                .await
2525                {
2526                    warn!("Failed to handle message from peer {}: {}", peer_id, e);
2527                }
2528            }
2529            Err(e) => {
2530                warn!("Failed to receive message from {}: {}", peer_id, e);
2531
2532                // Check if connection is still alive
2533                if !connection.is_alive().await {
2534                    info!("Connection to {} is dead, removing peer", peer_id);
2535
2536                    // Remove dead peer
2537                    remove_peer(&peers, &peer_id).await;
2538
2539                    // Generate peer disconnected event
2540                    let _ = event_tx.send(P2PEvent::PeerDisconnected(peer_id.clone()));
2541
2542                    break; // Exit the message receiving loop
2543                }
2544
2545                // Brief pause before retrying
2546                tokio::time::sleep(std::time::Duration::from_millis(100)).await;
2547            }
2548        }
2549    }
2550}
2551
2552/// Helper function to remove a peer
2553#[allow(dead_code)] // Deprecated during ant-quic migration
2554async fn remove_peer(peers: &Arc<RwLock<HashMap<PeerId, PeerInfo>>>, peer_id: &PeerId) {
2555    let mut peers_guard = peers.write().await;
2556    peers_guard.remove(peer_id);
2557}
2558
2559/// Helper function to update peer heartbeat
2560#[allow(dead_code)]
2561async fn update_peer_heartbeat(
2562    peers: &Arc<RwLock<HashMap<PeerId, PeerInfo>>>,
2563    peer_id: &PeerId,
2564) -> Result<()> {
2565    let mut peers_guard = peers.write().await;
2566    match peers_guard.get_mut(peer_id) {
2567        Some(peer_info) => {
2568            peer_info.last_seen = Instant::now();
2569            peer_info.heartbeat_count += 1;
2570            Ok(())
2571        }
2572        None => {
2573            warn!("Received heartbeat from unknown peer: {}", peer_id);
2574            Err(P2PError::Network(NetworkError::PeerNotFound(
2575                format!("Peer {} not found", peer_id).into(),
2576            )))
2577        }
2578    }
2579}
2580
2581/// Helper function to get resource metrics
2582#[allow(dead_code)]
2583async fn get_resource_metrics(resource_manager: &Option<Arc<ResourceManager>>) -> (u64, f64) {
2584    if let Some(manager) = resource_manager {
2585        let metrics = manager.get_metrics().await;
2586        (metrics.memory_used, metrics.cpu_usage)
2587    } else {
2588        (0, 0.0)
2589    }
2590}
2591
2592#[cfg(test)]
2593mod tests {
2594    use super::*;
2595    // MCP removed from tests
2596    use std::time::Duration;
2597    use tokio::time::timeout;
2598
2599    // Test tool handler for network tests
2600
2601    // MCP removed
2602
2603    /// Helper function to create a test node configuration
2604    fn create_test_node_config() -> NodeConfig {
2605        NodeConfig {
2606            peer_id: Some("test_peer_123".to_string()),
2607            listen_addrs: vec![
2608                std::net::SocketAddr::new(std::net::IpAddr::V6(std::net::Ipv6Addr::LOCALHOST), 0),
2609                std::net::SocketAddr::new(std::net::IpAddr::V4(std::net::Ipv4Addr::LOCALHOST), 0),
2610            ],
2611            listen_addr: std::net::SocketAddr::new(
2612                std::net::IpAddr::V4(std::net::Ipv4Addr::LOCALHOST),
2613                0,
2614            ),
2615            bootstrap_peers: vec![],
2616            bootstrap_peers_str: vec![],
2617            enable_ipv6: true,
2618
2619            connection_timeout: Duration::from_millis(300),
2620            keep_alive_interval: Duration::from_secs(30),
2621            max_connections: 100,
2622            max_incoming_connections: 50,
2623            dht_config: DHTConfig::default(),
2624            security_config: SecurityConfig::default(),
2625            production_config: None,
2626            bootstrap_cache_config: None,
2627            // identity_config: None,
2628        }
2629    }
2630
2631    /// Helper function to create a test tool
2632    // MCP removed: test tool helper deleted
2633
2634    #[tokio::test]
2635    async fn test_node_config_default() {
2636        let config = NodeConfig::default();
2637
2638        assert!(config.peer_id.is_none());
2639        assert_eq!(config.listen_addrs.len(), 2);
2640        assert!(config.enable_ipv6);
2641        assert_eq!(config.max_connections, 10000); // Fixed: matches actual default
2642        assert_eq!(config.max_incoming_connections, 100);
2643        assert_eq!(config.connection_timeout, Duration::from_secs(30));
2644    }
2645
2646    #[tokio::test]
2647    async fn test_dht_config_default() {
2648        let config = DHTConfig::default();
2649
2650        assert_eq!(config.k_value, 20);
2651        assert_eq!(config.alpha_value, 5);
2652        assert_eq!(config.record_ttl, Duration::from_secs(3600));
2653        assert_eq!(config.refresh_interval, Duration::from_secs(600));
2654    }
2655
2656    #[tokio::test]
2657    async fn test_security_config_default() {
2658        let config = SecurityConfig::default();
2659
2660        assert!(config.enable_noise);
2661        assert!(config.enable_tls);
2662        assert_eq!(config.trust_level, TrustLevel::Basic);
2663    }
2664
2665    #[test]
2666    fn test_trust_level_variants() {
2667        // Test that all trust level variants can be created
2668        let _none = TrustLevel::None;
2669        let _basic = TrustLevel::Basic;
2670        let _full = TrustLevel::Full;
2671
2672        // Test equality
2673        assert_eq!(TrustLevel::None, TrustLevel::None);
2674        assert_eq!(TrustLevel::Basic, TrustLevel::Basic);
2675        assert_eq!(TrustLevel::Full, TrustLevel::Full);
2676        assert_ne!(TrustLevel::None, TrustLevel::Basic);
2677    }
2678
2679    #[test]
2680    fn test_connection_status_variants() {
2681        let connecting = ConnectionStatus::Connecting;
2682        let connected = ConnectionStatus::Connected;
2683        let disconnecting = ConnectionStatus::Disconnecting;
2684        let disconnected = ConnectionStatus::Disconnected;
2685        let failed = ConnectionStatus::Failed("test error".to_string());
2686
2687        assert_eq!(connecting, ConnectionStatus::Connecting);
2688        assert_eq!(connected, ConnectionStatus::Connected);
2689        assert_eq!(disconnecting, ConnectionStatus::Disconnecting);
2690        assert_eq!(disconnected, ConnectionStatus::Disconnected);
2691        assert_ne!(connecting, connected);
2692
2693        if let ConnectionStatus::Failed(msg) = failed {
2694            assert_eq!(msg, "test error");
2695        } else {
2696            panic!("Expected Failed status");
2697        }
2698    }
2699
2700    #[tokio::test]
2701    async fn test_node_creation() -> Result<()> {
2702        let config = create_test_node_config();
2703        let node = P2PNode::new(config).await?;
2704
2705        assert_eq!(node.peer_id(), "test_peer_123");
2706        assert!(!node.is_running().await);
2707        assert_eq!(node.peer_count().await, 0);
2708        assert!(node.connected_peers().await.is_empty());
2709
2710        Ok(())
2711    }
2712
2713    #[tokio::test]
2714    async fn test_node_creation_without_peer_id() -> Result<()> {
2715        let mut config = create_test_node_config();
2716        config.peer_id = None;
2717
2718        let node = P2PNode::new(config).await?;
2719
2720        // Should have generated a peer ID
2721        assert!(node.peer_id().starts_with("peer_"));
2722        assert!(!node.is_running().await);
2723
2724        Ok(())
2725    }
2726
2727    #[tokio::test]
2728    async fn test_node_lifecycle() -> Result<()> {
2729        let config = create_test_node_config();
2730        let node = P2PNode::new(config).await?;
2731
2732        // Initially not running
2733        assert!(!node.is_running().await);
2734
2735        // Start the node
2736        node.start().await?;
2737        assert!(node.is_running().await);
2738
2739        // Check listen addresses were set (at least one)
2740        let listen_addrs = node.listen_addrs().await;
2741        assert!(
2742            !listen_addrs.is_empty(),
2743            "Expected at least one listening address"
2744        );
2745
2746        // Stop the node
2747        node.stop().await?;
2748        assert!(!node.is_running().await);
2749
2750        Ok(())
2751    }
2752
2753    #[tokio::test]
2754    async fn test_peer_connection() -> Result<()> {
2755        let config = create_test_node_config();
2756        let node = P2PNode::new(config).await?;
2757
2758        let peer_addr = "127.0.0.1:0";
2759
2760        // Connect to a peer
2761        let peer_id = node.connect_peer(peer_addr).await?;
2762        assert!(peer_id.starts_with("peer_from_"));
2763
2764        // Check peer count
2765        assert_eq!(node.peer_count().await, 1);
2766
2767        // Check connected peers
2768        let connected_peers = node.connected_peers().await;
2769        assert_eq!(connected_peers.len(), 1);
2770        assert_eq!(connected_peers[0], peer_id);
2771
2772        // Get peer info
2773        let peer_info = node.peer_info(&peer_id).await;
2774        assert!(peer_info.is_some());
2775        let info = peer_info.expect("Peer info should exist after adding peer");
2776        assert_eq!(info.peer_id, peer_id);
2777        assert_eq!(info.status, ConnectionStatus::Connected);
2778        assert!(info.protocols.contains(&"p2p-foundation/1.0".to_string()));
2779
2780        // Disconnect from peer
2781        node.disconnect_peer(&peer_id).await?;
2782        assert_eq!(node.peer_count().await, 0);
2783
2784        Ok(())
2785    }
2786
2787    #[tokio::test]
2788    async fn test_event_subscription() -> Result<()> {
2789        let config = create_test_node_config();
2790        let node = P2PNode::new(config).await?;
2791
2792        let mut events = node.subscribe_events();
2793        let peer_addr = "127.0.0.1:0";
2794
2795        // Connect to a peer (this should emit an event)
2796        let peer_id = node.connect_peer(peer_addr).await?;
2797
2798        // Check for PeerConnected event
2799        let event = timeout(Duration::from_millis(100), events.recv()).await;
2800        assert!(event.is_ok());
2801
2802        let event_result = event
2803            .expect("Should receive event")
2804            .expect("Event should not be error");
2805        match event_result {
2806            P2PEvent::PeerConnected(event_peer_id) => {
2807                assert_eq!(event_peer_id, peer_id);
2808            }
2809            _ => panic!("Expected PeerConnected event"),
2810        }
2811
2812        // Disconnect from peer (this should emit another event)
2813        node.disconnect_peer(&peer_id).await?;
2814
2815        // Check for PeerDisconnected event
2816        let event = timeout(Duration::from_millis(100), events.recv()).await;
2817        assert!(event.is_ok());
2818
2819        let event_result = event
2820            .expect("Should receive event")
2821            .expect("Event should not be error");
2822        match event_result {
2823            P2PEvent::PeerDisconnected(event_peer_id) => {
2824                assert_eq!(event_peer_id, peer_id);
2825            }
2826            _ => panic!("Expected PeerDisconnected event"),
2827        }
2828
2829        Ok(())
2830    }
2831
2832    #[tokio::test]
2833    async fn test_message_sending() -> Result<()> {
2834        // Create two nodes
2835        let mut config1 = create_test_node_config();
2836        config1.listen_addr =
2837            std::net::SocketAddr::new(std::net::IpAddr::V4(std::net::Ipv4Addr::LOCALHOST), 0);
2838        let node1 = P2PNode::new(config1).await?;
2839        node1.start().await?;
2840
2841        let mut config2 = create_test_node_config();
2842        config2.listen_addr =
2843            std::net::SocketAddr::new(std::net::IpAddr::V4(std::net::Ipv4Addr::LOCALHOST), 0);
2844        let node2 = P2PNode::new(config2).await?;
2845        node2.start().await?;
2846
2847        // Wait a bit for nodes to start listening
2848        tokio::time::sleep(tokio::time::Duration::from_millis(50)).await;
2849
2850        // Get actual listening address of node2
2851        let node2_addr = node2.local_addr().ok_or_else(|| {
2852            P2PError::Network(crate::error::NetworkError::ProtocolError(
2853                "No listening address".to_string().into(),
2854            ))
2855        })?;
2856
2857        // Connect node1 to node2
2858        let peer_id =
2859            match timeout(Duration::from_millis(500), node1.connect_peer(&node2_addr)).await {
2860                Ok(res) => res?,
2861                Err(_) => return Err(P2PError::Network(NetworkError::Timeout)),
2862            };
2863
2864        // Wait a bit for connection to establish
2865        tokio::time::sleep(tokio::time::Duration::from_millis(30)).await;
2866
2867        // Send a message
2868        let message_data = b"Hello, peer!".to_vec();
2869        let result = match timeout(
2870            Duration::from_millis(500),
2871            node1.send_message(&peer_id, "test-protocol", message_data),
2872        )
2873        .await
2874        {
2875            Ok(res) => res,
2876            Err(_) => return Err(P2PError::Network(NetworkError::Timeout)),
2877        };
2878        // For now, we'll just check that we don't get a "not connected" error
2879        // The actual send might fail due to no handler on the other side
2880        if let Err(e) = &result {
2881            assert!(!e.to_string().contains("not connected"), "Got error: {}", e);
2882        }
2883
2884        // Try to send to non-existent peer
2885        let non_existent_peer = "non_existent_peer".to_string();
2886        let result = node1
2887            .send_message(&non_existent_peer, "test-protocol", vec![])
2888            .await;
2889        assert!(result.is_err(), "Sending to non-existent peer should fail");
2890
2891        Ok(())
2892    }
2893
2894    #[tokio::test]
2895    async fn test_remote_mcp_operations() -> Result<()> {
2896        let config = create_test_node_config();
2897        let node = P2PNode::new(config).await?;
2898
2899        // MCP removed; test reduced to simple start/stop
2900        node.start().await?;
2901        node.stop().await?;
2902        Ok(())
2903    }
2904
2905    #[tokio::test]
2906    async fn test_health_check() -> Result<()> {
2907        let config = create_test_node_config();
2908        let node = P2PNode::new(config).await?;
2909
2910        // Health check should pass with no connections
2911        let result = node.health_check().await;
2912        assert!(result.is_ok());
2913
2914        // Note: We're not actually connecting to real peers here
2915        // since that would require running bootstrap nodes.
2916        // The health check should still pass with no connections.
2917
2918        Ok(())
2919    }
2920
2921    #[tokio::test]
2922    async fn test_node_uptime() -> Result<()> {
2923        let config = create_test_node_config();
2924        let node = P2PNode::new(config).await?;
2925
2926        let uptime1 = node.uptime();
2927        assert!(uptime1 >= Duration::from_secs(0));
2928
2929        // Wait a bit
2930        tokio::time::sleep(Duration::from_millis(10)).await;
2931
2932        let uptime2 = node.uptime();
2933        assert!(uptime2 > uptime1);
2934
2935        Ok(())
2936    }
2937
2938    #[tokio::test]
2939    async fn test_node_config_access() -> Result<()> {
2940        let config = create_test_node_config();
2941        let expected_peer_id = config.peer_id.clone();
2942        let node = P2PNode::new(config).await?;
2943
2944        let node_config = node.config();
2945        assert_eq!(node_config.peer_id, expected_peer_id);
2946        assert_eq!(node_config.max_connections, 100);
2947        // MCP removed
2948
2949        Ok(())
2950    }
2951
2952    #[tokio::test]
2953    async fn test_mcp_server_access() -> Result<()> {
2954        let config = create_test_node_config();
2955        let _node = P2PNode::new(config).await?;
2956
2957        // MCP removed
2958        Ok(())
2959    }
2960
2961    #[tokio::test]
2962    async fn test_dht_access() -> Result<()> {
2963        let config = create_test_node_config();
2964        let node = P2PNode::new(config).await?;
2965
2966        // Should have DHT
2967        assert!(node.dht().is_some());
2968
2969        Ok(())
2970    }
2971
2972    #[tokio::test]
2973    async fn test_node_builder() -> Result<()> {
2974        // Create a config using the builder but don't actually build a real node
2975        let builder = P2PNode::builder()
2976            .with_peer_id("builder_test_peer".to_string())
2977            .listen_on("/ip4/127.0.0.1/tcp/0")
2978            .listen_on("/ip6/::1/tcp/0")
2979            .with_bootstrap_peer("/ip4/127.0.0.1/tcp/9000") // Use a valid port number
2980            .with_ipv6(true)
2981            .with_connection_timeout(Duration::from_secs(15))
2982            .with_max_connections(200);
2983
2984        // Test the configuration that was built
2985        let config = builder.config;
2986        assert_eq!(config.peer_id, Some("builder_test_peer".to_string()));
2987        assert_eq!(config.listen_addrs.len(), 2); // 2 added by builder (no defaults)
2988        assert_eq!(config.bootstrap_peers_str.len(), 1); // Check bootstrap_peers_str instead
2989        assert!(config.enable_ipv6);
2990        assert_eq!(config.connection_timeout, Duration::from_secs(15));
2991        assert_eq!(config.max_connections, 200);
2992
2993        Ok(())
2994    }
2995
2996    #[tokio::test]
2997    async fn test_bootstrap_peers() -> Result<()> {
2998        let mut config = create_test_node_config();
2999        config.bootstrap_peers = vec![
3000            std::net::SocketAddr::new(std::net::IpAddr::V4(std::net::Ipv4Addr::LOCALHOST), 9200),
3001            std::net::SocketAddr::new(std::net::IpAddr::V4(std::net::Ipv4Addr::LOCALHOST), 9201),
3002        ];
3003
3004        let node = P2PNode::new(config).await?;
3005
3006        // Start node (which attempts to connect to bootstrap peers)
3007        node.start().await?;
3008
3009        // In a test environment, bootstrap peers may not be available
3010        // The test verifies the node starts correctly with bootstrap configuration
3011        // Peer count may include local/internal tracking, so we just verify it's reasonable
3012        let _peer_count = node.peer_count().await;
3013
3014        node.stop().await?;
3015        Ok(())
3016    }
3017
3018    #[tokio::test]
3019    async fn test_production_mode_disabled() -> Result<()> {
3020        let config = create_test_node_config();
3021        let node = P2PNode::new(config).await?;
3022
3023        assert!(!node.is_production_mode());
3024        assert!(node.production_config().is_none());
3025
3026        // Resource metrics should fail when production mode is disabled
3027        let result = node.resource_metrics().await;
3028        assert!(result.is_err());
3029        assert!(result.unwrap_err().to_string().contains("not enabled"));
3030
3031        Ok(())
3032    }
3033
3034    #[tokio::test]
3035    async fn test_network_event_variants() {
3036        // Test that all network event variants can be created
3037        let peer_id = "test_peer".to_string();
3038        let address = "/ip4/127.0.0.1/tcp/9000".to_string();
3039
3040        let _peer_connected = NetworkEvent::PeerConnected {
3041            peer_id: peer_id.clone(),
3042            addresses: vec![address.clone()],
3043        };
3044
3045        let _peer_disconnected = NetworkEvent::PeerDisconnected {
3046            peer_id: peer_id.clone(),
3047            reason: "test disconnect".to_string(),
3048        };
3049
3050        let _message_received = NetworkEvent::MessageReceived {
3051            peer_id: peer_id.clone(),
3052            protocol: "test-protocol".to_string(),
3053            data: vec![1, 2, 3],
3054        };
3055
3056        let _connection_failed = NetworkEvent::ConnectionFailed {
3057            peer_id: Some(peer_id.clone()),
3058            address: address.clone(),
3059            error: "connection refused".to_string(),
3060        };
3061
3062        let _dht_stored = NetworkEvent::DHTRecordStored {
3063            key: vec![1, 2, 3],
3064            value: vec![4, 5, 6],
3065        };
3066
3067        let _dht_retrieved = NetworkEvent::DHTRecordRetrieved {
3068            key: vec![1, 2, 3],
3069            value: Some(vec![4, 5, 6]),
3070        };
3071    }
3072
3073    #[tokio::test]
3074    async fn test_peer_info_structure() {
3075        let peer_info = PeerInfo {
3076            peer_id: "test_peer".to_string(),
3077            addresses: vec!["/ip4/127.0.0.1/tcp/9000".to_string()],
3078            connected_at: Instant::now(),
3079            last_seen: Instant::now(),
3080            status: ConnectionStatus::Connected,
3081            protocols: vec!["test-protocol".to_string()],
3082            heartbeat_count: 0,
3083        };
3084
3085        assert_eq!(peer_info.peer_id, "test_peer");
3086        assert_eq!(peer_info.addresses.len(), 1);
3087        assert_eq!(peer_info.status, ConnectionStatus::Connected);
3088        assert_eq!(peer_info.protocols.len(), 1);
3089    }
3090
3091    #[tokio::test]
3092    async fn test_serialization() -> Result<()> {
3093        // Test that configs can be serialized/deserialized
3094        let config = create_test_node_config();
3095        let serialized = serde_json::to_string(&config)?;
3096        let deserialized: NodeConfig = serde_json::from_str(&serialized)?;
3097
3098        assert_eq!(config.peer_id, deserialized.peer_id);
3099        assert_eq!(config.listen_addrs, deserialized.listen_addrs);
3100        assert_eq!(config.enable_ipv6, deserialized.enable_ipv6);
3101
3102        Ok(())
3103    }
3104
3105    #[tokio::test]
3106    async fn test_get_peer_id_by_address_found() -> Result<()> {
3107        let config = create_test_node_config();
3108        let node = P2PNode::new(config).await?;
3109
3110        // Manually insert a peer for testing
3111        let test_peer_id = "peer_test_123".to_string();
3112        let test_address = "192.168.1.100:9000".to_string();
3113
3114        let peer_info = PeerInfo {
3115            peer_id: test_peer_id.clone(),
3116            addresses: vec![test_address.clone()],
3117            connected_at: Instant::now(),
3118            last_seen: Instant::now(),
3119            status: ConnectionStatus::Connected,
3120            protocols: vec!["test-protocol".to_string()],
3121            heartbeat_count: 0,
3122        };
3123
3124        node.peers
3125            .write()
3126            .await
3127            .insert(test_peer_id.clone(), peer_info);
3128
3129        // Test: Find peer by address
3130        let found_peer_id = node.get_peer_id_by_address(&test_address).await;
3131        assert_eq!(found_peer_id, Some(test_peer_id));
3132
3133        Ok(())
3134    }
3135
3136    #[tokio::test]
3137    async fn test_get_peer_id_by_address_not_found() -> Result<()> {
3138        let config = create_test_node_config();
3139        let node = P2PNode::new(config).await?;
3140
3141        // Test: Try to find a peer that doesn't exist
3142        let result = node.get_peer_id_by_address("192.168.1.200:9000").await;
3143        assert_eq!(result, None);
3144
3145        Ok(())
3146    }
3147
3148    #[tokio::test]
3149    async fn test_get_peer_id_by_address_invalid_format() -> Result<()> {
3150        let config = create_test_node_config();
3151        let node = P2PNode::new(config).await?;
3152
3153        // Test: Invalid address format should return None
3154        let result = node.get_peer_id_by_address("invalid-address").await;
3155        assert_eq!(result, None);
3156
3157        Ok(())
3158    }
3159
3160    #[tokio::test]
3161    async fn test_get_peer_id_by_address_multiple_peers() -> Result<()> {
3162        let config = create_test_node_config();
3163        let node = P2PNode::new(config).await?;
3164
3165        // Add multiple peers with different addresses
3166        let peer1_id = "peer_1".to_string();
3167        let peer1_addr = "192.168.1.101:9001".to_string();
3168
3169        let peer2_id = "peer_2".to_string();
3170        let peer2_addr = "192.168.1.102:9002".to_string();
3171
3172        let peer1_info = PeerInfo {
3173            peer_id: peer1_id.clone(),
3174            addresses: vec![peer1_addr.clone()],
3175            connected_at: Instant::now(),
3176            last_seen: Instant::now(),
3177            status: ConnectionStatus::Connected,
3178            protocols: vec!["test-protocol".to_string()],
3179            heartbeat_count: 0,
3180        };
3181
3182        let peer2_info = PeerInfo {
3183            peer_id: peer2_id.clone(),
3184            addresses: vec![peer2_addr.clone()],
3185            connected_at: Instant::now(),
3186            last_seen: Instant::now(),
3187            status: ConnectionStatus::Connected,
3188            protocols: vec!["test-protocol".to_string()],
3189            heartbeat_count: 0,
3190        };
3191
3192        node.peers
3193            .write()
3194            .await
3195            .insert(peer1_id.clone(), peer1_info);
3196        node.peers
3197            .write()
3198            .await
3199            .insert(peer2_id.clone(), peer2_info);
3200
3201        // Test: Find each peer by their unique address
3202        let found_peer1 = node.get_peer_id_by_address(&peer1_addr).await;
3203        let found_peer2 = node.get_peer_id_by_address(&peer2_addr).await;
3204
3205        assert_eq!(found_peer1, Some(peer1_id));
3206        assert_eq!(found_peer2, Some(peer2_id));
3207
3208        Ok(())
3209    }
3210
3211    #[tokio::test]
3212    async fn test_list_active_connections_empty() -> Result<()> {
3213        let config = create_test_node_config();
3214        let node = P2PNode::new(config).await?;
3215
3216        // Test: No connections initially
3217        let connections = node.list_active_connections().await;
3218        assert!(connections.is_empty());
3219
3220        Ok(())
3221    }
3222
3223    #[tokio::test]
3224    async fn test_list_active_connections_with_peers() -> Result<()> {
3225        let config = create_test_node_config();
3226        let node = P2PNode::new(config).await?;
3227
3228        // Add multiple peers
3229        let peer1_id = "peer_1".to_string();
3230        let peer1_addrs = vec![
3231            "192.168.1.101:9001".to_string(),
3232            "192.168.1.101:9002".to_string(),
3233        ];
3234
3235        let peer2_id = "peer_2".to_string();
3236        let peer2_addrs = vec!["192.168.1.102:9003".to_string()];
3237
3238        let peer1_info = PeerInfo {
3239            peer_id: peer1_id.clone(),
3240            addresses: peer1_addrs.clone(),
3241            connected_at: Instant::now(),
3242            last_seen: Instant::now(),
3243            status: ConnectionStatus::Connected,
3244            protocols: vec!["test-protocol".to_string()],
3245            heartbeat_count: 0,
3246        };
3247
3248        let peer2_info = PeerInfo {
3249            peer_id: peer2_id.clone(),
3250            addresses: peer2_addrs.clone(),
3251            connected_at: Instant::now(),
3252            last_seen: Instant::now(),
3253            status: ConnectionStatus::Connected,
3254            protocols: vec!["test-protocol".to_string()],
3255            heartbeat_count: 0,
3256        };
3257
3258        node.peers
3259            .write()
3260            .await
3261            .insert(peer1_id.clone(), peer1_info);
3262        node.peers
3263            .write()
3264            .await
3265            .insert(peer2_id.clone(), peer2_info);
3266
3267        // Test: List all active connections
3268        let connections = node.list_active_connections().await;
3269        assert_eq!(connections.len(), 2);
3270
3271        // Verify peer1 and peer2 are in the list
3272        let peer1_conn = connections.iter().find(|(id, _)| id == &peer1_id);
3273        let peer2_conn = connections.iter().find(|(id, _)| id == &peer2_id);
3274
3275        assert!(peer1_conn.is_some());
3276        assert!(peer2_conn.is_some());
3277
3278        // Verify addresses match
3279        assert_eq!(peer1_conn.unwrap().1, peer1_addrs);
3280        assert_eq!(peer2_conn.unwrap().1, peer2_addrs);
3281
3282        Ok(())
3283    }
3284
3285    #[tokio::test]
3286    async fn test_remove_peer_success() -> Result<()> {
3287        let config = create_test_node_config();
3288        let node = P2PNode::new(config).await?;
3289
3290        // Add a peer
3291        let peer_id = "peer_to_remove".to_string();
3292        let peer_info = PeerInfo {
3293            peer_id: peer_id.clone(),
3294            addresses: vec!["192.168.1.100:9000".to_string()],
3295            connected_at: Instant::now(),
3296            last_seen: Instant::now(),
3297            status: ConnectionStatus::Connected,
3298            protocols: vec!["test-protocol".to_string()],
3299            heartbeat_count: 0,
3300        };
3301
3302        node.peers.write().await.insert(peer_id.clone(), peer_info);
3303
3304        // Verify peer exists
3305        assert!(node.is_peer_connected(&peer_id).await);
3306
3307        // Remove the peer
3308        let removed = node.remove_peer(&peer_id).await;
3309        assert!(removed);
3310
3311        // Verify peer no longer exists
3312        assert!(!node.is_peer_connected(&peer_id).await);
3313
3314        Ok(())
3315    }
3316
3317    #[tokio::test]
3318    async fn test_remove_peer_nonexistent() -> Result<()> {
3319        let config = create_test_node_config();
3320        let node = P2PNode::new(config).await?;
3321
3322        // Try to remove a peer that doesn't exist
3323        let removed = node.remove_peer(&"nonexistent_peer".to_string()).await;
3324        assert!(!removed);
3325
3326        Ok(())
3327    }
3328
3329    #[tokio::test]
3330    async fn test_is_peer_connected() -> Result<()> {
3331        let config = create_test_node_config();
3332        let node = P2PNode::new(config).await?;
3333
3334        let peer_id = "test_peer".to_string();
3335
3336        // Initially not connected
3337        assert!(!node.is_peer_connected(&peer_id).await);
3338
3339        // Add peer
3340        let peer_info = PeerInfo {
3341            peer_id: peer_id.clone(),
3342            addresses: vec!["192.168.1.100:9000".to_string()],
3343            connected_at: Instant::now(),
3344            last_seen: Instant::now(),
3345            status: ConnectionStatus::Connected,
3346            protocols: vec!["test-protocol".to_string()],
3347            heartbeat_count: 0,
3348        };
3349
3350        node.peers.write().await.insert(peer_id.clone(), peer_info);
3351
3352        // Now connected
3353        assert!(node.is_peer_connected(&peer_id).await);
3354
3355        // Remove peer
3356        node.remove_peer(&peer_id).await;
3357
3358        // No longer connected
3359        assert!(!node.is_peer_connected(&peer_id).await);
3360
3361        Ok(())
3362    }
3363
3364    #[test]
3365    fn test_normalize_ipv6_wildcard() {
3366        use std::net::{IpAddr, Ipv6Addr, SocketAddr};
3367
3368        let wildcard = SocketAddr::new(IpAddr::V6(Ipv6Addr::UNSPECIFIED), 8080);
3369        let normalized = normalize_wildcard_to_loopback(wildcard);
3370
3371        assert_eq!(normalized.ip(), IpAddr::V6(Ipv6Addr::LOCALHOST));
3372        assert_eq!(normalized.port(), 8080);
3373    }
3374
3375    #[test]
3376    fn test_normalize_ipv4_wildcard() {
3377        use std::net::{IpAddr, Ipv4Addr, SocketAddr};
3378
3379        let wildcard = SocketAddr::new(IpAddr::V4(Ipv4Addr::UNSPECIFIED), 9000);
3380        let normalized = normalize_wildcard_to_loopback(wildcard);
3381
3382        assert_eq!(normalized.ip(), IpAddr::V4(Ipv4Addr::LOCALHOST));
3383        assert_eq!(normalized.port(), 9000);
3384    }
3385
3386    #[test]
3387    fn test_normalize_specific_address_unchanged() {
3388        let specific: std::net::SocketAddr = "192.168.1.100:3000".parse().unwrap();
3389        let normalized = normalize_wildcard_to_loopback(specific);
3390
3391        assert_eq!(normalized, specific);
3392    }
3393
3394    #[test]
3395    fn test_normalize_loopback_unchanged() {
3396        use std::net::{IpAddr, Ipv4Addr, Ipv6Addr, SocketAddr};
3397
3398        let loopback_v6 = SocketAddr::new(IpAddr::V6(Ipv6Addr::LOCALHOST), 5000);
3399        let normalized_v6 = normalize_wildcard_to_loopback(loopback_v6);
3400        assert_eq!(normalized_v6, loopback_v6);
3401
3402        let loopback_v4 = SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), 5000);
3403        let normalized_v4 = normalize_wildcard_to_loopback(loopback_v4);
3404        assert_eq!(normalized_v4, loopback_v4);
3405    }
3406}