saorsa_core/
network.rs

1// Copyright 2024 Saorsa Labs Limited
2//
3// This software is dual-licensed under:
4// - GNU Affero General Public License v3.0 or later (AGPL-3.0-or-later)
5// - Commercial License
6//
7// For AGPL-3.0 license, see LICENSE-AGPL-3.0
8// For commercial licensing, contact: saorsalabs@gmail.com
9//
10// Unless required by applicable law or agreed to in writing, software
11// distributed under these licenses is distributed on an "AS IS" BASIS,
12// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
14//! Network module
15//!
16//! This module provides core networking functionality for the P2P Foundation.
17//! It handles peer connections, network events, and node lifecycle management.
18
19use crate::bootstrap::{BootstrapManager, ContactEntry, QualityMetrics};
20use crate::config::Config;
21use crate::dht::DHT;
22use crate::error::{NetworkError, P2PError, P2pResult as Result};
23
24use crate::production::{ProductionConfig, ResourceManager, ResourceMetrics};
25use crate::transport::ant_quic_adapter::DualStackNetworkNode;
26#[allow(unused_imports)] // Temporarily unused during migration
27use crate::transport::{TransportOptions, TransportType};
28use crate::validation::RateLimitConfig;
29use crate::validation::RateLimiter;
30use crate::{NetworkAddress, PeerId};
31use serde::{Deserialize, Serialize};
32use serde_json::json;
33use std::collections::{HashMap, HashSet};
34use std::sync::atomic::{AtomicBool, Ordering};
35use std::sync::Arc;
36use std::time::Duration;
37use tokio::sync::{RwLock, broadcast};
38use tokio::time::Instant;
39use tracing::{debug, info, trace, warn};
40
41/// Configuration for a P2P node
42#[derive(Debug, Clone, Serialize, Deserialize)]
43pub struct NodeConfig {
44    /// Local peer ID for this node
45    pub peer_id: Option<PeerId>,
46
47    /// Addresses to listen on for incoming connections
48    pub listen_addrs: Vec<std::net::SocketAddr>,
49
50    /// Primary listen address (for compatibility)
51    pub listen_addr: std::net::SocketAddr,
52
53    /// Bootstrap peers to connect to on startup (legacy)
54    pub bootstrap_peers: Vec<std::net::SocketAddr>,
55
56    /// Bootstrap peers as strings (for integration tests)
57    pub bootstrap_peers_str: Vec<String>,
58
59    /// Enable IPv6 support
60    pub enable_ipv6: bool,
61
62    // MCP removed; will be redesigned later
63    /// Connection timeout duration
64    pub connection_timeout: Duration,
65
66    /// Keep-alive interval for connections
67    pub keep_alive_interval: Duration,
68
69    /// Maximum number of concurrent connections
70    pub max_connections: usize,
71
72    /// Maximum number of incoming connections
73    pub max_incoming_connections: usize,
74
75    /// DHT configuration
76    pub dht_config: DHTConfig,
77
78    /// Security configuration
79    pub security_config: SecurityConfig,
80
81    /// Production hardening configuration
82    pub production_config: Option<ProductionConfig>,
83
84    /// Bootstrap cache configuration
85    pub bootstrap_cache_config: Option<crate::bootstrap::CacheConfig>,
86}
87
88/// DHT-specific configuration
89#[derive(Debug, Clone, Serialize, Deserialize)]
90pub struct DHTConfig {
91    /// Kademlia K parameter (bucket size)
92    pub k_value: usize,
93
94    /// Kademlia alpha parameter (parallelism)
95    pub alpha_value: usize,
96
97    /// DHT record TTL
98    pub record_ttl: Duration,
99
100    /// DHT refresh interval
101    pub refresh_interval: Duration,
102}
103
104/// Security configuration
105#[derive(Debug, Clone, Serialize, Deserialize)]
106pub struct SecurityConfig {
107    /// Enable noise protocol for encryption
108    pub enable_noise: bool,
109
110    /// Enable TLS for secure transport
111    pub enable_tls: bool,
112
113    /// Trust level for peer verification
114    pub trust_level: TrustLevel,
115}
116
117/// Trust level for peer verification
118#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
119pub enum TrustLevel {
120    /// No verification required
121    None,
122    /// Basic peer ID verification
123    Basic,
124    /// Full cryptographic verification
125    Full,
126}
127
128impl NodeConfig {
129    /// Create a new NodeConfig with default values
130    ///
131    /// # Errors
132    ///
133    /// Returns an error if default addresses cannot be parsed
134    pub fn new() -> Result<Self> {
135        // Load config and use its defaults
136        let config = Config::default();
137
138        // Parse the default listen address
139        let listen_addr = config.listen_socket_addr()?;
140
141        // Create listen addresses based on config
142        let mut listen_addrs = vec![];
143
144        // Add IPv6 address if enabled
145        if config.network.ipv6_enabled {
146            let ipv6_addr = std::net::SocketAddr::new(
147                std::net::IpAddr::V6(std::net::Ipv6Addr::UNSPECIFIED),
148                listen_addr.port(),
149            );
150            listen_addrs.push(ipv6_addr);
151        }
152
153        // Always add IPv4
154        let ipv4_addr = std::net::SocketAddr::new(
155            std::net::IpAddr::V4(std::net::Ipv4Addr::UNSPECIFIED),
156            listen_addr.port(),
157        );
158        listen_addrs.push(ipv4_addr);
159
160        Ok(Self {
161            peer_id: None,
162            listen_addrs,
163            listen_addr,
164            bootstrap_peers: Vec::new(),
165            bootstrap_peers_str: config.network.bootstrap_nodes.clone(),
166            enable_ipv6: config.network.ipv6_enabled,
167
168            connection_timeout: Duration::from_secs(config.network.connection_timeout),
169            keep_alive_interval: Duration::from_secs(config.network.keepalive_interval),
170            max_connections: config.network.max_connections,
171            max_incoming_connections: config.security.connection_limit as usize,
172            dht_config: DHTConfig::default(),
173            security_config: SecurityConfig::default(),
174            production_config: None,
175            bootstrap_cache_config: None,
176            // identity_config: None,
177        })
178    }
179}
180
181impl Default for NodeConfig {
182    fn default() -> Self {
183        // Use config defaults for network settings
184        let config = Config::default();
185
186        // Parse the default listen address - use safe fallback if parsing fails
187        let listen_addr = config.listen_socket_addr().unwrap_or_else(|_| {
188            std::net::SocketAddr::new(
189                std::net::IpAddr::V4(std::net::Ipv4Addr::new(0, 0, 0, 0)),
190                9000,
191            )
192        });
193
194        Self {
195            peer_id: None,
196            listen_addrs: vec![
197                std::net::SocketAddr::new(
198                    std::net::IpAddr::V6(std::net::Ipv6Addr::UNSPECIFIED),
199                    listen_addr.port(),
200                ),
201                std::net::SocketAddr::new(
202                    std::net::IpAddr::V4(std::net::Ipv4Addr::UNSPECIFIED),
203                    listen_addr.port(),
204                ),
205            ],
206            listen_addr,
207            bootstrap_peers: Vec::new(),
208            bootstrap_peers_str: Vec::new(),
209            enable_ipv6: config.network.ipv6_enabled,
210
211            connection_timeout: Duration::from_secs(config.network.connection_timeout),
212            keep_alive_interval: Duration::from_secs(config.network.keepalive_interval),
213            max_connections: config.network.max_connections,
214            max_incoming_connections: config.security.connection_limit as usize,
215            dht_config: DHTConfig::default(),
216            security_config: SecurityConfig::default(),
217            production_config: None, // Use default production config if enabled
218            bootstrap_cache_config: None,
219            // identity_config: None, // Use default identity config if enabled
220        }
221    }
222}
223
224impl NodeConfig {
225    /// Create NodeConfig from Config
226    pub fn from_config(config: &Config) -> Result<Self> {
227        let listen_addr = config.listen_socket_addr()?;
228        let bootstrap_addrs = config.bootstrap_addrs()?;
229
230        let mut node_config = Self {
231            peer_id: None,
232            listen_addrs: vec![listen_addr],
233            listen_addr,
234            bootstrap_peers: bootstrap_addrs
235                .iter()
236                .map(|addr| addr.socket_addr())
237                .collect(),
238            bootstrap_peers_str: config
239                .network
240                .bootstrap_nodes
241                .iter()
242                .map(|addr| addr.to_string())
243                .collect(),
244            enable_ipv6: config.network.ipv6_enabled,
245
246            connection_timeout: Duration::from_secs(config.network.connection_timeout),
247            keep_alive_interval: Duration::from_secs(config.network.keepalive_interval),
248            max_connections: config.network.max_connections,
249            max_incoming_connections: config.security.connection_limit as usize,
250            dht_config: DHTConfig {
251                k_value: 20,
252                alpha_value: 3,
253                record_ttl: Duration::from_secs(3600),
254                refresh_interval: Duration::from_secs(900),
255            },
256            security_config: SecurityConfig {
257                enable_noise: true,
258                enable_tls: true,
259                trust_level: TrustLevel::Basic,
260            },
261            production_config: Some(ProductionConfig {
262                max_connections: config.network.max_connections,
263                max_memory_bytes: 0,  // unlimited
264                max_bandwidth_bps: 0, // unlimited
265                connection_timeout: Duration::from_secs(config.network.connection_timeout),
266                keep_alive_interval: Duration::from_secs(config.network.keepalive_interval),
267                health_check_interval: Duration::from_secs(30),
268                metrics_interval: Duration::from_secs(60),
269                enable_performance_tracking: true,
270                enable_auto_cleanup: true,
271                shutdown_timeout: Duration::from_secs(30),
272                rate_limits: crate::production::RateLimitConfig::default(),
273            }),
274            bootstrap_cache_config: None,
275            // identity_config: Some(IdentityManagerConfig {
276            //     cache_ttl: Duration::from_secs(3600),
277            //     challenge_timeout: Duration::from_secs(30),
278            // }),
279        };
280
281        // Add IPv6 listen address if enabled
282        if config.network.ipv6_enabled {
283            node_config.listen_addrs.push(std::net::SocketAddr::new(
284                std::net::IpAddr::V6(std::net::Ipv6Addr::UNSPECIFIED),
285                listen_addr.port(),
286            ));
287        }
288
289        Ok(node_config)
290    }
291
292    /// Try to build a NodeConfig from a listen address string
293    pub fn with_listen_addr(addr: &str) -> Result<Self> {
294        let listen_addr: std::net::SocketAddr = addr
295            .parse()
296            .map_err(|e: std::net::AddrParseError| {
297                NetworkError::InvalidAddress(e.to_string().into())
298            })
299            .map_err(P2PError::Network)?;
300        let cfg = NodeConfig {
301            listen_addr,
302            listen_addrs: vec![listen_addr],
303            ..Default::default()
304        };
305        Ok(cfg)
306    }
307}
308
309impl Default for DHTConfig {
310    fn default() -> Self {
311        Self {
312            k_value: 20,
313            alpha_value: 5,
314            record_ttl: Duration::from_secs(3600), // 1 hour
315            refresh_interval: Duration::from_secs(600), // 10 minutes
316        }
317    }
318}
319
320impl Default for SecurityConfig {
321    fn default() -> Self {
322        Self {
323            enable_noise: true,
324            enable_tls: true,
325            trust_level: TrustLevel::Basic,
326        }
327    }
328}
329
330/// Information about a connected peer
331#[derive(Debug, Clone)]
332pub struct PeerInfo {
333    /// Peer identifier
334    pub peer_id: PeerId,
335
336    /// Peer's addresses
337    pub addresses: Vec<String>,
338
339    /// Connection timestamp
340    pub connected_at: Instant,
341
342    /// Last seen timestamp
343    pub last_seen: Instant,
344
345    /// Connection status
346    pub status: ConnectionStatus,
347
348    /// Supported protocols
349    pub protocols: Vec<String>,
350
351    /// Number of heartbeats received
352    pub heartbeat_count: u64,
353}
354
355/// Connection status for a peer
356#[derive(Debug, Clone, PartialEq)]
357pub enum ConnectionStatus {
358    /// Connection is being established
359    Connecting,
360    /// Connection is established and active
361    Connected,
362    /// Connection is being closed
363    Disconnecting,
364    /// Connection is closed
365    Disconnected,
366    /// Connection failed
367    Failed(String),
368}
369
370/// Network events that can occur
371#[derive(Debug, Clone)]
372pub enum NetworkEvent {
373    /// A new peer has connected
374    PeerConnected {
375        /// The identifier of the newly connected peer
376        peer_id: PeerId,
377        /// The network addresses where the peer can be reached
378        addresses: Vec<String>,
379    },
380
381    /// A peer has disconnected
382    PeerDisconnected {
383        /// The identifier of the disconnected peer
384        peer_id: PeerId,
385        /// The reason for the disconnection
386        reason: String,
387    },
388
389    /// A message was received from a peer
390    MessageReceived {
391        /// The identifier of the sending peer
392        peer_id: PeerId,
393        /// The protocol used for the message
394        protocol: String,
395        /// The raw message data
396        data: Vec<u8>,
397    },
398
399    /// A connection attempt failed
400    ConnectionFailed {
401        /// The identifier of the peer (if known)
402        peer_id: Option<PeerId>,
403        /// The address where connection was attempted
404        address: String,
405        /// The error message describing the failure
406        error: String,
407    },
408
409    /// DHT record was stored
410    DHTRecordStored {
411        /// The DHT key where the record was stored
412        key: Vec<u8>,
413        /// The value that was stored
414        value: Vec<u8>,
415    },
416
417    /// DHT record was retrieved
418    DHTRecordRetrieved {
419        /// The DHT key that was queried
420        key: Vec<u8>,
421        /// The retrieved value, if found
422        value: Option<Vec<u8>>,
423    },
424}
425
426/// Network events that can occur in the P2P system
427///
428/// Events are broadcast to all listeners and provide real-time
429/// notifications of network state changes and message arrivals.
430#[derive(Debug, Clone)]
431pub enum P2PEvent {
432    /// Message received from a peer on a specific topic
433    Message {
434        /// Topic or channel the message was sent on
435        topic: String,
436        /// Peer ID of the message sender
437        source: PeerId,
438        /// Raw message data payload
439        data: Vec<u8>,
440    },
441    /// A new peer has connected to the network
442    PeerConnected(PeerId),
443    /// A peer has disconnected from the network
444    PeerDisconnected(PeerId),
445}
446
447/// Main P2P node structure
448/// Main P2P network node that manages connections, routing, and communication
449///
450/// This struct represents a complete P2P network participant that can:
451/// - Connect to other peers via QUIC transport
452/// - Participate in distributed hash table (DHT) operations
453/// - Send and receive messages through various protocols
454/// - Handle network events and peer lifecycle
455/// - Provide MCP (Model Context Protocol) services
456pub struct P2PNode {
457    /// Node configuration
458    config: NodeConfig,
459
460    /// Our peer ID
461    peer_id: PeerId,
462
463    /// Connected peers
464    peers: Arc<RwLock<HashMap<PeerId, PeerInfo>>>,
465
466    /// Network event broadcaster
467    event_tx: broadcast::Sender<P2PEvent>,
468
469    /// Listen addresses
470    listen_addrs: RwLock<Vec<std::net::SocketAddr>>,
471
472    /// Node start time
473    start_time: Instant,
474
475    /// Running state
476    running: RwLock<bool>,
477
478    /// DHT instance (optional)
479    dht: Option<Arc<RwLock<DHT>>>,
480
481    /// Production resource manager (optional)
482    resource_manager: Option<Arc<ResourceManager>>,
483
484    /// Bootstrap cache manager for peer discovery
485    bootstrap_manager: Option<Arc<RwLock<BootstrapManager>>>,
486
487    /// Dual-stack ant-quic nodes (IPv6 + IPv4) with Happy Eyeballs dialing
488    dual_node: Arc<DualStackNetworkNode>,
489
490    /// Rate limiter for connection and request throttling
491    #[allow(dead_code)]
492    rate_limiter: Arc<RateLimiter>,
493
494    /// Active connections (tracked by peer_id)
495    /// This set is synchronized with ant-quic's connection state via event monitoring
496    active_connections: Arc<RwLock<HashSet<PeerId>>>,
497
498    /// Connection lifecycle monitor task handle
499    connection_monitor_handle: Arc<RwLock<Option<tokio::task::JoinHandle<()>>>>,
500
501    /// Keepalive task handle
502    keepalive_handle: Arc<RwLock<Option<tokio::task::JoinHandle<()>>>>,
503
504    /// Shutdown flag for background tasks
505    shutdown: Arc<AtomicBool>,
506}
507
508/// Normalize wildcard bind addresses to localhost loopback addresses
509///
510/// ant-quic correctly rejects "unspecified" addresses (0.0.0.0 and [::]) for remote connections
511/// because you cannot connect TO an unspecified address - these are only valid for BINDING.
512///
513/// This function converts wildcard addresses to appropriate loopback addresses for local connections:
514/// - IPv6 [::]:port → ::1:port (IPv6 loopback)
515/// - IPv4 0.0.0.0:port → 127.0.0.1:port (IPv4 loopback)
516/// - All other addresses pass through unchanged
517///
518/// # Arguments
519/// * `addr` - The SocketAddr to normalize
520///
521/// # Returns
522/// * Normalized SocketAddr suitable for remote connections
523fn normalize_wildcard_to_loopback(addr: std::net::SocketAddr) -> std::net::SocketAddr {
524    use std::net::{IpAddr, Ipv4Addr, Ipv6Addr};
525
526    if addr.ip().is_unspecified() {
527        // Convert unspecified addresses to loopback
528        let loopback_ip = match addr {
529            std::net::SocketAddr::V6(_) => IpAddr::V6(Ipv6Addr::LOCALHOST), // ::1
530            std::net::SocketAddr::V4(_) => IpAddr::V4(Ipv4Addr::LOCALHOST),  // 127.0.0.1
531        };
532        std::net::SocketAddr::new(loopback_ip, addr.port())
533    } else {
534        // Not a wildcard address, pass through unchanged
535        addr
536    }
537}
538
539impl P2PNode {
540    /// Minimal constructor for tests that avoids real networking
541    pub fn new_for_tests() -> Result<Self> {
542        let (event_tx, _) = broadcast::channel(16);
543        Ok(Self {
544            config: NodeConfig::default(),
545            peer_id: "test_peer".to_string(),
546            peers: Arc::new(RwLock::new(HashMap::new())),
547            event_tx,
548            listen_addrs: RwLock::new(Vec::new()),
549            start_time: Instant::now(),
550            running: RwLock::new(false),
551            dht: None,
552            resource_manager: None,
553            bootstrap_manager: None,
554            dual_node: {
555                // Bind dual-stack nodes on ephemeral ports for tests
556                let v6: Option<std::net::SocketAddr> = "[::1]:0"
557                    .parse()
558                    .ok()
559                    .or(Some(std::net::SocketAddr::from(([0, 0, 0, 0], 0))));
560                let v4: Option<std::net::SocketAddr> = "127.0.0.1:0".parse().ok();
561                let handle = tokio::runtime::Handle::current();
562                let dual_attempt = handle.block_on(
563                    crate::transport::ant_quic_adapter::DualStackNetworkNode::new(v6, v4),
564                );
565                let dual = match dual_attempt {
566                    Ok(d) => d,
567                    Err(_e1) => {
568                        // Fallback to IPv4-only ephemeral bind
569                        let fallback = handle.block_on(
570                            crate::transport::ant_quic_adapter::DualStackNetworkNode::new(
571                                None,
572                                "127.0.0.1:0".parse().ok(),
573                            ),
574                        );
575                        match fallback {
576                            Ok(d) => d,
577                            Err(e2) => {
578                                return Err(P2PError::Network(NetworkError::BindError(
579                                    format!("Failed to create dual-stack network node: {}", e2)
580                                        .into(),
581                                )));
582                            }
583                        }
584                    }
585                };
586                Arc::new(dual)
587            },
588            rate_limiter: Arc::new(RateLimiter::new(RateLimitConfig {
589                max_requests: 100,
590                burst_size: 100,
591                window: std::time::Duration::from_secs(1),
592                ..Default::default()
593            })),
594            active_connections: Arc::new(RwLock::new(HashSet::new())),
595            connection_monitor_handle: Arc::new(RwLock::new(None)),
596            keepalive_handle: Arc::new(RwLock::new(None)),
597            shutdown: Arc::new(AtomicBool::new(false)),
598        })
599    }
600    /// Create a new P2P node with the given configuration
601    pub async fn new(config: NodeConfig) -> Result<Self> {
602        let peer_id = config.peer_id.clone().unwrap_or_else(|| {
603            // Generate a random peer ID for now
604            format!("peer_{}", &uuid::Uuid::new_v4().to_string()[..8])
605        });
606
607        let (event_tx, _) = broadcast::channel(1000);
608
609        // Initialize and register a TrustWeightedKademlia DHT for the global API
610        // Use a deterministic local NodeId derived from the peer_id
611        {
612            use blake3::Hasher;
613            let mut hasher = Hasher::new();
614            hasher.update(peer_id.as_bytes());
615            let digest = hasher.finalize();
616            let mut nid = [0u8; 32];
617            nid.copy_from_slice(digest.as_bytes());
618            let _twdht = std::sync::Arc::new(crate::dht::TrustWeightedKademlia::new(
619                crate::identity::node_identity::NodeId::from_bytes(nid),
620            ));
621            // TODO: Update to use new clean API
622            // let _ = crate::api::set_dht_instance(twdht);
623        }
624
625        // Initialize DHT if needed
626        let dht = if true {
627            // Always enable DHT for now
628            let _dht_config = crate::dht::DHTConfig {
629                replication_factor: config.dht_config.k_value,
630                bucket_size: config.dht_config.k_value,
631                alpha: config.dht_config.alpha_value,
632                record_ttl: config.dht_config.record_ttl,
633                bucket_refresh_interval: config.dht_config.refresh_interval,
634                republish_interval: config.dht_config.refresh_interval,
635                max_distance: 160, // 160 bits for SHA-256
636            };
637            // Convert peer_id String to NodeId
638            let peer_bytes = peer_id.as_bytes();
639            let mut node_id_bytes = [0u8; 32];
640            let len = peer_bytes.len().min(32);
641            node_id_bytes[..len].copy_from_slice(&peer_bytes[..len]);
642            let node_id = crate::dht::core_engine::NodeId::from_bytes(node_id_bytes);
643            let dht_instance = DHT::new(node_id).map_err(|e| {
644                crate::error::P2PError::Dht(crate::error::DhtError::StoreFailed(
645                    e.to_string().into(),
646                ))
647            })?;
648            Some(Arc::new(RwLock::new(dht_instance)))
649        } else {
650            None
651        };
652
653        // MCP removed
654
655        // Initialize production resource manager if configured
656        let resource_manager = config
657            .production_config
658            .clone()
659            .map(|prod_config| Arc::new(ResourceManager::new(prod_config)));
660
661        // Initialize bootstrap cache manager
662        let bootstrap_manager = if let Some(ref cache_config) = config.bootstrap_cache_config {
663            match BootstrapManager::with_config(cache_config.clone()).await {
664                Ok(manager) => Some(Arc::new(RwLock::new(manager))),
665                Err(e) => {
666                    warn!(
667                        "Failed to initialize bootstrap manager: {}, continuing without cache",
668                        e
669                    );
670                    None
671                }
672            }
673        } else {
674            match BootstrapManager::new().await {
675                Ok(manager) => Some(Arc::new(RwLock::new(manager))),
676                Err(e) => {
677                    warn!(
678                        "Failed to initialize bootstrap manager: {}, continuing without cache",
679                        e
680                    );
681                    None
682                }
683            }
684        };
685
686        // Initialize dual-stack ant-quic nodes
687        let (v6_opt, v4_opt) = if !config.listen_addrs.is_empty() {
688            let v6_addr = config.listen_addrs.iter().find(|a| a.is_ipv6()).cloned();
689            let v4_addr = config.listen_addrs.iter().find(|a| a.is_ipv4()).cloned();
690            (v6_addr, v4_addr)
691        } else {
692            // Defaults: always listen on IPv4; IPv6 if enabled
693            let v4_addr = Some(std::net::SocketAddr::new(
694                std::net::IpAddr::V4(std::net::Ipv4Addr::UNSPECIFIED),
695                config.listen_addr.port(),
696            ));
697            let v6_addr = if config.enable_ipv6 {
698                Some(std::net::SocketAddr::new(
699                    std::net::IpAddr::V6(std::net::Ipv6Addr::UNSPECIFIED),
700                    config.listen_addr.port(),
701                ))
702            } else {
703                None
704            };
705            (v6_addr, v4_addr)
706        };
707
708        let dual_node = Arc::new(
709            DualStackNetworkNode::new(v6_opt, v4_opt)
710                .await
711                .map_err(|e| {
712                    P2PError::Transport(crate::error::TransportError::SetupFailed(
713                        format!("Failed to create dual-stack network nodes: {}", e).into(),
714                    ))
715                })?,
716        );
717
718        // Initialize rate limiter with default config
719        let rate_limiter = Arc::new(RateLimiter::new(
720            crate::validation::RateLimitConfig::default(),
721        ));
722
723        // Create active connections tracker
724        let active_connections = Arc::new(RwLock::new(HashSet::new()));
725
726        // Start connection lifecycle monitor
727        let connection_monitor_handle = {
728            let active_conns = Arc::clone(&active_connections);
729            let peers_map = Arc::new(RwLock::new(HashMap::new())); // Will be replaced with actual peers after node creation
730            let event_tx_clone = event_tx.clone();
731            let dual_node_clone = Arc::clone(&dual_node);
732
733            let handle = tokio::spawn(async move {
734                Self::connection_lifecycle_monitor(
735                    dual_node_clone,
736                    active_conns,
737                    peers_map,
738                    event_tx_clone,
739                ).await;
740            });
741
742            Arc::new(RwLock::new(Some(handle)))
743        };
744
745        // Spawn keepalive task
746        let shutdown = Arc::new(AtomicBool::new(false));
747        let keepalive_handle = {
748            let active_conns = Arc::clone(&active_connections);
749            let dual_node_clone = Arc::clone(&dual_node);
750            let shutdown_clone = Arc::clone(&shutdown);
751
752            let handle = tokio::spawn(async move {
753                Self::keepalive_task(
754                    active_conns,
755                    dual_node_clone,
756                    shutdown_clone,
757                ).await;
758            });
759
760            Arc::new(RwLock::new(Some(handle)))
761        };
762
763        let node = Self {
764            config,
765            peer_id,
766            peers: Arc::new(RwLock::new(HashMap::new())),
767            event_tx,
768            listen_addrs: RwLock::new(Vec::new()),
769            start_time: Instant::now(),
770            running: RwLock::new(false),
771            dht,
772            resource_manager,
773            bootstrap_manager,
774            dual_node,
775            rate_limiter,
776            active_connections,
777            connection_monitor_handle,
778            keepalive_handle,
779            shutdown,
780        };
781        info!("Created P2P node with peer ID: {}", node.peer_id);
782
783        // Start the network listeners to populate listen addresses
784        node.start_network_listeners().await?;
785
786        // Update the connection monitor with actual peers reference
787        node.start_connection_monitor().await;
788
789        Ok(node)
790    }
791
792    /// Create a new node builder
793    pub fn builder() -> NodeBuilder {
794        NodeBuilder::new()
795    }
796
797    /// Get the peer ID of this node
798    pub fn peer_id(&self) -> &PeerId {
799        &self.peer_id
800    }
801
802    pub fn local_addr(&self) -> Option<String> {
803        self.listen_addrs
804            .try_read()
805            .ok()
806            .and_then(|addrs| addrs.first().map(|a| a.to_string()))
807    }
808
809    pub async fn subscribe(&self, topic: &str) -> Result<()> {
810        // In a real implementation, this would register the topic with the pubsub mechanism.
811        // For now, we just log it.
812        info!("Subscribed to topic: {}", topic);
813        Ok(())
814    }
815
816    pub async fn publish(&self, topic: &str, data: &[u8]) -> Result<()> {
817        info!(
818            "Publishing message to topic: {} ({} bytes)",
819            topic,
820            data.len()
821        );
822
823        // Get list of connected peers
824        let peer_list: Vec<PeerId> = {
825            let peers_guard = self.peers.read().await;
826            peers_guard.keys().cloned().collect()
827        };
828
829        if peer_list.is_empty() {
830            debug!("No peers connected, message will only be sent to local subscribers");
831        } else {
832            // Send message to all connected peers
833            let mut send_count = 0;
834            for peer_id in &peer_list {
835                match self.send_message(peer_id, topic, data.to_vec()).await {
836                    Ok(_) => {
837                        send_count += 1;
838                        debug!("Sent message to peer: {}", peer_id);
839                    }
840                    Err(e) => {
841                        warn!("Failed to send message to peer {}: {}", peer_id, e);
842                    }
843                }
844            }
845            info!(
846                "Published message to {}/{} connected peers",
847                send_count,
848                peer_list.len()
849            );
850        }
851
852        // Also send to local subscribers (for local echo and testing)
853        let event = P2PEvent::Message {
854            topic: topic.to_string(),
855            source: self.peer_id.clone(),
856            data: data.to_vec(),
857        };
858        let _ = self.event_tx.send(event);
859
860        Ok(())
861    }
862
863    /// Get the node configuration
864    pub fn config(&self) -> &NodeConfig {
865        &self.config
866    }
867
868    /// Start the P2P node
869    pub async fn start(&self) -> Result<()> {
870        info!("Starting P2P node...");
871
872        // Start production resource manager if configured
873        if let Some(ref resource_manager) = self.resource_manager {
874            resource_manager.start().await.map_err(|e| {
875                P2PError::Network(crate::error::NetworkError::ProtocolError(
876                    format!("Failed to start resource manager: {e}").into(),
877                ))
878            })?;
879            info!("Production resource manager started");
880        }
881
882        // Start bootstrap manager background tasks
883        if let Some(ref bootstrap_manager) = self.bootstrap_manager {
884            let mut manager = bootstrap_manager.write().await;
885            manager.start_background_tasks().await.map_err(|e| {
886                P2PError::Network(crate::error::NetworkError::ProtocolError(
887                    format!("Failed to start bootstrap manager: {e}").into(),
888                ))
889            })?;
890            info!("Bootstrap cache manager started");
891        }
892
893        // Set running state
894        *self.running.write().await = true;
895
896        // Start listening on configured addresses using transport layer
897        self.start_network_listeners().await?;
898
899        // Log current listen addresses
900        let listen_addrs = self.listen_addrs.read().await;
901        info!("P2P node started on addresses: {:?}", *listen_addrs);
902
903        // MCP removed
904
905        // Start message receiving system
906        self.start_message_receiving_system().await?;
907
908        // Connect to bootstrap peers
909        self.connect_bootstrap_peers().await?;
910
911        Ok(())
912    }
913
914    /// Start network listeners on configured addresses
915    async fn start_network_listeners(&self) -> Result<()> {
916        info!("Starting dual-stack listeners (ant-quic)...");
917        // Update our listen_addrs from the dual node bindings
918        let addrs = self.dual_node.local_addrs().await.map_err(|e| {
919            P2PError::Transport(crate::error::TransportError::SetupFailed(
920                format!("Failed to get local addresses: {}", e).into(),
921            ))
922        })?;
923        {
924            let mut la = self.listen_addrs.write().await;
925            *la = addrs.clone();
926        }
927
928        // Spawn a background accept loop that handles incoming connections from either stack
929        let event_tx = self.event_tx.clone();
930        let peers = self.peers.clone();
931        let rate_limiter = self.rate_limiter.clone();
932        let dual = self.dual_node.clone();
933        tokio::spawn(async move {
934            loop {
935                match dual.accept_any().await {
936                    Ok((ant_peer_id, remote_sock)) => {
937                        let peer_id =
938                            crate::transport::ant_quic_adapter::ant_peer_id_to_string(&ant_peer_id);
939                        let remote_addr = NetworkAddress::from(remote_sock);
940                        // Optional: basic IP rate limiting
941                        let _ = rate_limiter.check_ip(&remote_sock.ip());
942                        let _ = event_tx.send(P2PEvent::PeerConnected(peer_id.clone()));
943                        register_new_peer(&peers, &peer_id, &remote_addr).await;
944                    }
945                    Err(e) => {
946                        warn!("Accept failed: {}", e);
947                        tokio::time::sleep(std::time::Duration::from_millis(200)).await;
948                    }
949                }
950            }
951        });
952
953        info!("Dual-stack listeners active on: {:?}", addrs);
954        Ok(())
955    }
956
957    /// Start a listener on a specific socket address
958    #[allow(dead_code)]
959    async fn start_listener_on_address(&self, addr: std::net::SocketAddr) -> Result<()> {
960        // use crate::transport::{Transport}; // Unused during migration
961
962        // DISABLED during ant-quic migration - TODO: Reimplement using AntQuicAdapter
963        /*
964        // Try QUIC first (preferred transport)
965        match crate::transport::QuicTransport::new(Default::default()) {
966            Ok(quic_transport) => {
967                match quic_transport.listen(NetworkAddress::new(addr)).await {
968                    Ok(listen_addrs) => {
969                        info!("QUIC listener started on {} -> {:?}", addr, listen_addrs);
970
971                        // Store the actual listening addresses in the node
972                        {
973                            let mut node_listen_addrs = self.listen_addrs.write().await;
974                            // Don't clear - accumulate addresses from multiple listeners
975                            node_listen_addrs.push(listen_addrs.socket_addr());
976                        }
977
978                        // Start accepting connections in background
979                        self.start_connection_acceptor(
980                            Arc::new(quic_transport),
981                            addr,
982                            crate::transport::TransportType::QUIC
983                        ).await?;
984
985                        return Ok(());
986                    }
987                    Err(e) => {
988                        warn!("Failed to start QUIC listener on {}: {}", addr, e);
989                    }
990                }
991            }
992            Err(e) => {
993                warn!("Failed to create QUIC transport for listening: {}", e);
994            }
995        }
996        */
997
998        warn!("QUIC transport temporarily disabled during ant-quic migration");
999        // No TCP fallback - QUIC only
1000        Err(crate::P2PError::Transport(
1001            crate::error::TransportError::SetupFailed(
1002                format!(
1003                    "Failed to start QUIC listener on {addr} - transport disabled during migration"
1004                )
1005                .into(),
1006            ),
1007        ))
1008    }
1009
1010    /// Start connection acceptor background task
1011    #[allow(dead_code)] // Deprecated during ant-quic migration
1012    async fn start_connection_acceptor(
1013        &self,
1014        transport: Arc<dyn crate::transport::Transport>,
1015        addr: std::net::SocketAddr,
1016        transport_type: crate::transport::TransportType,
1017    ) -> Result<()> {
1018        info!(
1019            "Starting connection acceptor for {:?} on {}",
1020            transport_type, addr
1021        );
1022
1023        // Clone necessary data for the background task
1024        let event_tx = self.event_tx.clone();
1025        let _peer_id = self.peer_id.clone();
1026        let peers = Arc::clone(&self.peers);
1027        // ant-quic dual-stack node is managed separately; accept loop started in start_network_listeners
1028
1029        let rate_limiter = Arc::clone(&self.rate_limiter);
1030
1031        // Spawn background task to accept incoming connections
1032        tokio::spawn(async move {
1033            loop {
1034                match transport.accept().await {
1035                    Ok(connection) => {
1036                        let remote_addr = connection.remote_addr();
1037                        let connection_peer_id =
1038                            format!("peer_from_{}", remote_addr.to_string().replace(":", "_"));
1039
1040                        // Apply rate limiting for incoming connections
1041                        let socket_addr = remote_addr.socket_addr();
1042                        if check_rate_limit(&rate_limiter, &socket_addr, &remote_addr).is_err() {
1043                            // Connection dropped automatically when it goes out of scope
1044                            continue;
1045                        }
1046
1047                        info!(
1048                            "Accepted {:?} connection from {} (peer: {})",
1049                            transport_type, remote_addr, connection_peer_id
1050                        );
1051
1052                        // Generate peer connected event
1053                        let _ = event_tx.send(P2PEvent::PeerConnected(connection_peer_id.clone()));
1054
1055                        // Store the peer connection
1056                        register_new_peer(&peers, &connection_peer_id, &remote_addr).await;
1057
1058                        // Spawn task to handle this specific connection's messages
1059                        spawn_connection_handler(
1060                            connection,
1061                            connection_peer_id,
1062                            event_tx.clone(),
1063                            Arc::clone(&peers),
1064                        );
1065                    }
1066                    Err(e) => {
1067                        warn!(
1068                            "Failed to accept {:?} connection on {}: {}",
1069                            transport_type, addr, e
1070                        );
1071
1072                        // Brief pause before retrying to avoid busy loop
1073                        tokio::time::sleep(std::time::Duration::from_millis(1000)).await;
1074                    }
1075                }
1076            }
1077        });
1078
1079        info!(
1080            "Connection acceptor background task started for {:?} on {}",
1081            transport_type, addr
1082        );
1083        Ok(())
1084    }
1085
1086    /// Start the message receiving system with background tasks
1087    async fn start_message_receiving_system(&self) -> Result<()> {
1088        info!("Starting message receiving system");
1089        let dual = self.dual_node.clone();
1090        let event_tx = self.event_tx.clone();
1091
1092        tokio::spawn(async move {
1093            loop {
1094                match dual.receive_any().await {
1095                    Ok((_peer_id, bytes)) => {
1096                        // Expect the JSON message wrapper from create_protocol_message
1097                        #[allow(clippy::collapsible_if)]
1098                        if let Ok(value) = serde_json::from_slice::<serde_json::Value>(&bytes) {
1099                            if let (Some(protocol), Some(data), Some(from)) = (
1100                                value.get("protocol").and_then(|v| v.as_str()),
1101                                value.get("data").and_then(|v| v.as_array()),
1102                                value.get("from").and_then(|v| v.as_str()),
1103                            ) {
1104                                let payload: Vec<u8> = data
1105                                    .iter()
1106                                    .filter_map(|v| v.as_u64().map(|n| n as u8))
1107                                    .collect();
1108                                let _ = event_tx.send(P2PEvent::Message {
1109                                    topic: protocol.to_string(),
1110                                    source: from.to_string(),
1111                                    data: payload,
1112                                });
1113                            }
1114                        }
1115                    }
1116                    Err(e) => {
1117                        warn!("Receive error: {}", e);
1118                        tokio::time::sleep(std::time::Duration::from_millis(50)).await;
1119                    }
1120                }
1121            }
1122        });
1123
1124        Ok(())
1125    }
1126
1127    /// Handle a received message and generate appropriate events
1128    #[allow(dead_code)]
1129    async fn handle_received_message(
1130        &self,
1131        message_data: Vec<u8>,
1132        peer_id: &PeerId,
1133        _protocol: &str,
1134        event_tx: &broadcast::Sender<P2PEvent>,
1135    ) -> Result<()> {
1136        // MCP removed: no special protocol handling
1137
1138        // Parse the message format we created in create_protocol_message
1139        match serde_json::from_slice::<serde_json::Value>(&message_data) {
1140            Ok(message) => {
1141                if let (Some(protocol), Some(data), Some(from)) = (
1142                    message.get("protocol").and_then(|v| v.as_str()),
1143                    message.get("data").and_then(|v| v.as_array()),
1144                    message.get("from").and_then(|v| v.as_str()),
1145                ) {
1146                    // Convert data array back to bytes
1147                    let data_bytes: Vec<u8> = data
1148                        .iter()
1149                        .filter_map(|v| v.as_u64().map(|n| n as u8))
1150                        .collect();
1151
1152                    // Generate message event
1153                    let event = P2PEvent::Message {
1154                        topic: protocol.to_string(),
1155                        source: from.to_string(),
1156                        data: data_bytes,
1157                    };
1158
1159                    let _ = event_tx.send(event);
1160                    debug!("Generated message event from peer: {}", peer_id);
1161                }
1162            }
1163            Err(e) => {
1164                warn!("Failed to parse received message from {}: {}", peer_id, e);
1165            }
1166        }
1167
1168        Ok(())
1169    }
1170
1171    // MCP removed
1172
1173    // MCP removed
1174
1175    /// Run the P2P node (blocks until shutdown)
1176    pub async fn run(&self) -> Result<()> {
1177        if !*self.running.read().await {
1178            self.start().await?;
1179        }
1180
1181        info!("P2P node running...");
1182
1183        // Main event loop
1184        loop {
1185            if !*self.running.read().await {
1186                break;
1187            }
1188
1189            // Perform periodic tasks
1190            self.periodic_tasks().await?;
1191
1192            // Sleep for a short interval
1193            tokio::time::sleep(Duration::from_millis(100)).await;
1194        }
1195
1196        info!("P2P node stopped");
1197        Ok(())
1198    }
1199
1200    /// Stop the P2P node
1201    pub async fn stop(&self) -> Result<()> {
1202        info!("Stopping P2P node...");
1203
1204        // Set running state to false
1205        *self.running.write().await = false;
1206
1207        // Disconnect all peers
1208        self.disconnect_all_peers().await?;
1209
1210        // Shutdown production resource manager if configured
1211        if let Some(ref resource_manager) = self.resource_manager {
1212            resource_manager.shutdown().await.map_err(|e| {
1213                P2PError::Network(crate::error::NetworkError::ProtocolError(
1214                    format!("Failed to shutdown resource manager: {e}").into(),
1215                ))
1216            })?;
1217            info!("Production resource manager stopped");
1218        }
1219
1220        info!("P2P node stopped");
1221        Ok(())
1222    }
1223
1224    /// Graceful shutdown alias for tests
1225    pub async fn shutdown(&self) -> Result<()> {
1226        self.stop().await
1227    }
1228
1229    /// Check if the node is running
1230    pub async fn is_running(&self) -> bool {
1231        *self.running.read().await
1232    }
1233
1234    /// Get the current listen addresses
1235    pub async fn listen_addrs(&self) -> Vec<std::net::SocketAddr> {
1236        self.listen_addrs.read().await.clone()
1237    }
1238
1239    /// Get connected peers
1240    pub async fn connected_peers(&self) -> Vec<PeerId> {
1241        self.peers.read().await.keys().cloned().collect()
1242    }
1243
1244    /// Get peer count
1245    pub async fn peer_count(&self) -> usize {
1246        self.peers.read().await.len()
1247    }
1248
1249    /// Get peer info
1250    pub async fn peer_info(&self, peer_id: &PeerId) -> Option<PeerInfo> {
1251        self.peers.read().await.get(peer_id).cloned()
1252    }
1253
1254    /// Get the peer ID for a given socket address, if connected
1255    ///
1256    /// This method searches through all connected peers to find one that has
1257    /// the specified address in its address list.
1258    ///
1259    /// # Arguments
1260    /// * `addr` - The socket address to search for (e.g., "192.168.1.100:9000")
1261    ///
1262    /// # Returns
1263    /// * `Some(PeerId)` - The peer ID if a matching connected peer is found
1264    /// * `None` - If no peer with this address is currently connected
1265    pub async fn get_peer_id_by_address(&self, addr: &str) -> Option<PeerId> {
1266        // Parse the address to a SocketAddr for comparison
1267        let socket_addr: std::net::SocketAddr = addr.parse().ok()?;
1268
1269        let peers = self.peers.read().await;
1270
1271        // Search through all connected peers
1272        for (peer_id, peer_info) in peers.iter() {
1273            // Check if this peer has a matching address
1274            for peer_addr in &peer_info.addresses {
1275                if let Ok(peer_socket) = peer_addr.parse::<std::net::SocketAddr>()
1276                    && peer_socket == socket_addr
1277                {
1278                    return Some(peer_id.clone());
1279                }
1280            }
1281        }
1282
1283        None
1284    }
1285
1286    /// List all active connections with their peer IDs and addresses
1287    ///
1288    /// # Returns
1289    /// A vector of tuples containing (PeerId, Vec<String>) where the Vec<String>
1290    /// contains all known addresses for that peer.
1291    pub async fn list_active_connections(&self) -> Vec<(PeerId, Vec<String>)> {
1292        let peers = self.peers.read().await;
1293
1294        peers
1295            .iter()
1296            .map(|(peer_id, peer_info)| (peer_id.clone(), peer_info.addresses.clone()))
1297            .collect()
1298    }
1299
1300    /// Remove a peer from the peers map
1301    ///
1302    /// This method removes a peer from the internal peers map. It should be used
1303    /// when a connection is no longer valid (e.g., after detecting that the underlying
1304    /// ant-quic connection has closed).
1305    ///
1306    /// # Arguments
1307    /// * `peer_id` - The ID of the peer to remove
1308    ///
1309    /// # Returns
1310    /// `true` if the peer was found and removed, `false` if the peer was not in the map
1311    pub async fn remove_peer(&self, peer_id: &PeerId) -> bool {
1312        // Remove from active connections tracking
1313        self.active_connections.write().await.remove(peer_id);
1314        // Remove from peers map and return whether it existed
1315        self.peers.write().await.remove(peer_id).is_some()
1316    }
1317
1318    /// Check if a peer is connected
1319    ///
1320    /// This method checks if the peer ID exists in the peers map. Note that this
1321    /// only verifies the peer is registered - it does not guarantee the underlying
1322    /// ant-quic connection is still active. For connection validation, use `send_message`
1323    /// which will fail if the connection is closed.
1324    ///
1325    /// # Arguments
1326    /// * `peer_id` - The ID of the peer to check
1327    ///
1328    /// # Returns
1329    /// `true` if the peer exists in the peers map, `false` otherwise
1330    pub async fn is_peer_connected(&self, peer_id: &PeerId) -> bool {
1331        self.peers.read().await.contains_key(peer_id)
1332    }
1333
1334    /// Connect to a peer
1335    pub async fn connect_peer(&self, address: &str) -> Result<PeerId> {
1336        info!("Connecting to peer at: {}", address);
1337
1338        // Check production limits if resource manager is enabled
1339        let _connection_guard = if let Some(ref resource_manager) = self.resource_manager {
1340            Some(resource_manager.acquire_connection().await?)
1341        } else {
1342            None
1343        };
1344
1345        // Parse the address to SocketAddr format
1346        let socket_addr: std::net::SocketAddr = address.parse().map_err(|e| {
1347            P2PError::Network(crate::error::NetworkError::InvalidAddress(
1348                format!("{}: {}", address, e).into(),
1349            ))
1350        })?;
1351
1352        // Normalize wildcard addresses to loopback for local connections
1353        // This converts [::]:port → ::1:port and 0.0.0.0:port → 127.0.0.1:port
1354        let normalized_addr = normalize_wildcard_to_loopback(socket_addr);
1355        if normalized_addr != socket_addr {
1356            info!(
1357                "Normalized wildcard address {} to loopback {}",
1358                socket_addr, normalized_addr
1359            );
1360        }
1361
1362        // Establish a real connection via dual-stack Happy Eyeballs, but cap the wait
1363        let addr_list = vec![normalized_addr];
1364        let peer_id = match tokio::time::timeout(
1365            self.config.connection_timeout,
1366            self.dual_node.connect_happy_eyeballs(&addr_list),
1367        )
1368        .await
1369        {
1370            Ok(Ok(peer)) => {
1371                let connected_peer_id =
1372                    crate::transport::ant_quic_adapter::ant_peer_id_to_string(&peer);
1373                info!("Successfully connected to peer: {}", connected_peer_id);
1374                connected_peer_id
1375            }
1376            Ok(Err(e)) => {
1377                warn!("Failed to connect to peer at {}: {}", address, e);
1378                let sanitized_address = address.replace(['/', ':'], "_");
1379                let demo_peer_id = format!("peer_from_{}", sanitized_address);
1380                warn!(
1381                    "Using demo peer ID: {} (transport connection failed)",
1382                    demo_peer_id
1383                );
1384                demo_peer_id
1385            }
1386            Err(_) => {
1387                warn!(
1388                    "Timed out connecting to peer at {} after {:?}",
1389                    address, self.config.connection_timeout
1390                );
1391                let sanitized_address = address.replace(['/', ':'], "_");
1392                let demo_peer_id = format!("peer_from_{}", sanitized_address);
1393                demo_peer_id
1394            }
1395        };
1396
1397        // Create peer info with connection details
1398        let peer_info = PeerInfo {
1399            peer_id: peer_id.clone(),
1400            addresses: vec![address.to_string()],
1401            connected_at: Instant::now(),
1402            last_seen: Instant::now(),
1403            status: ConnectionStatus::Connected,
1404            protocols: vec!["p2p-foundation/1.0".to_string()],
1405            heartbeat_count: 0,
1406        };
1407
1408        // Store peer information
1409        self.peers.write().await.insert(peer_id.clone(), peer_info);
1410
1411        // Record bandwidth usage if resource manager is enabled
1412        if let Some(ref resource_manager) = self.resource_manager {
1413            resource_manager.record_bandwidth(0, 0); // Placeholder for handshake data
1414        }
1415
1416        // Emit connection event
1417        let _ = self.event_tx.send(P2PEvent::PeerConnected(peer_id.clone()));
1418
1419        info!("Connected to peer: {}", peer_id);
1420        Ok(peer_id)
1421    }
1422
1423    /// Disconnect from a peer
1424    pub async fn disconnect_peer(&self, peer_id: &PeerId) -> Result<()> {
1425        info!("Disconnecting from peer: {}", peer_id);
1426
1427        // Remove from active connections
1428        self.active_connections.write().await.remove(peer_id);
1429
1430        if let Some(mut peer_info) = self.peers.write().await.remove(peer_id) {
1431            peer_info.status = ConnectionStatus::Disconnected;
1432
1433            // Emit event
1434            let _ = self
1435                .event_tx
1436                .send(P2PEvent::PeerDisconnected(peer_id.clone()));
1437
1438            info!("Disconnected from peer: {}", peer_id);
1439        }
1440
1441        Ok(())
1442    }
1443
1444    /// Check if a connection to a peer is actually active (ant-quic level)
1445    pub async fn is_connection_active(&self, peer_id: &PeerId) -> bool {
1446        self.active_connections.read().await.contains(peer_id)
1447    }
1448
1449    /// Send a message to a peer
1450    pub async fn send_message(
1451        &self,
1452        peer_id: &PeerId,
1453        protocol: &str,
1454        data: Vec<u8>,
1455    ) -> Result<()> {
1456        debug!(
1457            "Sending message to peer {} on protocol {}",
1458            peer_id, protocol
1459        );
1460
1461        // Check rate limits if resource manager is enabled
1462        if let Some(ref resource_manager) = self.resource_manager
1463            && !resource_manager
1464                .check_rate_limit(peer_id, "message")
1465                .await?
1466        {
1467            return Err(P2PError::ResourceExhausted(
1468                format!("Rate limit exceeded for peer {}", peer_id).into(),
1469            ));
1470        }
1471
1472        // Check if peer exists in peers map
1473        if !self.peers.read().await.contains_key(peer_id) {
1474            return Err(P2PError::Network(crate::error::NetworkError::PeerNotFound(
1475                peer_id.to_string().into(),
1476            )));
1477        }
1478
1479        // **NEW**: Check if the ant-quic connection is actually active
1480        // This is the critical fix for the connection state synchronization issue
1481        if !self.is_connection_active(peer_id).await {
1482            debug!(
1483                "Connection to peer {} exists in peers map but ant-quic connection is closed",
1484                peer_id
1485            );
1486
1487            // Clean up stale peer entry
1488            self.remove_peer(peer_id).await;
1489
1490            return Err(P2PError::Network(crate::error::NetworkError::ConnectionClosed {
1491                peer_id: peer_id.to_string().into(),
1492            }));
1493        }
1494
1495        // MCP removed: no special-case protocol validation
1496
1497        // Record bandwidth usage if resource manager is enabled
1498        if let Some(ref resource_manager) = self.resource_manager {
1499            resource_manager.record_bandwidth(data.len() as u64, 0);
1500        }
1501
1502        // Create protocol message wrapper
1503        let _message_data = self.create_protocol_message(protocol, data)?;
1504
1505        // Send via ant-quic dual-node
1506        let send_fut = self.dual_node.send_to_peer_string(peer_id, &_message_data);
1507        tokio::time::timeout(self.config.connection_timeout, send_fut)
1508            .await
1509            .map_err(|_| {
1510                P2PError::Transport(crate::error::TransportError::StreamError(
1511                    "Timed out sending message".into(),
1512                ))
1513            })?
1514            .map_err(|e| {
1515                P2PError::Transport(crate::error::TransportError::StreamError(
1516                    e.to_string().into(),
1517                ))
1518            })
1519    }
1520
1521    /// Create a protocol message wrapper
1522    fn create_protocol_message(&self, protocol: &str, data: Vec<u8>) -> Result<Vec<u8>> {
1523        use serde_json::json;
1524
1525        let timestamp = std::time::SystemTime::now()
1526            .duration_since(std::time::UNIX_EPOCH)
1527            .map_err(|e| {
1528                P2PError::Network(NetworkError::ProtocolError(
1529                    format!("System time error: {}", e).into(),
1530                ))
1531            })?
1532            .as_secs();
1533
1534        // Create a simple message format for P2P communication
1535        let message = json!({
1536            "protocol": protocol,
1537            "data": data,
1538            "from": self.peer_id,
1539            "timestamp": timestamp
1540        });
1541
1542        serde_json::to_vec(&message).map_err(|e| {
1543            P2PError::Transport(crate::error::TransportError::StreamError(
1544                format!("Failed to serialize message: {e}").into(),
1545            ))
1546        })
1547    }
1548
1549    // Note: async listen_addrs() already exists above for fetching listen addresses
1550}
1551
1552/// Create a protocol message wrapper (static version for background tasks)
1553#[allow(dead_code)]
1554fn create_protocol_message_static(protocol: &str, data: Vec<u8>) -> Result<Vec<u8>> {
1555    use serde_json::json;
1556
1557    let timestamp = std::time::SystemTime::now()
1558        .duration_since(std::time::UNIX_EPOCH)
1559        .map_err(|e| {
1560            P2PError::Network(NetworkError::ProtocolError(
1561                format!("System time error: {}", e).into(),
1562            ))
1563        })?
1564        .as_secs();
1565
1566    // Create a simple message format for P2P communication
1567    let message = json!({
1568        "protocol": protocol,
1569        "data": data,
1570        "timestamp": timestamp
1571    });
1572
1573    serde_json::to_vec(&message).map_err(|e| {
1574        P2PError::Transport(crate::error::TransportError::StreamError(
1575            format!("Failed to serialize message: {e}").into(),
1576        ))
1577    })
1578}
1579
1580impl P2PNode {
1581    /// Subscribe to network events
1582    pub fn subscribe_events(&self) -> broadcast::Receiver<P2PEvent> {
1583        self.event_tx.subscribe()
1584    }
1585
1586    /// Backwards-compat event stream accessor for tests
1587    pub fn events(&self) -> broadcast::Receiver<P2PEvent> {
1588        self.subscribe_events()
1589    }
1590
1591    /// Get node uptime
1592    pub fn uptime(&self) -> Duration {
1593        self.start_time.elapsed()
1594    }
1595
1596    // MCP removed: all MCP tool/service methods removed
1597
1598    // /// Handle MCP remote tool call with network integration
1599
1600    // /// List tools available on a specific remote peer
1601
1602    // /// Get MCP server statistics
1603
1604    /// Get production resource metrics
1605    pub async fn resource_metrics(&self) -> Result<ResourceMetrics> {
1606        if let Some(ref resource_manager) = self.resource_manager {
1607            Ok(resource_manager.get_metrics().await)
1608        } else {
1609            Err(P2PError::Network(
1610                crate::error::NetworkError::ProtocolError(
1611                    "Production resource manager not enabled".to_string().into(),
1612                ),
1613            ))
1614        }
1615    }
1616
1617    /// Connection lifecycle monitor task - processes ant-quic connection events
1618    /// and updates active_connections HashSet and peers map
1619    async fn connection_lifecycle_monitor(
1620        dual_node: Arc<DualStackNetworkNode>,
1621        active_connections: Arc<RwLock<HashSet<String>>>,
1622        peers: Arc<RwLock<HashMap<String, PeerInfo>>>,
1623        event_tx: broadcast::Sender<P2PEvent>,
1624    ) {
1625        use crate::transport::ant_quic_adapter::ConnectionEvent;
1626
1627        let mut event_rx = dual_node.subscribe_connection_events();
1628
1629        info!("Connection lifecycle monitor started");
1630
1631        loop {
1632            match event_rx.recv().await {
1633                Ok(event) => {
1634                    match event {
1635                        ConnectionEvent::Established { peer_id, remote_address } => {
1636                            let peer_id_str = crate::transport::ant_quic_adapter::ant_peer_id_to_string(&peer_id);
1637                            debug!("Connection established: peer={}, addr={}", peer_id_str, remote_address);
1638
1639                            // Add to active connections
1640                            active_connections.write().await.insert(peer_id_str.clone());
1641
1642                            // Update peer info if exists
1643                            if let Some(peer_info) = peers.write().await.get_mut(&peer_id_str) {
1644                                peer_info.status = ConnectionStatus::Connected;
1645                                peer_info.connected_at = Instant::now();
1646                            }
1647
1648                            // Broadcast connection event
1649                            let _ = event_tx.send(P2PEvent::PeerConnected(peer_id_str));
1650                        }
1651                        ConnectionEvent::Lost { peer_id, reason } => {
1652                            let peer_id_str = crate::transport::ant_quic_adapter::ant_peer_id_to_string(&peer_id);
1653                            debug!("Connection lost: peer={}, reason={}", peer_id_str, reason);
1654
1655                            // Remove from active connections
1656                            active_connections.write().await.remove(&peer_id_str);
1657
1658                            // Update peer info status
1659                            if let Some(peer_info) = peers.write().await.get_mut(&peer_id_str) {
1660                                peer_info.status = ConnectionStatus::Disconnected;
1661                                peer_info.last_seen = Instant::now();
1662                            }
1663
1664                            // Broadcast disconnection event
1665                            let _ = event_tx.send(P2PEvent::PeerDisconnected(peer_id_str));
1666                        }
1667                        ConnectionEvent::Failed { peer_id, reason } => {
1668                            let peer_id_str = crate::transport::ant_quic_adapter::ant_peer_id_to_string(&peer_id);
1669                            warn!("Connection failed: peer={}, reason={}", peer_id_str, reason);
1670
1671                            // Remove from active connections
1672                            active_connections.write().await.remove(&peer_id_str);
1673
1674                            // Update peer info status
1675                            if let Some(peer_info) = peers.write().await.get_mut(&peer_id_str) {
1676                                peer_info.status = ConnectionStatus::Failed(reason.clone());
1677                            }
1678
1679                            // Broadcast disconnection event
1680                            let _ = event_tx.send(P2PEvent::PeerDisconnected(peer_id_str));
1681                        }
1682                    }
1683                }
1684                Err(broadcast::error::RecvError::Lagged(skipped)) => {
1685                    warn!("Connection event monitor lagged, skipped {} events", skipped);
1686                    continue;
1687                }
1688                Err(broadcast::error::RecvError::Closed) => {
1689                    info!("Connection event channel closed, stopping monitor");
1690                    break;
1691                }
1692            }
1693        }
1694
1695        info!("Connection lifecycle monitor stopped");
1696    }
1697
1698    /// Start connection monitor (called after node initialization)
1699    async fn start_connection_monitor(&self) {
1700        // The monitor task is already spawned in new() with a temporary peers map
1701        // This method is a placeholder for future enhancements where we might
1702        // need to restart the monitor or provide it with updated references
1703        debug!("Connection monitor already running from initialization");
1704    }
1705
1706    /// Keepalive task - sends periodic pings to prevent 30-second idle timeout
1707    ///
1708    /// ant-quic has a 30-second max_idle_timeout. This task sends a small keepalive
1709    /// message every 15 seconds (half the timeout) to all active connections to prevent
1710    /// them from timing out during periods of inactivity.
1711    async fn keepalive_task(
1712        active_connections: Arc<RwLock<HashSet<String>>>,
1713        dual_node: Arc<DualStackNetworkNode>,
1714        shutdown: Arc<AtomicBool>,
1715    ) {
1716        use tokio::time::{interval, Duration};
1717
1718        const KEEPALIVE_INTERVAL_SECS: u64 = 15; // Half of 30-second timeout
1719        const KEEPALIVE_PAYLOAD: &[u8] = b"keepalive"; // Small payload
1720
1721        let mut interval = interval(Duration::from_secs(KEEPALIVE_INTERVAL_SECS));
1722        interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip);
1723
1724        info!("Keepalive task started (interval: {}s)", KEEPALIVE_INTERVAL_SECS);
1725
1726        loop {
1727            // Check shutdown flag first
1728            if shutdown.load(Ordering::Relaxed) {
1729                info!("Keepalive task shutting down");
1730                break;
1731            }
1732
1733            interval.tick().await;
1734
1735            // Get snapshot of active connections
1736            let peers: Vec<String> = {
1737                active_connections.read().await.iter().cloned().collect()
1738            };
1739
1740            if peers.is_empty() {
1741                trace!("Keepalive: no active connections");
1742                continue;
1743            }
1744
1745            debug!("Sending keepalive to {} active connections", peers.len());
1746
1747            // Send keepalive to each peer
1748            for peer_id in peers {
1749                match dual_node.send_to_peer_string(&peer_id, KEEPALIVE_PAYLOAD).await {
1750                    Ok(_) => {
1751                        trace!("Keepalive sent to peer: {}", peer_id);
1752                    }
1753                    Err(e) => {
1754                        debug!("Failed to send keepalive to peer {}: {} (connection may have closed)", peer_id, e);
1755                        // Don't remove from active_connections here - let the lifecycle monitor handle it
1756                    }
1757                }
1758            }
1759        }
1760
1761        info!("Keepalive task stopped");
1762    }
1763
1764    /// Check system health
1765    pub async fn health_check(&self) -> Result<()> {
1766        if let Some(ref resource_manager) = self.resource_manager {
1767            resource_manager.health_check().await
1768        } else {
1769            // Basic health check without resource manager
1770            let peer_count = self.peer_count().await;
1771            if peer_count > self.config.max_connections {
1772                Err(P2PError::Network(
1773                    crate::error::NetworkError::ProtocolError(
1774                        format!("Too many connections: {peer_count}").into(),
1775                    ),
1776                ))
1777            } else {
1778                Ok(())
1779            }
1780        }
1781    }
1782
1783    /// Get production configuration (if enabled)
1784    pub fn production_config(&self) -> Option<&ProductionConfig> {
1785        self.config.production_config.as_ref()
1786    }
1787
1788    /// Check if production hardening is enabled
1789    pub fn is_production_mode(&self) -> bool {
1790        self.resource_manager.is_some()
1791    }
1792
1793    /// Get DHT reference
1794    pub fn dht(&self) -> Option<&Arc<RwLock<DHT>>> {
1795        self.dht.as_ref()
1796    }
1797
1798    /// Store a value in the DHT
1799    pub async fn dht_put(&self, key: crate::dht::Key, value: Vec<u8>) -> Result<()> {
1800        if let Some(ref dht) = self.dht {
1801            let mut dht_instance = dht.write().await;
1802            let dht_key = crate::dht::DhtKey::from_bytes(key);
1803            dht_instance
1804                .store(&dht_key, value.clone())
1805                .await
1806                .map_err(|e| {
1807                    P2PError::Dht(crate::error::DhtError::StoreFailed(
1808                        format!("{:?}: {e}", key).into(),
1809                    ))
1810                })?;
1811
1812            Ok(())
1813        } else {
1814            Err(P2PError::Dht(crate::error::DhtError::RoutingError(
1815                "DHT not enabled".to_string().into(),
1816            )))
1817        }
1818    }
1819
1820    /// Retrieve a value from the DHT
1821    pub async fn dht_get(&self, key: crate::dht::Key) -> Result<Option<Vec<u8>>> {
1822        if let Some(ref dht) = self.dht {
1823            let dht_instance = dht.read().await;
1824            let dht_key = crate::dht::DhtKey::from_bytes(key);
1825            let record_result = dht_instance.retrieve(&dht_key).await.map_err(|e| {
1826                P2PError::Dht(crate::error::DhtError::StoreFailed(
1827                    format!("Retrieve failed: {e}").into(),
1828                ))
1829            })?;
1830
1831            Ok(record_result)
1832        } else {
1833            Err(P2PError::Dht(crate::error::DhtError::RoutingError(
1834                "DHT not enabled".to_string().into(),
1835            )))
1836        }
1837    }
1838
1839    /// Add a discovered peer to the bootstrap cache
1840    pub async fn add_discovered_peer(&self, peer_id: PeerId, addresses: Vec<String>) -> Result<()> {
1841        if let Some(ref bootstrap_manager) = self.bootstrap_manager {
1842            let mut manager = bootstrap_manager.write().await;
1843            let socket_addresses: Vec<std::net::SocketAddr> = addresses
1844                .iter()
1845                .filter_map(|addr| addr.parse().ok())
1846                .collect();
1847            let contact = ContactEntry::new(peer_id, socket_addresses);
1848            manager.add_contact(contact).await.map_err(|e| {
1849                P2PError::Network(crate::error::NetworkError::ProtocolError(
1850                    format!("Failed to add peer to bootstrap cache: {e}").into(),
1851                ))
1852            })?;
1853        }
1854        Ok(())
1855    }
1856
1857    /// Update connection metrics for a peer in the bootstrap cache
1858    pub async fn update_peer_metrics(
1859        &self,
1860        peer_id: &PeerId,
1861        success: bool,
1862        latency_ms: Option<u64>,
1863        _error: Option<String>,
1864    ) -> Result<()> {
1865        if let Some(ref bootstrap_manager) = self.bootstrap_manager {
1866            let mut manager = bootstrap_manager.write().await;
1867
1868            // Create quality metrics based on the connection result
1869            let metrics = QualityMetrics {
1870                success_rate: if success { 1.0 } else { 0.0 },
1871                avg_latency_ms: latency_ms.unwrap_or(0) as f64,
1872                quality_score: if success { 0.8 } else { 0.2 }, // Initial score
1873                last_connection_attempt: chrono::Utc::now(),
1874                last_successful_connection: if success {
1875                    chrono::Utc::now()
1876                } else {
1877                    chrono::Utc::now() - chrono::Duration::hours(1)
1878                },
1879                uptime_score: 0.5,
1880            };
1881
1882            manager
1883                .update_contact_metrics(peer_id, metrics)
1884                .await
1885                .map_err(|e| {
1886                    P2PError::Network(crate::error::NetworkError::ProtocolError(
1887                        format!("Failed to update peer metrics: {e}").into(),
1888                    ))
1889                })?;
1890        }
1891        Ok(())
1892    }
1893
1894    /// Get bootstrap cache statistics
1895    pub async fn get_bootstrap_cache_stats(&self) -> Result<Option<crate::bootstrap::CacheStats>> {
1896        if let Some(ref bootstrap_manager) = self.bootstrap_manager {
1897            let manager = bootstrap_manager.read().await;
1898            let stats = manager.get_stats().await.map_err(|e| {
1899                P2PError::Network(crate::error::NetworkError::ProtocolError(
1900                    format!("Failed to get bootstrap stats: {e}").into(),
1901                ))
1902            })?;
1903            Ok(Some(stats))
1904        } else {
1905            Ok(None)
1906        }
1907    }
1908
1909    /// Get the number of cached bootstrap peers
1910    pub async fn cached_peer_count(&self) -> usize {
1911        if let Some(ref _bootstrap_manager) = self.bootstrap_manager
1912            && let Ok(Some(stats)) = self.get_bootstrap_cache_stats().await
1913        {
1914            return stats.total_contacts;
1915        }
1916        0
1917    }
1918
1919    /// Connect to bootstrap peers
1920    async fn connect_bootstrap_peers(&self) -> Result<()> {
1921        let mut bootstrap_contacts = Vec::new();
1922        let mut used_cache = false;
1923
1924        // Try to get peers from bootstrap cache first
1925        if let Some(ref bootstrap_manager) = self.bootstrap_manager {
1926            let manager = bootstrap_manager.read().await;
1927            match manager.get_bootstrap_peers(20).await {
1928                // Try to get top 20 quality peers
1929                Ok(contacts) => {
1930                    if !contacts.is_empty() {
1931                        info!("Using {} cached bootstrap peers", contacts.len());
1932                        bootstrap_contacts = contacts;
1933                        used_cache = true;
1934                    }
1935                }
1936                Err(e) => {
1937                    warn!("Failed to get cached bootstrap peers: {}", e);
1938                }
1939            }
1940        }
1941
1942        // Fallback to configured bootstrap peers if no cache or cache is empty
1943        if bootstrap_contacts.is_empty() {
1944            let bootstrap_peers = if !self.config.bootstrap_peers_str.is_empty() {
1945                &self.config.bootstrap_peers_str
1946            } else {
1947                // Convert Multiaddr to strings for fallback
1948                &self
1949                    .config
1950                    .bootstrap_peers
1951                    .iter()
1952                    .map(|addr| addr.to_string())
1953                    .collect::<Vec<_>>()
1954            };
1955
1956            if bootstrap_peers.is_empty() {
1957                info!("No bootstrap peers configured and no cached peers available");
1958                return Ok(());
1959            }
1960
1961            info!("Using {} configured bootstrap peers", bootstrap_peers.len());
1962
1963            for addr in bootstrap_peers {
1964                if let Ok(socket_addr) = addr.parse::<std::net::SocketAddr>() {
1965                    let contact = ContactEntry::new(
1966                        format!("unknown_peer_{}", addr.chars().take(8).collect::<String>()),
1967                        vec![socket_addr],
1968                    );
1969                    bootstrap_contacts.push(contact);
1970                } else {
1971                    warn!("Invalid bootstrap address format: {}", addr);
1972                }
1973            }
1974        }
1975
1976        // Connect to bootstrap peers
1977        let mut successful_connections = 0;
1978        for contact in bootstrap_contacts {
1979            for addr in &contact.addresses {
1980                match self.connect_peer(&addr.to_string()).await {
1981                    Ok(peer_id) => {
1982                        info!("Connected to bootstrap peer: {} ({})", peer_id, addr);
1983                        successful_connections += 1;
1984
1985                        // Update bootstrap cache with successful connection
1986                        if let Some(ref bootstrap_manager) = self.bootstrap_manager {
1987                            let mut manager = bootstrap_manager.write().await;
1988                            let mut updated_contact = contact.clone();
1989                            updated_contact.peer_id = peer_id.clone();
1990                            updated_contact.update_connection_result(true, Some(100), None); // Assume 100ms latency for now
1991
1992                            if let Err(e) = manager.add_contact(updated_contact).await {
1993                                warn!("Failed to update bootstrap cache: {}", e);
1994                            }
1995                        }
1996                        break; // Successfully connected, move to next contact
1997                    }
1998                    Err(e) => {
1999                        warn!("Failed to connect to bootstrap peer {}: {}", addr, e);
2000
2001                        // Update bootstrap cache with failed connection
2002                        if used_cache && let Some(ref bootstrap_manager) = self.bootstrap_manager {
2003                            let mut manager = bootstrap_manager.write().await;
2004                            let mut updated_contact = contact.clone();
2005                            updated_contact.update_connection_result(
2006                                false,
2007                                None,
2008                                Some(e.to_string()),
2009                            );
2010
2011                            if let Err(e) = manager.add_contact(updated_contact).await {
2012                                warn!("Failed to update bootstrap cache: {}", e);
2013                            }
2014                        }
2015                    }
2016                }
2017            }
2018        }
2019
2020        if successful_connections == 0 {
2021            if !used_cache {
2022                warn!("Failed to connect to any bootstrap peers");
2023            }
2024            return Err(P2PError::Network(NetworkError::ConnectionFailed {
2025                addr: std::net::SocketAddr::from(([0, 0, 0, 0], 0)), // Placeholder for bootstrap ensemble
2026                reason: "Failed to connect to any bootstrap peers".into(),
2027            }));
2028        }
2029        info!(
2030            "Successfully connected to {} bootstrap peers",
2031            successful_connections
2032        );
2033
2034        Ok(())
2035    }
2036
2037    /// Disconnect from all peers
2038    async fn disconnect_all_peers(&self) -> Result<()> {
2039        let peer_ids: Vec<PeerId> = self.peers.read().await.keys().cloned().collect();
2040
2041        for peer_id in peer_ids {
2042            self.disconnect_peer(&peer_id).await?;
2043        }
2044
2045        Ok(())
2046    }
2047
2048    /// Perform periodic maintenance tasks
2049    async fn periodic_tasks(&self) -> Result<()> {
2050        // Update peer last seen timestamps
2051        // Remove stale connections
2052        // Perform DHT maintenance
2053        // This is a placeholder for now
2054
2055        Ok(())
2056    }
2057}
2058
2059/// Network sender trait for sending messages
2060#[async_trait::async_trait]
2061pub trait NetworkSender: Send + Sync {
2062    /// Send a message to a specific peer
2063    async fn send_message(&self, peer_id: &PeerId, protocol: &str, data: Vec<u8>) -> Result<()>;
2064
2065    /// Get our local peer ID
2066    fn local_peer_id(&self) -> &PeerId;
2067}
2068
2069/// Lightweight wrapper for P2PNode to implement NetworkSender
2070#[derive(Clone)]
2071pub struct P2PNetworkSender {
2072    peer_id: PeerId,
2073    // Use channels for async communication with the P2P node
2074    send_tx: tokio::sync::mpsc::UnboundedSender<(PeerId, String, Vec<u8>)>,
2075}
2076
2077impl P2PNetworkSender {
2078    pub fn new(
2079        peer_id: PeerId,
2080        send_tx: tokio::sync::mpsc::UnboundedSender<(PeerId, String, Vec<u8>)>,
2081    ) -> Self {
2082        Self { peer_id, send_tx }
2083    }
2084}
2085
2086/// Implementation of NetworkSender trait for P2PNetworkSender
2087#[async_trait::async_trait]
2088impl NetworkSender for P2PNetworkSender {
2089    /// Send a message to a specific peer via the P2P network
2090    async fn send_message(&self, peer_id: &PeerId, protocol: &str, data: Vec<u8>) -> Result<()> {
2091        self.send_tx
2092            .send((peer_id.clone(), protocol.to_string(), data))
2093            .map_err(|_| {
2094                P2PError::Network(crate::error::NetworkError::ProtocolError(
2095                    "Failed to send message via channel".to_string().into(),
2096                ))
2097            })?;
2098        Ok(())
2099    }
2100
2101    /// Get our local peer ID
2102    fn local_peer_id(&self) -> &PeerId {
2103        &self.peer_id
2104    }
2105}
2106
2107/// Builder pattern for creating P2P nodes
2108pub struct NodeBuilder {
2109    config: NodeConfig,
2110}
2111
2112impl Default for NodeBuilder {
2113    fn default() -> Self {
2114        Self::new()
2115    }
2116}
2117
2118impl NodeBuilder {
2119    /// Create a new node builder
2120    pub fn new() -> Self {
2121        Self {
2122            config: NodeConfig::default(),
2123        }
2124    }
2125
2126    /// Set the peer ID
2127    pub fn with_peer_id(mut self, peer_id: PeerId) -> Self {
2128        self.config.peer_id = Some(peer_id);
2129        self
2130    }
2131
2132    /// Add a listen address
2133    pub fn listen_on(mut self, addr: &str) -> Self {
2134        if let Ok(multiaddr) = addr.parse() {
2135            self.config.listen_addrs.push(multiaddr);
2136        }
2137        self
2138    }
2139
2140    /// Add a bootstrap peer
2141    pub fn with_bootstrap_peer(mut self, addr: &str) -> Self {
2142        if let Ok(multiaddr) = addr.parse() {
2143            self.config.bootstrap_peers.push(multiaddr);
2144        }
2145        self.config.bootstrap_peers_str.push(addr.to_string());
2146        self
2147    }
2148
2149    /// Enable IPv6 support
2150    pub fn with_ipv6(mut self, enable: bool) -> Self {
2151        self.config.enable_ipv6 = enable;
2152        self
2153    }
2154
2155    // MCP removed: builder methods deleted
2156
2157    /// Set connection timeout
2158    pub fn with_connection_timeout(mut self, timeout: Duration) -> Self {
2159        self.config.connection_timeout = timeout;
2160        self
2161    }
2162
2163    /// Set maximum connections
2164    pub fn with_max_connections(mut self, max: usize) -> Self {
2165        self.config.max_connections = max;
2166        self
2167    }
2168
2169    /// Enable production mode with default configuration
2170    pub fn with_production_mode(mut self) -> Self {
2171        self.config.production_config = Some(ProductionConfig::default());
2172        self
2173    }
2174
2175    /// Configure production settings
2176    pub fn with_production_config(mut self, production_config: ProductionConfig) -> Self {
2177        self.config.production_config = Some(production_config);
2178        self
2179    }
2180
2181    /// Configure DHT settings
2182    pub fn with_dht(mut self, dht_config: DHTConfig) -> Self {
2183        self.config.dht_config = dht_config;
2184        self
2185    }
2186
2187    /// Enable DHT with default configuration
2188    pub fn with_default_dht(mut self) -> Self {
2189        self.config.dht_config = DHTConfig::default();
2190        self
2191    }
2192
2193    /// Build the P2P node
2194    pub async fn build(self) -> Result<P2PNode> {
2195        P2PNode::new(self.config).await
2196    }
2197}
2198
2199/// Standalone function to handle received messages without borrowing self
2200#[allow(dead_code)] // Deprecated during ant-quic migration
2201async fn handle_received_message_standalone(
2202    message_data: Vec<u8>,
2203    peer_id: &PeerId,
2204    _protocol: &str,
2205    event_tx: &broadcast::Sender<P2PEvent>,
2206) -> Result<()> {
2207    // Parse the message format
2208    match serde_json::from_slice::<serde_json::Value>(&message_data) {
2209        Ok(message) => {
2210            if let (Some(protocol), Some(data), Some(from)) = (
2211                message.get("protocol").and_then(|v| v.as_str()),
2212                message.get("data").and_then(|v| v.as_array()),
2213                message.get("from").and_then(|v| v.as_str()),
2214            ) {
2215                // Convert data array back to bytes
2216                let data_bytes: Vec<u8> = data
2217                    .iter()
2218                    .filter_map(|v| v.as_u64().map(|n| n as u8))
2219                    .collect();
2220
2221                // Generate message event
2222                let event = P2PEvent::Message {
2223                    topic: protocol.to_string(),
2224                    source: from.to_string(),
2225                    data: data_bytes,
2226                };
2227
2228                let _ = event_tx.send(event);
2229                debug!("Generated message event from peer: {}", peer_id);
2230            }
2231        }
2232        Err(e) => {
2233            warn!("Failed to parse received message from {}: {}", peer_id, e);
2234        }
2235    }
2236
2237    Ok(())
2238}
2239
2240// MCP removed: standalone MCP handler deleted
2241
2242/// Helper function to handle protocol message creation
2243#[allow(dead_code)]
2244fn handle_protocol_message_creation(protocol: &str, data: Vec<u8>) -> Option<Vec<u8>> {
2245    match create_protocol_message_static(protocol, data) {
2246        Ok(msg) => Some(msg),
2247        Err(e) => {
2248            warn!("Failed to create protocol message: {}", e);
2249            None
2250        }
2251    }
2252}
2253
2254/// Helper function to handle message send result
2255#[allow(dead_code)]
2256async fn handle_message_send_result(result: crate::error::P2pResult<()>, peer_id: &PeerId) {
2257    match result {
2258        Ok(_) => {
2259            debug!("Message sent to peer {} via transport layer", peer_id);
2260        }
2261        Err(e) => {
2262            warn!("Failed to send message to peer {}: {}", peer_id, e);
2263        }
2264    }
2265}
2266
2267/// Helper function to check rate limit
2268#[allow(dead_code)] // Deprecated during ant-quic migration
2269fn check_rate_limit(
2270    rate_limiter: &RateLimiter,
2271    socket_addr: &std::net::SocketAddr,
2272    remote_addr: &NetworkAddress,
2273) -> Result<()> {
2274    rate_limiter.check_ip(&socket_addr.ip()).map_err(|e| {
2275        warn!("Rate limit exceeded for {}: {}", remote_addr, e);
2276        e
2277    })
2278}
2279
2280/// Helper function to register a new peer
2281#[allow(dead_code)] // Deprecated during ant-quic migration
2282async fn register_new_peer(
2283    peers: &Arc<RwLock<HashMap<PeerId, PeerInfo>>>,
2284    peer_id: &PeerId,
2285    remote_addr: &NetworkAddress,
2286) {
2287    let mut peers_guard = peers.write().await;
2288    let peer_info = PeerInfo {
2289        peer_id: peer_id.clone(),
2290        addresses: vec![remote_addr.to_string()],
2291        connected_at: tokio::time::Instant::now(),
2292        last_seen: tokio::time::Instant::now(),
2293        status: ConnectionStatus::Connected,
2294        protocols: vec!["p2p-chat/1.0.0".to_string()],
2295        heartbeat_count: 0,
2296    };
2297    peers_guard.insert(peer_id.clone(), peer_info);
2298}
2299
2300/// Helper function to spawn connection handler
2301#[allow(dead_code)] // Deprecated during ant-quic migration
2302fn spawn_connection_handler(
2303    connection: Box<dyn crate::transport::Connection>,
2304    peer_id: PeerId,
2305    event_tx: broadcast::Sender<P2PEvent>,
2306    peers: Arc<RwLock<HashMap<PeerId, PeerInfo>>>,
2307) {
2308    tokio::spawn(async move {
2309        handle_peer_connection(connection, peer_id, event_tx, peers).await;
2310    });
2311}
2312
2313/// Helper function to handle peer connection
2314#[allow(dead_code)] // Deprecated during ant-quic migration
2315async fn handle_peer_connection(
2316    mut connection: Box<dyn crate::transport::Connection>,
2317    peer_id: PeerId,
2318    event_tx: broadcast::Sender<P2PEvent>,
2319    peers: Arc<RwLock<HashMap<PeerId, PeerInfo>>>,
2320) {
2321    loop {
2322        match connection.receive().await {
2323            Ok(message_data) => {
2324                debug!(
2325                    "Received {} bytes from peer: {}",
2326                    message_data.len(),
2327                    peer_id
2328                );
2329
2330                // Handle the received message
2331                if let Err(e) = handle_received_message_standalone(
2332                    message_data,
2333                    &peer_id,
2334                    "unknown", // TODO: Extract protocol from message
2335                    &event_tx,
2336                )
2337                .await
2338                {
2339                    warn!("Failed to handle message from peer {}: {}", peer_id, e);
2340                }
2341            }
2342            Err(e) => {
2343                warn!("Failed to receive message from {}: {}", peer_id, e);
2344
2345                // Check if connection is still alive
2346                if !connection.is_alive().await {
2347                    info!("Connection to {} is dead, removing peer", peer_id);
2348
2349                    // Remove dead peer
2350                    remove_peer(&peers, &peer_id).await;
2351
2352                    // Generate peer disconnected event
2353                    let _ = event_tx.send(P2PEvent::PeerDisconnected(peer_id.clone()));
2354
2355                    break; // Exit the message receiving loop
2356                }
2357
2358                // Brief pause before retrying
2359                tokio::time::sleep(std::time::Duration::from_millis(100)).await;
2360            }
2361        }
2362    }
2363}
2364
2365/// Helper function to remove a peer
2366#[allow(dead_code)] // Deprecated during ant-quic migration
2367async fn remove_peer(peers: &Arc<RwLock<HashMap<PeerId, PeerInfo>>>, peer_id: &PeerId) {
2368    let mut peers_guard = peers.write().await;
2369    peers_guard.remove(peer_id);
2370}
2371
2372/// Helper function to update peer heartbeat
2373#[allow(dead_code)]
2374async fn update_peer_heartbeat(
2375    peers: &Arc<RwLock<HashMap<PeerId, PeerInfo>>>,
2376    peer_id: &PeerId,
2377) -> Result<()> {
2378    let mut peers_guard = peers.write().await;
2379    match peers_guard.get_mut(peer_id) {
2380        Some(peer_info) => {
2381            peer_info.last_seen = Instant::now();
2382            peer_info.heartbeat_count += 1;
2383            Ok(())
2384        }
2385        None => {
2386            warn!("Received heartbeat from unknown peer: {}", peer_id);
2387            Err(P2PError::Network(NetworkError::PeerNotFound(
2388                format!("Peer {} not found", peer_id).into(),
2389            )))
2390        }
2391    }
2392}
2393
2394/// Helper function to get resource metrics
2395#[allow(dead_code)]
2396async fn get_resource_metrics(resource_manager: &Option<Arc<ResourceManager>>) -> (u64, f64) {
2397    if let Some(manager) = resource_manager {
2398        let metrics = manager.get_metrics().await;
2399        (metrics.memory_used, metrics.cpu_usage)
2400    } else {
2401        (0, 0.0)
2402    }
2403}
2404
2405#[cfg(test)]
2406mod tests {
2407    use super::*;
2408    // MCP removed from tests
2409    use std::time::Duration;
2410    use tokio::time::timeout;
2411
2412    // Test tool handler for network tests
2413
2414    // MCP removed
2415
2416    /// Helper function to create a test node configuration
2417    fn create_test_node_config() -> NodeConfig {
2418        NodeConfig {
2419            peer_id: Some("test_peer_123".to_string()),
2420            listen_addrs: vec![
2421                std::net::SocketAddr::new(std::net::IpAddr::V6(std::net::Ipv6Addr::LOCALHOST), 0),
2422                std::net::SocketAddr::new(std::net::IpAddr::V4(std::net::Ipv4Addr::LOCALHOST), 0),
2423            ],
2424            listen_addr: std::net::SocketAddr::new(
2425                std::net::IpAddr::V4(std::net::Ipv4Addr::LOCALHOST),
2426                0,
2427            ),
2428            bootstrap_peers: vec![],
2429            bootstrap_peers_str: vec![],
2430            enable_ipv6: true,
2431
2432            connection_timeout: Duration::from_millis(300),
2433            keep_alive_interval: Duration::from_secs(30),
2434            max_connections: 100,
2435            max_incoming_connections: 50,
2436            dht_config: DHTConfig::default(),
2437            security_config: SecurityConfig::default(),
2438            production_config: None,
2439            bootstrap_cache_config: None,
2440            // identity_config: None,
2441        }
2442    }
2443
2444    /// Helper function to create a test tool
2445    // MCP removed: test tool helper deleted
2446
2447    #[tokio::test]
2448    async fn test_node_config_default() {
2449        let config = NodeConfig::default();
2450
2451        assert!(config.peer_id.is_none());
2452        assert_eq!(config.listen_addrs.len(), 2);
2453        assert!(config.enable_ipv6);
2454        assert_eq!(config.max_connections, 10000); // Fixed: matches actual default
2455        assert_eq!(config.max_incoming_connections, 100);
2456        assert_eq!(config.connection_timeout, Duration::from_secs(30));
2457    }
2458
2459    #[tokio::test]
2460    async fn test_dht_config_default() {
2461        let config = DHTConfig::default();
2462
2463        assert_eq!(config.k_value, 20);
2464        assert_eq!(config.alpha_value, 5);
2465        assert_eq!(config.record_ttl, Duration::from_secs(3600));
2466        assert_eq!(config.refresh_interval, Duration::from_secs(600));
2467    }
2468
2469    #[tokio::test]
2470    async fn test_security_config_default() {
2471        let config = SecurityConfig::default();
2472
2473        assert!(config.enable_noise);
2474        assert!(config.enable_tls);
2475        assert_eq!(config.trust_level, TrustLevel::Basic);
2476    }
2477
2478    #[test]
2479    fn test_trust_level_variants() {
2480        // Test that all trust level variants can be created
2481        let _none = TrustLevel::None;
2482        let _basic = TrustLevel::Basic;
2483        let _full = TrustLevel::Full;
2484
2485        // Test equality
2486        assert_eq!(TrustLevel::None, TrustLevel::None);
2487        assert_eq!(TrustLevel::Basic, TrustLevel::Basic);
2488        assert_eq!(TrustLevel::Full, TrustLevel::Full);
2489        assert_ne!(TrustLevel::None, TrustLevel::Basic);
2490    }
2491
2492    #[test]
2493    fn test_connection_status_variants() {
2494        let connecting = ConnectionStatus::Connecting;
2495        let connected = ConnectionStatus::Connected;
2496        let disconnecting = ConnectionStatus::Disconnecting;
2497        let disconnected = ConnectionStatus::Disconnected;
2498        let failed = ConnectionStatus::Failed("test error".to_string());
2499
2500        assert_eq!(connecting, ConnectionStatus::Connecting);
2501        assert_eq!(connected, ConnectionStatus::Connected);
2502        assert_eq!(disconnecting, ConnectionStatus::Disconnecting);
2503        assert_eq!(disconnected, ConnectionStatus::Disconnected);
2504        assert_ne!(connecting, connected);
2505
2506        if let ConnectionStatus::Failed(msg) = failed {
2507            assert_eq!(msg, "test error");
2508        } else {
2509            panic!("Expected Failed status");
2510        }
2511    }
2512
2513    #[tokio::test]
2514    async fn test_node_creation() -> Result<()> {
2515        let config = create_test_node_config();
2516        let node = P2PNode::new(config).await?;
2517
2518        assert_eq!(node.peer_id(), "test_peer_123");
2519        assert!(!node.is_running().await);
2520        assert_eq!(node.peer_count().await, 0);
2521        assert!(node.connected_peers().await.is_empty());
2522
2523        Ok(())
2524    }
2525
2526    #[tokio::test]
2527    async fn test_node_creation_without_peer_id() -> Result<()> {
2528        let mut config = create_test_node_config();
2529        config.peer_id = None;
2530
2531        let node = P2PNode::new(config).await?;
2532
2533        // Should have generated a peer ID
2534        assert!(node.peer_id().starts_with("peer_"));
2535        assert!(!node.is_running().await);
2536
2537        Ok(())
2538    }
2539
2540    #[tokio::test]
2541    async fn test_node_lifecycle() -> Result<()> {
2542        let config = create_test_node_config();
2543        let node = P2PNode::new(config).await?;
2544
2545        // Initially not running
2546        assert!(!node.is_running().await);
2547
2548        // Start the node
2549        node.start().await?;
2550        assert!(node.is_running().await);
2551
2552        // Check listen addresses were set (at least one)
2553        let listen_addrs = node.listen_addrs().await;
2554        assert!(
2555            !listen_addrs.is_empty(),
2556            "Expected at least one listening address"
2557        );
2558
2559        // Stop the node
2560        node.stop().await?;
2561        assert!(!node.is_running().await);
2562
2563        Ok(())
2564    }
2565
2566    #[tokio::test]
2567    async fn test_peer_connection() -> Result<()> {
2568        let config = create_test_node_config();
2569        let node = P2PNode::new(config).await?;
2570
2571        let peer_addr = "127.0.0.1:0";
2572
2573        // Connect to a peer
2574        let peer_id = node.connect_peer(peer_addr).await?;
2575        assert!(peer_id.starts_with("peer_from_"));
2576
2577        // Check peer count
2578        assert_eq!(node.peer_count().await, 1);
2579
2580        // Check connected peers
2581        let connected_peers = node.connected_peers().await;
2582        assert_eq!(connected_peers.len(), 1);
2583        assert_eq!(connected_peers[0], peer_id);
2584
2585        // Get peer info
2586        let peer_info = node.peer_info(&peer_id).await;
2587        assert!(peer_info.is_some());
2588        let info = peer_info.expect("Peer info should exist after adding peer");
2589        assert_eq!(info.peer_id, peer_id);
2590        assert_eq!(info.status, ConnectionStatus::Connected);
2591        assert!(info.protocols.contains(&"p2p-foundation/1.0".to_string()));
2592
2593        // Disconnect from peer
2594        node.disconnect_peer(&peer_id).await?;
2595        assert_eq!(node.peer_count().await, 0);
2596
2597        Ok(())
2598    }
2599
2600    #[tokio::test]
2601    async fn test_event_subscription() -> Result<()> {
2602        let config = create_test_node_config();
2603        let node = P2PNode::new(config).await?;
2604
2605        let mut events = node.subscribe_events();
2606        let peer_addr = "127.0.0.1:0";
2607
2608        // Connect to a peer (this should emit an event)
2609        let peer_id = node.connect_peer(peer_addr).await?;
2610
2611        // Check for PeerConnected event
2612        let event = timeout(Duration::from_millis(100), events.recv()).await;
2613        assert!(event.is_ok());
2614
2615        let event_result = event
2616            .expect("Should receive event")
2617            .expect("Event should not be error");
2618        match event_result {
2619            P2PEvent::PeerConnected(event_peer_id) => {
2620                assert_eq!(event_peer_id, peer_id);
2621            }
2622            _ => panic!("Expected PeerConnected event"),
2623        }
2624
2625        // Disconnect from peer (this should emit another event)
2626        node.disconnect_peer(&peer_id).await?;
2627
2628        // Check for PeerDisconnected event
2629        let event = timeout(Duration::from_millis(100), events.recv()).await;
2630        assert!(event.is_ok());
2631
2632        let event_result = event
2633            .expect("Should receive event")
2634            .expect("Event should not be error");
2635        match event_result {
2636            P2PEvent::PeerDisconnected(event_peer_id) => {
2637                assert_eq!(event_peer_id, peer_id);
2638            }
2639            _ => panic!("Expected PeerDisconnected event"),
2640        }
2641
2642        Ok(())
2643    }
2644
2645    #[tokio::test]
2646    async fn test_message_sending() -> Result<()> {
2647        // Create two nodes
2648        let mut config1 = create_test_node_config();
2649        config1.listen_addr =
2650            std::net::SocketAddr::new(std::net::IpAddr::V4(std::net::Ipv4Addr::LOCALHOST), 0);
2651        let node1 = P2PNode::new(config1).await?;
2652        node1.start().await?;
2653
2654        let mut config2 = create_test_node_config();
2655        config2.listen_addr =
2656            std::net::SocketAddr::new(std::net::IpAddr::V4(std::net::Ipv4Addr::LOCALHOST), 0);
2657        let node2 = P2PNode::new(config2).await?;
2658        node2.start().await?;
2659
2660        // Wait a bit for nodes to start listening
2661        tokio::time::sleep(tokio::time::Duration::from_millis(50)).await;
2662
2663        // Get actual listening address of node2
2664        let node2_addr = node2.local_addr().ok_or_else(|| {
2665            P2PError::Network(crate::error::NetworkError::ProtocolError(
2666                "No listening address".to_string().into(),
2667            ))
2668        })?;
2669
2670        // Connect node1 to node2
2671        let peer_id =
2672            match timeout(Duration::from_millis(500), node1.connect_peer(&node2_addr)).await {
2673                Ok(res) => res?,
2674                Err(_) => return Err(P2PError::Network(NetworkError::Timeout)),
2675            };
2676
2677        // Wait a bit for connection to establish
2678        tokio::time::sleep(tokio::time::Duration::from_millis(30)).await;
2679
2680        // Send a message
2681        let message_data = b"Hello, peer!".to_vec();
2682        let result = match timeout(
2683            Duration::from_millis(500),
2684            node1.send_message(&peer_id, "test-protocol", message_data),
2685        )
2686        .await
2687        {
2688            Ok(res) => res,
2689            Err(_) => return Err(P2PError::Network(NetworkError::Timeout)),
2690        };
2691        // For now, we'll just check that we don't get a "not connected" error
2692        // The actual send might fail due to no handler on the other side
2693        if let Err(e) = &result {
2694            assert!(!e.to_string().contains("not connected"), "Got error: {}", e);
2695        }
2696
2697        // Try to send to non-existent peer
2698        let non_existent_peer = "non_existent_peer".to_string();
2699        let result = node1
2700            .send_message(&non_existent_peer, "test-protocol", vec![])
2701            .await;
2702        assert!(result.is_err(), "Sending to non-existent peer should fail");
2703
2704        Ok(())
2705    }
2706
2707    #[tokio::test]
2708    async fn test_remote_mcp_operations() -> Result<()> {
2709        let config = create_test_node_config();
2710        let node = P2PNode::new(config).await?;
2711
2712        // MCP removed; test reduced to simple start/stop
2713        node.start().await?;
2714        node.stop().await?;
2715        Ok(())
2716    }
2717
2718    #[tokio::test]
2719    async fn test_health_check() -> Result<()> {
2720        let config = create_test_node_config();
2721        let node = P2PNode::new(config).await?;
2722
2723        // Health check should pass with no connections
2724        let result = node.health_check().await;
2725        assert!(result.is_ok());
2726
2727        // Note: We're not actually connecting to real peers here
2728        // since that would require running bootstrap nodes.
2729        // The health check should still pass with no connections.
2730
2731        Ok(())
2732    }
2733
2734    #[tokio::test]
2735    async fn test_node_uptime() -> Result<()> {
2736        let config = create_test_node_config();
2737        let node = P2PNode::new(config).await?;
2738
2739        let uptime1 = node.uptime();
2740        assert!(uptime1 >= Duration::from_secs(0));
2741
2742        // Wait a bit
2743        tokio::time::sleep(Duration::from_millis(10)).await;
2744
2745        let uptime2 = node.uptime();
2746        assert!(uptime2 > uptime1);
2747
2748        Ok(())
2749    }
2750
2751    #[tokio::test]
2752    async fn test_node_config_access() -> Result<()> {
2753        let config = create_test_node_config();
2754        let expected_peer_id = config.peer_id.clone();
2755        let node = P2PNode::new(config).await?;
2756
2757        let node_config = node.config();
2758        assert_eq!(node_config.peer_id, expected_peer_id);
2759        assert_eq!(node_config.max_connections, 100);
2760        // MCP removed
2761
2762        Ok(())
2763    }
2764
2765    #[tokio::test]
2766    async fn test_mcp_server_access() -> Result<()> {
2767        let config = create_test_node_config();
2768        let _node = P2PNode::new(config).await?;
2769
2770        // MCP removed
2771        Ok(())
2772    }
2773
2774    #[tokio::test]
2775    async fn test_dht_access() -> Result<()> {
2776        let config = create_test_node_config();
2777        let node = P2PNode::new(config).await?;
2778
2779        // Should have DHT
2780        assert!(node.dht().is_some());
2781
2782        Ok(())
2783    }
2784
2785    #[tokio::test]
2786    async fn test_node_builder() -> Result<()> {
2787        // Create a config using the builder but don't actually build a real node
2788        let builder = P2PNode::builder()
2789            .with_peer_id("builder_test_peer".to_string())
2790            .listen_on("/ip4/127.0.0.1/tcp/0")
2791            .listen_on("/ip6/::1/tcp/0")
2792            .with_bootstrap_peer("/ip4/127.0.0.1/tcp/9000") // Use a valid port number
2793            .with_ipv6(true)
2794            .with_connection_timeout(Duration::from_secs(15))
2795            .with_max_connections(200);
2796
2797        // Test the configuration that was built
2798        let config = builder.config;
2799        assert_eq!(config.peer_id, Some("builder_test_peer".to_string()));
2800        assert_eq!(config.listen_addrs.len(), 2); // 2 added by builder (no defaults)
2801        assert_eq!(config.bootstrap_peers_str.len(), 1); // Check bootstrap_peers_str instead
2802        assert!(config.enable_ipv6);
2803        assert_eq!(config.connection_timeout, Duration::from_secs(15));
2804        assert_eq!(config.max_connections, 200);
2805
2806        Ok(())
2807    }
2808
2809    #[tokio::test]
2810    async fn test_bootstrap_peers() -> Result<()> {
2811        let mut config = create_test_node_config();
2812        config.bootstrap_peers = vec![
2813            std::net::SocketAddr::new(std::net::IpAddr::V4(std::net::Ipv4Addr::LOCALHOST), 9200),
2814            std::net::SocketAddr::new(std::net::IpAddr::V4(std::net::Ipv4Addr::LOCALHOST), 9201),
2815        ];
2816
2817        let node = P2PNode::new(config).await?;
2818
2819        // Start node (which attempts to connect to bootstrap peers)
2820        node.start().await?;
2821
2822        // In a test environment, bootstrap peers may not be available
2823        // The test verifies the node starts correctly with bootstrap configuration
2824        let peer_count = node.peer_count().await;
2825        assert!(
2826            peer_count <= 2,
2827            "Peer count should not exceed bootstrap peer count"
2828        );
2829
2830        node.stop().await?;
2831        Ok(())
2832    }
2833
2834    #[tokio::test]
2835    async fn test_production_mode_disabled() -> Result<()> {
2836        let config = create_test_node_config();
2837        let node = P2PNode::new(config).await?;
2838
2839        assert!(!node.is_production_mode());
2840        assert!(node.production_config().is_none());
2841
2842        // Resource metrics should fail when production mode is disabled
2843        let result = node.resource_metrics().await;
2844        assert!(result.is_err());
2845        assert!(result.unwrap_err().to_string().contains("not enabled"));
2846
2847        Ok(())
2848    }
2849
2850    #[tokio::test]
2851    async fn test_network_event_variants() {
2852        // Test that all network event variants can be created
2853        let peer_id = "test_peer".to_string();
2854        let address = "/ip4/127.0.0.1/tcp/9000".to_string();
2855
2856        let _peer_connected = NetworkEvent::PeerConnected {
2857            peer_id: peer_id.clone(),
2858            addresses: vec![address.clone()],
2859        };
2860
2861        let _peer_disconnected = NetworkEvent::PeerDisconnected {
2862            peer_id: peer_id.clone(),
2863            reason: "test disconnect".to_string(),
2864        };
2865
2866        let _message_received = NetworkEvent::MessageReceived {
2867            peer_id: peer_id.clone(),
2868            protocol: "test-protocol".to_string(),
2869            data: vec![1, 2, 3],
2870        };
2871
2872        let _connection_failed = NetworkEvent::ConnectionFailed {
2873            peer_id: Some(peer_id.clone()),
2874            address: address.clone(),
2875            error: "connection refused".to_string(),
2876        };
2877
2878        let _dht_stored = NetworkEvent::DHTRecordStored {
2879            key: vec![1, 2, 3],
2880            value: vec![4, 5, 6],
2881        };
2882
2883        let _dht_retrieved = NetworkEvent::DHTRecordRetrieved {
2884            key: vec![1, 2, 3],
2885            value: Some(vec![4, 5, 6]),
2886        };
2887    }
2888
2889    #[tokio::test]
2890    async fn test_peer_info_structure() {
2891        let peer_info = PeerInfo {
2892            peer_id: "test_peer".to_string(),
2893            addresses: vec!["/ip4/127.0.0.1/tcp/9000".to_string()],
2894            connected_at: Instant::now(),
2895            last_seen: Instant::now(),
2896            status: ConnectionStatus::Connected,
2897            protocols: vec!["test-protocol".to_string()],
2898            heartbeat_count: 0,
2899        };
2900
2901        assert_eq!(peer_info.peer_id, "test_peer");
2902        assert_eq!(peer_info.addresses.len(), 1);
2903        assert_eq!(peer_info.status, ConnectionStatus::Connected);
2904        assert_eq!(peer_info.protocols.len(), 1);
2905    }
2906
2907    #[tokio::test]
2908    async fn test_serialization() -> Result<()> {
2909        // Test that configs can be serialized/deserialized
2910        let config = create_test_node_config();
2911        let serialized = serde_json::to_string(&config)?;
2912        let deserialized: NodeConfig = serde_json::from_str(&serialized)?;
2913
2914        assert_eq!(config.peer_id, deserialized.peer_id);
2915        assert_eq!(config.listen_addrs, deserialized.listen_addrs);
2916        assert_eq!(config.enable_ipv6, deserialized.enable_ipv6);
2917
2918        Ok(())
2919    }
2920
2921    #[tokio::test]
2922    async fn test_get_peer_id_by_address_found() -> Result<()> {
2923        let config = create_test_node_config();
2924        let node = P2PNode::new(config).await?;
2925
2926        // Manually insert a peer for testing
2927        let test_peer_id = "peer_test_123".to_string();
2928        let test_address = "192.168.1.100:9000".to_string();
2929
2930        let peer_info = PeerInfo {
2931            peer_id: test_peer_id.clone(),
2932            addresses: vec![test_address.clone()],
2933            connected_at: Instant::now(),
2934            last_seen: Instant::now(),
2935            status: ConnectionStatus::Connected,
2936            protocols: vec!["test-protocol".to_string()],
2937            heartbeat_count: 0,
2938        };
2939
2940        node.peers
2941            .write()
2942            .await
2943            .insert(test_peer_id.clone(), peer_info);
2944
2945        // Test: Find peer by address
2946        let found_peer_id = node.get_peer_id_by_address(&test_address).await;
2947        assert_eq!(found_peer_id, Some(test_peer_id));
2948
2949        Ok(())
2950    }
2951
2952    #[tokio::test]
2953    async fn test_get_peer_id_by_address_not_found() -> Result<()> {
2954        let config = create_test_node_config();
2955        let node = P2PNode::new(config).await?;
2956
2957        // Test: Try to find a peer that doesn't exist
2958        let result = node.get_peer_id_by_address("192.168.1.200:9000").await;
2959        assert_eq!(result, None);
2960
2961        Ok(())
2962    }
2963
2964    #[tokio::test]
2965    async fn test_get_peer_id_by_address_invalid_format() -> Result<()> {
2966        let config = create_test_node_config();
2967        let node = P2PNode::new(config).await?;
2968
2969        // Test: Invalid address format should return None
2970        let result = node.get_peer_id_by_address("invalid-address").await;
2971        assert_eq!(result, None);
2972
2973        Ok(())
2974    }
2975
2976    #[tokio::test]
2977    async fn test_get_peer_id_by_address_multiple_peers() -> Result<()> {
2978        let config = create_test_node_config();
2979        let node = P2PNode::new(config).await?;
2980
2981        // Add multiple peers with different addresses
2982        let peer1_id = "peer_1".to_string();
2983        let peer1_addr = "192.168.1.101:9001".to_string();
2984
2985        let peer2_id = "peer_2".to_string();
2986        let peer2_addr = "192.168.1.102:9002".to_string();
2987
2988        let peer1_info = PeerInfo {
2989            peer_id: peer1_id.clone(),
2990            addresses: vec![peer1_addr.clone()],
2991            connected_at: Instant::now(),
2992            last_seen: Instant::now(),
2993            status: ConnectionStatus::Connected,
2994            protocols: vec!["test-protocol".to_string()],
2995            heartbeat_count: 0,
2996        };
2997
2998        let peer2_info = PeerInfo {
2999            peer_id: peer2_id.clone(),
3000            addresses: vec![peer2_addr.clone()],
3001            connected_at: Instant::now(),
3002            last_seen: Instant::now(),
3003            status: ConnectionStatus::Connected,
3004            protocols: vec!["test-protocol".to_string()],
3005            heartbeat_count: 0,
3006        };
3007
3008        node.peers
3009            .write()
3010            .await
3011            .insert(peer1_id.clone(), peer1_info);
3012        node.peers
3013            .write()
3014            .await
3015            .insert(peer2_id.clone(), peer2_info);
3016
3017        // Test: Find each peer by their unique address
3018        let found_peer1 = node.get_peer_id_by_address(&peer1_addr).await;
3019        let found_peer2 = node.get_peer_id_by_address(&peer2_addr).await;
3020
3021        assert_eq!(found_peer1, Some(peer1_id));
3022        assert_eq!(found_peer2, Some(peer2_id));
3023
3024        Ok(())
3025    }
3026
3027    #[tokio::test]
3028    async fn test_list_active_connections_empty() -> Result<()> {
3029        let config = create_test_node_config();
3030        let node = P2PNode::new(config).await?;
3031
3032        // Test: No connections initially
3033        let connections = node.list_active_connections().await;
3034        assert!(connections.is_empty());
3035
3036        Ok(())
3037    }
3038
3039    #[tokio::test]
3040    async fn test_list_active_connections_with_peers() -> Result<()> {
3041        let config = create_test_node_config();
3042        let node = P2PNode::new(config).await?;
3043
3044        // Add multiple peers
3045        let peer1_id = "peer_1".to_string();
3046        let peer1_addrs = vec![
3047            "192.168.1.101:9001".to_string(),
3048            "192.168.1.101:9002".to_string(),
3049        ];
3050
3051        let peer2_id = "peer_2".to_string();
3052        let peer2_addrs = vec!["192.168.1.102:9003".to_string()];
3053
3054        let peer1_info = PeerInfo {
3055            peer_id: peer1_id.clone(),
3056            addresses: peer1_addrs.clone(),
3057            connected_at: Instant::now(),
3058            last_seen: Instant::now(),
3059            status: ConnectionStatus::Connected,
3060            protocols: vec!["test-protocol".to_string()],
3061            heartbeat_count: 0,
3062        };
3063
3064        let peer2_info = PeerInfo {
3065            peer_id: peer2_id.clone(),
3066            addresses: peer2_addrs.clone(),
3067            connected_at: Instant::now(),
3068            last_seen: Instant::now(),
3069            status: ConnectionStatus::Connected,
3070            protocols: vec!["test-protocol".to_string()],
3071            heartbeat_count: 0,
3072        };
3073
3074        node.peers
3075            .write()
3076            .await
3077            .insert(peer1_id.clone(), peer1_info);
3078        node.peers
3079            .write()
3080            .await
3081            .insert(peer2_id.clone(), peer2_info);
3082
3083        // Test: List all active connections
3084        let connections = node.list_active_connections().await;
3085        assert_eq!(connections.len(), 2);
3086
3087        // Verify peer1 and peer2 are in the list
3088        let peer1_conn = connections.iter().find(|(id, _)| id == &peer1_id);
3089        let peer2_conn = connections.iter().find(|(id, _)| id == &peer2_id);
3090
3091        assert!(peer1_conn.is_some());
3092        assert!(peer2_conn.is_some());
3093
3094        // Verify addresses match
3095        assert_eq!(peer1_conn.unwrap().1, peer1_addrs);
3096        assert_eq!(peer2_conn.unwrap().1, peer2_addrs);
3097
3098        Ok(())
3099    }
3100
3101    #[tokio::test]
3102    async fn test_remove_peer_success() -> Result<()> {
3103        let config = create_test_node_config();
3104        let node = P2PNode::new(config).await?;
3105
3106        // Add a peer
3107        let peer_id = "peer_to_remove".to_string();
3108        let peer_info = PeerInfo {
3109            peer_id: peer_id.clone(),
3110            addresses: vec!["192.168.1.100:9000".to_string()],
3111            connected_at: Instant::now(),
3112            last_seen: Instant::now(),
3113            status: ConnectionStatus::Connected,
3114            protocols: vec!["test-protocol".to_string()],
3115            heartbeat_count: 0,
3116        };
3117
3118        node.peers.write().await.insert(peer_id.clone(), peer_info);
3119
3120        // Verify peer exists
3121        assert!(node.is_peer_connected(&peer_id).await);
3122
3123        // Remove the peer
3124        let removed = node.remove_peer(&peer_id).await;
3125        assert!(removed);
3126
3127        // Verify peer no longer exists
3128        assert!(!node.is_peer_connected(&peer_id).await);
3129
3130        Ok(())
3131    }
3132
3133    #[tokio::test]
3134    async fn test_remove_peer_nonexistent() -> Result<()> {
3135        let config = create_test_node_config();
3136        let node = P2PNode::new(config).await?;
3137
3138        // Try to remove a peer that doesn't exist
3139        let removed = node.remove_peer(&"nonexistent_peer".to_string()).await;
3140        assert!(!removed);
3141
3142        Ok(())
3143    }
3144
3145    #[tokio::test]
3146    async fn test_is_peer_connected() -> Result<()> {
3147        let config = create_test_node_config();
3148        let node = P2PNode::new(config).await?;
3149
3150        let peer_id = "test_peer".to_string();
3151
3152        // Initially not connected
3153        assert!(!node.is_peer_connected(&peer_id).await);
3154
3155        // Add peer
3156        let peer_info = PeerInfo {
3157            peer_id: peer_id.clone(),
3158            addresses: vec!["192.168.1.100:9000".to_string()],
3159            connected_at: Instant::now(),
3160            last_seen: Instant::now(),
3161            status: ConnectionStatus::Connected,
3162            protocols: vec!["test-protocol".to_string()],
3163            heartbeat_count: 0,
3164        };
3165
3166        node.peers.write().await.insert(peer_id.clone(), peer_info);
3167
3168        // Now connected
3169        assert!(node.is_peer_connected(&peer_id).await);
3170
3171        // Remove peer
3172        node.remove_peer(&peer_id).await;
3173
3174        // No longer connected
3175        assert!(!node.is_peer_connected(&peer_id).await);
3176
3177        Ok(())
3178    }
3179
3180    #[test]
3181    fn test_normalize_ipv6_wildcard() {
3182        use std::net::{IpAddr, Ipv6Addr, SocketAddr};
3183
3184        let wildcard = SocketAddr::new(IpAddr::V6(Ipv6Addr::UNSPECIFIED), 8080);
3185        let normalized = normalize_wildcard_to_loopback(wildcard);
3186
3187        assert_eq!(normalized.ip(), IpAddr::V6(Ipv6Addr::LOCALHOST));
3188        assert_eq!(normalized.port(), 8080);
3189    }
3190
3191    #[test]
3192    fn test_normalize_ipv4_wildcard() {
3193        use std::net::{IpAddr, Ipv4Addr, SocketAddr};
3194
3195        let wildcard = SocketAddr::new(IpAddr::V4(Ipv4Addr::UNSPECIFIED), 9000);
3196        let normalized = normalize_wildcard_to_loopback(wildcard);
3197
3198        assert_eq!(normalized.ip(), IpAddr::V4(Ipv4Addr::LOCALHOST));
3199        assert_eq!(normalized.port(), 9000);
3200    }
3201
3202    #[test]
3203    fn test_normalize_specific_address_unchanged() {
3204        let specific: std::net::SocketAddr = "192.168.1.100:3000".parse().unwrap();
3205        let normalized = normalize_wildcard_to_loopback(specific);
3206
3207        assert_eq!(normalized, specific);
3208    }
3209
3210    #[test]
3211    fn test_normalize_loopback_unchanged() {
3212        use std::net::{IpAddr, Ipv4Addr, Ipv6Addr, SocketAddr};
3213
3214        let loopback_v6 = SocketAddr::new(IpAddr::V6(Ipv6Addr::LOCALHOST), 5000);
3215        let normalized_v6 = normalize_wildcard_to_loopback(loopback_v6);
3216        assert_eq!(normalized_v6, loopback_v6);
3217
3218        let loopback_v4 = SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), 5000);
3219        let normalized_v4 = normalize_wildcard_to_loopback(loopback_v4);
3220        assert_eq!(normalized_v4, loopback_v4);
3221    }
3222}