saorsa_core/
network.rs

1// Copyright 2024 Saorsa Labs Limited
2//
3// This software is dual-licensed under:
4// - GNU Affero General Public License v3.0 or later (AGPL-3.0-or-later)
5// - Commercial License
6//
7// For AGPL-3.0 license, see LICENSE-AGPL-3.0
8// For commercial licensing, contact: saorsalabs@gmail.com
9//
10// Unless required by applicable law or agreed to in writing, software
11// distributed under these licenses is distributed on an "AS IS" BASIS,
12// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
14//! Network module
15//!
16//! This module provides core networking functionality for the P2P Foundation.
17//! It handles peer connections, network events, and node lifecycle management.
18
19use crate::bgp_geo_provider::BgpGeoProvider;
20use crate::bootstrap::{BootstrapManager, ContactEntry, QualityMetrics};
21use crate::config::Config;
22use crate::control::RejectionMessage;
23use crate::dht::DHT;
24use crate::error::{NetworkError, P2PError, P2pResult as Result};
25use crate::identity::rejection::RejectionReason;
26use crate::security::GeoProvider;
27
28use crate::production::{ProductionConfig, ResourceManager, ResourceMetrics};
29use crate::transport::ant_quic_adapter::DualStackNetworkNode;
30#[allow(unused_imports)] // Temporarily unused during migration
31use crate::transport::{TransportOptions, TransportType};
32use crate::validation::RateLimitConfig;
33use crate::validation::RateLimiter;
34use crate::{NetworkAddress, PeerId};
35use serde::{Deserialize, Serialize};
36use serde_json::json;
37use std::collections::{HashMap, HashSet};
38use std::sync::Arc;
39use std::sync::atomic::{AtomicBool, Ordering};
40use std::time::Duration;
41use tokio::sync::{RwLock, broadcast};
42use tokio::time::Instant;
43use tracing::{debug, info, trace, warn};
44
45/// Configuration for a P2P node
46#[derive(Debug, Clone, Serialize, Deserialize)]
47pub struct NodeConfig {
48    /// Local peer ID for this node
49    pub peer_id: Option<PeerId>,
50
51    /// Addresses to listen on for incoming connections
52    pub listen_addrs: Vec<std::net::SocketAddr>,
53
54    /// Primary listen address (for compatibility)
55    pub listen_addr: std::net::SocketAddr,
56
57    /// Bootstrap peers to connect to on startup (legacy)
58    pub bootstrap_peers: Vec<std::net::SocketAddr>,
59
60    /// Bootstrap peers as strings (for integration tests)
61    pub bootstrap_peers_str: Vec<String>,
62
63    /// Enable IPv6 support
64    pub enable_ipv6: bool,
65
66    // MCP removed; will be redesigned later
67    /// Connection timeout duration
68    pub connection_timeout: Duration,
69
70    /// Keep-alive interval for connections
71    pub keep_alive_interval: Duration,
72
73    /// Maximum number of concurrent connections
74    pub max_connections: usize,
75
76    /// Maximum number of incoming connections
77    pub max_incoming_connections: usize,
78
79    /// DHT configuration
80    pub dht_config: DHTConfig,
81
82    /// Security configuration
83    pub security_config: SecurityConfig,
84
85    /// Production hardening configuration
86    pub production_config: Option<ProductionConfig>,
87
88    /// Bootstrap cache configuration
89    pub bootstrap_cache_config: Option<crate::bootstrap::CacheConfig>,
90}
91
92/// DHT-specific configuration
93#[derive(Debug, Clone, Serialize, Deserialize)]
94pub struct DHTConfig {
95    /// Kademlia K parameter (bucket size)
96    pub k_value: usize,
97
98    /// Kademlia alpha parameter (parallelism)
99    pub alpha_value: usize,
100
101    /// DHT record TTL
102    pub record_ttl: Duration,
103
104    /// DHT refresh interval
105    pub refresh_interval: Duration,
106}
107
108/// Security configuration
109#[derive(Debug, Clone, Serialize, Deserialize)]
110pub struct SecurityConfig {
111    /// Enable noise protocol for encryption
112    pub enable_noise: bool,
113
114    /// Enable TLS for secure transport
115    pub enable_tls: bool,
116
117    /// Trust level for peer verification
118    pub trust_level: TrustLevel,
119}
120
121/// Trust level for peer verification
122#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
123pub enum TrustLevel {
124    /// No verification required
125    None,
126    /// Basic peer ID verification
127    Basic,
128    /// Full cryptographic verification
129    Full,
130}
131
132impl NodeConfig {
133    /// Create a new NodeConfig with default values
134    ///
135    /// # Errors
136    ///
137    /// Returns an error if default addresses cannot be parsed
138    pub fn new() -> Result<Self> {
139        // Load config and use its defaults
140        let config = Config::default();
141
142        // Parse the default listen address
143        let listen_addr = config.listen_socket_addr()?;
144
145        // Create listen addresses based on config
146        let mut listen_addrs = vec![];
147
148        // Add IPv6 address if enabled
149        if config.network.ipv6_enabled {
150            let ipv6_addr = std::net::SocketAddr::new(
151                std::net::IpAddr::V6(std::net::Ipv6Addr::UNSPECIFIED),
152                listen_addr.port(),
153            );
154            listen_addrs.push(ipv6_addr);
155        }
156
157        // Always add IPv4
158        let ipv4_addr = std::net::SocketAddr::new(
159            std::net::IpAddr::V4(std::net::Ipv4Addr::UNSPECIFIED),
160            listen_addr.port(),
161        );
162        listen_addrs.push(ipv4_addr);
163
164        Ok(Self {
165            peer_id: None,
166            listen_addrs,
167            listen_addr,
168            bootstrap_peers: Vec::new(),
169            bootstrap_peers_str: config.network.bootstrap_nodes.clone(),
170            enable_ipv6: config.network.ipv6_enabled,
171
172            connection_timeout: Duration::from_secs(config.network.connection_timeout),
173            keep_alive_interval: Duration::from_secs(config.network.keepalive_interval),
174            max_connections: config.network.max_connections,
175            max_incoming_connections: config.security.connection_limit as usize,
176            dht_config: DHTConfig::default(),
177            security_config: SecurityConfig::default(),
178            production_config: None,
179            bootstrap_cache_config: None,
180            // identity_config: None,
181        })
182    }
183}
184
185impl Default for NodeConfig {
186    fn default() -> Self {
187        // Use config defaults for network settings
188        let config = Config::default();
189
190        // Parse the default listen address - use safe fallback if parsing fails
191        let listen_addr = config.listen_socket_addr().unwrap_or_else(|_| {
192            std::net::SocketAddr::new(
193                std::net::IpAddr::V4(std::net::Ipv4Addr::new(0, 0, 0, 0)),
194                9000,
195            )
196        });
197
198        Self {
199            peer_id: None,
200            listen_addrs: vec![
201                std::net::SocketAddr::new(
202                    std::net::IpAddr::V6(std::net::Ipv6Addr::UNSPECIFIED),
203                    listen_addr.port(),
204                ),
205                std::net::SocketAddr::new(
206                    std::net::IpAddr::V4(std::net::Ipv4Addr::UNSPECIFIED),
207                    listen_addr.port(),
208                ),
209            ],
210            listen_addr,
211            bootstrap_peers: Vec::new(),
212            bootstrap_peers_str: Vec::new(),
213            enable_ipv6: config.network.ipv6_enabled,
214
215            connection_timeout: Duration::from_secs(config.network.connection_timeout),
216            keep_alive_interval: Duration::from_secs(config.network.keepalive_interval),
217            max_connections: config.network.max_connections,
218            max_incoming_connections: config.security.connection_limit as usize,
219            dht_config: DHTConfig::default(),
220            security_config: SecurityConfig::default(),
221            production_config: None, // Use default production config if enabled
222            bootstrap_cache_config: None,
223            // identity_config: None, // Use default identity config if enabled
224        }
225    }
226}
227
228impl NodeConfig {
229    /// Create NodeConfig from Config
230    pub fn from_config(config: &Config) -> Result<Self> {
231        let listen_addr = config.listen_socket_addr()?;
232        let bootstrap_addrs = config.bootstrap_addrs()?;
233
234        let mut node_config = Self {
235            peer_id: None,
236            listen_addrs: vec![listen_addr],
237            listen_addr,
238            bootstrap_peers: bootstrap_addrs
239                .iter()
240                .map(|addr| addr.socket_addr())
241                .collect(),
242            bootstrap_peers_str: config
243                .network
244                .bootstrap_nodes
245                .iter()
246                .map(|addr| addr.to_string())
247                .collect(),
248            enable_ipv6: config.network.ipv6_enabled,
249
250            connection_timeout: Duration::from_secs(config.network.connection_timeout),
251            keep_alive_interval: Duration::from_secs(config.network.keepalive_interval),
252            max_connections: config.network.max_connections,
253            max_incoming_connections: config.security.connection_limit as usize,
254            dht_config: DHTConfig {
255                k_value: 20,
256                alpha_value: 3,
257                record_ttl: Duration::from_secs(3600),
258                refresh_interval: Duration::from_secs(900),
259            },
260            security_config: SecurityConfig {
261                enable_noise: true,
262                enable_tls: true,
263                trust_level: TrustLevel::Basic,
264            },
265            production_config: Some(ProductionConfig {
266                max_connections: config.network.max_connections,
267                max_memory_bytes: 0,  // unlimited
268                max_bandwidth_bps: 0, // unlimited
269                connection_timeout: Duration::from_secs(config.network.connection_timeout),
270                keep_alive_interval: Duration::from_secs(config.network.keepalive_interval),
271                health_check_interval: Duration::from_secs(30),
272                metrics_interval: Duration::from_secs(60),
273                enable_performance_tracking: true,
274                enable_auto_cleanup: true,
275                shutdown_timeout: Duration::from_secs(30),
276                rate_limits: crate::production::RateLimitConfig::default(),
277            }),
278            bootstrap_cache_config: None,
279            // identity_config: Some(IdentityManagerConfig {
280            //     cache_ttl: Duration::from_secs(3600),
281            //     challenge_timeout: Duration::from_secs(30),
282            // }),
283        };
284
285        // Add IPv6 listen address if enabled
286        if config.network.ipv6_enabled {
287            node_config.listen_addrs.push(std::net::SocketAddr::new(
288                std::net::IpAddr::V6(std::net::Ipv6Addr::UNSPECIFIED),
289                listen_addr.port(),
290            ));
291        }
292
293        Ok(node_config)
294    }
295
296    /// Try to build a NodeConfig from a listen address string
297    pub fn with_listen_addr(addr: &str) -> Result<Self> {
298        let listen_addr: std::net::SocketAddr = addr
299            .parse()
300            .map_err(|e: std::net::AddrParseError| {
301                NetworkError::InvalidAddress(e.to_string().into())
302            })
303            .map_err(P2PError::Network)?;
304        let cfg = NodeConfig {
305            listen_addr,
306            listen_addrs: vec![listen_addr],
307            ..Default::default()
308        };
309        Ok(cfg)
310    }
311}
312
313impl Default for DHTConfig {
314    fn default() -> Self {
315        Self {
316            k_value: 20,
317            alpha_value: 5,
318            record_ttl: Duration::from_secs(3600), // 1 hour
319            refresh_interval: Duration::from_secs(600), // 10 minutes
320        }
321    }
322}
323
324impl Default for SecurityConfig {
325    fn default() -> Self {
326        Self {
327            enable_noise: true,
328            enable_tls: true,
329            trust_level: TrustLevel::Basic,
330        }
331    }
332}
333
334/// Information about a connected peer
335#[derive(Debug, Clone)]
336pub struct PeerInfo {
337    /// Peer identifier
338    pub peer_id: PeerId,
339
340    /// Peer's addresses
341    pub addresses: Vec<String>,
342
343    /// Connection timestamp
344    pub connected_at: Instant,
345
346    /// Last seen timestamp
347    pub last_seen: Instant,
348
349    /// Connection status
350    pub status: ConnectionStatus,
351
352    /// Supported protocols
353    pub protocols: Vec<String>,
354
355    /// Number of heartbeats received
356    pub heartbeat_count: u64,
357}
358
359/// Connection status for a peer
360#[derive(Debug, Clone, PartialEq)]
361pub enum ConnectionStatus {
362    /// Connection is being established
363    Connecting,
364    /// Connection is established and active
365    Connected,
366    /// Connection is being closed
367    Disconnecting,
368    /// Connection is closed
369    Disconnected,
370    /// Connection failed
371    Failed(String),
372}
373
374/// Network events that can occur
375#[derive(Debug, Clone)]
376pub enum NetworkEvent {
377    /// A new peer has connected
378    PeerConnected {
379        /// The identifier of the newly connected peer
380        peer_id: PeerId,
381        /// The network addresses where the peer can be reached
382        addresses: Vec<String>,
383    },
384
385    /// A peer has disconnected
386    PeerDisconnected {
387        /// The identifier of the disconnected peer
388        peer_id: PeerId,
389        /// The reason for the disconnection
390        reason: String,
391    },
392
393    /// A message was received from a peer
394    MessageReceived {
395        /// The identifier of the sending peer
396        peer_id: PeerId,
397        /// The protocol used for the message
398        protocol: String,
399        /// The raw message data
400        data: Vec<u8>,
401    },
402
403    /// A connection attempt failed
404    ConnectionFailed {
405        /// The identifier of the peer (if known)
406        peer_id: Option<PeerId>,
407        /// The address where connection was attempted
408        address: String,
409        /// The error message describing the failure
410        error: String,
411    },
412
413    /// DHT record was stored
414    DHTRecordStored {
415        /// The DHT key where the record was stored
416        key: Vec<u8>,
417        /// The value that was stored
418        value: Vec<u8>,
419    },
420
421    /// DHT record was retrieved
422    DHTRecordRetrieved {
423        /// The DHT key that was queried
424        key: Vec<u8>,
425        /// The retrieved value, if found
426        value: Option<Vec<u8>>,
427    },
428}
429
430/// Network events that can occur in the P2P system
431///
432/// Events are broadcast to all listeners and provide real-time
433/// notifications of network state changes and message arrivals.
434#[derive(Debug, Clone)]
435pub enum P2PEvent {
436    /// Message received from a peer on a specific topic
437    Message {
438        /// Topic or channel the message was sent on
439        topic: String,
440        /// Peer ID of the message sender
441        source: PeerId,
442        /// Raw message data payload
443        data: Vec<u8>,
444    },
445    /// A new peer has connected to the network
446    PeerConnected(PeerId),
447    /// A peer has disconnected from the network
448    PeerDisconnected(PeerId),
449}
450
451/// Main P2P node structure
452/// Main P2P network node that manages connections, routing, and communication
453///
454/// This struct represents a complete P2P network participant that can:
455/// - Connect to other peers via QUIC transport
456/// - Participate in distributed hash table (DHT) operations
457/// - Send and receive messages through various protocols
458/// - Handle network events and peer lifecycle
459/// - Provide MCP (Model Context Protocol) services
460pub struct P2PNode {
461    /// Node configuration
462    config: NodeConfig,
463
464    /// Our peer ID
465    peer_id: PeerId,
466
467    /// Connected peers
468    peers: Arc<RwLock<HashMap<PeerId, PeerInfo>>>,
469
470    /// Network event broadcaster
471    event_tx: broadcast::Sender<P2PEvent>,
472
473    /// Listen addresses
474    listen_addrs: RwLock<Vec<std::net::SocketAddr>>,
475
476    /// Node start time
477    start_time: Instant,
478
479    /// Running state
480    running: RwLock<bool>,
481
482    /// DHT instance (optional)
483    dht: Option<Arc<RwLock<DHT>>>,
484
485    /// Production resource manager (optional)
486    resource_manager: Option<Arc<ResourceManager>>,
487
488    /// Bootstrap cache manager for peer discovery
489    bootstrap_manager: Option<Arc<RwLock<BootstrapManager>>>,
490
491    /// Dual-stack ant-quic nodes (IPv6 + IPv4) with Happy Eyeballs dialing
492    dual_node: Arc<DualStackNetworkNode>,
493
494    /// Rate limiter for connection and request throttling
495    #[allow(dead_code)]
496    rate_limiter: Arc<RateLimiter>,
497
498    /// Active connections (tracked by peer_id)
499    /// This set is synchronized with ant-quic's connection state via event monitoring
500    active_connections: Arc<RwLock<HashSet<PeerId>>>,
501
502    /// Security dashboard for monitoring
503    pub security_dashboard: Option<Arc<crate::dht::metrics::SecurityDashboard>>,
504
505    /// Connection lifecycle monitor task handle
506    #[allow(dead_code)]
507    connection_monitor_handle: Arc<RwLock<Option<tokio::task::JoinHandle<()>>>>,
508
509    /// Keepalive task handle
510    #[allow(dead_code)]
511    keepalive_handle: Arc<RwLock<Option<tokio::task::JoinHandle<()>>>>,
512
513    /// Shutdown flag for background tasks
514    #[allow(dead_code)]
515    shutdown: Arc<AtomicBool>,
516
517    /// GeoIP provider for connection validation
518    #[allow(dead_code)]
519    geo_provider: Arc<BgpGeoProvider>,
520}
521
522/// Normalize wildcard bind addresses to localhost loopback addresses
523///
524/// ant-quic correctly rejects "unspecified" addresses (0.0.0.0 and [::]) for remote connections
525/// because you cannot connect TO an unspecified address - these are only valid for BINDING.
526///
527/// This function converts wildcard addresses to appropriate loopback addresses for local connections:
528/// - IPv6 [::]:port → ::1:port (IPv6 loopback)
529/// - IPv4 0.0.0.0:port → 127.0.0.1:port (IPv4 loopback)
530/// - All other addresses pass through unchanged
531///
532/// # Arguments
533/// * `addr` - The SocketAddr to normalize
534///
535/// # Returns
536/// * Normalized SocketAddr suitable for remote connections
537fn normalize_wildcard_to_loopback(addr: std::net::SocketAddr) -> std::net::SocketAddr {
538    use std::net::{IpAddr, Ipv4Addr, Ipv6Addr};
539
540    if addr.ip().is_unspecified() {
541        // Convert unspecified addresses to loopback
542        let loopback_ip = match addr {
543            std::net::SocketAddr::V6(_) => IpAddr::V6(Ipv6Addr::LOCALHOST), // ::1
544            std::net::SocketAddr::V4(_) => IpAddr::V4(Ipv4Addr::LOCALHOST), // 127.0.0.1
545        };
546        std::net::SocketAddr::new(loopback_ip, addr.port())
547    } else {
548        // Not a wildcard address, pass through unchanged
549        addr
550    }
551}
552
553impl P2PNode {
554    /// Minimal constructor for tests that avoids real networking
555    pub fn new_for_tests() -> Result<Self> {
556        let (event_tx, _) = broadcast::channel(16);
557        Ok(Self {
558            config: NodeConfig::default(),
559            peer_id: "test_peer".to_string(),
560            peers: Arc::new(RwLock::new(HashMap::new())),
561            event_tx,
562            listen_addrs: RwLock::new(Vec::new()),
563            start_time: Instant::now(),
564            running: RwLock::new(false),
565            dht: None,
566            resource_manager: None,
567            bootstrap_manager: None,
568            dual_node: {
569                // Bind dual-stack nodes on ephemeral ports for tests
570                let v6: Option<std::net::SocketAddr> = "[::1]:0"
571                    .parse()
572                    .ok()
573                    .or(Some(std::net::SocketAddr::from(([0, 0, 0, 0], 0))));
574                let v4: Option<std::net::SocketAddr> = "127.0.0.1:0".parse().ok();
575                let handle = tokio::runtime::Handle::current();
576                let dual_attempt = handle.block_on(
577                    crate::transport::ant_quic_adapter::DualStackNetworkNode::new(v6, v4),
578                );
579                let dual = match dual_attempt {
580                    Ok(d) => d,
581                    Err(_e1) => {
582                        // Fallback to IPv4-only ephemeral bind
583                        let fallback = handle.block_on(
584                            crate::transport::ant_quic_adapter::DualStackNetworkNode::new(
585                                None,
586                                "127.0.0.1:0".parse().ok(),
587                            ),
588                        );
589                        match fallback {
590                            Ok(d) => d,
591                            Err(e2) => {
592                                return Err(P2PError::Network(NetworkError::BindError(
593                                    format!("Failed to create dual-stack network node: {}", e2)
594                                        .into(),
595                                )));
596                            }
597                        }
598                    }
599                };
600                Arc::new(dual)
601            },
602            rate_limiter: Arc::new(RateLimiter::new(RateLimitConfig {
603                max_requests: 100,
604                burst_size: 100,
605                window: std::time::Duration::from_secs(1),
606                ..Default::default()
607            })),
608            active_connections: Arc::new(RwLock::new(HashSet::new())),
609            connection_monitor_handle: Arc::new(RwLock::new(None)),
610            keepalive_handle: Arc::new(RwLock::new(None)),
611            shutdown: Arc::new(AtomicBool::new(false)),
612            geo_provider: Arc::new(BgpGeoProvider::new()),
613            security_dashboard: None,
614        })
615    }
616    /// Create a new P2P node with the given configuration
617    pub async fn new(config: NodeConfig) -> Result<Self> {
618        let peer_id = config.peer_id.clone().unwrap_or_else(|| {
619            // Generate a random peer ID for now
620            format!("peer_{}", &uuid::Uuid::new_v4().to_string()[..8])
621        });
622
623        let (event_tx, _) = broadcast::channel(1000);
624
625        // Initialize and register a TrustWeightedKademlia DHT for the global API
626        // Use a deterministic local NodeId derived from the peer_id
627        {
628            use blake3::Hasher;
629            let mut hasher = Hasher::new();
630            hasher.update(peer_id.as_bytes());
631            let digest = hasher.finalize();
632            let mut nid = [0u8; 32];
633            nid.copy_from_slice(digest.as_bytes());
634            let _twdht = std::sync::Arc::new(crate::dht::TrustWeightedKademlia::new(
635                crate::identity::node_identity::NodeId::from_bytes(nid),
636            ));
637            // TODO: Update to use new clean API
638            // let _ = crate::api::set_dht_instance(twdht);
639        }
640
641        // Initialize DHT if needed
642        let (dht, security_dashboard) = if true {
643            // Assuming DHT is always enabled for now, or check config
644            let _dht_config = crate::dht::DHTConfig {
645                replication_factor: config.dht_config.k_value,
646                bucket_size: config.dht_config.k_value,
647                alpha: config.dht_config.alpha_value,
648                record_ttl: config.dht_config.record_ttl,
649                bucket_refresh_interval: config.dht_config.refresh_interval,
650                republish_interval: config.dht_config.refresh_interval,
651                max_distance: 160,
652            };
653            // Convert peer_id String to NodeId
654            let peer_bytes = peer_id.as_bytes();
655            let mut node_id_bytes = [0u8; 32];
656            let len = peer_bytes.len().min(32);
657            node_id_bytes[..len].copy_from_slice(&peer_bytes[..len]);
658            let node_id = crate::dht::core_engine::NodeId::from_bytes(node_id_bytes);
659            let dht_instance = DHT::new(node_id).map_err(|e| {
660                crate::error::P2PError::Dht(crate::error::DhtError::StoreFailed(
661                    e.to_string().into(),
662                ))
663            })?;
664            dht_instance.start_maintenance_tasks();
665
666            // Create Security Dashboard
667            let security_metrics = dht_instance.security_metrics();
668            let dashboard = crate::dht::metrics::SecurityDashboard::new(
669                security_metrics,
670                Arc::new(crate::dht::metrics::DhtMetricsCollector::new()),
671                Arc::new(crate::dht::metrics::TrustMetricsCollector::new()),
672                Arc::new(crate::dht::metrics::PlacementMetricsCollector::new()),
673            );
674
675            (
676                Some(Arc::new(RwLock::new(dht_instance))),
677                Some(Arc::new(dashboard)),
678            )
679        } else {
680            (None, None)
681        };
682
683        // MCP removed
684
685        // Initialize production resource manager if configured
686        let resource_manager = config
687            .production_config
688            .clone()
689            .map(|prod_config| Arc::new(ResourceManager::new(prod_config)));
690
691        // Initialize bootstrap cache manager
692        let bootstrap_manager = if let Some(ref cache_config) = config.bootstrap_cache_config {
693            match BootstrapManager::with_config(cache_config.clone()).await {
694                Ok(manager) => Some(Arc::new(RwLock::new(manager))),
695                Err(e) => {
696                    warn!(
697                        "Failed to initialize bootstrap manager: {}, continuing without cache",
698                        e
699                    );
700                    None
701                }
702            }
703        } else {
704            match BootstrapManager::new().await {
705                Ok(manager) => Some(Arc::new(RwLock::new(manager))),
706                Err(e) => {
707                    warn!(
708                        "Failed to initialize bootstrap manager: {}, continuing without cache",
709                        e
710                    );
711                    None
712                }
713            }
714        };
715
716        // Initialize dual-stack ant-quic nodes
717        // Determine bind addresses
718        let (v6_opt, v4_opt) = {
719            let port = config.listen_addr.port();
720            let ip = config.listen_addr.ip();
721
722            let v4_addr = if ip.is_ipv4() {
723                Some(std::net::SocketAddr::new(ip, port))
724            } else {
725                // If config is IPv6, we still might want IPv4 on UNSPECIFIED if dual stack is desired
726                // But for now let's just stick to defaults if not specified
727                Some(std::net::SocketAddr::new(
728                    std::net::IpAddr::V4(std::net::Ipv4Addr::UNSPECIFIED),
729                    port,
730                ))
731            };
732
733            let v6_addr = if config.enable_ipv6 {
734                if ip.is_ipv6() {
735                    Some(std::net::SocketAddr::new(ip, port))
736                } else {
737                    Some(std::net::SocketAddr::new(
738                        std::net::IpAddr::V6(std::net::Ipv6Addr::UNSPECIFIED),
739                        port,
740                    ))
741                }
742            } else {
743                None
744            };
745            (v6_addr, v4_addr)
746        };
747
748        let dual_node = Arc::new(
749            DualStackNetworkNode::new(v6_opt, v4_opt)
750                .await
751                .map_err(|e| {
752                    P2PError::Transport(crate::error::TransportError::SetupFailed(
753                        format!("Failed to create dual-stack network nodes: {}", e).into(),
754                    ))
755                })?,
756        );
757
758        // Initialize rate limiter with default config
759        let rate_limiter = Arc::new(RateLimiter::new(
760            crate::validation::RateLimitConfig::default(),
761        ));
762
763        // Create active connections tracker
764        let active_connections = Arc::new(RwLock::new(HashSet::new()));
765
766        // Initialize GeoIP provider
767        let geo_provider = Arc::new(BgpGeoProvider::new());
768
769        // Create peers map
770        let peers = Arc::new(RwLock::new(HashMap::new()));
771
772        // Start connection lifecycle monitor
773        let connection_monitor_handle = {
774            let active_conns = Arc::clone(&active_connections);
775            let peers_map = Arc::clone(&peers);
776            let event_tx_clone = event_tx.clone();
777            let dual_node_clone = Arc::clone(&dual_node);
778            let geo_provider_clone = Arc::clone(&geo_provider);
779            let peer_id_clone = peer_id.clone();
780
781            let handle = tokio::spawn(async move {
782                Self::connection_lifecycle_monitor(
783                    dual_node_clone,
784                    active_conns,
785                    peers_map,
786                    event_tx_clone,
787                    geo_provider_clone,
788                    peer_id_clone,
789                )
790                .await;
791            });
792
793            Arc::new(RwLock::new(Some(handle)))
794        };
795
796        // Spawn keepalive task
797        let shutdown = Arc::new(AtomicBool::new(false));
798        let keepalive_handle = {
799            let active_conns = Arc::clone(&active_connections);
800            let dual_node_clone = Arc::clone(&dual_node);
801            let shutdown_clone = Arc::clone(&shutdown);
802
803            let handle = tokio::spawn(async move {
804                Self::keepalive_task(active_conns, dual_node_clone, shutdown_clone).await;
805            });
806
807            Arc::new(RwLock::new(Some(handle)))
808        };
809
810        let node = Self {
811            config,
812            peer_id,
813            peers,
814            event_tx,
815            listen_addrs: RwLock::new(Vec::new()),
816            start_time: Instant::now(),
817            running: RwLock::new(false),
818            dht,
819            resource_manager,
820            bootstrap_manager,
821            dual_node,
822            rate_limiter,
823            active_connections,
824            security_dashboard,
825            connection_monitor_handle,
826            keepalive_handle,
827            shutdown,
828            geo_provider,
829        };
830        info!("Created P2P node with peer ID: {}", node.peer_id);
831
832        // Start the network listeners to populate listen addresses
833        node.start_network_listeners().await?;
834
835        // Update the connection monitor with actual peers reference
836        node.start_connection_monitor().await;
837
838        Ok(node)
839    }
840
841    /// Create a new node builder
842    pub fn builder() -> NodeBuilder {
843        NodeBuilder::new()
844    }
845
846    /// Get the peer ID of this node
847    pub fn peer_id(&self) -> &PeerId {
848        &self.peer_id
849    }
850
851    pub fn local_addr(&self) -> Option<String> {
852        self.listen_addrs
853            .try_read()
854            .ok()
855            .and_then(|addrs| addrs.first().map(|a| a.to_string()))
856    }
857
858    pub async fn subscribe(&self, topic: &str) -> Result<()> {
859        // In a real implementation, this would register the topic with the pubsub mechanism.
860        // For now, we just log it.
861        info!("Subscribed to topic: {}", topic);
862        Ok(())
863    }
864
865    pub async fn publish(&self, topic: &str, data: &[u8]) -> Result<()> {
866        info!(
867            "Publishing message to topic: {} ({} bytes)",
868            topic,
869            data.len()
870        );
871
872        // Get list of connected peers
873        let peer_list: Vec<PeerId> = {
874            let peers_guard = self.peers.read().await;
875            peers_guard.keys().cloned().collect()
876        };
877
878        if peer_list.is_empty() {
879            debug!("No peers connected, message will only be sent to local subscribers");
880        } else {
881            // Send message to all connected peers
882            let mut send_count = 0;
883            for peer_id in &peer_list {
884                match self.send_message(peer_id, topic, data.to_vec()).await {
885                    Ok(_) => {
886                        send_count += 1;
887                        debug!("Sent message to peer: {}", peer_id);
888                    }
889                    Err(e) => {
890                        warn!("Failed to send message to peer {}: {}", peer_id, e);
891                    }
892                }
893            }
894            info!(
895                "Published message to {}/{} connected peers",
896                send_count,
897                peer_list.len()
898            );
899        }
900
901        // Also send to local subscribers (for local echo and testing)
902        let event = P2PEvent::Message {
903            topic: topic.to_string(),
904            source: self.peer_id.clone(),
905            data: data.to_vec(),
906        };
907        let _ = self.event_tx.send(event);
908
909        Ok(())
910    }
911
912    /// Get the node configuration
913    pub fn config(&self) -> &NodeConfig {
914        &self.config
915    }
916
917    /// Start the P2P node
918    pub async fn start(&self) -> Result<()> {
919        info!("Starting P2P node...");
920
921        // Start production resource manager if configured
922        if let Some(ref resource_manager) = self.resource_manager {
923            resource_manager.start().await.map_err(|e| {
924                P2PError::Network(crate::error::NetworkError::ProtocolError(
925                    format!("Failed to start resource manager: {e}").into(),
926                ))
927            })?;
928            info!("Production resource manager started");
929        }
930
931        // Start bootstrap manager background tasks
932        if let Some(ref bootstrap_manager) = self.bootstrap_manager {
933            let mut manager = bootstrap_manager.write().await;
934            manager.start_background_tasks().await.map_err(|e| {
935                P2PError::Network(crate::error::NetworkError::ProtocolError(
936                    format!("Failed to start bootstrap manager: {e}").into(),
937                ))
938            })?;
939            info!("Bootstrap cache manager started");
940        }
941
942        // Set running state
943        *self.running.write().await = true;
944
945        // Start listening on configured addresses using transport layer
946        self.start_network_listeners().await?;
947
948        // Log current listen addresses
949        let listen_addrs = self.listen_addrs.read().await;
950        info!("P2P node started on addresses: {:?}", *listen_addrs);
951
952        // MCP removed
953
954        // Start message receiving system
955        self.start_message_receiving_system().await?;
956
957        // Connect to bootstrap peers
958        self.connect_bootstrap_peers().await?;
959
960        Ok(())
961    }
962
963    /// Start network listeners on configured addresses
964    async fn start_network_listeners(&self) -> Result<()> {
965        info!("Starting dual-stack listeners (ant-quic)...");
966        // Update our listen_addrs from the dual node bindings
967        let addrs = self.dual_node.local_addrs().await.map_err(|e| {
968            P2PError::Transport(crate::error::TransportError::SetupFailed(
969                format!("Failed to get local addresses: {}", e).into(),
970            ))
971        })?;
972        {
973            let mut la = self.listen_addrs.write().await;
974            *la = addrs.clone();
975        }
976
977        // Spawn a background accept loop that handles incoming connections from either stack
978        let event_tx = self.event_tx.clone();
979        let peers = self.peers.clone();
980        let active_connections = self.active_connections.clone();
981        let rate_limiter = self.rate_limiter.clone();
982        let dual = self.dual_node.clone();
983        tokio::spawn(async move {
984            loop {
985                match dual.accept_any().await {
986                    Ok((ant_peer_id, remote_sock)) => {
987                        let peer_id =
988                            crate::transport::ant_quic_adapter::ant_peer_id_to_string(&ant_peer_id);
989                        let remote_addr = NetworkAddress::from(remote_sock);
990                        // Optional: basic IP rate limiting
991                        let _ = rate_limiter.check_ip(&remote_sock.ip());
992                        let _ = event_tx.send(P2PEvent::PeerConnected(peer_id.clone()));
993                        register_new_peer(&peers, &peer_id, &remote_addr).await;
994                        active_connections.write().await.insert(peer_id);
995                    }
996                    Err(e) => {
997                        warn!("Accept failed: {}", e);
998                        tokio::time::sleep(std::time::Duration::from_millis(200)).await;
999                    }
1000                }
1001            }
1002        });
1003
1004        info!("Dual-stack listeners active on: {:?}", addrs);
1005        Ok(())
1006    }
1007
1008    /// Start a listener on a specific socket address
1009    #[allow(dead_code)]
1010    async fn start_listener_on_address(&self, addr: std::net::SocketAddr) -> Result<()> {
1011        // use crate::transport::{Transport}; // Unused during migration
1012
1013        // DISABLED during ant-quic migration - TODO: Reimplement using AntQuicAdapter
1014        /*
1015        // Try QUIC first (preferred transport)
1016        match crate::transport::QuicTransport::new(Default::default()) {
1017            Ok(quic_transport) => {
1018                match quic_transport.listen(NetworkAddress::new(addr)).await {
1019                    Ok(listen_addrs) => {
1020                        info!("QUIC listener started on {} -> {:?}", addr, listen_addrs);
1021
1022                        // Store the actual listening addresses in the node
1023                        {
1024                            let mut node_listen_addrs = self.listen_addrs.write().await;
1025                            // Don't clear - accumulate addresses from multiple listeners
1026                            node_listen_addrs.push(listen_addrs.socket_addr());
1027                        }
1028
1029                        // Start accepting connections in background
1030                        self.start_connection_acceptor(
1031                            Arc::new(quic_transport),
1032                            addr,
1033                            crate::transport::TransportType::QUIC
1034                        ).await?;
1035
1036                        return Ok(());
1037                    }
1038                    Err(e) => {
1039                        warn!("Failed to start QUIC listener on {}: {}", addr, e);
1040                    }
1041                }
1042            }
1043            Err(e) => {
1044                warn!("Failed to create QUIC transport for listening: {}", e);
1045            }
1046        }
1047        */
1048
1049        warn!("QUIC transport temporarily disabled during ant-quic migration");
1050        // No TCP fallback - QUIC only
1051        Err(crate::P2PError::Transport(
1052            crate::error::TransportError::SetupFailed(
1053                format!(
1054                    "Failed to start QUIC listener on {addr} - transport disabled during migration"
1055                )
1056                .into(),
1057            ),
1058        ))
1059    }
1060
1061    /// Start connection acceptor background task
1062    #[allow(dead_code)] // Deprecated during ant-quic migration
1063    async fn start_connection_acceptor(
1064        &self,
1065        transport: Arc<dyn crate::transport::Transport>,
1066        addr: std::net::SocketAddr,
1067        transport_type: crate::transport::TransportType,
1068    ) -> Result<()> {
1069        info!(
1070            "Starting connection acceptor for {:?} on {}",
1071            transport_type, addr
1072        );
1073
1074        // Clone necessary data for the background task
1075        let event_tx = self.event_tx.clone();
1076        let _peer_id = self.peer_id.clone();
1077        let peers = Arc::clone(&self.peers);
1078        // ant-quic dual-stack node is managed separately; accept loop started in start_network_listeners
1079
1080        let rate_limiter = Arc::clone(&self.rate_limiter);
1081
1082        // Spawn background task to accept incoming connections
1083        tokio::spawn(async move {
1084            loop {
1085                match transport.accept().await {
1086                    Ok(connection) => {
1087                        let remote_addr = connection.remote_addr();
1088                        let connection_peer_id =
1089                            format!("peer_from_{}", remote_addr.to_string().replace(":", "_"));
1090
1091                        // Apply rate limiting for incoming connections
1092                        let socket_addr = remote_addr.socket_addr();
1093                        if check_rate_limit(&rate_limiter, &socket_addr, &remote_addr).is_err() {
1094                            // Connection dropped automatically when it goes out of scope
1095                            continue;
1096                        }
1097
1098                        info!(
1099                            "Accepted {:?} connection from {} (peer: {})",
1100                            transport_type, remote_addr, connection_peer_id
1101                        );
1102
1103                        // Generate peer connected event
1104                        let _ = event_tx.send(P2PEvent::PeerConnected(connection_peer_id.clone()));
1105
1106                        // Store the peer connection
1107                        register_new_peer(&peers, &connection_peer_id, &remote_addr).await;
1108
1109                        // Spawn task to handle this specific connection's messages
1110                        spawn_connection_handler(
1111                            connection,
1112                            connection_peer_id,
1113                            event_tx.clone(),
1114                            Arc::clone(&peers),
1115                        );
1116                    }
1117                    Err(e) => {
1118                        warn!(
1119                            "Failed to accept {:?} connection on {}: {}",
1120                            transport_type, addr, e
1121                        );
1122
1123                        // Brief pause before retrying to avoid busy loop
1124                        tokio::time::sleep(std::time::Duration::from_millis(1000)).await;
1125                    }
1126                }
1127            }
1128        });
1129
1130        info!(
1131            "Connection acceptor background task started for {:?} on {}",
1132            transport_type, addr
1133        );
1134        Ok(())
1135    }
1136
1137    /// Start the message receiving system with background tasks
1138    async fn start_message_receiving_system(&self) -> Result<()> {
1139        info!("Starting message receiving system");
1140        let dual = self.dual_node.clone();
1141        let event_tx = self.event_tx.clone();
1142
1143        tokio::spawn(async move {
1144            loop {
1145                match dual.receive_any().await {
1146                    Ok((_peer_id, bytes)) => {
1147                        // Expect the JSON message wrapper from create_protocol_message
1148                        #[allow(clippy::collapsible_if)]
1149                        if let Ok(value) = serde_json::from_slice::<serde_json::Value>(&bytes) {
1150                            if let (Some(protocol), Some(data), Some(from)) = (
1151                                value.get("protocol").and_then(|v| v.as_str()),
1152                                value.get("data").and_then(|v| v.as_array()),
1153                                value.get("from").and_then(|v| v.as_str()),
1154                            ) {
1155                                let payload: Vec<u8> = data
1156                                    .iter()
1157                                    .filter_map(|v| v.as_u64().map(|n| n as u8))
1158                                    .collect();
1159                                let _ = event_tx.send(P2PEvent::Message {
1160                                    topic: protocol.to_string(),
1161                                    source: from.to_string(),
1162                                    data: payload,
1163                                });
1164                            }
1165                        }
1166                    }
1167                    Err(e) => {
1168                        warn!("Receive error: {}", e);
1169                        tokio::time::sleep(std::time::Duration::from_millis(50)).await;
1170                    }
1171                }
1172            }
1173        });
1174
1175        Ok(())
1176    }
1177
1178    /// Handle a received message and generate appropriate events
1179    #[allow(dead_code)]
1180    async fn handle_received_message(
1181        &self,
1182        message_data: Vec<u8>,
1183        peer_id: &PeerId,
1184        _protocol: &str,
1185        event_tx: &broadcast::Sender<P2PEvent>,
1186    ) -> Result<()> {
1187        // MCP removed: no special protocol handling
1188
1189        // Parse the message format we created in create_protocol_message
1190        match serde_json::from_slice::<serde_json::Value>(&message_data) {
1191            Ok(message) => {
1192                if let (Some(protocol), Some(data), Some(from)) = (
1193                    message.get("protocol").and_then(|v| v.as_str()),
1194                    message.get("data").and_then(|v| v.as_array()),
1195                    message.get("from").and_then(|v| v.as_str()),
1196                ) {
1197                    // Convert data array back to bytes
1198                    let data_bytes: Vec<u8> = data
1199                        .iter()
1200                        .filter_map(|v| v.as_u64().map(|n| n as u8))
1201                        .collect();
1202
1203                    // Generate message event
1204                    let event = P2PEvent::Message {
1205                        topic: protocol.to_string(),
1206                        source: from.to_string(),
1207                        data: data_bytes,
1208                    };
1209
1210                    let _ = event_tx.send(event);
1211                    debug!("Generated message event from peer: {}", peer_id);
1212                }
1213            }
1214            Err(e) => {
1215                warn!("Failed to parse received message from {}: {}", peer_id, e);
1216            }
1217        }
1218
1219        Ok(())
1220    }
1221
1222    // MCP removed
1223
1224    // MCP removed
1225
1226    /// Run the P2P node (blocks until shutdown)
1227    pub async fn run(&self) -> Result<()> {
1228        if !*self.running.read().await {
1229            self.start().await?;
1230        }
1231
1232        info!("P2P node running...");
1233
1234        // Main event loop
1235        loop {
1236            if !*self.running.read().await {
1237                break;
1238            }
1239
1240            // Perform periodic tasks
1241            self.periodic_tasks().await?;
1242
1243            // Sleep for a short interval
1244            tokio::time::sleep(Duration::from_millis(100)).await;
1245        }
1246
1247        info!("P2P node stopped");
1248        Ok(())
1249    }
1250
1251    /// Stop the P2P node
1252    pub async fn stop(&self) -> Result<()> {
1253        info!("Stopping P2P node...");
1254
1255        // Set running state to false
1256        *self.running.write().await = false;
1257
1258        // Disconnect all peers
1259        self.disconnect_all_peers().await?;
1260
1261        // Shutdown production resource manager if configured
1262        if let Some(ref resource_manager) = self.resource_manager {
1263            resource_manager.shutdown().await.map_err(|e| {
1264                P2PError::Network(crate::error::NetworkError::ProtocolError(
1265                    format!("Failed to shutdown resource manager: {e}").into(),
1266                ))
1267            })?;
1268            info!("Production resource manager stopped");
1269        }
1270
1271        info!("P2P node stopped");
1272        Ok(())
1273    }
1274
1275    /// Graceful shutdown alias for tests
1276    pub async fn shutdown(&self) -> Result<()> {
1277        self.stop().await
1278    }
1279
1280    /// Check if the node is running
1281    pub async fn is_running(&self) -> bool {
1282        *self.running.read().await
1283    }
1284
1285    /// Get the current listen addresses
1286    pub async fn listen_addrs(&self) -> Vec<std::net::SocketAddr> {
1287        self.listen_addrs.read().await.clone()
1288    }
1289
1290    /// Get connected peers
1291    pub async fn connected_peers(&self) -> Vec<PeerId> {
1292        self.peers.read().await.keys().cloned().collect()
1293    }
1294
1295    /// Get peer count
1296    pub async fn peer_count(&self) -> usize {
1297        self.peers.read().await.len()
1298    }
1299
1300    /// Get peer info
1301    pub async fn peer_info(&self, peer_id: &PeerId) -> Option<PeerInfo> {
1302        self.peers.read().await.get(peer_id).cloned()
1303    }
1304
1305    /// Get the peer ID for a given socket address, if connected
1306    ///
1307    /// This method searches through all connected peers to find one that has
1308    /// the specified address in its address list.
1309    ///
1310    /// # Arguments
1311    /// * `addr` - The socket address to search for (e.g., "192.168.1.100:9000")
1312    ///
1313    /// # Returns
1314    /// * `Some(PeerId)` - The peer ID if a matching connected peer is found
1315    /// * `None` - If no peer with this address is currently connected
1316    pub async fn get_peer_id_by_address(&self, addr: &str) -> Option<PeerId> {
1317        // Parse the address to a SocketAddr for comparison
1318        let socket_addr: std::net::SocketAddr = addr.parse().ok()?;
1319
1320        let peers = self.peers.read().await;
1321
1322        // Search through all connected peers
1323        for (peer_id, peer_info) in peers.iter() {
1324            // Check if this peer has a matching address
1325            for peer_addr in &peer_info.addresses {
1326                if let Ok(peer_socket) = peer_addr.parse::<std::net::SocketAddr>()
1327                    && peer_socket == socket_addr
1328                {
1329                    return Some(peer_id.clone());
1330                }
1331            }
1332        }
1333
1334        None
1335    }
1336
1337    /// List all active connections with their peer IDs and addresses
1338    ///
1339    /// # Returns
1340    /// A vector of tuples containing (PeerId, Vec<String>) where the Vec<String>
1341    /// contains all known addresses for that peer.
1342    pub async fn list_active_connections(&self) -> Vec<(PeerId, Vec<String>)> {
1343        let peers = self.peers.read().await;
1344
1345        peers
1346            .iter()
1347            .map(|(peer_id, peer_info)| (peer_id.clone(), peer_info.addresses.clone()))
1348            .collect()
1349    }
1350
1351    /// Remove a peer from the peers map
1352    ///
1353    /// This method removes a peer from the internal peers map. It should be used
1354    /// when a connection is no longer valid (e.g., after detecting that the underlying
1355    /// ant-quic connection has closed).
1356    ///
1357    /// # Arguments
1358    /// * `peer_id` - The ID of the peer to remove
1359    ///
1360    /// # Returns
1361    /// `true` if the peer was found and removed, `false` if the peer was not in the map
1362    pub async fn remove_peer(&self, peer_id: &PeerId) -> bool {
1363        // Remove from active connections tracking
1364        self.active_connections.write().await.remove(peer_id);
1365        // Remove from peers map and return whether it existed
1366        self.peers.write().await.remove(peer_id).is_some()
1367    }
1368
1369    /// Check if a peer is connected
1370    ///
1371    /// This method checks if the peer ID exists in the peers map. Note that this
1372    /// only verifies the peer is registered - it does not guarantee the underlying
1373    /// ant-quic connection is still active. For connection validation, use `send_message`
1374    /// which will fail if the connection is closed.
1375    ///
1376    /// # Arguments
1377    /// * `peer_id` - The ID of the peer to check
1378    ///
1379    /// # Returns
1380    /// `true` if the peer exists in the peers map, `false` otherwise
1381    pub async fn is_peer_connected(&self, peer_id: &PeerId) -> bool {
1382        self.peers.read().await.contains_key(peer_id)
1383    }
1384
1385    /// Connect to a peer
1386    pub async fn connect_peer(&self, address: &str) -> Result<PeerId> {
1387        info!("Connecting to peer at: {}", address);
1388
1389        // Check production limits if resource manager is enabled
1390        let _connection_guard = if let Some(ref resource_manager) = self.resource_manager {
1391            Some(resource_manager.acquire_connection().await?)
1392        } else {
1393            None
1394        };
1395
1396        // Parse the address to SocketAddr format
1397        let socket_addr: std::net::SocketAddr = address.parse().map_err(|e| {
1398            P2PError::Network(crate::error::NetworkError::InvalidAddress(
1399                format!("{}: {}", address, e).into(),
1400            ))
1401        })?;
1402
1403        // Normalize wildcard addresses to loopback for local connections
1404        // This converts [::]:port → ::1:port and 0.0.0.0:port → 127.0.0.1:port
1405        let normalized_addr = normalize_wildcard_to_loopback(socket_addr);
1406        if normalized_addr != socket_addr {
1407            info!(
1408                "Normalized wildcard address {} to loopback {}",
1409                socket_addr, normalized_addr
1410            );
1411        }
1412
1413        // Establish a real connection via dual-stack Happy Eyeballs, but cap the wait
1414        let addr_list = vec![normalized_addr];
1415        let peer_id = match tokio::time::timeout(
1416            self.config.connection_timeout,
1417            self.dual_node.connect_happy_eyeballs(&addr_list),
1418        )
1419        .await
1420        {
1421            Ok(Ok(peer)) => {
1422                let connected_peer_id =
1423                    crate::transport::ant_quic_adapter::ant_peer_id_to_string(&peer);
1424                info!("Successfully connected to peer: {}", connected_peer_id);
1425                connected_peer_id
1426            }
1427            Ok(Err(e)) => {
1428                warn!("Failed to connect to peer at {}: {}", address, e);
1429                let sanitized_address = address.replace(['/', ':'], "_");
1430                let demo_peer_id = format!("peer_from_{}", sanitized_address);
1431                warn!(
1432                    "Using demo peer ID: {} (transport connection failed)",
1433                    demo_peer_id
1434                );
1435                demo_peer_id
1436            }
1437            Err(_) => {
1438                warn!(
1439                    "Timed out connecting to peer at {} after {:?}",
1440                    address, self.config.connection_timeout
1441                );
1442                let sanitized_address = address.replace(['/', ':'], "_");
1443                let demo_peer_id = format!("peer_from_{}", sanitized_address);
1444                demo_peer_id
1445            }
1446        };
1447
1448        // Create peer info with connection details
1449        let peer_info = PeerInfo {
1450            peer_id: peer_id.clone(),
1451            addresses: vec![address.to_string()],
1452            connected_at: Instant::now(),
1453            last_seen: Instant::now(),
1454            status: ConnectionStatus::Connected,
1455            protocols: vec!["p2p-foundation/1.0".to_string()],
1456            heartbeat_count: 0,
1457        };
1458
1459        // Store peer information
1460        self.peers.write().await.insert(peer_id.clone(), peer_info);
1461
1462        // Add to active connections tracking
1463        // This is critical for is_connection_active() to work correctly
1464        self.active_connections
1465            .write()
1466            .await
1467            .insert(peer_id.clone());
1468
1469        // Record bandwidth usage if resource manager is enabled
1470        if let Some(ref resource_manager) = self.resource_manager {
1471            resource_manager.record_bandwidth(0, 0); // Placeholder for handshake data
1472        }
1473
1474        // Emit connection event
1475        let _ = self.event_tx.send(P2PEvent::PeerConnected(peer_id.clone()));
1476
1477        info!("Connected to peer: {}", peer_id);
1478        Ok(peer_id)
1479    }
1480
1481    /// Disconnect from a peer
1482    pub async fn disconnect_peer(&self, peer_id: &PeerId) -> Result<()> {
1483        info!("Disconnecting from peer: {}", peer_id);
1484
1485        // Remove from active connections
1486        self.active_connections.write().await.remove(peer_id);
1487
1488        if let Some(mut peer_info) = self.peers.write().await.remove(peer_id) {
1489            peer_info.status = ConnectionStatus::Disconnected;
1490
1491            // Emit event
1492            let _ = self
1493                .event_tx
1494                .send(P2PEvent::PeerDisconnected(peer_id.clone()));
1495
1496            info!("Disconnected from peer: {}", peer_id);
1497        }
1498
1499        Ok(())
1500    }
1501
1502    /// Check if a connection to a peer is active
1503    pub async fn is_connection_active(&self, peer_id: &str) -> bool {
1504        self.active_connections.read().await.contains(peer_id)
1505    }
1506
1507    /// Send a message to a peer
1508    pub async fn send_message(
1509        &self,
1510        peer_id: &PeerId,
1511        protocol: &str,
1512        data: Vec<u8>,
1513    ) -> Result<()> {
1514        debug!(
1515            "Sending message to peer {} on protocol {}",
1516            peer_id, protocol
1517        );
1518
1519        // Check rate limits if resource manager is enabled
1520        if let Some(ref resource_manager) = self.resource_manager
1521            && !resource_manager
1522                .check_rate_limit(peer_id, "message")
1523                .await?
1524        {
1525            return Err(P2PError::ResourceExhausted(
1526                format!("Rate limit exceeded for peer {}", peer_id).into(),
1527            ));
1528        }
1529
1530        // Check if peer exists in peers map
1531        if !self.peers.read().await.contains_key(peer_id) {
1532            return Err(P2PError::Network(crate::error::NetworkError::PeerNotFound(
1533                peer_id.to_string().into(),
1534            )));
1535        }
1536
1537        // **NEW**: Check if the ant-quic connection is actually active
1538        // This is the critical fix for the connection state synchronization issue
1539        if !self.is_connection_active(peer_id).await {
1540            debug!(
1541                "Connection to peer {} exists in peers map but ant-quic connection is closed",
1542                peer_id
1543            );
1544
1545            // Clean up stale peer entry
1546            self.remove_peer(peer_id).await;
1547
1548            return Err(P2PError::Network(
1549                crate::error::NetworkError::ConnectionClosed {
1550                    peer_id: peer_id.to_string().into(),
1551                },
1552            ));
1553        }
1554
1555        // MCP removed: no special-case protocol validation
1556
1557        // Record bandwidth usage if resource manager is enabled
1558        if let Some(ref resource_manager) = self.resource_manager {
1559            resource_manager.record_bandwidth(data.len() as u64, 0);
1560        }
1561
1562        // Create protocol message wrapper
1563        let _message_data = self.create_protocol_message(protocol, data)?;
1564
1565        // Send via ant-quic dual-node
1566        let send_fut = self.dual_node.send_to_peer_string(peer_id, &_message_data);
1567        tokio::time::timeout(self.config.connection_timeout, send_fut)
1568            .await
1569            .map_err(|_| {
1570                P2PError::Transport(crate::error::TransportError::StreamError(
1571                    "Timed out sending message".into(),
1572                ))
1573            })?
1574            .map_err(|e| {
1575                P2PError::Transport(crate::error::TransportError::StreamError(
1576                    e.to_string().into(),
1577                ))
1578            })
1579    }
1580
1581    /// Create a protocol message wrapper
1582    fn create_protocol_message(&self, protocol: &str, data: Vec<u8>) -> Result<Vec<u8>> {
1583        use serde_json::json;
1584
1585        let timestamp = std::time::SystemTime::now()
1586            .duration_since(std::time::UNIX_EPOCH)
1587            .map_err(|e| {
1588                P2PError::Network(NetworkError::ProtocolError(
1589                    format!("System time error: {}", e).into(),
1590                ))
1591            })?
1592            .as_secs();
1593
1594        // Create a simple message format for P2P communication
1595        let message = json!({
1596            "protocol": protocol,
1597            "data": data,
1598            "from": self.peer_id,
1599            "timestamp": timestamp
1600        });
1601
1602        serde_json::to_vec(&message).map_err(|e| {
1603            P2PError::Transport(crate::error::TransportError::StreamError(
1604                format!("Failed to serialize message: {e}").into(),
1605            ))
1606        })
1607    }
1608
1609    // Note: async listen_addrs() already exists above for fetching listen addresses
1610}
1611
1612/// Create a protocol message wrapper (static version for background tasks)
1613#[allow(dead_code)]
1614fn create_protocol_message_static(protocol: &str, data: Vec<u8>) -> Result<Vec<u8>> {
1615    use serde_json::json;
1616
1617    let timestamp = std::time::SystemTime::now()
1618        .duration_since(std::time::UNIX_EPOCH)
1619        .map_err(|e| {
1620            P2PError::Network(NetworkError::ProtocolError(
1621                format!("System time error: {}", e).into(),
1622            ))
1623        })?
1624        .as_secs();
1625
1626    // Create a simple message format for P2P communication
1627    let message = json!({
1628        "protocol": protocol,
1629        "data": data,
1630        "timestamp": timestamp
1631    });
1632
1633    serde_json::to_vec(&message).map_err(|e| {
1634        P2PError::Transport(crate::error::TransportError::StreamError(
1635            format!("Failed to serialize message: {e}").into(),
1636        ))
1637    })
1638}
1639
1640impl P2PNode {
1641    /// Subscribe to network events
1642    pub fn subscribe_events(&self) -> broadcast::Receiver<P2PEvent> {
1643        self.event_tx.subscribe()
1644    }
1645
1646    /// Backwards-compat event stream accessor for tests
1647    pub fn events(&self) -> broadcast::Receiver<P2PEvent> {
1648        self.subscribe_events()
1649    }
1650
1651    /// Get node uptime
1652    pub fn uptime(&self) -> Duration {
1653        self.start_time.elapsed()
1654    }
1655
1656    // MCP removed: all MCP tool/service methods removed
1657
1658    // /// Handle MCP remote tool call with network integration
1659
1660    // /// List tools available on a specific remote peer
1661
1662    // /// Get MCP server statistics
1663
1664    /// Get production resource metrics
1665    pub async fn resource_metrics(&self) -> Result<ResourceMetrics> {
1666        if let Some(ref resource_manager) = self.resource_manager {
1667            Ok(resource_manager.get_metrics().await)
1668        } else {
1669            Err(P2PError::Network(
1670                crate::error::NetworkError::ProtocolError(
1671                    "Production resource manager not enabled".to_string().into(),
1672                ),
1673            ))
1674        }
1675    }
1676
1677    /// Connection lifecycle monitor task - processes ant-quic connection events
1678    /// and updates active_connections HashSet and peers map
1679    async fn connection_lifecycle_monitor(
1680        dual_node: Arc<DualStackNetworkNode>,
1681        active_connections: Arc<RwLock<HashSet<String>>>,
1682        peers: Arc<RwLock<HashMap<String, PeerInfo>>>,
1683        event_tx: broadcast::Sender<P2PEvent>,
1684        geo_provider: Arc<BgpGeoProvider>,
1685        local_peer_id: String,
1686    ) {
1687        use crate::transport::ant_quic_adapter::ConnectionEvent;
1688
1689        let mut event_rx = dual_node.subscribe_connection_events();
1690
1691        info!("Connection lifecycle monitor started");
1692
1693        loop {
1694            match event_rx.recv().await {
1695                Ok(event) => {
1696                    match event {
1697                        ConnectionEvent::Established {
1698                            peer_id,
1699                            remote_address,
1700                        } => {
1701                            let peer_id_str =
1702                                crate::transport::ant_quic_adapter::ant_peer_id_to_string(&peer_id);
1703                            debug!(
1704                                "Connection established: peer={}, addr={}",
1705                                peer_id_str, remote_address
1706                            );
1707
1708                            // **GeoIP Validation**
1709                            // Check if the peer's IP is allowed
1710                            let ip = remote_address.ip();
1711                            let is_rejected = match ip {
1712                                std::net::IpAddr::V4(v4) => {
1713                                    // Check if it's a hosting provider or VPN
1714                                    if let Some(asn) = geo_provider.lookup_ipv4_asn(v4) {
1715                                        geo_provider.is_hosting_asn(asn)
1716                                            || geo_provider.is_vpn_asn(asn)
1717                                    } else {
1718                                        false
1719                                    }
1720                                }
1721                                std::net::IpAddr::V6(v6) => {
1722                                    let info = geo_provider.lookup(v6);
1723                                    info.is_hosting_provider || info.is_vpn_provider
1724                                }
1725                            };
1726
1727                            if is_rejected {
1728                                info!(
1729                                    "Rejecting connection from {} ({}) due to GeoIP policy (Hosting/VPN)",
1730                                    peer_id_str, remote_address
1731                                );
1732
1733                                // Create rejection message
1734                                let rejection = RejectionMessage {
1735                                    reason: RejectionReason::GeoIpPolicy,
1736                                    message:
1737                                        "Connection rejected: Hosting/VPN providers not allowed"
1738                                            .to_string(),
1739                                    suggested_target: None, // Could suggest a different region if we knew more
1740                                };
1741
1742                                // Serialize message
1743                                if let Ok(data) = serde_json::to_vec(&rejection) {
1744                                    // Create protocol message
1745                                    let timestamp = std::time::SystemTime::now()
1746                                        .duration_since(std::time::UNIX_EPOCH)
1747                                        .unwrap_or_default()
1748                                        .as_secs();
1749
1750                                    let message = serde_json::json!({
1751                                        "protocol": "control",
1752                                        "data": data,
1753                                        "from": local_peer_id,
1754                                        "timestamp": timestamp
1755                                    });
1756
1757                                    if let Ok(msg_bytes) = serde_json::to_vec(&message) {
1758                                        // Send rejection message
1759                                        // We use send_to_peer directly on dual_node to avoid the checks in P2PNode::send_message
1760                                        // which might fail if we haven't fully registered the peer yet
1761                                        let _ = dual_node.send_to_peer(&peer_id, &msg_bytes).await;
1762
1763                                        // Give it a moment to send before disconnecting?
1764                                        // ant-quic might handle this, but a small yield is safe
1765                                        tokio::task::yield_now().await;
1766                                    }
1767                                }
1768
1769                                // Disconnect (TODO: Add disconnect method to dual_node or just drop?)
1770                                // For now, we just don't add it to active connections, effectively ignoring it
1771                                // Ideally we should actively close the connection
1772                                continue;
1773                            }
1774
1775                            // Add to active connections
1776                            active_connections.write().await.insert(peer_id_str.clone());
1777
1778                            // Update peer info or insert new
1779                            let mut peers_lock = peers.write().await;
1780                            if let Some(peer_info) = peers_lock.get_mut(&peer_id_str) {
1781                                peer_info.status = ConnectionStatus::Connected;
1782                                peer_info.connected_at = Instant::now();
1783                            } else {
1784                                // New incoming peer
1785                                debug!("Registering new incoming peer: {}", peer_id_str);
1786                                peers_lock.insert(
1787                                    peer_id_str.clone(),
1788                                    PeerInfo {
1789                                        peer_id: peer_id_str.clone(),
1790                                        addresses: vec![remote_address.to_string()],
1791                                        status: ConnectionStatus::Connected,
1792                                        last_seen: Instant::now(),
1793                                        connected_at: Instant::now(),
1794                                        protocols: Vec::new(),
1795                                        heartbeat_count: 0,
1796                                    },
1797                                );
1798                            }
1799
1800                            // Broadcast connection event
1801                            let _ = event_tx.send(P2PEvent::PeerConnected(peer_id_str));
1802                        }
1803                        ConnectionEvent::Lost { peer_id, reason } => {
1804                            let peer_id_str =
1805                                crate::transport::ant_quic_adapter::ant_peer_id_to_string(&peer_id);
1806                            debug!("Connection lost: peer={}, reason={}", peer_id_str, reason);
1807
1808                            // Remove from active connections
1809                            active_connections.write().await.remove(&peer_id_str);
1810
1811                            // Update peer info status
1812                            if let Some(peer_info) = peers.write().await.get_mut(&peer_id_str) {
1813                                peer_info.status = ConnectionStatus::Disconnected;
1814                                peer_info.last_seen = Instant::now();
1815                            }
1816
1817                            // Broadcast disconnection event
1818                            let _ = event_tx.send(P2PEvent::PeerDisconnected(peer_id_str));
1819                        }
1820                        ConnectionEvent::Failed { peer_id, reason } => {
1821                            let peer_id_str =
1822                                crate::transport::ant_quic_adapter::ant_peer_id_to_string(&peer_id);
1823                            warn!("Connection failed: peer={}, reason={}", peer_id_str, reason);
1824
1825                            // Remove from active connections
1826                            active_connections.write().await.remove(&peer_id_str);
1827
1828                            // Update peer info status
1829                            if let Some(peer_info) = peers.write().await.get_mut(&peer_id_str) {
1830                                peer_info.status = ConnectionStatus::Failed(reason.clone());
1831                            }
1832
1833                            // Broadcast disconnection event
1834                            let _ = event_tx.send(P2PEvent::PeerDisconnected(peer_id_str));
1835                        }
1836                    }
1837                }
1838                Err(broadcast::error::RecvError::Lagged(skipped)) => {
1839                    warn!(
1840                        "Connection event monitor lagged, skipped {} events",
1841                        skipped
1842                    );
1843                    continue;
1844                }
1845                Err(broadcast::error::RecvError::Closed) => {
1846                    info!("Connection event channel closed, stopping monitor");
1847                    break;
1848                }
1849            }
1850        }
1851
1852        info!("Connection lifecycle monitor stopped");
1853    }
1854
1855    /// Start connection monitor (called after node initialization)
1856    async fn start_connection_monitor(&self) {
1857        // The monitor task is already spawned in new() with a temporary peers map
1858        // This method is a placeholder for future enhancements where we might
1859        // need to restart the monitor or provide it with updated references
1860        debug!("Connection monitor already running from initialization");
1861    }
1862
1863    /// Keepalive task - sends periodic pings to prevent 30-second idle timeout
1864    ///
1865    /// ant-quic has a 30-second max_idle_timeout. This task sends a small keepalive
1866    /// message every 15 seconds (half the timeout) to all active connections to prevent
1867    /// them from timing out during periods of inactivity.
1868    async fn keepalive_task(
1869        active_connections: Arc<RwLock<HashSet<String>>>,
1870        dual_node: Arc<DualStackNetworkNode>,
1871        shutdown: Arc<AtomicBool>,
1872    ) {
1873        use tokio::time::{Duration, interval};
1874
1875        const KEEPALIVE_INTERVAL_SECS: u64 = 15; // Half of 30-second timeout
1876        const KEEPALIVE_PAYLOAD: &[u8] = b"keepalive"; // Small payload
1877
1878        let mut interval = interval(Duration::from_secs(KEEPALIVE_INTERVAL_SECS));
1879        interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip);
1880
1881        info!(
1882            "Keepalive task started (interval: {}s)",
1883            KEEPALIVE_INTERVAL_SECS
1884        );
1885
1886        loop {
1887            // Check shutdown flag first
1888            if shutdown.load(Ordering::Relaxed) {
1889                info!("Keepalive task shutting down");
1890                break;
1891            }
1892
1893            interval.tick().await;
1894
1895            // Get snapshot of active connections
1896            let peers: Vec<String> = { active_connections.read().await.iter().cloned().collect() };
1897
1898            if peers.is_empty() {
1899                trace!("Keepalive: no active connections");
1900                continue;
1901            }
1902
1903            debug!("Sending keepalive to {} active connections", peers.len());
1904
1905            // Send keepalive to each peer
1906            for peer_id in peers {
1907                match dual_node
1908                    .send_to_peer_string(&peer_id, KEEPALIVE_PAYLOAD)
1909                    .await
1910                {
1911                    Ok(_) => {
1912                        trace!("Keepalive sent to peer: {}", peer_id);
1913                    }
1914                    Err(e) => {
1915                        debug!(
1916                            "Failed to send keepalive to peer {}: {} (connection may have closed)",
1917                            peer_id, e
1918                        );
1919                        // Don't remove from active_connections here - let the lifecycle monitor handle it
1920                    }
1921                }
1922            }
1923        }
1924
1925        info!("Keepalive task stopped");
1926    }
1927
1928    /// Check system health
1929    pub async fn health_check(&self) -> Result<()> {
1930        if let Some(ref resource_manager) = self.resource_manager {
1931            resource_manager.health_check().await
1932        } else {
1933            // Basic health check without resource manager
1934            let peer_count = self.peer_count().await;
1935            if peer_count > self.config.max_connections {
1936                Err(P2PError::Network(
1937                    crate::error::NetworkError::ProtocolError(
1938                        format!("Too many connections: {peer_count}").into(),
1939                    ),
1940                ))
1941            } else {
1942                Ok(())
1943            }
1944        }
1945    }
1946
1947    /// Get production configuration (if enabled)
1948    pub fn production_config(&self) -> Option<&ProductionConfig> {
1949        self.config.production_config.as_ref()
1950    }
1951
1952    /// Check if production hardening is enabled
1953    pub fn is_production_mode(&self) -> bool {
1954        self.resource_manager.is_some()
1955    }
1956
1957    /// Get DHT reference
1958    pub fn dht(&self) -> Option<&Arc<RwLock<DHT>>> {
1959        self.dht.as_ref()
1960    }
1961
1962    /// Store a value in the DHT
1963    pub async fn dht_put(&self, key: crate::dht::Key, value: Vec<u8>) -> Result<()> {
1964        if let Some(ref dht) = self.dht {
1965            let mut dht_instance = dht.write().await;
1966            let dht_key = crate::dht::DhtKey::from_bytes(key);
1967            dht_instance
1968                .store(&dht_key, value.clone())
1969                .await
1970                .map_err(|e| {
1971                    P2PError::Dht(crate::error::DhtError::StoreFailed(
1972                        format!("{:?}: {e}", key).into(),
1973                    ))
1974                })?;
1975
1976            Ok(())
1977        } else {
1978            Err(P2PError::Dht(crate::error::DhtError::RoutingError(
1979                "DHT not enabled".to_string().into(),
1980            )))
1981        }
1982    }
1983
1984    /// Retrieve a value from the DHT
1985    pub async fn dht_get(&self, key: crate::dht::Key) -> Result<Option<Vec<u8>>> {
1986        if let Some(ref dht) = self.dht {
1987            let dht_instance = dht.read().await;
1988            let dht_key = crate::dht::DhtKey::from_bytes(key);
1989            let record_result = dht_instance.retrieve(&dht_key).await.map_err(|e| {
1990                P2PError::Dht(crate::error::DhtError::StoreFailed(
1991                    format!("Retrieve failed: {e}").into(),
1992                ))
1993            })?;
1994
1995            Ok(record_result)
1996        } else {
1997            Err(P2PError::Dht(crate::error::DhtError::RoutingError(
1998                "DHT not enabled".to_string().into(),
1999            )))
2000        }
2001    }
2002
2003    /// Add a discovered peer to the bootstrap cache
2004    pub async fn add_discovered_peer(&self, peer_id: PeerId, addresses: Vec<String>) -> Result<()> {
2005        if let Some(ref bootstrap_manager) = self.bootstrap_manager {
2006            let mut manager = bootstrap_manager.write().await;
2007            let socket_addresses: Vec<std::net::SocketAddr> = addresses
2008                .iter()
2009                .filter_map(|addr| addr.parse().ok())
2010                .collect();
2011            let contact = ContactEntry::new(peer_id, socket_addresses);
2012            manager.add_contact(contact).await.map_err(|e| {
2013                P2PError::Network(crate::error::NetworkError::ProtocolError(
2014                    format!("Failed to add peer to bootstrap cache: {e}").into(),
2015                ))
2016            })?;
2017        }
2018        Ok(())
2019    }
2020
2021    /// Update connection metrics for a peer in the bootstrap cache
2022    pub async fn update_peer_metrics(
2023        &self,
2024        peer_id: &PeerId,
2025        success: bool,
2026        latency_ms: Option<u64>,
2027        _error: Option<String>,
2028    ) -> Result<()> {
2029        if let Some(ref bootstrap_manager) = self.bootstrap_manager {
2030            let mut manager = bootstrap_manager.write().await;
2031
2032            // Create quality metrics based on the connection result
2033            let metrics = QualityMetrics {
2034                success_rate: if success { 1.0 } else { 0.0 },
2035                avg_latency_ms: latency_ms.unwrap_or(0) as f64,
2036                quality_score: if success { 0.8 } else { 0.2 }, // Initial score
2037                last_connection_attempt: chrono::Utc::now(),
2038                last_successful_connection: if success {
2039                    chrono::Utc::now()
2040                } else {
2041                    chrono::Utc::now() - chrono::Duration::hours(1)
2042                },
2043                uptime_score: 0.5,
2044            };
2045
2046            manager
2047                .update_contact_metrics(peer_id, metrics)
2048                .await
2049                .map_err(|e| {
2050                    P2PError::Network(crate::error::NetworkError::ProtocolError(
2051                        format!("Failed to update peer metrics: {e}").into(),
2052                    ))
2053                })?;
2054        }
2055        Ok(())
2056    }
2057
2058    /// Get bootstrap cache statistics
2059    pub async fn get_bootstrap_cache_stats(&self) -> Result<Option<crate::bootstrap::CacheStats>> {
2060        if let Some(ref bootstrap_manager) = self.bootstrap_manager {
2061            let manager = bootstrap_manager.read().await;
2062            let stats = manager.get_stats().await.map_err(|e| {
2063                P2PError::Network(crate::error::NetworkError::ProtocolError(
2064                    format!("Failed to get bootstrap stats: {e}").into(),
2065                ))
2066            })?;
2067            Ok(Some(stats))
2068        } else {
2069            Ok(None)
2070        }
2071    }
2072
2073    /// Get the number of cached bootstrap peers
2074    pub async fn cached_peer_count(&self) -> usize {
2075        if let Some(ref _bootstrap_manager) = self.bootstrap_manager
2076            && let Ok(Some(stats)) = self.get_bootstrap_cache_stats().await
2077        {
2078            return stats.total_contacts;
2079        }
2080        0
2081    }
2082
2083    /// Connect to bootstrap peers
2084    async fn connect_bootstrap_peers(&self) -> Result<()> {
2085        let mut bootstrap_contacts = Vec::new();
2086        let mut used_cache = false;
2087        let mut seen_addresses = std::collections::HashSet::new();
2088
2089        // CLI-provided bootstrap peers take priority - always include them first
2090        let cli_bootstrap_peers = if !self.config.bootstrap_peers_str.is_empty() {
2091            self.config.bootstrap_peers_str.clone()
2092        } else {
2093            // Convert Multiaddr to strings
2094            self.config
2095                .bootstrap_peers
2096                .iter()
2097                .map(|addr| addr.to_string())
2098                .collect::<Vec<_>>()
2099        };
2100
2101        if !cli_bootstrap_peers.is_empty() {
2102            info!(
2103                "Using {} CLI-provided bootstrap peers (priority)",
2104                cli_bootstrap_peers.len()
2105            );
2106            for addr in &cli_bootstrap_peers {
2107                if let Ok(socket_addr) = addr.parse::<std::net::SocketAddr>() {
2108                    seen_addresses.insert(socket_addr);
2109                    let contact = ContactEntry::new(
2110                        format!("cli_peer_{}", addr.chars().take(8).collect::<String>()),
2111                        vec![socket_addr],
2112                    );
2113                    bootstrap_contacts.push(contact);
2114                } else {
2115                    warn!("Invalid bootstrap address format: {}", addr);
2116                }
2117            }
2118        }
2119
2120        // Supplement with cached bootstrap peers (after CLI peers)
2121        if let Some(ref bootstrap_manager) = self.bootstrap_manager {
2122            let manager = bootstrap_manager.read().await;
2123            match manager.get_bootstrap_peers(20).await {
2124                // Try to get top 20 quality peers
2125                Ok(contacts) => {
2126                    if !contacts.is_empty() {
2127                        let mut added_from_cache = 0;
2128                        for contact in contacts {
2129                            // Only add if we haven't already added this address from CLI
2130                            let new_addresses: Vec<_> = contact
2131                                .addresses
2132                                .iter()
2133                                .filter(|addr| !seen_addresses.contains(addr))
2134                                .copied()
2135                                .collect();
2136
2137                            if !new_addresses.is_empty() {
2138                                for addr in &new_addresses {
2139                                    seen_addresses.insert(*addr);
2140                                }
2141                                let mut contact = contact.clone();
2142                                contact.addresses = new_addresses;
2143                                bootstrap_contacts.push(contact);
2144                                added_from_cache += 1;
2145                            }
2146                        }
2147                        if added_from_cache > 0 {
2148                            info!(
2149                                "Added {} cached bootstrap peers (supplementing CLI peers)",
2150                                added_from_cache
2151                            );
2152                            used_cache = true;
2153                        }
2154                    }
2155                }
2156                Err(e) => {
2157                    warn!("Failed to get cached bootstrap peers: {}", e);
2158                }
2159            }
2160        }
2161
2162        if bootstrap_contacts.is_empty() {
2163            info!("No bootstrap peers configured and no cached peers available");
2164            return Ok(());
2165        }
2166
2167        // Connect to bootstrap peers
2168        let mut successful_connections = 0;
2169        for contact in bootstrap_contacts {
2170            for addr in &contact.addresses {
2171                match self.connect_peer(&addr.to_string()).await {
2172                    Ok(peer_id) => {
2173                        info!("Connected to bootstrap peer: {} ({})", peer_id, addr);
2174                        successful_connections += 1;
2175
2176                        // Update bootstrap cache with successful connection
2177                        if let Some(ref bootstrap_manager) = self.bootstrap_manager {
2178                            let mut manager = bootstrap_manager.write().await;
2179                            let mut updated_contact = contact.clone();
2180                            updated_contact.peer_id = peer_id.clone();
2181                            updated_contact.update_connection_result(true, Some(100), None); // Assume 100ms latency for now
2182
2183                            if let Err(e) = manager.add_contact(updated_contact).await {
2184                                warn!("Failed to update bootstrap cache: {}", e);
2185                            }
2186                        }
2187                        break; // Successfully connected, move to next contact
2188                    }
2189                    Err(e) => {
2190                        warn!("Failed to connect to bootstrap peer {}: {}", addr, e);
2191
2192                        // Update bootstrap cache with failed connection
2193                        if used_cache && let Some(ref bootstrap_manager) = self.bootstrap_manager {
2194                            let mut manager = bootstrap_manager.write().await;
2195                            let mut updated_contact = contact.clone();
2196                            updated_contact.update_connection_result(
2197                                false,
2198                                None,
2199                                Some(e.to_string()),
2200                            );
2201
2202                            if let Err(e) = manager.add_contact(updated_contact).await {
2203                                warn!("Failed to update bootstrap cache: {}", e);
2204                            }
2205                        }
2206                    }
2207                }
2208            }
2209        }
2210
2211        if successful_connections == 0 {
2212            if !used_cache {
2213                warn!("Failed to connect to any bootstrap peers");
2214            }
2215            return Err(P2PError::Network(NetworkError::ConnectionFailed {
2216                addr: std::net::SocketAddr::from(([0, 0, 0, 0], 0)), // Placeholder for bootstrap ensemble
2217                reason: "Failed to connect to any bootstrap peers".into(),
2218            }));
2219        }
2220        info!(
2221            "Successfully connected to {} bootstrap peers",
2222            successful_connections
2223        );
2224
2225        Ok(())
2226    }
2227
2228    /// Disconnect from all peers
2229    async fn disconnect_all_peers(&self) -> Result<()> {
2230        let peer_ids: Vec<PeerId> = self.peers.read().await.keys().cloned().collect();
2231
2232        for peer_id in peer_ids {
2233            self.disconnect_peer(&peer_id).await?;
2234        }
2235
2236        Ok(())
2237    }
2238
2239    /// Perform periodic maintenance tasks
2240    async fn periodic_tasks(&self) -> Result<()> {
2241        // Update peer last seen timestamps
2242        // Remove stale connections
2243        // Perform DHT maintenance
2244        // This is a placeholder for now
2245
2246        Ok(())
2247    }
2248}
2249
2250/// Network sender trait for sending messages
2251#[async_trait::async_trait]
2252pub trait NetworkSender: Send + Sync {
2253    /// Send a message to a specific peer
2254    async fn send_message(&self, peer_id: &PeerId, protocol: &str, data: Vec<u8>) -> Result<()>;
2255
2256    /// Get our local peer ID
2257    fn local_peer_id(&self) -> &PeerId;
2258}
2259
2260/// Lightweight wrapper for P2PNode to implement NetworkSender
2261#[derive(Clone)]
2262pub struct P2PNetworkSender {
2263    peer_id: PeerId,
2264    // Use channels for async communication with the P2P node
2265    send_tx: tokio::sync::mpsc::UnboundedSender<(PeerId, String, Vec<u8>)>,
2266}
2267
2268impl P2PNetworkSender {
2269    pub fn new(
2270        peer_id: PeerId,
2271        send_tx: tokio::sync::mpsc::UnboundedSender<(PeerId, String, Vec<u8>)>,
2272    ) -> Self {
2273        Self { peer_id, send_tx }
2274    }
2275}
2276
2277/// Implementation of NetworkSender trait for P2PNetworkSender
2278#[async_trait::async_trait]
2279impl NetworkSender for P2PNetworkSender {
2280    /// Send a message to a specific peer via the P2P network
2281    async fn send_message(&self, peer_id: &PeerId, protocol: &str, data: Vec<u8>) -> Result<()> {
2282        self.send_tx
2283            .send((peer_id.clone(), protocol.to_string(), data))
2284            .map_err(|_| {
2285                P2PError::Network(crate::error::NetworkError::ProtocolError(
2286                    "Failed to send message via channel".to_string().into(),
2287                ))
2288            })?;
2289        Ok(())
2290    }
2291
2292    /// Get our local peer ID
2293    fn local_peer_id(&self) -> &PeerId {
2294        &self.peer_id
2295    }
2296}
2297
2298/// Builder pattern for creating P2P nodes
2299pub struct NodeBuilder {
2300    config: NodeConfig,
2301}
2302
2303impl Default for NodeBuilder {
2304    fn default() -> Self {
2305        Self::new()
2306    }
2307}
2308
2309impl NodeBuilder {
2310    /// Create a new node builder
2311    pub fn new() -> Self {
2312        Self {
2313            config: NodeConfig::default(),
2314        }
2315    }
2316
2317    /// Set the peer ID
2318    pub fn with_peer_id(mut self, peer_id: PeerId) -> Self {
2319        self.config.peer_id = Some(peer_id);
2320        self
2321    }
2322
2323    /// Add a listen address
2324    pub fn listen_on(mut self, addr: &str) -> Self {
2325        if let Ok(multiaddr) = addr.parse() {
2326            self.config.listen_addrs.push(multiaddr);
2327        }
2328        self
2329    }
2330
2331    /// Add a bootstrap peer
2332    pub fn with_bootstrap_peer(mut self, addr: &str) -> Self {
2333        if let Ok(multiaddr) = addr.parse() {
2334            self.config.bootstrap_peers.push(multiaddr);
2335        }
2336        self.config.bootstrap_peers_str.push(addr.to_string());
2337        self
2338    }
2339
2340    /// Enable IPv6 support
2341    pub fn with_ipv6(mut self, enable: bool) -> Self {
2342        self.config.enable_ipv6 = enable;
2343        self
2344    }
2345
2346    // MCP removed: builder methods deleted
2347
2348    /// Set connection timeout
2349    pub fn with_connection_timeout(mut self, timeout: Duration) -> Self {
2350        self.config.connection_timeout = timeout;
2351        self
2352    }
2353
2354    /// Set maximum connections
2355    pub fn with_max_connections(mut self, max: usize) -> Self {
2356        self.config.max_connections = max;
2357        self
2358    }
2359
2360    /// Enable production mode with default configuration
2361    pub fn with_production_mode(mut self) -> Self {
2362        self.config.production_config = Some(ProductionConfig::default());
2363        self
2364    }
2365
2366    /// Configure production settings
2367    pub fn with_production_config(mut self, production_config: ProductionConfig) -> Self {
2368        self.config.production_config = Some(production_config);
2369        self
2370    }
2371
2372    /// Configure DHT settings
2373    pub fn with_dht(mut self, dht_config: DHTConfig) -> Self {
2374        self.config.dht_config = dht_config;
2375        self
2376    }
2377
2378    /// Enable DHT with default configuration
2379    pub fn with_default_dht(mut self) -> Self {
2380        self.config.dht_config = DHTConfig::default();
2381        self
2382    }
2383
2384    /// Build the P2P node
2385    pub async fn build(self) -> Result<P2PNode> {
2386        P2PNode::new(self.config).await
2387    }
2388}
2389
2390/// Standalone function to handle received messages without borrowing self
2391#[allow(dead_code)] // Deprecated during ant-quic migration
2392async fn handle_received_message_standalone(
2393    message_data: Vec<u8>,
2394    peer_id: &PeerId,
2395    _protocol: &str,
2396    event_tx: &broadcast::Sender<P2PEvent>,
2397) -> Result<()> {
2398    // Parse the message format
2399    match serde_json::from_slice::<serde_json::Value>(&message_data) {
2400        Ok(message) => {
2401            if let (Some(protocol), Some(data), Some(from)) = (
2402                message.get("protocol").and_then(|v| v.as_str()),
2403                message.get("data").and_then(|v| v.as_array()),
2404                message.get("from").and_then(|v| v.as_str()),
2405            ) {
2406                // Convert data array back to bytes
2407                let data_bytes: Vec<u8> = data
2408                    .iter()
2409                    .filter_map(|v| v.as_u64().map(|n| n as u8))
2410                    .collect();
2411
2412                // Generate message event
2413                let event = P2PEvent::Message {
2414                    topic: protocol.to_string(),
2415                    source: from.to_string(),
2416                    data: data_bytes,
2417                };
2418
2419                let _ = event_tx.send(event);
2420                debug!("Generated message event from peer: {}", peer_id);
2421            }
2422        }
2423        Err(e) => {
2424            warn!("Failed to parse received message from {}: {}", peer_id, e);
2425        }
2426    }
2427
2428    Ok(())
2429}
2430
2431// MCP removed: standalone MCP handler deleted
2432
2433/// Helper function to handle protocol message creation
2434#[allow(dead_code)]
2435fn handle_protocol_message_creation(protocol: &str, data: Vec<u8>) -> Option<Vec<u8>> {
2436    match create_protocol_message_static(protocol, data) {
2437        Ok(msg) => Some(msg),
2438        Err(e) => {
2439            warn!("Failed to create protocol message: {}", e);
2440            None
2441        }
2442    }
2443}
2444
2445/// Helper function to handle message send result
2446#[allow(dead_code)]
2447async fn handle_message_send_result(result: crate::error::P2pResult<()>, peer_id: &PeerId) {
2448    match result {
2449        Ok(_) => {
2450            debug!("Message sent to peer {} via transport layer", peer_id);
2451        }
2452        Err(e) => {
2453            warn!("Failed to send message to peer {}: {}", peer_id, e);
2454        }
2455    }
2456}
2457
2458/// Helper function to check rate limit
2459#[allow(dead_code)] // Deprecated during ant-quic migration
2460fn check_rate_limit(
2461    rate_limiter: &RateLimiter,
2462    socket_addr: &std::net::SocketAddr,
2463    remote_addr: &NetworkAddress,
2464) -> Result<()> {
2465    rate_limiter.check_ip(&socket_addr.ip()).map_err(|e| {
2466        warn!("Rate limit exceeded for {}: {}", remote_addr, e);
2467        e
2468    })
2469}
2470
2471/// Helper function to register a new peer
2472#[allow(dead_code)] // Deprecated during ant-quic migration
2473async fn register_new_peer(
2474    peers: &Arc<RwLock<HashMap<PeerId, PeerInfo>>>,
2475    peer_id: &PeerId,
2476    remote_addr: &NetworkAddress,
2477) {
2478    let mut peers_guard = peers.write().await;
2479    let peer_info = PeerInfo {
2480        peer_id: peer_id.clone(),
2481        addresses: vec![remote_addr.to_string()],
2482        connected_at: tokio::time::Instant::now(),
2483        last_seen: tokio::time::Instant::now(),
2484        status: ConnectionStatus::Connected,
2485        protocols: vec!["p2p-chat/1.0.0".to_string()],
2486        heartbeat_count: 0,
2487    };
2488    peers_guard.insert(peer_id.clone(), peer_info);
2489}
2490
2491/// Helper function to spawn connection handler
2492#[allow(dead_code)] // Deprecated during ant-quic migration
2493fn spawn_connection_handler(
2494    connection: Box<dyn crate::transport::Connection>,
2495    peer_id: PeerId,
2496    event_tx: broadcast::Sender<P2PEvent>,
2497    peers: Arc<RwLock<HashMap<PeerId, PeerInfo>>>,
2498) {
2499    tokio::spawn(async move {
2500        handle_peer_connection(connection, peer_id, event_tx, peers).await;
2501    });
2502}
2503
2504/// Helper function to handle peer connection
2505#[allow(dead_code)] // Deprecated during ant-quic migration
2506async fn handle_peer_connection(
2507    mut connection: Box<dyn crate::transport::Connection>,
2508    peer_id: PeerId,
2509    event_tx: broadcast::Sender<P2PEvent>,
2510    peers: Arc<RwLock<HashMap<PeerId, PeerInfo>>>,
2511) {
2512    loop {
2513        match connection.receive().await {
2514            Ok(message_data) => {
2515                debug!(
2516                    "Received {} bytes from peer: {}",
2517                    message_data.len(),
2518                    peer_id
2519                );
2520
2521                // Handle the received message
2522                if let Err(e) = handle_received_message_standalone(
2523                    message_data,
2524                    &peer_id,
2525                    "unknown", // TODO: Extract protocol from message
2526                    &event_tx,
2527                )
2528                .await
2529                {
2530                    warn!("Failed to handle message from peer {}: {}", peer_id, e);
2531                }
2532            }
2533            Err(e) => {
2534                warn!("Failed to receive message from {}: {}", peer_id, e);
2535
2536                // Check if connection is still alive
2537                if !connection.is_alive().await {
2538                    info!("Connection to {} is dead, removing peer", peer_id);
2539
2540                    // Remove dead peer
2541                    remove_peer(&peers, &peer_id).await;
2542
2543                    // Generate peer disconnected event
2544                    let _ = event_tx.send(P2PEvent::PeerDisconnected(peer_id.clone()));
2545
2546                    break; // Exit the message receiving loop
2547                }
2548
2549                // Brief pause before retrying
2550                tokio::time::sleep(std::time::Duration::from_millis(100)).await;
2551            }
2552        }
2553    }
2554}
2555
2556/// Helper function to remove a peer
2557#[allow(dead_code)] // Deprecated during ant-quic migration
2558async fn remove_peer(peers: &Arc<RwLock<HashMap<PeerId, PeerInfo>>>, peer_id: &PeerId) {
2559    let mut peers_guard = peers.write().await;
2560    peers_guard.remove(peer_id);
2561}
2562
2563/// Helper function to update peer heartbeat
2564#[allow(dead_code)]
2565async fn update_peer_heartbeat(
2566    peers: &Arc<RwLock<HashMap<PeerId, PeerInfo>>>,
2567    peer_id: &PeerId,
2568) -> Result<()> {
2569    let mut peers_guard = peers.write().await;
2570    match peers_guard.get_mut(peer_id) {
2571        Some(peer_info) => {
2572            peer_info.last_seen = Instant::now();
2573            peer_info.heartbeat_count += 1;
2574            Ok(())
2575        }
2576        None => {
2577            warn!("Received heartbeat from unknown peer: {}", peer_id);
2578            Err(P2PError::Network(NetworkError::PeerNotFound(
2579                format!("Peer {} not found", peer_id).into(),
2580            )))
2581        }
2582    }
2583}
2584
2585/// Helper function to get resource metrics
2586#[allow(dead_code)]
2587async fn get_resource_metrics(resource_manager: &Option<Arc<ResourceManager>>) -> (u64, f64) {
2588    if let Some(manager) = resource_manager {
2589        let metrics = manager.get_metrics().await;
2590        (metrics.memory_used, metrics.cpu_usage)
2591    } else {
2592        (0, 0.0)
2593    }
2594}
2595
2596#[cfg(test)]
2597mod tests {
2598    use super::*;
2599    // MCP removed from tests
2600    use std::time::Duration;
2601    use tokio::time::timeout;
2602
2603    // Test tool handler for network tests
2604
2605    // MCP removed
2606
2607    /// Helper function to create a test node configuration
2608    fn create_test_node_config() -> NodeConfig {
2609        NodeConfig {
2610            peer_id: Some("test_peer_123".to_string()),
2611            listen_addrs: vec![
2612                std::net::SocketAddr::new(std::net::IpAddr::V6(std::net::Ipv6Addr::LOCALHOST), 0),
2613                std::net::SocketAddr::new(std::net::IpAddr::V4(std::net::Ipv4Addr::LOCALHOST), 0),
2614            ],
2615            listen_addr: std::net::SocketAddr::new(
2616                std::net::IpAddr::V4(std::net::Ipv4Addr::LOCALHOST),
2617                0,
2618            ),
2619            bootstrap_peers: vec![],
2620            bootstrap_peers_str: vec![],
2621            enable_ipv6: true,
2622
2623            connection_timeout: Duration::from_millis(300),
2624            keep_alive_interval: Duration::from_secs(30),
2625            max_connections: 100,
2626            max_incoming_connections: 50,
2627            dht_config: DHTConfig::default(),
2628            security_config: SecurityConfig::default(),
2629            production_config: None,
2630            bootstrap_cache_config: None,
2631            // identity_config: None,
2632        }
2633    }
2634
2635    /// Helper function to create a test tool
2636    // MCP removed: test tool helper deleted
2637
2638    #[tokio::test]
2639    async fn test_node_config_default() {
2640        let config = NodeConfig::default();
2641
2642        assert!(config.peer_id.is_none());
2643        assert_eq!(config.listen_addrs.len(), 2);
2644        assert!(config.enable_ipv6);
2645        assert_eq!(config.max_connections, 10000); // Fixed: matches actual default
2646        assert_eq!(config.max_incoming_connections, 100);
2647        assert_eq!(config.connection_timeout, Duration::from_secs(30));
2648    }
2649
2650    #[tokio::test]
2651    async fn test_dht_config_default() {
2652        let config = DHTConfig::default();
2653
2654        assert_eq!(config.k_value, 20);
2655        assert_eq!(config.alpha_value, 5);
2656        assert_eq!(config.record_ttl, Duration::from_secs(3600));
2657        assert_eq!(config.refresh_interval, Duration::from_secs(600));
2658    }
2659
2660    #[tokio::test]
2661    async fn test_security_config_default() {
2662        let config = SecurityConfig::default();
2663
2664        assert!(config.enable_noise);
2665        assert!(config.enable_tls);
2666        assert_eq!(config.trust_level, TrustLevel::Basic);
2667    }
2668
2669    #[test]
2670    fn test_trust_level_variants() {
2671        // Test that all trust level variants can be created
2672        let _none = TrustLevel::None;
2673        let _basic = TrustLevel::Basic;
2674        let _full = TrustLevel::Full;
2675
2676        // Test equality
2677        assert_eq!(TrustLevel::None, TrustLevel::None);
2678        assert_eq!(TrustLevel::Basic, TrustLevel::Basic);
2679        assert_eq!(TrustLevel::Full, TrustLevel::Full);
2680        assert_ne!(TrustLevel::None, TrustLevel::Basic);
2681    }
2682
2683    #[test]
2684    fn test_connection_status_variants() {
2685        let connecting = ConnectionStatus::Connecting;
2686        let connected = ConnectionStatus::Connected;
2687        let disconnecting = ConnectionStatus::Disconnecting;
2688        let disconnected = ConnectionStatus::Disconnected;
2689        let failed = ConnectionStatus::Failed("test error".to_string());
2690
2691        assert_eq!(connecting, ConnectionStatus::Connecting);
2692        assert_eq!(connected, ConnectionStatus::Connected);
2693        assert_eq!(disconnecting, ConnectionStatus::Disconnecting);
2694        assert_eq!(disconnected, ConnectionStatus::Disconnected);
2695        assert_ne!(connecting, connected);
2696
2697        if let ConnectionStatus::Failed(msg) = failed {
2698            assert_eq!(msg, "test error");
2699        } else {
2700            panic!("Expected Failed status");
2701        }
2702    }
2703
2704    #[tokio::test]
2705    async fn test_node_creation() -> Result<()> {
2706        let config = create_test_node_config();
2707        let node = P2PNode::new(config).await?;
2708
2709        assert_eq!(node.peer_id(), "test_peer_123");
2710        assert!(!node.is_running().await);
2711        assert_eq!(node.peer_count().await, 0);
2712        assert!(node.connected_peers().await.is_empty());
2713
2714        Ok(())
2715    }
2716
2717    #[tokio::test]
2718    async fn test_node_creation_without_peer_id() -> Result<()> {
2719        let mut config = create_test_node_config();
2720        config.peer_id = None;
2721
2722        let node = P2PNode::new(config).await?;
2723
2724        // Should have generated a peer ID
2725        assert!(node.peer_id().starts_with("peer_"));
2726        assert!(!node.is_running().await);
2727
2728        Ok(())
2729    }
2730
2731    #[tokio::test]
2732    async fn test_node_lifecycle() -> Result<()> {
2733        let config = create_test_node_config();
2734        let node = P2PNode::new(config).await?;
2735
2736        // Initially not running
2737        assert!(!node.is_running().await);
2738
2739        // Start the node
2740        node.start().await?;
2741        assert!(node.is_running().await);
2742
2743        // Check listen addresses were set (at least one)
2744        let listen_addrs = node.listen_addrs().await;
2745        assert!(
2746            !listen_addrs.is_empty(),
2747            "Expected at least one listening address"
2748        );
2749
2750        // Stop the node
2751        node.stop().await?;
2752        assert!(!node.is_running().await);
2753
2754        Ok(())
2755    }
2756
2757    #[tokio::test]
2758    async fn test_peer_connection() -> Result<()> {
2759        let config = create_test_node_config();
2760        let node = P2PNode::new(config).await?;
2761
2762        let peer_addr = "127.0.0.1:0";
2763
2764        // Connect to a peer
2765        let peer_id = node.connect_peer(peer_addr).await?;
2766        assert!(peer_id.starts_with("peer_from_"));
2767
2768        // Check peer count
2769        assert_eq!(node.peer_count().await, 1);
2770
2771        // Check connected peers
2772        let connected_peers = node.connected_peers().await;
2773        assert_eq!(connected_peers.len(), 1);
2774        assert_eq!(connected_peers[0], peer_id);
2775
2776        // Get peer info
2777        let peer_info = node.peer_info(&peer_id).await;
2778        assert!(peer_info.is_some());
2779        let info = peer_info.expect("Peer info should exist after adding peer");
2780        assert_eq!(info.peer_id, peer_id);
2781        assert_eq!(info.status, ConnectionStatus::Connected);
2782        assert!(info.protocols.contains(&"p2p-foundation/1.0".to_string()));
2783
2784        // Disconnect from peer
2785        node.disconnect_peer(&peer_id).await?;
2786        assert_eq!(node.peer_count().await, 0);
2787
2788        Ok(())
2789    }
2790
2791    #[tokio::test]
2792    async fn test_event_subscription() -> Result<()> {
2793        let config = create_test_node_config();
2794        let node = P2PNode::new(config).await?;
2795
2796        let mut events = node.subscribe_events();
2797        let peer_addr = "127.0.0.1:0";
2798
2799        // Connect to a peer (this should emit an event)
2800        let peer_id = node.connect_peer(peer_addr).await?;
2801
2802        // Check for PeerConnected event
2803        let event = timeout(Duration::from_millis(100), events.recv()).await;
2804        assert!(event.is_ok());
2805
2806        let event_result = event
2807            .expect("Should receive event")
2808            .expect("Event should not be error");
2809        match event_result {
2810            P2PEvent::PeerConnected(event_peer_id) => {
2811                assert_eq!(event_peer_id, peer_id);
2812            }
2813            _ => panic!("Expected PeerConnected event"),
2814        }
2815
2816        // Disconnect from peer (this should emit another event)
2817        node.disconnect_peer(&peer_id).await?;
2818
2819        // Check for PeerDisconnected event
2820        let event = timeout(Duration::from_millis(100), events.recv()).await;
2821        assert!(event.is_ok());
2822
2823        let event_result = event
2824            .expect("Should receive event")
2825            .expect("Event should not be error");
2826        match event_result {
2827            P2PEvent::PeerDisconnected(event_peer_id) => {
2828                assert_eq!(event_peer_id, peer_id);
2829            }
2830            _ => panic!("Expected PeerDisconnected event"),
2831        }
2832
2833        Ok(())
2834    }
2835
2836    #[tokio::test]
2837    async fn test_message_sending() -> Result<()> {
2838        // Create two nodes
2839        let mut config1 = create_test_node_config();
2840        config1.listen_addr =
2841            std::net::SocketAddr::new(std::net::IpAddr::V4(std::net::Ipv4Addr::LOCALHOST), 0);
2842        let node1 = P2PNode::new(config1).await?;
2843        node1.start().await?;
2844
2845        let mut config2 = create_test_node_config();
2846        config2.listen_addr =
2847            std::net::SocketAddr::new(std::net::IpAddr::V4(std::net::Ipv4Addr::LOCALHOST), 0);
2848        let node2 = P2PNode::new(config2).await?;
2849        node2.start().await?;
2850
2851        // Wait a bit for nodes to start listening
2852        tokio::time::sleep(tokio::time::Duration::from_millis(50)).await;
2853
2854        // Get actual listening address of node2
2855        let node2_addr = node2.local_addr().ok_or_else(|| {
2856            P2PError::Network(crate::error::NetworkError::ProtocolError(
2857                "No listening address".to_string().into(),
2858            ))
2859        })?;
2860
2861        // Connect node1 to node2
2862        let peer_id =
2863            match timeout(Duration::from_millis(500), node1.connect_peer(&node2_addr)).await {
2864                Ok(res) => res?,
2865                Err(_) => return Err(P2PError::Network(NetworkError::Timeout)),
2866            };
2867
2868        // Wait a bit for connection to establish
2869        tokio::time::sleep(tokio::time::Duration::from_millis(30)).await;
2870
2871        // Send a message
2872        let message_data = b"Hello, peer!".to_vec();
2873        let result = match timeout(
2874            Duration::from_millis(500),
2875            node1.send_message(&peer_id, "test-protocol", message_data),
2876        )
2877        .await
2878        {
2879            Ok(res) => res,
2880            Err(_) => return Err(P2PError::Network(NetworkError::Timeout)),
2881        };
2882        // For now, we'll just check that we don't get a "not connected" error
2883        // The actual send might fail due to no handler on the other side
2884        if let Err(e) = &result {
2885            assert!(!e.to_string().contains("not connected"), "Got error: {}", e);
2886        }
2887
2888        // Try to send to non-existent peer
2889        let non_existent_peer = "non_existent_peer".to_string();
2890        let result = node1
2891            .send_message(&non_existent_peer, "test-protocol", vec![])
2892            .await;
2893        assert!(result.is_err(), "Sending to non-existent peer should fail");
2894
2895        Ok(())
2896    }
2897
2898    #[tokio::test]
2899    async fn test_remote_mcp_operations() -> Result<()> {
2900        let config = create_test_node_config();
2901        let node = P2PNode::new(config).await?;
2902
2903        // MCP removed; test reduced to simple start/stop
2904        node.start().await?;
2905        node.stop().await?;
2906        Ok(())
2907    }
2908
2909    #[tokio::test]
2910    async fn test_health_check() -> Result<()> {
2911        let config = create_test_node_config();
2912        let node = P2PNode::new(config).await?;
2913
2914        // Health check should pass with no connections
2915        let result = node.health_check().await;
2916        assert!(result.is_ok());
2917
2918        // Note: We're not actually connecting to real peers here
2919        // since that would require running bootstrap nodes.
2920        // The health check should still pass with no connections.
2921
2922        Ok(())
2923    }
2924
2925    #[tokio::test]
2926    async fn test_node_uptime() -> Result<()> {
2927        let config = create_test_node_config();
2928        let node = P2PNode::new(config).await?;
2929
2930        let uptime1 = node.uptime();
2931        assert!(uptime1 >= Duration::from_secs(0));
2932
2933        // Wait a bit
2934        tokio::time::sleep(Duration::from_millis(10)).await;
2935
2936        let uptime2 = node.uptime();
2937        assert!(uptime2 > uptime1);
2938
2939        Ok(())
2940    }
2941
2942    #[tokio::test]
2943    async fn test_node_config_access() -> Result<()> {
2944        let config = create_test_node_config();
2945        let expected_peer_id = config.peer_id.clone();
2946        let node = P2PNode::new(config).await?;
2947
2948        let node_config = node.config();
2949        assert_eq!(node_config.peer_id, expected_peer_id);
2950        assert_eq!(node_config.max_connections, 100);
2951        // MCP removed
2952
2953        Ok(())
2954    }
2955
2956    #[tokio::test]
2957    async fn test_mcp_server_access() -> Result<()> {
2958        let config = create_test_node_config();
2959        let _node = P2PNode::new(config).await?;
2960
2961        // MCP removed
2962        Ok(())
2963    }
2964
2965    #[tokio::test]
2966    async fn test_dht_access() -> Result<()> {
2967        let config = create_test_node_config();
2968        let node = P2PNode::new(config).await?;
2969
2970        // Should have DHT
2971        assert!(node.dht().is_some());
2972
2973        Ok(())
2974    }
2975
2976    #[tokio::test]
2977    async fn test_node_builder() -> Result<()> {
2978        // Create a config using the builder but don't actually build a real node
2979        let builder = P2PNode::builder()
2980            .with_peer_id("builder_test_peer".to_string())
2981            .listen_on("/ip4/127.0.0.1/tcp/0")
2982            .listen_on("/ip6/::1/tcp/0")
2983            .with_bootstrap_peer("/ip4/127.0.0.1/tcp/9000") // Use a valid port number
2984            .with_ipv6(true)
2985            .with_connection_timeout(Duration::from_secs(15))
2986            .with_max_connections(200);
2987
2988        // Test the configuration that was built
2989        let config = builder.config;
2990        assert_eq!(config.peer_id, Some("builder_test_peer".to_string()));
2991        assert_eq!(config.listen_addrs.len(), 2); // 2 added by builder (no defaults)
2992        assert_eq!(config.bootstrap_peers_str.len(), 1); // Check bootstrap_peers_str instead
2993        assert!(config.enable_ipv6);
2994        assert_eq!(config.connection_timeout, Duration::from_secs(15));
2995        assert_eq!(config.max_connections, 200);
2996
2997        Ok(())
2998    }
2999
3000    #[tokio::test]
3001    async fn test_bootstrap_peers() -> Result<()> {
3002        let mut config = create_test_node_config();
3003        config.bootstrap_peers = vec![
3004            std::net::SocketAddr::new(std::net::IpAddr::V4(std::net::Ipv4Addr::LOCALHOST), 9200),
3005            std::net::SocketAddr::new(std::net::IpAddr::V4(std::net::Ipv4Addr::LOCALHOST), 9201),
3006        ];
3007
3008        let node = P2PNode::new(config).await?;
3009
3010        // Start node (which attempts to connect to bootstrap peers)
3011        node.start().await?;
3012
3013        // In a test environment, bootstrap peers may not be available
3014        // The test verifies the node starts correctly with bootstrap configuration
3015        // Peer count may include local/internal tracking, so we just verify it's reasonable
3016        let _peer_count = node.peer_count().await;
3017
3018        node.stop().await?;
3019        Ok(())
3020    }
3021
3022    #[tokio::test]
3023    async fn test_production_mode_disabled() -> Result<()> {
3024        let config = create_test_node_config();
3025        let node = P2PNode::new(config).await?;
3026
3027        assert!(!node.is_production_mode());
3028        assert!(node.production_config().is_none());
3029
3030        // Resource metrics should fail when production mode is disabled
3031        let result = node.resource_metrics().await;
3032        assert!(result.is_err());
3033        assert!(result.unwrap_err().to_string().contains("not enabled"));
3034
3035        Ok(())
3036    }
3037
3038    #[tokio::test]
3039    async fn test_network_event_variants() {
3040        // Test that all network event variants can be created
3041        let peer_id = "test_peer".to_string();
3042        let address = "/ip4/127.0.0.1/tcp/9000".to_string();
3043
3044        let _peer_connected = NetworkEvent::PeerConnected {
3045            peer_id: peer_id.clone(),
3046            addresses: vec![address.clone()],
3047        };
3048
3049        let _peer_disconnected = NetworkEvent::PeerDisconnected {
3050            peer_id: peer_id.clone(),
3051            reason: "test disconnect".to_string(),
3052        };
3053
3054        let _message_received = NetworkEvent::MessageReceived {
3055            peer_id: peer_id.clone(),
3056            protocol: "test-protocol".to_string(),
3057            data: vec![1, 2, 3],
3058        };
3059
3060        let _connection_failed = NetworkEvent::ConnectionFailed {
3061            peer_id: Some(peer_id.clone()),
3062            address: address.clone(),
3063            error: "connection refused".to_string(),
3064        };
3065
3066        let _dht_stored = NetworkEvent::DHTRecordStored {
3067            key: vec![1, 2, 3],
3068            value: vec![4, 5, 6],
3069        };
3070
3071        let _dht_retrieved = NetworkEvent::DHTRecordRetrieved {
3072            key: vec![1, 2, 3],
3073            value: Some(vec![4, 5, 6]),
3074        };
3075    }
3076
3077    #[tokio::test]
3078    async fn test_peer_info_structure() {
3079        let peer_info = PeerInfo {
3080            peer_id: "test_peer".to_string(),
3081            addresses: vec!["/ip4/127.0.0.1/tcp/9000".to_string()],
3082            connected_at: Instant::now(),
3083            last_seen: Instant::now(),
3084            status: ConnectionStatus::Connected,
3085            protocols: vec!["test-protocol".to_string()],
3086            heartbeat_count: 0,
3087        };
3088
3089        assert_eq!(peer_info.peer_id, "test_peer");
3090        assert_eq!(peer_info.addresses.len(), 1);
3091        assert_eq!(peer_info.status, ConnectionStatus::Connected);
3092        assert_eq!(peer_info.protocols.len(), 1);
3093    }
3094
3095    #[tokio::test]
3096    async fn test_serialization() -> Result<()> {
3097        // Test that configs can be serialized/deserialized
3098        let config = create_test_node_config();
3099        let serialized = serde_json::to_string(&config)?;
3100        let deserialized: NodeConfig = serde_json::from_str(&serialized)?;
3101
3102        assert_eq!(config.peer_id, deserialized.peer_id);
3103        assert_eq!(config.listen_addrs, deserialized.listen_addrs);
3104        assert_eq!(config.enable_ipv6, deserialized.enable_ipv6);
3105
3106        Ok(())
3107    }
3108
3109    #[tokio::test]
3110    async fn test_get_peer_id_by_address_found() -> Result<()> {
3111        let config = create_test_node_config();
3112        let node = P2PNode::new(config).await?;
3113
3114        // Manually insert a peer for testing
3115        let test_peer_id = "peer_test_123".to_string();
3116        let test_address = "192.168.1.100:9000".to_string();
3117
3118        let peer_info = PeerInfo {
3119            peer_id: test_peer_id.clone(),
3120            addresses: vec![test_address.clone()],
3121            connected_at: Instant::now(),
3122            last_seen: Instant::now(),
3123            status: ConnectionStatus::Connected,
3124            protocols: vec!["test-protocol".to_string()],
3125            heartbeat_count: 0,
3126        };
3127
3128        node.peers
3129            .write()
3130            .await
3131            .insert(test_peer_id.clone(), peer_info);
3132
3133        // Test: Find peer by address
3134        let found_peer_id = node.get_peer_id_by_address(&test_address).await;
3135        assert_eq!(found_peer_id, Some(test_peer_id));
3136
3137        Ok(())
3138    }
3139
3140    #[tokio::test]
3141    async fn test_get_peer_id_by_address_not_found() -> Result<()> {
3142        let config = create_test_node_config();
3143        let node = P2PNode::new(config).await?;
3144
3145        // Test: Try to find a peer that doesn't exist
3146        let result = node.get_peer_id_by_address("192.168.1.200:9000").await;
3147        assert_eq!(result, None);
3148
3149        Ok(())
3150    }
3151
3152    #[tokio::test]
3153    async fn test_get_peer_id_by_address_invalid_format() -> Result<()> {
3154        let config = create_test_node_config();
3155        let node = P2PNode::new(config).await?;
3156
3157        // Test: Invalid address format should return None
3158        let result = node.get_peer_id_by_address("invalid-address").await;
3159        assert_eq!(result, None);
3160
3161        Ok(())
3162    }
3163
3164    #[tokio::test]
3165    async fn test_get_peer_id_by_address_multiple_peers() -> Result<()> {
3166        let config = create_test_node_config();
3167        let node = P2PNode::new(config).await?;
3168
3169        // Add multiple peers with different addresses
3170        let peer1_id = "peer_1".to_string();
3171        let peer1_addr = "192.168.1.101:9001".to_string();
3172
3173        let peer2_id = "peer_2".to_string();
3174        let peer2_addr = "192.168.1.102:9002".to_string();
3175
3176        let peer1_info = PeerInfo {
3177            peer_id: peer1_id.clone(),
3178            addresses: vec![peer1_addr.clone()],
3179            connected_at: Instant::now(),
3180            last_seen: Instant::now(),
3181            status: ConnectionStatus::Connected,
3182            protocols: vec!["test-protocol".to_string()],
3183            heartbeat_count: 0,
3184        };
3185
3186        let peer2_info = PeerInfo {
3187            peer_id: peer2_id.clone(),
3188            addresses: vec![peer2_addr.clone()],
3189            connected_at: Instant::now(),
3190            last_seen: Instant::now(),
3191            status: ConnectionStatus::Connected,
3192            protocols: vec!["test-protocol".to_string()],
3193            heartbeat_count: 0,
3194        };
3195
3196        node.peers
3197            .write()
3198            .await
3199            .insert(peer1_id.clone(), peer1_info);
3200        node.peers
3201            .write()
3202            .await
3203            .insert(peer2_id.clone(), peer2_info);
3204
3205        // Test: Find each peer by their unique address
3206        let found_peer1 = node.get_peer_id_by_address(&peer1_addr).await;
3207        let found_peer2 = node.get_peer_id_by_address(&peer2_addr).await;
3208
3209        assert_eq!(found_peer1, Some(peer1_id));
3210        assert_eq!(found_peer2, Some(peer2_id));
3211
3212        Ok(())
3213    }
3214
3215    #[tokio::test]
3216    async fn test_list_active_connections_empty() -> Result<()> {
3217        let config = create_test_node_config();
3218        let node = P2PNode::new(config).await?;
3219
3220        // Test: No connections initially
3221        let connections = node.list_active_connections().await;
3222        assert!(connections.is_empty());
3223
3224        Ok(())
3225    }
3226
3227    #[tokio::test]
3228    async fn test_list_active_connections_with_peers() -> Result<()> {
3229        let config = create_test_node_config();
3230        let node = P2PNode::new(config).await?;
3231
3232        // Add multiple peers
3233        let peer1_id = "peer_1".to_string();
3234        let peer1_addrs = vec![
3235            "192.168.1.101:9001".to_string(),
3236            "192.168.1.101:9002".to_string(),
3237        ];
3238
3239        let peer2_id = "peer_2".to_string();
3240        let peer2_addrs = vec!["192.168.1.102:9003".to_string()];
3241
3242        let peer1_info = PeerInfo {
3243            peer_id: peer1_id.clone(),
3244            addresses: peer1_addrs.clone(),
3245            connected_at: Instant::now(),
3246            last_seen: Instant::now(),
3247            status: ConnectionStatus::Connected,
3248            protocols: vec!["test-protocol".to_string()],
3249            heartbeat_count: 0,
3250        };
3251
3252        let peer2_info = PeerInfo {
3253            peer_id: peer2_id.clone(),
3254            addresses: peer2_addrs.clone(),
3255            connected_at: Instant::now(),
3256            last_seen: Instant::now(),
3257            status: ConnectionStatus::Connected,
3258            protocols: vec!["test-protocol".to_string()],
3259            heartbeat_count: 0,
3260        };
3261
3262        node.peers
3263            .write()
3264            .await
3265            .insert(peer1_id.clone(), peer1_info);
3266        node.peers
3267            .write()
3268            .await
3269            .insert(peer2_id.clone(), peer2_info);
3270
3271        // Test: List all active connections
3272        let connections = node.list_active_connections().await;
3273        assert_eq!(connections.len(), 2);
3274
3275        // Verify peer1 and peer2 are in the list
3276        let peer1_conn = connections.iter().find(|(id, _)| id == &peer1_id);
3277        let peer2_conn = connections.iter().find(|(id, _)| id == &peer2_id);
3278
3279        assert!(peer1_conn.is_some());
3280        assert!(peer2_conn.is_some());
3281
3282        // Verify addresses match
3283        assert_eq!(peer1_conn.unwrap().1, peer1_addrs);
3284        assert_eq!(peer2_conn.unwrap().1, peer2_addrs);
3285
3286        Ok(())
3287    }
3288
3289    #[tokio::test]
3290    async fn test_remove_peer_success() -> Result<()> {
3291        let config = create_test_node_config();
3292        let node = P2PNode::new(config).await?;
3293
3294        // Add a peer
3295        let peer_id = "peer_to_remove".to_string();
3296        let peer_info = PeerInfo {
3297            peer_id: peer_id.clone(),
3298            addresses: vec!["192.168.1.100:9000".to_string()],
3299            connected_at: Instant::now(),
3300            last_seen: Instant::now(),
3301            status: ConnectionStatus::Connected,
3302            protocols: vec!["test-protocol".to_string()],
3303            heartbeat_count: 0,
3304        };
3305
3306        node.peers.write().await.insert(peer_id.clone(), peer_info);
3307
3308        // Verify peer exists
3309        assert!(node.is_peer_connected(&peer_id).await);
3310
3311        // Remove the peer
3312        let removed = node.remove_peer(&peer_id).await;
3313        assert!(removed);
3314
3315        // Verify peer no longer exists
3316        assert!(!node.is_peer_connected(&peer_id).await);
3317
3318        Ok(())
3319    }
3320
3321    #[tokio::test]
3322    async fn test_remove_peer_nonexistent() -> Result<()> {
3323        let config = create_test_node_config();
3324        let node = P2PNode::new(config).await?;
3325
3326        // Try to remove a peer that doesn't exist
3327        let removed = node.remove_peer(&"nonexistent_peer".to_string()).await;
3328        assert!(!removed);
3329
3330        Ok(())
3331    }
3332
3333    #[tokio::test]
3334    async fn test_is_peer_connected() -> Result<()> {
3335        let config = create_test_node_config();
3336        let node = P2PNode::new(config).await?;
3337
3338        let peer_id = "test_peer".to_string();
3339
3340        // Initially not connected
3341        assert!(!node.is_peer_connected(&peer_id).await);
3342
3343        // Add peer
3344        let peer_info = PeerInfo {
3345            peer_id: peer_id.clone(),
3346            addresses: vec!["192.168.1.100:9000".to_string()],
3347            connected_at: Instant::now(),
3348            last_seen: Instant::now(),
3349            status: ConnectionStatus::Connected,
3350            protocols: vec!["test-protocol".to_string()],
3351            heartbeat_count: 0,
3352        };
3353
3354        node.peers.write().await.insert(peer_id.clone(), peer_info);
3355
3356        // Now connected
3357        assert!(node.is_peer_connected(&peer_id).await);
3358
3359        // Remove peer
3360        node.remove_peer(&peer_id).await;
3361
3362        // No longer connected
3363        assert!(!node.is_peer_connected(&peer_id).await);
3364
3365        Ok(())
3366    }
3367
3368    #[test]
3369    fn test_normalize_ipv6_wildcard() {
3370        use std::net::{IpAddr, Ipv6Addr, SocketAddr};
3371
3372        let wildcard = SocketAddr::new(IpAddr::V6(Ipv6Addr::UNSPECIFIED), 8080);
3373        let normalized = normalize_wildcard_to_loopback(wildcard);
3374
3375        assert_eq!(normalized.ip(), IpAddr::V6(Ipv6Addr::LOCALHOST));
3376        assert_eq!(normalized.port(), 8080);
3377    }
3378
3379    #[test]
3380    fn test_normalize_ipv4_wildcard() {
3381        use std::net::{IpAddr, Ipv4Addr, SocketAddr};
3382
3383        let wildcard = SocketAddr::new(IpAddr::V4(Ipv4Addr::UNSPECIFIED), 9000);
3384        let normalized = normalize_wildcard_to_loopback(wildcard);
3385
3386        assert_eq!(normalized.ip(), IpAddr::V4(Ipv4Addr::LOCALHOST));
3387        assert_eq!(normalized.port(), 9000);
3388    }
3389
3390    #[test]
3391    fn test_normalize_specific_address_unchanged() {
3392        let specific: std::net::SocketAddr = "192.168.1.100:3000".parse().unwrap();
3393        let normalized = normalize_wildcard_to_loopback(specific);
3394
3395        assert_eq!(normalized, specific);
3396    }
3397
3398    #[test]
3399    fn test_normalize_loopback_unchanged() {
3400        use std::net::{IpAddr, Ipv4Addr, Ipv6Addr, SocketAddr};
3401
3402        let loopback_v6 = SocketAddr::new(IpAddr::V6(Ipv6Addr::LOCALHOST), 5000);
3403        let normalized_v6 = normalize_wildcard_to_loopback(loopback_v6);
3404        assert_eq!(normalized_v6, loopback_v6);
3405
3406        let loopback_v4 = SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), 5000);
3407        let normalized_v4 = normalize_wildcard_to_loopback(loopback_v4);
3408        assert_eq!(normalized_v4, loopback_v4);
3409    }
3410}