saorsa_core/
network.rs

1// Copyright 2024 Saorsa Labs Limited
2//
3// This software is dual-licensed under:
4// - GNU Affero General Public License v3.0 or later (AGPL-3.0-or-later)
5// - Commercial License
6//
7// For AGPL-3.0 license, see LICENSE-AGPL-3.0
8// For commercial licensing, contact: saorsalabs@gmail.com
9//
10// Unless required by applicable law or agreed to in writing, software
11// distributed under these licenses is distributed on an "AS IS" BASIS,
12// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
14//! Network module
15//!
16//! This module provides core networking functionality for the P2P Foundation.
17//! It handles peer connections, network events, and node lifecycle management.
18
19use crate::bootstrap::{BootstrapManager, ContactEntry, QualityMetrics};
20use crate::config::Config;
21use crate::dht::DHT;
22use crate::error::{NetworkError, P2PError, P2pResult as Result};
23
24use crate::production::{ProductionConfig, ResourceManager, ResourceMetrics};
25use crate::transport::ant_quic_adapter::DualStackNetworkNode;
26#[allow(unused_imports)] // Temporarily unused during migration
27use crate::transport::{TransportOptions, TransportType};
28use crate::validation::RateLimitConfig;
29use crate::validation::RateLimiter;
30use crate::{NetworkAddress, PeerId};
31use serde::{Deserialize, Serialize};
32use serde_json::json;
33use std::collections::{HashMap, HashSet};
34use std::sync::Arc;
35use std::sync::atomic::{AtomicBool, Ordering};
36use std::time::Duration;
37use tokio::sync::{RwLock, broadcast};
38use tokio::time::Instant;
39use tracing::{debug, info, trace, warn};
40
41/// Configuration for a P2P node
42#[derive(Debug, Clone, Serialize, Deserialize)]
43pub struct NodeConfig {
44    /// Local peer ID for this node
45    pub peer_id: Option<PeerId>,
46
47    /// Addresses to listen on for incoming connections
48    pub listen_addrs: Vec<std::net::SocketAddr>,
49
50    /// Primary listen address (for compatibility)
51    pub listen_addr: std::net::SocketAddr,
52
53    /// Bootstrap peers to connect to on startup (legacy)
54    pub bootstrap_peers: Vec<std::net::SocketAddr>,
55
56    /// Bootstrap peers as strings (for integration tests)
57    pub bootstrap_peers_str: Vec<String>,
58
59    /// Enable IPv6 support
60    pub enable_ipv6: bool,
61
62    // MCP removed; will be redesigned later
63    /// Connection timeout duration
64    pub connection_timeout: Duration,
65
66    /// Keep-alive interval for connections
67    pub keep_alive_interval: Duration,
68
69    /// Maximum number of concurrent connections
70    pub max_connections: usize,
71
72    /// Maximum number of incoming connections
73    pub max_incoming_connections: usize,
74
75    /// DHT configuration
76    pub dht_config: DHTConfig,
77
78    /// Security configuration
79    pub security_config: SecurityConfig,
80
81    /// Production hardening configuration
82    pub production_config: Option<ProductionConfig>,
83
84    /// Bootstrap cache configuration
85    pub bootstrap_cache_config: Option<crate::bootstrap::CacheConfig>,
86}
87
88/// DHT-specific configuration
89#[derive(Debug, Clone, Serialize, Deserialize)]
90pub struct DHTConfig {
91    /// Kademlia K parameter (bucket size)
92    pub k_value: usize,
93
94    /// Kademlia alpha parameter (parallelism)
95    pub alpha_value: usize,
96
97    /// DHT record TTL
98    pub record_ttl: Duration,
99
100    /// DHT refresh interval
101    pub refresh_interval: Duration,
102}
103
104/// Security configuration
105#[derive(Debug, Clone, Serialize, Deserialize)]
106pub struct SecurityConfig {
107    /// Enable noise protocol for encryption
108    pub enable_noise: bool,
109
110    /// Enable TLS for secure transport
111    pub enable_tls: bool,
112
113    /// Trust level for peer verification
114    pub trust_level: TrustLevel,
115}
116
117/// Trust level for peer verification
118#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
119pub enum TrustLevel {
120    /// No verification required
121    None,
122    /// Basic peer ID verification
123    Basic,
124    /// Full cryptographic verification
125    Full,
126}
127
128impl NodeConfig {
129    /// Create a new NodeConfig with default values
130    ///
131    /// # Errors
132    ///
133    /// Returns an error if default addresses cannot be parsed
134    pub fn new() -> Result<Self> {
135        // Load config and use its defaults
136        let config = Config::default();
137
138        // Parse the default listen address
139        let listen_addr = config.listen_socket_addr()?;
140
141        // Create listen addresses based on config
142        let mut listen_addrs = vec![];
143
144        // Add IPv6 address if enabled
145        if config.network.ipv6_enabled {
146            let ipv6_addr = std::net::SocketAddr::new(
147                std::net::IpAddr::V6(std::net::Ipv6Addr::UNSPECIFIED),
148                listen_addr.port(),
149            );
150            listen_addrs.push(ipv6_addr);
151        }
152
153        // Always add IPv4
154        let ipv4_addr = std::net::SocketAddr::new(
155            std::net::IpAddr::V4(std::net::Ipv4Addr::UNSPECIFIED),
156            listen_addr.port(),
157        );
158        listen_addrs.push(ipv4_addr);
159
160        Ok(Self {
161            peer_id: None,
162            listen_addrs,
163            listen_addr,
164            bootstrap_peers: Vec::new(),
165            bootstrap_peers_str: config.network.bootstrap_nodes.clone(),
166            enable_ipv6: config.network.ipv6_enabled,
167
168            connection_timeout: Duration::from_secs(config.network.connection_timeout),
169            keep_alive_interval: Duration::from_secs(config.network.keepalive_interval),
170            max_connections: config.network.max_connections,
171            max_incoming_connections: config.security.connection_limit as usize,
172            dht_config: DHTConfig::default(),
173            security_config: SecurityConfig::default(),
174            production_config: None,
175            bootstrap_cache_config: None,
176            // identity_config: None,
177        })
178    }
179}
180
181impl Default for NodeConfig {
182    fn default() -> Self {
183        // Use config defaults for network settings
184        let config = Config::default();
185
186        // Parse the default listen address - use safe fallback if parsing fails
187        let listen_addr = config.listen_socket_addr().unwrap_or_else(|_| {
188            std::net::SocketAddr::new(
189                std::net::IpAddr::V4(std::net::Ipv4Addr::new(0, 0, 0, 0)),
190                9000,
191            )
192        });
193
194        Self {
195            peer_id: None,
196            listen_addrs: vec![
197                std::net::SocketAddr::new(
198                    std::net::IpAddr::V6(std::net::Ipv6Addr::UNSPECIFIED),
199                    listen_addr.port(),
200                ),
201                std::net::SocketAddr::new(
202                    std::net::IpAddr::V4(std::net::Ipv4Addr::UNSPECIFIED),
203                    listen_addr.port(),
204                ),
205            ],
206            listen_addr,
207            bootstrap_peers: Vec::new(),
208            bootstrap_peers_str: Vec::new(),
209            enable_ipv6: config.network.ipv6_enabled,
210
211            connection_timeout: Duration::from_secs(config.network.connection_timeout),
212            keep_alive_interval: Duration::from_secs(config.network.keepalive_interval),
213            max_connections: config.network.max_connections,
214            max_incoming_connections: config.security.connection_limit as usize,
215            dht_config: DHTConfig::default(),
216            security_config: SecurityConfig::default(),
217            production_config: None, // Use default production config if enabled
218            bootstrap_cache_config: None,
219            // identity_config: None, // Use default identity config if enabled
220        }
221    }
222}
223
224impl NodeConfig {
225    /// Create NodeConfig from Config
226    pub fn from_config(config: &Config) -> Result<Self> {
227        let listen_addr = config.listen_socket_addr()?;
228        let bootstrap_addrs = config.bootstrap_addrs()?;
229
230        let mut node_config = Self {
231            peer_id: None,
232            listen_addrs: vec![listen_addr],
233            listen_addr,
234            bootstrap_peers: bootstrap_addrs
235                .iter()
236                .map(|addr| addr.socket_addr())
237                .collect(),
238            bootstrap_peers_str: config
239                .network
240                .bootstrap_nodes
241                .iter()
242                .map(|addr| addr.to_string())
243                .collect(),
244            enable_ipv6: config.network.ipv6_enabled,
245
246            connection_timeout: Duration::from_secs(config.network.connection_timeout),
247            keep_alive_interval: Duration::from_secs(config.network.keepalive_interval),
248            max_connections: config.network.max_connections,
249            max_incoming_connections: config.security.connection_limit as usize,
250            dht_config: DHTConfig {
251                k_value: 20,
252                alpha_value: 3,
253                record_ttl: Duration::from_secs(3600),
254                refresh_interval: Duration::from_secs(900),
255            },
256            security_config: SecurityConfig {
257                enable_noise: true,
258                enable_tls: true,
259                trust_level: TrustLevel::Basic,
260            },
261            production_config: Some(ProductionConfig {
262                max_connections: config.network.max_connections,
263                max_memory_bytes: 0,  // unlimited
264                max_bandwidth_bps: 0, // unlimited
265                connection_timeout: Duration::from_secs(config.network.connection_timeout),
266                keep_alive_interval: Duration::from_secs(config.network.keepalive_interval),
267                health_check_interval: Duration::from_secs(30),
268                metrics_interval: Duration::from_secs(60),
269                enable_performance_tracking: true,
270                enable_auto_cleanup: true,
271                shutdown_timeout: Duration::from_secs(30),
272                rate_limits: crate::production::RateLimitConfig::default(),
273            }),
274            bootstrap_cache_config: None,
275            // identity_config: Some(IdentityManagerConfig {
276            //     cache_ttl: Duration::from_secs(3600),
277            //     challenge_timeout: Duration::from_secs(30),
278            // }),
279        };
280
281        // Add IPv6 listen address if enabled
282        if config.network.ipv6_enabled {
283            node_config.listen_addrs.push(std::net::SocketAddr::new(
284                std::net::IpAddr::V6(std::net::Ipv6Addr::UNSPECIFIED),
285                listen_addr.port(),
286            ));
287        }
288
289        Ok(node_config)
290    }
291
292    /// Try to build a NodeConfig from a listen address string
293    pub fn with_listen_addr(addr: &str) -> Result<Self> {
294        let listen_addr: std::net::SocketAddr = addr
295            .parse()
296            .map_err(|e: std::net::AddrParseError| {
297                NetworkError::InvalidAddress(e.to_string().into())
298            })
299            .map_err(P2PError::Network)?;
300        let cfg = NodeConfig {
301            listen_addr,
302            listen_addrs: vec![listen_addr],
303            ..Default::default()
304        };
305        Ok(cfg)
306    }
307}
308
309impl Default for DHTConfig {
310    fn default() -> Self {
311        Self {
312            k_value: 20,
313            alpha_value: 5,
314            record_ttl: Duration::from_secs(3600), // 1 hour
315            refresh_interval: Duration::from_secs(600), // 10 minutes
316        }
317    }
318}
319
320impl Default for SecurityConfig {
321    fn default() -> Self {
322        Self {
323            enable_noise: true,
324            enable_tls: true,
325            trust_level: TrustLevel::Basic,
326        }
327    }
328}
329
330/// Information about a connected peer
331#[derive(Debug, Clone)]
332pub struct PeerInfo {
333    /// Peer identifier
334    pub peer_id: PeerId,
335
336    /// Peer's addresses
337    pub addresses: Vec<String>,
338
339    /// Connection timestamp
340    pub connected_at: Instant,
341
342    /// Last seen timestamp
343    pub last_seen: Instant,
344
345    /// Connection status
346    pub status: ConnectionStatus,
347
348    /// Supported protocols
349    pub protocols: Vec<String>,
350
351    /// Number of heartbeats received
352    pub heartbeat_count: u64,
353}
354
355/// Connection status for a peer
356#[derive(Debug, Clone, PartialEq)]
357pub enum ConnectionStatus {
358    /// Connection is being established
359    Connecting,
360    /// Connection is established and active
361    Connected,
362    /// Connection is being closed
363    Disconnecting,
364    /// Connection is closed
365    Disconnected,
366    /// Connection failed
367    Failed(String),
368}
369
370/// Network events that can occur
371#[derive(Debug, Clone)]
372pub enum NetworkEvent {
373    /// A new peer has connected
374    PeerConnected {
375        /// The identifier of the newly connected peer
376        peer_id: PeerId,
377        /// The network addresses where the peer can be reached
378        addresses: Vec<String>,
379    },
380
381    /// A peer has disconnected
382    PeerDisconnected {
383        /// The identifier of the disconnected peer
384        peer_id: PeerId,
385        /// The reason for the disconnection
386        reason: String,
387    },
388
389    /// A message was received from a peer
390    MessageReceived {
391        /// The identifier of the sending peer
392        peer_id: PeerId,
393        /// The protocol used for the message
394        protocol: String,
395        /// The raw message data
396        data: Vec<u8>,
397    },
398
399    /// A connection attempt failed
400    ConnectionFailed {
401        /// The identifier of the peer (if known)
402        peer_id: Option<PeerId>,
403        /// The address where connection was attempted
404        address: String,
405        /// The error message describing the failure
406        error: String,
407    },
408
409    /// DHT record was stored
410    DHTRecordStored {
411        /// The DHT key where the record was stored
412        key: Vec<u8>,
413        /// The value that was stored
414        value: Vec<u8>,
415    },
416
417    /// DHT record was retrieved
418    DHTRecordRetrieved {
419        /// The DHT key that was queried
420        key: Vec<u8>,
421        /// The retrieved value, if found
422        value: Option<Vec<u8>>,
423    },
424}
425
426/// Network events that can occur in the P2P system
427///
428/// Events are broadcast to all listeners and provide real-time
429/// notifications of network state changes and message arrivals.
430#[derive(Debug, Clone)]
431pub enum P2PEvent {
432    /// Message received from a peer on a specific topic
433    Message {
434        /// Topic or channel the message was sent on
435        topic: String,
436        /// Peer ID of the message sender
437        source: PeerId,
438        /// Raw message data payload
439        data: Vec<u8>,
440    },
441    /// A new peer has connected to the network
442    PeerConnected(PeerId),
443    /// A peer has disconnected from the network
444    PeerDisconnected(PeerId),
445}
446
447/// Main P2P node structure
448/// Main P2P network node that manages connections, routing, and communication
449///
450/// This struct represents a complete P2P network participant that can:
451/// - Connect to other peers via QUIC transport
452/// - Participate in distributed hash table (DHT) operations
453/// - Send and receive messages through various protocols
454/// - Handle network events and peer lifecycle
455/// - Provide MCP (Model Context Protocol) services
456pub struct P2PNode {
457    /// Node configuration
458    config: NodeConfig,
459
460    /// Our peer ID
461    peer_id: PeerId,
462
463    /// Connected peers
464    peers: Arc<RwLock<HashMap<PeerId, PeerInfo>>>,
465
466    /// Network event broadcaster
467    event_tx: broadcast::Sender<P2PEvent>,
468
469    /// Listen addresses
470    listen_addrs: RwLock<Vec<std::net::SocketAddr>>,
471
472    /// Node start time
473    start_time: Instant,
474
475    /// Running state
476    running: RwLock<bool>,
477
478    /// DHT instance (optional)
479    dht: Option<Arc<RwLock<DHT>>>,
480
481    /// Production resource manager (optional)
482    resource_manager: Option<Arc<ResourceManager>>,
483
484    /// Bootstrap cache manager for peer discovery
485    bootstrap_manager: Option<Arc<RwLock<BootstrapManager>>>,
486
487    /// Dual-stack ant-quic nodes (IPv6 + IPv4) with Happy Eyeballs dialing
488    dual_node: Arc<DualStackNetworkNode>,
489
490    /// Rate limiter for connection and request throttling
491    #[allow(dead_code)]
492    rate_limiter: Arc<RateLimiter>,
493
494    /// Active connections (tracked by peer_id)
495    /// This set is synchronized with ant-quic's connection state via event monitoring
496    active_connections: Arc<RwLock<HashSet<PeerId>>>,
497
498    /// Connection lifecycle monitor task handle
499    #[allow(dead_code)]
500    connection_monitor_handle: Arc<RwLock<Option<tokio::task::JoinHandle<()>>>>,
501
502    /// Keepalive task handle
503    #[allow(dead_code)]
504    keepalive_handle: Arc<RwLock<Option<tokio::task::JoinHandle<()>>>>,
505
506    /// Shutdown flag for background tasks
507    #[allow(dead_code)]
508    shutdown: Arc<AtomicBool>,
509}
510
511/// Normalize wildcard bind addresses to localhost loopback addresses
512///
513/// ant-quic correctly rejects "unspecified" addresses (0.0.0.0 and [::]) for remote connections
514/// because you cannot connect TO an unspecified address - these are only valid for BINDING.
515///
516/// This function converts wildcard addresses to appropriate loopback addresses for local connections:
517/// - IPv6 [::]:port → ::1:port (IPv6 loopback)
518/// - IPv4 0.0.0.0:port → 127.0.0.1:port (IPv4 loopback)
519/// - All other addresses pass through unchanged
520///
521/// # Arguments
522/// * `addr` - The SocketAddr to normalize
523///
524/// # Returns
525/// * Normalized SocketAddr suitable for remote connections
526fn normalize_wildcard_to_loopback(addr: std::net::SocketAddr) -> std::net::SocketAddr {
527    use std::net::{IpAddr, Ipv4Addr, Ipv6Addr};
528
529    if addr.ip().is_unspecified() {
530        // Convert unspecified addresses to loopback
531        let loopback_ip = match addr {
532            std::net::SocketAddr::V6(_) => IpAddr::V6(Ipv6Addr::LOCALHOST), // ::1
533            std::net::SocketAddr::V4(_) => IpAddr::V4(Ipv4Addr::LOCALHOST), // 127.0.0.1
534        };
535        std::net::SocketAddr::new(loopback_ip, addr.port())
536    } else {
537        // Not a wildcard address, pass through unchanged
538        addr
539    }
540}
541
542impl P2PNode {
543    /// Minimal constructor for tests that avoids real networking
544    pub fn new_for_tests() -> Result<Self> {
545        let (event_tx, _) = broadcast::channel(16);
546        Ok(Self {
547            config: NodeConfig::default(),
548            peer_id: "test_peer".to_string(),
549            peers: Arc::new(RwLock::new(HashMap::new())),
550            event_tx,
551            listen_addrs: RwLock::new(Vec::new()),
552            start_time: Instant::now(),
553            running: RwLock::new(false),
554            dht: None,
555            resource_manager: None,
556            bootstrap_manager: None,
557            dual_node: {
558                // Bind dual-stack nodes on ephemeral ports for tests
559                let v6: Option<std::net::SocketAddr> = "[::1]:0"
560                    .parse()
561                    .ok()
562                    .or(Some(std::net::SocketAddr::from(([0, 0, 0, 0], 0))));
563                let v4: Option<std::net::SocketAddr> = "127.0.0.1:0".parse().ok();
564                let handle = tokio::runtime::Handle::current();
565                let dual_attempt = handle.block_on(
566                    crate::transport::ant_quic_adapter::DualStackNetworkNode::new(v6, v4),
567                );
568                let dual = match dual_attempt {
569                    Ok(d) => d,
570                    Err(_e1) => {
571                        // Fallback to IPv4-only ephemeral bind
572                        let fallback = handle.block_on(
573                            crate::transport::ant_quic_adapter::DualStackNetworkNode::new(
574                                None,
575                                "127.0.0.1:0".parse().ok(),
576                            ),
577                        );
578                        match fallback {
579                            Ok(d) => d,
580                            Err(e2) => {
581                                return Err(P2PError::Network(NetworkError::BindError(
582                                    format!("Failed to create dual-stack network node: {}", e2)
583                                        .into(),
584                                )));
585                            }
586                        }
587                    }
588                };
589                Arc::new(dual)
590            },
591            rate_limiter: Arc::new(RateLimiter::new(RateLimitConfig {
592                max_requests: 100,
593                burst_size: 100,
594                window: std::time::Duration::from_secs(1),
595                ..Default::default()
596            })),
597            active_connections: Arc::new(RwLock::new(HashSet::new())),
598            connection_monitor_handle: Arc::new(RwLock::new(None)),
599            keepalive_handle: Arc::new(RwLock::new(None)),
600            shutdown: Arc::new(AtomicBool::new(false)),
601        })
602    }
603    /// Create a new P2P node with the given configuration
604    pub async fn new(config: NodeConfig) -> Result<Self> {
605        let peer_id = config.peer_id.clone().unwrap_or_else(|| {
606            // Generate a random peer ID for now
607            format!("peer_{}", &uuid::Uuid::new_v4().to_string()[..8])
608        });
609
610        let (event_tx, _) = broadcast::channel(1000);
611
612        // Initialize and register a TrustWeightedKademlia DHT for the global API
613        // Use a deterministic local NodeId derived from the peer_id
614        {
615            use blake3::Hasher;
616            let mut hasher = Hasher::new();
617            hasher.update(peer_id.as_bytes());
618            let digest = hasher.finalize();
619            let mut nid = [0u8; 32];
620            nid.copy_from_slice(digest.as_bytes());
621            let _twdht = std::sync::Arc::new(crate::dht::TrustWeightedKademlia::new(
622                crate::identity::node_identity::NodeId::from_bytes(nid),
623            ));
624            // TODO: Update to use new clean API
625            // let _ = crate::api::set_dht_instance(twdht);
626        }
627
628        // Initialize DHT if needed
629        let dht = if true {
630            // Always enable DHT for now
631            let _dht_config = crate::dht::DHTConfig {
632                replication_factor: config.dht_config.k_value,
633                bucket_size: config.dht_config.k_value,
634                alpha: config.dht_config.alpha_value,
635                record_ttl: config.dht_config.record_ttl,
636                bucket_refresh_interval: config.dht_config.refresh_interval,
637                republish_interval: config.dht_config.refresh_interval,
638                max_distance: 160, // 160 bits for SHA-256
639            };
640            // Convert peer_id String to NodeId
641            let peer_bytes = peer_id.as_bytes();
642            let mut node_id_bytes = [0u8; 32];
643            let len = peer_bytes.len().min(32);
644            node_id_bytes[..len].copy_from_slice(&peer_bytes[..len]);
645            let node_id = crate::dht::core_engine::NodeId::from_bytes(node_id_bytes);
646            let dht_instance = DHT::new(node_id).map_err(|e| {
647                crate::error::P2PError::Dht(crate::error::DhtError::StoreFailed(
648                    e.to_string().into(),
649                ))
650            })?;
651            Some(Arc::new(RwLock::new(dht_instance)))
652        } else {
653            None
654        };
655
656        // MCP removed
657
658        // Initialize production resource manager if configured
659        let resource_manager = config
660            .production_config
661            .clone()
662            .map(|prod_config| Arc::new(ResourceManager::new(prod_config)));
663
664        // Initialize bootstrap cache manager
665        let bootstrap_manager = if let Some(ref cache_config) = config.bootstrap_cache_config {
666            match BootstrapManager::with_config(cache_config.clone()).await {
667                Ok(manager) => Some(Arc::new(RwLock::new(manager))),
668                Err(e) => {
669                    warn!(
670                        "Failed to initialize bootstrap manager: {}, continuing without cache",
671                        e
672                    );
673                    None
674                }
675            }
676        } else {
677            match BootstrapManager::new().await {
678                Ok(manager) => Some(Arc::new(RwLock::new(manager))),
679                Err(e) => {
680                    warn!(
681                        "Failed to initialize bootstrap manager: {}, continuing without cache",
682                        e
683                    );
684                    None
685                }
686            }
687        };
688
689        // Initialize dual-stack ant-quic nodes
690        let (v6_opt, v4_opt) = if !config.listen_addrs.is_empty() {
691            let v6_addr = config.listen_addrs.iter().find(|a| a.is_ipv6()).cloned();
692            let v4_addr = config.listen_addrs.iter().find(|a| a.is_ipv4()).cloned();
693            (v6_addr, v4_addr)
694        } else {
695            // Defaults: always listen on IPv4; IPv6 if enabled
696            let v4_addr = Some(std::net::SocketAddr::new(
697                std::net::IpAddr::V4(std::net::Ipv4Addr::UNSPECIFIED),
698                config.listen_addr.port(),
699            ));
700            let v6_addr = if config.enable_ipv6 {
701                Some(std::net::SocketAddr::new(
702                    std::net::IpAddr::V6(std::net::Ipv6Addr::UNSPECIFIED),
703                    config.listen_addr.port(),
704                ))
705            } else {
706                None
707            };
708            (v6_addr, v4_addr)
709        };
710
711        let dual_node = Arc::new(
712            DualStackNetworkNode::new(v6_opt, v4_opt)
713                .await
714                .map_err(|e| {
715                    P2PError::Transport(crate::error::TransportError::SetupFailed(
716                        format!("Failed to create dual-stack network nodes: {}", e).into(),
717                    ))
718                })?,
719        );
720
721        // Initialize rate limiter with default config
722        let rate_limiter = Arc::new(RateLimiter::new(
723            crate::validation::RateLimitConfig::default(),
724        ));
725
726        // Create active connections tracker
727        let active_connections = Arc::new(RwLock::new(HashSet::new()));
728
729        // Start connection lifecycle monitor
730        let connection_monitor_handle = {
731            let active_conns = Arc::clone(&active_connections);
732            let peers_map = Arc::new(RwLock::new(HashMap::new())); // Will be replaced with actual peers after node creation
733            let event_tx_clone = event_tx.clone();
734            let dual_node_clone = Arc::clone(&dual_node);
735
736            let handle = tokio::spawn(async move {
737                Self::connection_lifecycle_monitor(
738                    dual_node_clone,
739                    active_conns,
740                    peers_map,
741                    event_tx_clone,
742                )
743                .await;
744            });
745
746            Arc::new(RwLock::new(Some(handle)))
747        };
748
749        // Spawn keepalive task
750        let shutdown = Arc::new(AtomicBool::new(false));
751        let keepalive_handle = {
752            let active_conns = Arc::clone(&active_connections);
753            let dual_node_clone = Arc::clone(&dual_node);
754            let shutdown_clone = Arc::clone(&shutdown);
755
756            let handle = tokio::spawn(async move {
757                Self::keepalive_task(active_conns, dual_node_clone, shutdown_clone).await;
758            });
759
760            Arc::new(RwLock::new(Some(handle)))
761        };
762
763        let node = Self {
764            config,
765            peer_id,
766            peers: Arc::new(RwLock::new(HashMap::new())),
767            event_tx,
768            listen_addrs: RwLock::new(Vec::new()),
769            start_time: Instant::now(),
770            running: RwLock::new(false),
771            dht,
772            resource_manager,
773            bootstrap_manager,
774            dual_node,
775            rate_limiter,
776            active_connections,
777            connection_monitor_handle,
778            keepalive_handle,
779            shutdown,
780        };
781        info!("Created P2P node with peer ID: {}", node.peer_id);
782
783        // Start the network listeners to populate listen addresses
784        node.start_network_listeners().await?;
785
786        // Update the connection monitor with actual peers reference
787        node.start_connection_monitor().await;
788
789        Ok(node)
790    }
791
792    /// Create a new node builder
793    pub fn builder() -> NodeBuilder {
794        NodeBuilder::new()
795    }
796
797    /// Get the peer ID of this node
798    pub fn peer_id(&self) -> &PeerId {
799        &self.peer_id
800    }
801
802    pub fn local_addr(&self) -> Option<String> {
803        self.listen_addrs
804            .try_read()
805            .ok()
806            .and_then(|addrs| addrs.first().map(|a| a.to_string()))
807    }
808
809    pub async fn subscribe(&self, topic: &str) -> Result<()> {
810        // In a real implementation, this would register the topic with the pubsub mechanism.
811        // For now, we just log it.
812        info!("Subscribed to topic: {}", topic);
813        Ok(())
814    }
815
816    pub async fn publish(&self, topic: &str, data: &[u8]) -> Result<()> {
817        info!(
818            "Publishing message to topic: {} ({} bytes)",
819            topic,
820            data.len()
821        );
822
823        // Get list of connected peers
824        let peer_list: Vec<PeerId> = {
825            let peers_guard = self.peers.read().await;
826            peers_guard.keys().cloned().collect()
827        };
828
829        if peer_list.is_empty() {
830            debug!("No peers connected, message will only be sent to local subscribers");
831        } else {
832            // Send message to all connected peers
833            let mut send_count = 0;
834            for peer_id in &peer_list {
835                match self.send_message(peer_id, topic, data.to_vec()).await {
836                    Ok(_) => {
837                        send_count += 1;
838                        debug!("Sent message to peer: {}", peer_id);
839                    }
840                    Err(e) => {
841                        warn!("Failed to send message to peer {}: {}", peer_id, e);
842                    }
843                }
844            }
845            info!(
846                "Published message to {}/{} connected peers",
847                send_count,
848                peer_list.len()
849            );
850        }
851
852        // Also send to local subscribers (for local echo and testing)
853        let event = P2PEvent::Message {
854            topic: topic.to_string(),
855            source: self.peer_id.clone(),
856            data: data.to_vec(),
857        };
858        let _ = self.event_tx.send(event);
859
860        Ok(())
861    }
862
863    /// Get the node configuration
864    pub fn config(&self) -> &NodeConfig {
865        &self.config
866    }
867
868    /// Start the P2P node
869    pub async fn start(&self) -> Result<()> {
870        info!("Starting P2P node...");
871
872        // Start production resource manager if configured
873        if let Some(ref resource_manager) = self.resource_manager {
874            resource_manager.start().await.map_err(|e| {
875                P2PError::Network(crate::error::NetworkError::ProtocolError(
876                    format!("Failed to start resource manager: {e}").into(),
877                ))
878            })?;
879            info!("Production resource manager started");
880        }
881
882        // Start bootstrap manager background tasks
883        if let Some(ref bootstrap_manager) = self.bootstrap_manager {
884            let mut manager = bootstrap_manager.write().await;
885            manager.start_background_tasks().await.map_err(|e| {
886                P2PError::Network(crate::error::NetworkError::ProtocolError(
887                    format!("Failed to start bootstrap manager: {e}").into(),
888                ))
889            })?;
890            info!("Bootstrap cache manager started");
891        }
892
893        // Set running state
894        *self.running.write().await = true;
895
896        // Start listening on configured addresses using transport layer
897        self.start_network_listeners().await?;
898
899        // Log current listen addresses
900        let listen_addrs = self.listen_addrs.read().await;
901        info!("P2P node started on addresses: {:?}", *listen_addrs);
902
903        // MCP removed
904
905        // Start message receiving system
906        self.start_message_receiving_system().await?;
907
908        // Connect to bootstrap peers
909        self.connect_bootstrap_peers().await?;
910
911        Ok(())
912    }
913
914    /// Start network listeners on configured addresses
915    async fn start_network_listeners(&self) -> Result<()> {
916        info!("Starting dual-stack listeners (ant-quic)...");
917        // Update our listen_addrs from the dual node bindings
918        let addrs = self.dual_node.local_addrs().await.map_err(|e| {
919            P2PError::Transport(crate::error::TransportError::SetupFailed(
920                format!("Failed to get local addresses: {}", e).into(),
921            ))
922        })?;
923        {
924            let mut la = self.listen_addrs.write().await;
925            *la = addrs.clone();
926        }
927
928        // Spawn a background accept loop that handles incoming connections from either stack
929        let event_tx = self.event_tx.clone();
930        let peers = self.peers.clone();
931        let rate_limiter = self.rate_limiter.clone();
932        let dual = self.dual_node.clone();
933        tokio::spawn(async move {
934            loop {
935                match dual.accept_any().await {
936                    Ok((ant_peer_id, remote_sock)) => {
937                        let peer_id =
938                            crate::transport::ant_quic_adapter::ant_peer_id_to_string(&ant_peer_id);
939                        let remote_addr = NetworkAddress::from(remote_sock);
940                        // Optional: basic IP rate limiting
941                        let _ = rate_limiter.check_ip(&remote_sock.ip());
942                        let _ = event_tx.send(P2PEvent::PeerConnected(peer_id.clone()));
943                        register_new_peer(&peers, &peer_id, &remote_addr).await;
944                    }
945                    Err(e) => {
946                        warn!("Accept failed: {}", e);
947                        tokio::time::sleep(std::time::Duration::from_millis(200)).await;
948                    }
949                }
950            }
951        });
952
953        info!("Dual-stack listeners active on: {:?}", addrs);
954        Ok(())
955    }
956
957    /// Start a listener on a specific socket address
958    #[allow(dead_code)]
959    async fn start_listener_on_address(&self, addr: std::net::SocketAddr) -> Result<()> {
960        // use crate::transport::{Transport}; // Unused during migration
961
962        // DISABLED during ant-quic migration - TODO: Reimplement using AntQuicAdapter
963        /*
964        // Try QUIC first (preferred transport)
965        match crate::transport::QuicTransport::new(Default::default()) {
966            Ok(quic_transport) => {
967                match quic_transport.listen(NetworkAddress::new(addr)).await {
968                    Ok(listen_addrs) => {
969                        info!("QUIC listener started on {} -> {:?}", addr, listen_addrs);
970
971                        // Store the actual listening addresses in the node
972                        {
973                            let mut node_listen_addrs = self.listen_addrs.write().await;
974                            // Don't clear - accumulate addresses from multiple listeners
975                            node_listen_addrs.push(listen_addrs.socket_addr());
976                        }
977
978                        // Start accepting connections in background
979                        self.start_connection_acceptor(
980                            Arc::new(quic_transport),
981                            addr,
982                            crate::transport::TransportType::QUIC
983                        ).await?;
984
985                        return Ok(());
986                    }
987                    Err(e) => {
988                        warn!("Failed to start QUIC listener on {}: {}", addr, e);
989                    }
990                }
991            }
992            Err(e) => {
993                warn!("Failed to create QUIC transport for listening: {}", e);
994            }
995        }
996        */
997
998        warn!("QUIC transport temporarily disabled during ant-quic migration");
999        // No TCP fallback - QUIC only
1000        Err(crate::P2PError::Transport(
1001            crate::error::TransportError::SetupFailed(
1002                format!(
1003                    "Failed to start QUIC listener on {addr} - transport disabled during migration"
1004                )
1005                .into(),
1006            ),
1007        ))
1008    }
1009
1010    /// Start connection acceptor background task
1011    #[allow(dead_code)] // Deprecated during ant-quic migration
1012    async fn start_connection_acceptor(
1013        &self,
1014        transport: Arc<dyn crate::transport::Transport>,
1015        addr: std::net::SocketAddr,
1016        transport_type: crate::transport::TransportType,
1017    ) -> Result<()> {
1018        info!(
1019            "Starting connection acceptor for {:?} on {}",
1020            transport_type, addr
1021        );
1022
1023        // Clone necessary data for the background task
1024        let event_tx = self.event_tx.clone();
1025        let _peer_id = self.peer_id.clone();
1026        let peers = Arc::clone(&self.peers);
1027        // ant-quic dual-stack node is managed separately; accept loop started in start_network_listeners
1028
1029        let rate_limiter = Arc::clone(&self.rate_limiter);
1030
1031        // Spawn background task to accept incoming connections
1032        tokio::spawn(async move {
1033            loop {
1034                match transport.accept().await {
1035                    Ok(connection) => {
1036                        let remote_addr = connection.remote_addr();
1037                        let connection_peer_id =
1038                            format!("peer_from_{}", remote_addr.to_string().replace(":", "_"));
1039
1040                        // Apply rate limiting for incoming connections
1041                        let socket_addr = remote_addr.socket_addr();
1042                        if check_rate_limit(&rate_limiter, &socket_addr, &remote_addr).is_err() {
1043                            // Connection dropped automatically when it goes out of scope
1044                            continue;
1045                        }
1046
1047                        info!(
1048                            "Accepted {:?} connection from {} (peer: {})",
1049                            transport_type, remote_addr, connection_peer_id
1050                        );
1051
1052                        // Generate peer connected event
1053                        let _ = event_tx.send(P2PEvent::PeerConnected(connection_peer_id.clone()));
1054
1055                        // Store the peer connection
1056                        register_new_peer(&peers, &connection_peer_id, &remote_addr).await;
1057
1058                        // Spawn task to handle this specific connection's messages
1059                        spawn_connection_handler(
1060                            connection,
1061                            connection_peer_id,
1062                            event_tx.clone(),
1063                            Arc::clone(&peers),
1064                        );
1065                    }
1066                    Err(e) => {
1067                        warn!(
1068                            "Failed to accept {:?} connection on {}: {}",
1069                            transport_type, addr, e
1070                        );
1071
1072                        // Brief pause before retrying to avoid busy loop
1073                        tokio::time::sleep(std::time::Duration::from_millis(1000)).await;
1074                    }
1075                }
1076            }
1077        });
1078
1079        info!(
1080            "Connection acceptor background task started for {:?} on {}",
1081            transport_type, addr
1082        );
1083        Ok(())
1084    }
1085
1086    /// Start the message receiving system with background tasks
1087    async fn start_message_receiving_system(&self) -> Result<()> {
1088        info!("Starting message receiving system");
1089        let dual = self.dual_node.clone();
1090        let event_tx = self.event_tx.clone();
1091
1092        tokio::spawn(async move {
1093            loop {
1094                match dual.receive_any().await {
1095                    Ok((_peer_id, bytes)) => {
1096                        // Expect the JSON message wrapper from create_protocol_message
1097                        #[allow(clippy::collapsible_if)]
1098                        if let Ok(value) = serde_json::from_slice::<serde_json::Value>(&bytes) {
1099                            if let (Some(protocol), Some(data), Some(from)) = (
1100                                value.get("protocol").and_then(|v| v.as_str()),
1101                                value.get("data").and_then(|v| v.as_array()),
1102                                value.get("from").and_then(|v| v.as_str()),
1103                            ) {
1104                                let payload: Vec<u8> = data
1105                                    .iter()
1106                                    .filter_map(|v| v.as_u64().map(|n| n as u8))
1107                                    .collect();
1108                                let _ = event_tx.send(P2PEvent::Message {
1109                                    topic: protocol.to_string(),
1110                                    source: from.to_string(),
1111                                    data: payload,
1112                                });
1113                            }
1114                        }
1115                    }
1116                    Err(e) => {
1117                        warn!("Receive error: {}", e);
1118                        tokio::time::sleep(std::time::Duration::from_millis(50)).await;
1119                    }
1120                }
1121            }
1122        });
1123
1124        Ok(())
1125    }
1126
1127    /// Handle a received message and generate appropriate events
1128    #[allow(dead_code)]
1129    async fn handle_received_message(
1130        &self,
1131        message_data: Vec<u8>,
1132        peer_id: &PeerId,
1133        _protocol: &str,
1134        event_tx: &broadcast::Sender<P2PEvent>,
1135    ) -> Result<()> {
1136        // MCP removed: no special protocol handling
1137
1138        // Parse the message format we created in create_protocol_message
1139        match serde_json::from_slice::<serde_json::Value>(&message_data) {
1140            Ok(message) => {
1141                if let (Some(protocol), Some(data), Some(from)) = (
1142                    message.get("protocol").and_then(|v| v.as_str()),
1143                    message.get("data").and_then(|v| v.as_array()),
1144                    message.get("from").and_then(|v| v.as_str()),
1145                ) {
1146                    // Convert data array back to bytes
1147                    let data_bytes: Vec<u8> = data
1148                        .iter()
1149                        .filter_map(|v| v.as_u64().map(|n| n as u8))
1150                        .collect();
1151
1152                    // Generate message event
1153                    let event = P2PEvent::Message {
1154                        topic: protocol.to_string(),
1155                        source: from.to_string(),
1156                        data: data_bytes,
1157                    };
1158
1159                    let _ = event_tx.send(event);
1160                    debug!("Generated message event from peer: {}", peer_id);
1161                }
1162            }
1163            Err(e) => {
1164                warn!("Failed to parse received message from {}: {}", peer_id, e);
1165            }
1166        }
1167
1168        Ok(())
1169    }
1170
1171    // MCP removed
1172
1173    // MCP removed
1174
1175    /// Run the P2P node (blocks until shutdown)
1176    pub async fn run(&self) -> Result<()> {
1177        if !*self.running.read().await {
1178            self.start().await?;
1179        }
1180
1181        info!("P2P node running...");
1182
1183        // Main event loop
1184        loop {
1185            if !*self.running.read().await {
1186                break;
1187            }
1188
1189            // Perform periodic tasks
1190            self.periodic_tasks().await?;
1191
1192            // Sleep for a short interval
1193            tokio::time::sleep(Duration::from_millis(100)).await;
1194        }
1195
1196        info!("P2P node stopped");
1197        Ok(())
1198    }
1199
1200    /// Stop the P2P node
1201    pub async fn stop(&self) -> Result<()> {
1202        info!("Stopping P2P node...");
1203
1204        // Set running state to false
1205        *self.running.write().await = false;
1206
1207        // Disconnect all peers
1208        self.disconnect_all_peers().await?;
1209
1210        // Shutdown production resource manager if configured
1211        if let Some(ref resource_manager) = self.resource_manager {
1212            resource_manager.shutdown().await.map_err(|e| {
1213                P2PError::Network(crate::error::NetworkError::ProtocolError(
1214                    format!("Failed to shutdown resource manager: {e}").into(),
1215                ))
1216            })?;
1217            info!("Production resource manager stopped");
1218        }
1219
1220        info!("P2P node stopped");
1221        Ok(())
1222    }
1223
1224    /// Graceful shutdown alias for tests
1225    pub async fn shutdown(&self) -> Result<()> {
1226        self.stop().await
1227    }
1228
1229    /// Check if the node is running
1230    pub async fn is_running(&self) -> bool {
1231        *self.running.read().await
1232    }
1233
1234    /// Get the current listen addresses
1235    pub async fn listen_addrs(&self) -> Vec<std::net::SocketAddr> {
1236        self.listen_addrs.read().await.clone()
1237    }
1238
1239    /// Get connected peers
1240    pub async fn connected_peers(&self) -> Vec<PeerId> {
1241        self.peers.read().await.keys().cloned().collect()
1242    }
1243
1244    /// Get peer count
1245    pub async fn peer_count(&self) -> usize {
1246        self.peers.read().await.len()
1247    }
1248
1249    /// Get peer info
1250    pub async fn peer_info(&self, peer_id: &PeerId) -> Option<PeerInfo> {
1251        self.peers.read().await.get(peer_id).cloned()
1252    }
1253
1254    /// Get the peer ID for a given socket address, if connected
1255    ///
1256    /// This method searches through all connected peers to find one that has
1257    /// the specified address in its address list.
1258    ///
1259    /// # Arguments
1260    /// * `addr` - The socket address to search for (e.g., "192.168.1.100:9000")
1261    ///
1262    /// # Returns
1263    /// * `Some(PeerId)` - The peer ID if a matching connected peer is found
1264    /// * `None` - If no peer with this address is currently connected
1265    pub async fn get_peer_id_by_address(&self, addr: &str) -> Option<PeerId> {
1266        // Parse the address to a SocketAddr for comparison
1267        let socket_addr: std::net::SocketAddr = addr.parse().ok()?;
1268
1269        let peers = self.peers.read().await;
1270
1271        // Search through all connected peers
1272        for (peer_id, peer_info) in peers.iter() {
1273            // Check if this peer has a matching address
1274            for peer_addr in &peer_info.addresses {
1275                if let Ok(peer_socket) = peer_addr.parse::<std::net::SocketAddr>()
1276                    && peer_socket == socket_addr
1277                {
1278                    return Some(peer_id.clone());
1279                }
1280            }
1281        }
1282
1283        None
1284    }
1285
1286    /// List all active connections with their peer IDs and addresses
1287    ///
1288    /// # Returns
1289    /// A vector of tuples containing (PeerId, Vec<String>) where the Vec<String>
1290    /// contains all known addresses for that peer.
1291    pub async fn list_active_connections(&self) -> Vec<(PeerId, Vec<String>)> {
1292        let peers = self.peers.read().await;
1293
1294        peers
1295            .iter()
1296            .map(|(peer_id, peer_info)| (peer_id.clone(), peer_info.addresses.clone()))
1297            .collect()
1298    }
1299
1300    /// Remove a peer from the peers map
1301    ///
1302    /// This method removes a peer from the internal peers map. It should be used
1303    /// when a connection is no longer valid (e.g., after detecting that the underlying
1304    /// ant-quic connection has closed).
1305    ///
1306    /// # Arguments
1307    /// * `peer_id` - The ID of the peer to remove
1308    ///
1309    /// # Returns
1310    /// `true` if the peer was found and removed, `false` if the peer was not in the map
1311    pub async fn remove_peer(&self, peer_id: &PeerId) -> bool {
1312        // Remove from active connections tracking
1313        self.active_connections.write().await.remove(peer_id);
1314        // Remove from peers map and return whether it existed
1315        self.peers.write().await.remove(peer_id).is_some()
1316    }
1317
1318    /// Check if a peer is connected
1319    ///
1320    /// This method checks if the peer ID exists in the peers map. Note that this
1321    /// only verifies the peer is registered - it does not guarantee the underlying
1322    /// ant-quic connection is still active. For connection validation, use `send_message`
1323    /// which will fail if the connection is closed.
1324    ///
1325    /// # Arguments
1326    /// * `peer_id` - The ID of the peer to check
1327    ///
1328    /// # Returns
1329    /// `true` if the peer exists in the peers map, `false` otherwise
1330    pub async fn is_peer_connected(&self, peer_id: &PeerId) -> bool {
1331        self.peers.read().await.contains_key(peer_id)
1332    }
1333
1334    /// Connect to a peer
1335    pub async fn connect_peer(&self, address: &str) -> Result<PeerId> {
1336        info!("Connecting to peer at: {}", address);
1337
1338        // Check production limits if resource manager is enabled
1339        let _connection_guard = if let Some(ref resource_manager) = self.resource_manager {
1340            Some(resource_manager.acquire_connection().await?)
1341        } else {
1342            None
1343        };
1344
1345        // Parse the address to SocketAddr format
1346        let socket_addr: std::net::SocketAddr = address.parse().map_err(|e| {
1347            P2PError::Network(crate::error::NetworkError::InvalidAddress(
1348                format!("{}: {}", address, e).into(),
1349            ))
1350        })?;
1351
1352        // Normalize wildcard addresses to loopback for local connections
1353        // This converts [::]:port → ::1:port and 0.0.0.0:port → 127.0.0.1:port
1354        let normalized_addr = normalize_wildcard_to_loopback(socket_addr);
1355        if normalized_addr != socket_addr {
1356            info!(
1357                "Normalized wildcard address {} to loopback {}",
1358                socket_addr, normalized_addr
1359            );
1360        }
1361
1362        // Establish a real connection via dual-stack Happy Eyeballs, but cap the wait
1363        let addr_list = vec![normalized_addr];
1364        let peer_id = match tokio::time::timeout(
1365            self.config.connection_timeout,
1366            self.dual_node.connect_happy_eyeballs(&addr_list),
1367        )
1368        .await
1369        {
1370            Ok(Ok(peer)) => {
1371                let connected_peer_id =
1372                    crate::transport::ant_quic_adapter::ant_peer_id_to_string(&peer);
1373                info!("Successfully connected to peer: {}", connected_peer_id);
1374                connected_peer_id
1375            }
1376            Ok(Err(e)) => {
1377                warn!("Failed to connect to peer at {}: {}", address, e);
1378                let sanitized_address = address.replace(['/', ':'], "_");
1379                let demo_peer_id = format!("peer_from_{}", sanitized_address);
1380                warn!(
1381                    "Using demo peer ID: {} (transport connection failed)",
1382                    demo_peer_id
1383                );
1384                demo_peer_id
1385            }
1386            Err(_) => {
1387                warn!(
1388                    "Timed out connecting to peer at {} after {:?}",
1389                    address, self.config.connection_timeout
1390                );
1391                let sanitized_address = address.replace(['/', ':'], "_");
1392                let demo_peer_id = format!("peer_from_{}", sanitized_address);
1393                demo_peer_id
1394            }
1395        };
1396
1397        // Create peer info with connection details
1398        let peer_info = PeerInfo {
1399            peer_id: peer_id.clone(),
1400            addresses: vec![address.to_string()],
1401            connected_at: Instant::now(),
1402            last_seen: Instant::now(),
1403            status: ConnectionStatus::Connected,
1404            protocols: vec!["p2p-foundation/1.0".to_string()],
1405            heartbeat_count: 0,
1406        };
1407
1408        // Store peer information
1409        self.peers.write().await.insert(peer_id.clone(), peer_info);
1410
1411        // Add to active connections tracking
1412        // This is critical for is_connection_active() to work correctly
1413        self.active_connections
1414            .write()
1415            .await
1416            .insert(peer_id.clone());
1417
1418        // Record bandwidth usage if resource manager is enabled
1419        if let Some(ref resource_manager) = self.resource_manager {
1420            resource_manager.record_bandwidth(0, 0); // Placeholder for handshake data
1421        }
1422
1423        // Emit connection event
1424        let _ = self.event_tx.send(P2PEvent::PeerConnected(peer_id.clone()));
1425
1426        info!("Connected to peer: {}", peer_id);
1427        Ok(peer_id)
1428    }
1429
1430    /// Disconnect from a peer
1431    pub async fn disconnect_peer(&self, peer_id: &PeerId) -> Result<()> {
1432        info!("Disconnecting from peer: {}", peer_id);
1433
1434        // Remove from active connections
1435        self.active_connections.write().await.remove(peer_id);
1436
1437        if let Some(mut peer_info) = self.peers.write().await.remove(peer_id) {
1438            peer_info.status = ConnectionStatus::Disconnected;
1439
1440            // Emit event
1441            let _ = self
1442                .event_tx
1443                .send(P2PEvent::PeerDisconnected(peer_id.clone()));
1444
1445            info!("Disconnected from peer: {}", peer_id);
1446        }
1447
1448        Ok(())
1449    }
1450
1451    /// Check if a connection to a peer is actually active (ant-quic level)
1452    pub async fn is_connection_active(&self, peer_id: &PeerId) -> bool {
1453        self.active_connections.read().await.contains(peer_id)
1454    }
1455
1456    /// Send a message to a peer
1457    pub async fn send_message(
1458        &self,
1459        peer_id: &PeerId,
1460        protocol: &str,
1461        data: Vec<u8>,
1462    ) -> Result<()> {
1463        debug!(
1464            "Sending message to peer {} on protocol {}",
1465            peer_id, protocol
1466        );
1467
1468        // Check rate limits if resource manager is enabled
1469        if let Some(ref resource_manager) = self.resource_manager
1470            && !resource_manager
1471                .check_rate_limit(peer_id, "message")
1472                .await?
1473        {
1474            return Err(P2PError::ResourceExhausted(
1475                format!("Rate limit exceeded for peer {}", peer_id).into(),
1476            ));
1477        }
1478
1479        // Check if peer exists in peers map
1480        if !self.peers.read().await.contains_key(peer_id) {
1481            return Err(P2PError::Network(crate::error::NetworkError::PeerNotFound(
1482                peer_id.to_string().into(),
1483            )));
1484        }
1485
1486        // **NEW**: Check if the ant-quic connection is actually active
1487        // This is the critical fix for the connection state synchronization issue
1488        if !self.is_connection_active(peer_id).await {
1489            debug!(
1490                "Connection to peer {} exists in peers map but ant-quic connection is closed",
1491                peer_id
1492            );
1493
1494            // Clean up stale peer entry
1495            self.remove_peer(peer_id).await;
1496
1497            return Err(P2PError::Network(
1498                crate::error::NetworkError::ConnectionClosed {
1499                    peer_id: peer_id.to_string().into(),
1500                },
1501            ));
1502        }
1503
1504        // MCP removed: no special-case protocol validation
1505
1506        // Record bandwidth usage if resource manager is enabled
1507        if let Some(ref resource_manager) = self.resource_manager {
1508            resource_manager.record_bandwidth(data.len() as u64, 0);
1509        }
1510
1511        // Create protocol message wrapper
1512        let _message_data = self.create_protocol_message(protocol, data)?;
1513
1514        // Send via ant-quic dual-node
1515        let send_fut = self.dual_node.send_to_peer_string(peer_id, &_message_data);
1516        tokio::time::timeout(self.config.connection_timeout, send_fut)
1517            .await
1518            .map_err(|_| {
1519                P2PError::Transport(crate::error::TransportError::StreamError(
1520                    "Timed out sending message".into(),
1521                ))
1522            })?
1523            .map_err(|e| {
1524                P2PError::Transport(crate::error::TransportError::StreamError(
1525                    e.to_string().into(),
1526                ))
1527            })
1528    }
1529
1530    /// Create a protocol message wrapper
1531    fn create_protocol_message(&self, protocol: &str, data: Vec<u8>) -> Result<Vec<u8>> {
1532        use serde_json::json;
1533
1534        let timestamp = std::time::SystemTime::now()
1535            .duration_since(std::time::UNIX_EPOCH)
1536            .map_err(|e| {
1537                P2PError::Network(NetworkError::ProtocolError(
1538                    format!("System time error: {}", e).into(),
1539                ))
1540            })?
1541            .as_secs();
1542
1543        // Create a simple message format for P2P communication
1544        let message = json!({
1545            "protocol": protocol,
1546            "data": data,
1547            "from": self.peer_id,
1548            "timestamp": timestamp
1549        });
1550
1551        serde_json::to_vec(&message).map_err(|e| {
1552            P2PError::Transport(crate::error::TransportError::StreamError(
1553                format!("Failed to serialize message: {e}").into(),
1554            ))
1555        })
1556    }
1557
1558    // Note: async listen_addrs() already exists above for fetching listen addresses
1559}
1560
1561/// Create a protocol message wrapper (static version for background tasks)
1562#[allow(dead_code)]
1563fn create_protocol_message_static(protocol: &str, data: Vec<u8>) -> Result<Vec<u8>> {
1564    use serde_json::json;
1565
1566    let timestamp = std::time::SystemTime::now()
1567        .duration_since(std::time::UNIX_EPOCH)
1568        .map_err(|e| {
1569            P2PError::Network(NetworkError::ProtocolError(
1570                format!("System time error: {}", e).into(),
1571            ))
1572        })?
1573        .as_secs();
1574
1575    // Create a simple message format for P2P communication
1576    let message = json!({
1577        "protocol": protocol,
1578        "data": data,
1579        "timestamp": timestamp
1580    });
1581
1582    serde_json::to_vec(&message).map_err(|e| {
1583        P2PError::Transport(crate::error::TransportError::StreamError(
1584            format!("Failed to serialize message: {e}").into(),
1585        ))
1586    })
1587}
1588
1589impl P2PNode {
1590    /// Subscribe to network events
1591    pub fn subscribe_events(&self) -> broadcast::Receiver<P2PEvent> {
1592        self.event_tx.subscribe()
1593    }
1594
1595    /// Backwards-compat event stream accessor for tests
1596    pub fn events(&self) -> broadcast::Receiver<P2PEvent> {
1597        self.subscribe_events()
1598    }
1599
1600    /// Get node uptime
1601    pub fn uptime(&self) -> Duration {
1602        self.start_time.elapsed()
1603    }
1604
1605    // MCP removed: all MCP tool/service methods removed
1606
1607    // /// Handle MCP remote tool call with network integration
1608
1609    // /// List tools available on a specific remote peer
1610
1611    // /// Get MCP server statistics
1612
1613    /// Get production resource metrics
1614    pub async fn resource_metrics(&self) -> Result<ResourceMetrics> {
1615        if let Some(ref resource_manager) = self.resource_manager {
1616            Ok(resource_manager.get_metrics().await)
1617        } else {
1618            Err(P2PError::Network(
1619                crate::error::NetworkError::ProtocolError(
1620                    "Production resource manager not enabled".to_string().into(),
1621                ),
1622            ))
1623        }
1624    }
1625
1626    /// Connection lifecycle monitor task - processes ant-quic connection events
1627    /// and updates active_connections HashSet and peers map
1628    async fn connection_lifecycle_monitor(
1629        dual_node: Arc<DualStackNetworkNode>,
1630        active_connections: Arc<RwLock<HashSet<String>>>,
1631        peers: Arc<RwLock<HashMap<String, PeerInfo>>>,
1632        event_tx: broadcast::Sender<P2PEvent>,
1633    ) {
1634        use crate::transport::ant_quic_adapter::ConnectionEvent;
1635
1636        let mut event_rx = dual_node.subscribe_connection_events();
1637
1638        info!("Connection lifecycle monitor started");
1639
1640        loop {
1641            match event_rx.recv().await {
1642                Ok(event) => {
1643                    match event {
1644                        ConnectionEvent::Established {
1645                            peer_id,
1646                            remote_address,
1647                        } => {
1648                            let peer_id_str =
1649                                crate::transport::ant_quic_adapter::ant_peer_id_to_string(&peer_id);
1650                            debug!(
1651                                "Connection established: peer={}, addr={}",
1652                                peer_id_str, remote_address
1653                            );
1654
1655                            // Add to active connections
1656                            active_connections.write().await.insert(peer_id_str.clone());
1657
1658                            // Update peer info if exists
1659                            if let Some(peer_info) = peers.write().await.get_mut(&peer_id_str) {
1660                                peer_info.status = ConnectionStatus::Connected;
1661                                peer_info.connected_at = Instant::now();
1662                            }
1663
1664                            // Broadcast connection event
1665                            let _ = event_tx.send(P2PEvent::PeerConnected(peer_id_str));
1666                        }
1667                        ConnectionEvent::Lost { peer_id, reason } => {
1668                            let peer_id_str =
1669                                crate::transport::ant_quic_adapter::ant_peer_id_to_string(&peer_id);
1670                            debug!("Connection lost: peer={}, reason={}", peer_id_str, reason);
1671
1672                            // Remove from active connections
1673                            active_connections.write().await.remove(&peer_id_str);
1674
1675                            // Update peer info status
1676                            if let Some(peer_info) = peers.write().await.get_mut(&peer_id_str) {
1677                                peer_info.status = ConnectionStatus::Disconnected;
1678                                peer_info.last_seen = Instant::now();
1679                            }
1680
1681                            // Broadcast disconnection event
1682                            let _ = event_tx.send(P2PEvent::PeerDisconnected(peer_id_str));
1683                        }
1684                        ConnectionEvent::Failed { peer_id, reason } => {
1685                            let peer_id_str =
1686                                crate::transport::ant_quic_adapter::ant_peer_id_to_string(&peer_id);
1687                            warn!("Connection failed: peer={}, reason={}", peer_id_str, reason);
1688
1689                            // Remove from active connections
1690                            active_connections.write().await.remove(&peer_id_str);
1691
1692                            // Update peer info status
1693                            if let Some(peer_info) = peers.write().await.get_mut(&peer_id_str) {
1694                                peer_info.status = ConnectionStatus::Failed(reason.clone());
1695                            }
1696
1697                            // Broadcast disconnection event
1698                            let _ = event_tx.send(P2PEvent::PeerDisconnected(peer_id_str));
1699                        }
1700                    }
1701                }
1702                Err(broadcast::error::RecvError::Lagged(skipped)) => {
1703                    warn!(
1704                        "Connection event monitor lagged, skipped {} events",
1705                        skipped
1706                    );
1707                    continue;
1708                }
1709                Err(broadcast::error::RecvError::Closed) => {
1710                    info!("Connection event channel closed, stopping monitor");
1711                    break;
1712                }
1713            }
1714        }
1715
1716        info!("Connection lifecycle monitor stopped");
1717    }
1718
1719    /// Start connection monitor (called after node initialization)
1720    async fn start_connection_monitor(&self) {
1721        // The monitor task is already spawned in new() with a temporary peers map
1722        // This method is a placeholder for future enhancements where we might
1723        // need to restart the monitor or provide it with updated references
1724        debug!("Connection monitor already running from initialization");
1725    }
1726
1727    /// Keepalive task - sends periodic pings to prevent 30-second idle timeout
1728    ///
1729    /// ant-quic has a 30-second max_idle_timeout. This task sends a small keepalive
1730    /// message every 15 seconds (half the timeout) to all active connections to prevent
1731    /// them from timing out during periods of inactivity.
1732    async fn keepalive_task(
1733        active_connections: Arc<RwLock<HashSet<String>>>,
1734        dual_node: Arc<DualStackNetworkNode>,
1735        shutdown: Arc<AtomicBool>,
1736    ) {
1737        use tokio::time::{Duration, interval};
1738
1739        const KEEPALIVE_INTERVAL_SECS: u64 = 15; // Half of 30-second timeout
1740        const KEEPALIVE_PAYLOAD: &[u8] = b"keepalive"; // Small payload
1741
1742        let mut interval = interval(Duration::from_secs(KEEPALIVE_INTERVAL_SECS));
1743        interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip);
1744
1745        info!(
1746            "Keepalive task started (interval: {}s)",
1747            KEEPALIVE_INTERVAL_SECS
1748        );
1749
1750        loop {
1751            // Check shutdown flag first
1752            if shutdown.load(Ordering::Relaxed) {
1753                info!("Keepalive task shutting down");
1754                break;
1755            }
1756
1757            interval.tick().await;
1758
1759            // Get snapshot of active connections
1760            let peers: Vec<String> = { active_connections.read().await.iter().cloned().collect() };
1761
1762            if peers.is_empty() {
1763                trace!("Keepalive: no active connections");
1764                continue;
1765            }
1766
1767            debug!("Sending keepalive to {} active connections", peers.len());
1768
1769            // Send keepalive to each peer
1770            for peer_id in peers {
1771                match dual_node
1772                    .send_to_peer_string(&peer_id, KEEPALIVE_PAYLOAD)
1773                    .await
1774                {
1775                    Ok(_) => {
1776                        trace!("Keepalive sent to peer: {}", peer_id);
1777                    }
1778                    Err(e) => {
1779                        debug!(
1780                            "Failed to send keepalive to peer {}: {} (connection may have closed)",
1781                            peer_id, e
1782                        );
1783                        // Don't remove from active_connections here - let the lifecycle monitor handle it
1784                    }
1785                }
1786            }
1787        }
1788
1789        info!("Keepalive task stopped");
1790    }
1791
1792    /// Check system health
1793    pub async fn health_check(&self) -> Result<()> {
1794        if let Some(ref resource_manager) = self.resource_manager {
1795            resource_manager.health_check().await
1796        } else {
1797            // Basic health check without resource manager
1798            let peer_count = self.peer_count().await;
1799            if peer_count > self.config.max_connections {
1800                Err(P2PError::Network(
1801                    crate::error::NetworkError::ProtocolError(
1802                        format!("Too many connections: {peer_count}").into(),
1803                    ),
1804                ))
1805            } else {
1806                Ok(())
1807            }
1808        }
1809    }
1810
1811    /// Get production configuration (if enabled)
1812    pub fn production_config(&self) -> Option<&ProductionConfig> {
1813        self.config.production_config.as_ref()
1814    }
1815
1816    /// Check if production hardening is enabled
1817    pub fn is_production_mode(&self) -> bool {
1818        self.resource_manager.is_some()
1819    }
1820
1821    /// Get DHT reference
1822    pub fn dht(&self) -> Option<&Arc<RwLock<DHT>>> {
1823        self.dht.as_ref()
1824    }
1825
1826    /// Store a value in the DHT
1827    pub async fn dht_put(&self, key: crate::dht::Key, value: Vec<u8>) -> Result<()> {
1828        if let Some(ref dht) = self.dht {
1829            let mut dht_instance = dht.write().await;
1830            let dht_key = crate::dht::DhtKey::from_bytes(key);
1831            dht_instance
1832                .store(&dht_key, value.clone())
1833                .await
1834                .map_err(|e| {
1835                    P2PError::Dht(crate::error::DhtError::StoreFailed(
1836                        format!("{:?}: {e}", key).into(),
1837                    ))
1838                })?;
1839
1840            Ok(())
1841        } else {
1842            Err(P2PError::Dht(crate::error::DhtError::RoutingError(
1843                "DHT not enabled".to_string().into(),
1844            )))
1845        }
1846    }
1847
1848    /// Retrieve a value from the DHT
1849    pub async fn dht_get(&self, key: crate::dht::Key) -> Result<Option<Vec<u8>>> {
1850        if let Some(ref dht) = self.dht {
1851            let dht_instance = dht.read().await;
1852            let dht_key = crate::dht::DhtKey::from_bytes(key);
1853            let record_result = dht_instance.retrieve(&dht_key).await.map_err(|e| {
1854                P2PError::Dht(crate::error::DhtError::StoreFailed(
1855                    format!("Retrieve failed: {e}").into(),
1856                ))
1857            })?;
1858
1859            Ok(record_result)
1860        } else {
1861            Err(P2PError::Dht(crate::error::DhtError::RoutingError(
1862                "DHT not enabled".to_string().into(),
1863            )))
1864        }
1865    }
1866
1867    /// Add a discovered peer to the bootstrap cache
1868    pub async fn add_discovered_peer(&self, peer_id: PeerId, addresses: Vec<String>) -> Result<()> {
1869        if let Some(ref bootstrap_manager) = self.bootstrap_manager {
1870            let mut manager = bootstrap_manager.write().await;
1871            let socket_addresses: Vec<std::net::SocketAddr> = addresses
1872                .iter()
1873                .filter_map(|addr| addr.parse().ok())
1874                .collect();
1875            let contact = ContactEntry::new(peer_id, socket_addresses);
1876            manager.add_contact(contact).await.map_err(|e| {
1877                P2PError::Network(crate::error::NetworkError::ProtocolError(
1878                    format!("Failed to add peer to bootstrap cache: {e}").into(),
1879                ))
1880            })?;
1881        }
1882        Ok(())
1883    }
1884
1885    /// Update connection metrics for a peer in the bootstrap cache
1886    pub async fn update_peer_metrics(
1887        &self,
1888        peer_id: &PeerId,
1889        success: bool,
1890        latency_ms: Option<u64>,
1891        _error: Option<String>,
1892    ) -> Result<()> {
1893        if let Some(ref bootstrap_manager) = self.bootstrap_manager {
1894            let mut manager = bootstrap_manager.write().await;
1895
1896            // Create quality metrics based on the connection result
1897            let metrics = QualityMetrics {
1898                success_rate: if success { 1.0 } else { 0.0 },
1899                avg_latency_ms: latency_ms.unwrap_or(0) as f64,
1900                quality_score: if success { 0.8 } else { 0.2 }, // Initial score
1901                last_connection_attempt: chrono::Utc::now(),
1902                last_successful_connection: if success {
1903                    chrono::Utc::now()
1904                } else {
1905                    chrono::Utc::now() - chrono::Duration::hours(1)
1906                },
1907                uptime_score: 0.5,
1908            };
1909
1910            manager
1911                .update_contact_metrics(peer_id, metrics)
1912                .await
1913                .map_err(|e| {
1914                    P2PError::Network(crate::error::NetworkError::ProtocolError(
1915                        format!("Failed to update peer metrics: {e}").into(),
1916                    ))
1917                })?;
1918        }
1919        Ok(())
1920    }
1921
1922    /// Get bootstrap cache statistics
1923    pub async fn get_bootstrap_cache_stats(&self) -> Result<Option<crate::bootstrap::CacheStats>> {
1924        if let Some(ref bootstrap_manager) = self.bootstrap_manager {
1925            let manager = bootstrap_manager.read().await;
1926            let stats = manager.get_stats().await.map_err(|e| {
1927                P2PError::Network(crate::error::NetworkError::ProtocolError(
1928                    format!("Failed to get bootstrap stats: {e}").into(),
1929                ))
1930            })?;
1931            Ok(Some(stats))
1932        } else {
1933            Ok(None)
1934        }
1935    }
1936
1937    /// Get the number of cached bootstrap peers
1938    pub async fn cached_peer_count(&self) -> usize {
1939        if let Some(ref _bootstrap_manager) = self.bootstrap_manager
1940            && let Ok(Some(stats)) = self.get_bootstrap_cache_stats().await
1941        {
1942            return stats.total_contacts;
1943        }
1944        0
1945    }
1946
1947    /// Connect to bootstrap peers
1948    async fn connect_bootstrap_peers(&self) -> Result<()> {
1949        let mut bootstrap_contacts = Vec::new();
1950        let mut used_cache = false;
1951
1952        // Try to get peers from bootstrap cache first
1953        if let Some(ref bootstrap_manager) = self.bootstrap_manager {
1954            let manager = bootstrap_manager.read().await;
1955            match manager.get_bootstrap_peers(20).await {
1956                // Try to get top 20 quality peers
1957                Ok(contacts) => {
1958                    if !contacts.is_empty() {
1959                        info!("Using {} cached bootstrap peers", contacts.len());
1960                        bootstrap_contacts = contacts;
1961                        used_cache = true;
1962                    }
1963                }
1964                Err(e) => {
1965                    warn!("Failed to get cached bootstrap peers: {}", e);
1966                }
1967            }
1968        }
1969
1970        // Fallback to configured bootstrap peers if no cache or cache is empty
1971        if bootstrap_contacts.is_empty() {
1972            let bootstrap_peers = if !self.config.bootstrap_peers_str.is_empty() {
1973                &self.config.bootstrap_peers_str
1974            } else {
1975                // Convert Multiaddr to strings for fallback
1976                &self
1977                    .config
1978                    .bootstrap_peers
1979                    .iter()
1980                    .map(|addr| addr.to_string())
1981                    .collect::<Vec<_>>()
1982            };
1983
1984            if bootstrap_peers.is_empty() {
1985                info!("No bootstrap peers configured and no cached peers available");
1986                return Ok(());
1987            }
1988
1989            info!("Using {} configured bootstrap peers", bootstrap_peers.len());
1990
1991            for addr in bootstrap_peers {
1992                if let Ok(socket_addr) = addr.parse::<std::net::SocketAddr>() {
1993                    let contact = ContactEntry::new(
1994                        format!("unknown_peer_{}", addr.chars().take(8).collect::<String>()),
1995                        vec![socket_addr],
1996                    );
1997                    bootstrap_contacts.push(contact);
1998                } else {
1999                    warn!("Invalid bootstrap address format: {}", addr);
2000                }
2001            }
2002        }
2003
2004        // Connect to bootstrap peers
2005        let mut successful_connections = 0;
2006        for contact in bootstrap_contacts {
2007            for addr in &contact.addresses {
2008                match self.connect_peer(&addr.to_string()).await {
2009                    Ok(peer_id) => {
2010                        info!("Connected to bootstrap peer: {} ({})", peer_id, addr);
2011                        successful_connections += 1;
2012
2013                        // Update bootstrap cache with successful connection
2014                        if let Some(ref bootstrap_manager) = self.bootstrap_manager {
2015                            let mut manager = bootstrap_manager.write().await;
2016                            let mut updated_contact = contact.clone();
2017                            updated_contact.peer_id = peer_id.clone();
2018                            updated_contact.update_connection_result(true, Some(100), None); // Assume 100ms latency for now
2019
2020                            if let Err(e) = manager.add_contact(updated_contact).await {
2021                                warn!("Failed to update bootstrap cache: {}", e);
2022                            }
2023                        }
2024                        break; // Successfully connected, move to next contact
2025                    }
2026                    Err(e) => {
2027                        warn!("Failed to connect to bootstrap peer {}: {}", addr, e);
2028
2029                        // Update bootstrap cache with failed connection
2030                        if used_cache && let Some(ref bootstrap_manager) = self.bootstrap_manager {
2031                            let mut manager = bootstrap_manager.write().await;
2032                            let mut updated_contact = contact.clone();
2033                            updated_contact.update_connection_result(
2034                                false,
2035                                None,
2036                                Some(e.to_string()),
2037                            );
2038
2039                            if let Err(e) = manager.add_contact(updated_contact).await {
2040                                warn!("Failed to update bootstrap cache: {}", e);
2041                            }
2042                        }
2043                    }
2044                }
2045            }
2046        }
2047
2048        if successful_connections == 0 {
2049            if !used_cache {
2050                warn!("Failed to connect to any bootstrap peers");
2051            }
2052            return Err(P2PError::Network(NetworkError::ConnectionFailed {
2053                addr: std::net::SocketAddr::from(([0, 0, 0, 0], 0)), // Placeholder for bootstrap ensemble
2054                reason: "Failed to connect to any bootstrap peers".into(),
2055            }));
2056        }
2057        info!(
2058            "Successfully connected to {} bootstrap peers",
2059            successful_connections
2060        );
2061
2062        Ok(())
2063    }
2064
2065    /// Disconnect from all peers
2066    async fn disconnect_all_peers(&self) -> Result<()> {
2067        let peer_ids: Vec<PeerId> = self.peers.read().await.keys().cloned().collect();
2068
2069        for peer_id in peer_ids {
2070            self.disconnect_peer(&peer_id).await?;
2071        }
2072
2073        Ok(())
2074    }
2075
2076    /// Perform periodic maintenance tasks
2077    async fn periodic_tasks(&self) -> Result<()> {
2078        // Update peer last seen timestamps
2079        // Remove stale connections
2080        // Perform DHT maintenance
2081        // This is a placeholder for now
2082
2083        Ok(())
2084    }
2085}
2086
2087/// Network sender trait for sending messages
2088#[async_trait::async_trait]
2089pub trait NetworkSender: Send + Sync {
2090    /// Send a message to a specific peer
2091    async fn send_message(&self, peer_id: &PeerId, protocol: &str, data: Vec<u8>) -> Result<()>;
2092
2093    /// Get our local peer ID
2094    fn local_peer_id(&self) -> &PeerId;
2095}
2096
2097/// Lightweight wrapper for P2PNode to implement NetworkSender
2098#[derive(Clone)]
2099pub struct P2PNetworkSender {
2100    peer_id: PeerId,
2101    // Use channels for async communication with the P2P node
2102    send_tx: tokio::sync::mpsc::UnboundedSender<(PeerId, String, Vec<u8>)>,
2103}
2104
2105impl P2PNetworkSender {
2106    pub fn new(
2107        peer_id: PeerId,
2108        send_tx: tokio::sync::mpsc::UnboundedSender<(PeerId, String, Vec<u8>)>,
2109    ) -> Self {
2110        Self { peer_id, send_tx }
2111    }
2112}
2113
2114/// Implementation of NetworkSender trait for P2PNetworkSender
2115#[async_trait::async_trait]
2116impl NetworkSender for P2PNetworkSender {
2117    /// Send a message to a specific peer via the P2P network
2118    async fn send_message(&self, peer_id: &PeerId, protocol: &str, data: Vec<u8>) -> Result<()> {
2119        self.send_tx
2120            .send((peer_id.clone(), protocol.to_string(), data))
2121            .map_err(|_| {
2122                P2PError::Network(crate::error::NetworkError::ProtocolError(
2123                    "Failed to send message via channel".to_string().into(),
2124                ))
2125            })?;
2126        Ok(())
2127    }
2128
2129    /// Get our local peer ID
2130    fn local_peer_id(&self) -> &PeerId {
2131        &self.peer_id
2132    }
2133}
2134
2135/// Builder pattern for creating P2P nodes
2136pub struct NodeBuilder {
2137    config: NodeConfig,
2138}
2139
2140impl Default for NodeBuilder {
2141    fn default() -> Self {
2142        Self::new()
2143    }
2144}
2145
2146impl NodeBuilder {
2147    /// Create a new node builder
2148    pub fn new() -> Self {
2149        Self {
2150            config: NodeConfig::default(),
2151        }
2152    }
2153
2154    /// Set the peer ID
2155    pub fn with_peer_id(mut self, peer_id: PeerId) -> Self {
2156        self.config.peer_id = Some(peer_id);
2157        self
2158    }
2159
2160    /// Add a listen address
2161    pub fn listen_on(mut self, addr: &str) -> Self {
2162        if let Ok(multiaddr) = addr.parse() {
2163            self.config.listen_addrs.push(multiaddr);
2164        }
2165        self
2166    }
2167
2168    /// Add a bootstrap peer
2169    pub fn with_bootstrap_peer(mut self, addr: &str) -> Self {
2170        if let Ok(multiaddr) = addr.parse() {
2171            self.config.bootstrap_peers.push(multiaddr);
2172        }
2173        self.config.bootstrap_peers_str.push(addr.to_string());
2174        self
2175    }
2176
2177    /// Enable IPv6 support
2178    pub fn with_ipv6(mut self, enable: bool) -> Self {
2179        self.config.enable_ipv6 = enable;
2180        self
2181    }
2182
2183    // MCP removed: builder methods deleted
2184
2185    /// Set connection timeout
2186    pub fn with_connection_timeout(mut self, timeout: Duration) -> Self {
2187        self.config.connection_timeout = timeout;
2188        self
2189    }
2190
2191    /// Set maximum connections
2192    pub fn with_max_connections(mut self, max: usize) -> Self {
2193        self.config.max_connections = max;
2194        self
2195    }
2196
2197    /// Enable production mode with default configuration
2198    pub fn with_production_mode(mut self) -> Self {
2199        self.config.production_config = Some(ProductionConfig::default());
2200        self
2201    }
2202
2203    /// Configure production settings
2204    pub fn with_production_config(mut self, production_config: ProductionConfig) -> Self {
2205        self.config.production_config = Some(production_config);
2206        self
2207    }
2208
2209    /// Configure DHT settings
2210    pub fn with_dht(mut self, dht_config: DHTConfig) -> Self {
2211        self.config.dht_config = dht_config;
2212        self
2213    }
2214
2215    /// Enable DHT with default configuration
2216    pub fn with_default_dht(mut self) -> Self {
2217        self.config.dht_config = DHTConfig::default();
2218        self
2219    }
2220
2221    /// Build the P2P node
2222    pub async fn build(self) -> Result<P2PNode> {
2223        P2PNode::new(self.config).await
2224    }
2225}
2226
2227/// Standalone function to handle received messages without borrowing self
2228#[allow(dead_code)] // Deprecated during ant-quic migration
2229async fn handle_received_message_standalone(
2230    message_data: Vec<u8>,
2231    peer_id: &PeerId,
2232    _protocol: &str,
2233    event_tx: &broadcast::Sender<P2PEvent>,
2234) -> Result<()> {
2235    // Parse the message format
2236    match serde_json::from_slice::<serde_json::Value>(&message_data) {
2237        Ok(message) => {
2238            if let (Some(protocol), Some(data), Some(from)) = (
2239                message.get("protocol").and_then(|v| v.as_str()),
2240                message.get("data").and_then(|v| v.as_array()),
2241                message.get("from").and_then(|v| v.as_str()),
2242            ) {
2243                // Convert data array back to bytes
2244                let data_bytes: Vec<u8> = data
2245                    .iter()
2246                    .filter_map(|v| v.as_u64().map(|n| n as u8))
2247                    .collect();
2248
2249                // Generate message event
2250                let event = P2PEvent::Message {
2251                    topic: protocol.to_string(),
2252                    source: from.to_string(),
2253                    data: data_bytes,
2254                };
2255
2256                let _ = event_tx.send(event);
2257                debug!("Generated message event from peer: {}", peer_id);
2258            }
2259        }
2260        Err(e) => {
2261            warn!("Failed to parse received message from {}: {}", peer_id, e);
2262        }
2263    }
2264
2265    Ok(())
2266}
2267
2268// MCP removed: standalone MCP handler deleted
2269
2270/// Helper function to handle protocol message creation
2271#[allow(dead_code)]
2272fn handle_protocol_message_creation(protocol: &str, data: Vec<u8>) -> Option<Vec<u8>> {
2273    match create_protocol_message_static(protocol, data) {
2274        Ok(msg) => Some(msg),
2275        Err(e) => {
2276            warn!("Failed to create protocol message: {}", e);
2277            None
2278        }
2279    }
2280}
2281
2282/// Helper function to handle message send result
2283#[allow(dead_code)]
2284async fn handle_message_send_result(result: crate::error::P2pResult<()>, peer_id: &PeerId) {
2285    match result {
2286        Ok(_) => {
2287            debug!("Message sent to peer {} via transport layer", peer_id);
2288        }
2289        Err(e) => {
2290            warn!("Failed to send message to peer {}: {}", peer_id, e);
2291        }
2292    }
2293}
2294
2295/// Helper function to check rate limit
2296#[allow(dead_code)] // Deprecated during ant-quic migration
2297fn check_rate_limit(
2298    rate_limiter: &RateLimiter,
2299    socket_addr: &std::net::SocketAddr,
2300    remote_addr: &NetworkAddress,
2301) -> Result<()> {
2302    rate_limiter.check_ip(&socket_addr.ip()).map_err(|e| {
2303        warn!("Rate limit exceeded for {}: {}", remote_addr, e);
2304        e
2305    })
2306}
2307
2308/// Helper function to register a new peer
2309#[allow(dead_code)] // Deprecated during ant-quic migration
2310async fn register_new_peer(
2311    peers: &Arc<RwLock<HashMap<PeerId, PeerInfo>>>,
2312    peer_id: &PeerId,
2313    remote_addr: &NetworkAddress,
2314) {
2315    let mut peers_guard = peers.write().await;
2316    let peer_info = PeerInfo {
2317        peer_id: peer_id.clone(),
2318        addresses: vec![remote_addr.to_string()],
2319        connected_at: tokio::time::Instant::now(),
2320        last_seen: tokio::time::Instant::now(),
2321        status: ConnectionStatus::Connected,
2322        protocols: vec!["p2p-chat/1.0.0".to_string()],
2323        heartbeat_count: 0,
2324    };
2325    peers_guard.insert(peer_id.clone(), peer_info);
2326}
2327
2328/// Helper function to spawn connection handler
2329#[allow(dead_code)] // Deprecated during ant-quic migration
2330fn spawn_connection_handler(
2331    connection: Box<dyn crate::transport::Connection>,
2332    peer_id: PeerId,
2333    event_tx: broadcast::Sender<P2PEvent>,
2334    peers: Arc<RwLock<HashMap<PeerId, PeerInfo>>>,
2335) {
2336    tokio::spawn(async move {
2337        handle_peer_connection(connection, peer_id, event_tx, peers).await;
2338    });
2339}
2340
2341/// Helper function to handle peer connection
2342#[allow(dead_code)] // Deprecated during ant-quic migration
2343async fn handle_peer_connection(
2344    mut connection: Box<dyn crate::transport::Connection>,
2345    peer_id: PeerId,
2346    event_tx: broadcast::Sender<P2PEvent>,
2347    peers: Arc<RwLock<HashMap<PeerId, PeerInfo>>>,
2348) {
2349    loop {
2350        match connection.receive().await {
2351            Ok(message_data) => {
2352                debug!(
2353                    "Received {} bytes from peer: {}",
2354                    message_data.len(),
2355                    peer_id
2356                );
2357
2358                // Handle the received message
2359                if let Err(e) = handle_received_message_standalone(
2360                    message_data,
2361                    &peer_id,
2362                    "unknown", // TODO: Extract protocol from message
2363                    &event_tx,
2364                )
2365                .await
2366                {
2367                    warn!("Failed to handle message from peer {}: {}", peer_id, e);
2368                }
2369            }
2370            Err(e) => {
2371                warn!("Failed to receive message from {}: {}", peer_id, e);
2372
2373                // Check if connection is still alive
2374                if !connection.is_alive().await {
2375                    info!("Connection to {} is dead, removing peer", peer_id);
2376
2377                    // Remove dead peer
2378                    remove_peer(&peers, &peer_id).await;
2379
2380                    // Generate peer disconnected event
2381                    let _ = event_tx.send(P2PEvent::PeerDisconnected(peer_id.clone()));
2382
2383                    break; // Exit the message receiving loop
2384                }
2385
2386                // Brief pause before retrying
2387                tokio::time::sleep(std::time::Duration::from_millis(100)).await;
2388            }
2389        }
2390    }
2391}
2392
2393/// Helper function to remove a peer
2394#[allow(dead_code)] // Deprecated during ant-quic migration
2395async fn remove_peer(peers: &Arc<RwLock<HashMap<PeerId, PeerInfo>>>, peer_id: &PeerId) {
2396    let mut peers_guard = peers.write().await;
2397    peers_guard.remove(peer_id);
2398}
2399
2400/// Helper function to update peer heartbeat
2401#[allow(dead_code)]
2402async fn update_peer_heartbeat(
2403    peers: &Arc<RwLock<HashMap<PeerId, PeerInfo>>>,
2404    peer_id: &PeerId,
2405) -> Result<()> {
2406    let mut peers_guard = peers.write().await;
2407    match peers_guard.get_mut(peer_id) {
2408        Some(peer_info) => {
2409            peer_info.last_seen = Instant::now();
2410            peer_info.heartbeat_count += 1;
2411            Ok(())
2412        }
2413        None => {
2414            warn!("Received heartbeat from unknown peer: {}", peer_id);
2415            Err(P2PError::Network(NetworkError::PeerNotFound(
2416                format!("Peer {} not found", peer_id).into(),
2417            )))
2418        }
2419    }
2420}
2421
2422/// Helper function to get resource metrics
2423#[allow(dead_code)]
2424async fn get_resource_metrics(resource_manager: &Option<Arc<ResourceManager>>) -> (u64, f64) {
2425    if let Some(manager) = resource_manager {
2426        let metrics = manager.get_metrics().await;
2427        (metrics.memory_used, metrics.cpu_usage)
2428    } else {
2429        (0, 0.0)
2430    }
2431}
2432
2433#[cfg(test)]
2434mod tests {
2435    use super::*;
2436    // MCP removed from tests
2437    use std::time::Duration;
2438    use tokio::time::timeout;
2439
2440    // Test tool handler for network tests
2441
2442    // MCP removed
2443
2444    /// Helper function to create a test node configuration
2445    fn create_test_node_config() -> NodeConfig {
2446        NodeConfig {
2447            peer_id: Some("test_peer_123".to_string()),
2448            listen_addrs: vec![
2449                std::net::SocketAddr::new(std::net::IpAddr::V6(std::net::Ipv6Addr::LOCALHOST), 0),
2450                std::net::SocketAddr::new(std::net::IpAddr::V4(std::net::Ipv4Addr::LOCALHOST), 0),
2451            ],
2452            listen_addr: std::net::SocketAddr::new(
2453                std::net::IpAddr::V4(std::net::Ipv4Addr::LOCALHOST),
2454                0,
2455            ),
2456            bootstrap_peers: vec![],
2457            bootstrap_peers_str: vec![],
2458            enable_ipv6: true,
2459
2460            connection_timeout: Duration::from_millis(300),
2461            keep_alive_interval: Duration::from_secs(30),
2462            max_connections: 100,
2463            max_incoming_connections: 50,
2464            dht_config: DHTConfig::default(),
2465            security_config: SecurityConfig::default(),
2466            production_config: None,
2467            bootstrap_cache_config: None,
2468            // identity_config: None,
2469        }
2470    }
2471
2472    /// Helper function to create a test tool
2473    // MCP removed: test tool helper deleted
2474
2475    #[tokio::test]
2476    async fn test_node_config_default() {
2477        let config = NodeConfig::default();
2478
2479        assert!(config.peer_id.is_none());
2480        assert_eq!(config.listen_addrs.len(), 2);
2481        assert!(config.enable_ipv6);
2482        assert_eq!(config.max_connections, 10000); // Fixed: matches actual default
2483        assert_eq!(config.max_incoming_connections, 100);
2484        assert_eq!(config.connection_timeout, Duration::from_secs(30));
2485    }
2486
2487    #[tokio::test]
2488    async fn test_dht_config_default() {
2489        let config = DHTConfig::default();
2490
2491        assert_eq!(config.k_value, 20);
2492        assert_eq!(config.alpha_value, 5);
2493        assert_eq!(config.record_ttl, Duration::from_secs(3600));
2494        assert_eq!(config.refresh_interval, Duration::from_secs(600));
2495    }
2496
2497    #[tokio::test]
2498    async fn test_security_config_default() {
2499        let config = SecurityConfig::default();
2500
2501        assert!(config.enable_noise);
2502        assert!(config.enable_tls);
2503        assert_eq!(config.trust_level, TrustLevel::Basic);
2504    }
2505
2506    #[test]
2507    fn test_trust_level_variants() {
2508        // Test that all trust level variants can be created
2509        let _none = TrustLevel::None;
2510        let _basic = TrustLevel::Basic;
2511        let _full = TrustLevel::Full;
2512
2513        // Test equality
2514        assert_eq!(TrustLevel::None, TrustLevel::None);
2515        assert_eq!(TrustLevel::Basic, TrustLevel::Basic);
2516        assert_eq!(TrustLevel::Full, TrustLevel::Full);
2517        assert_ne!(TrustLevel::None, TrustLevel::Basic);
2518    }
2519
2520    #[test]
2521    fn test_connection_status_variants() {
2522        let connecting = ConnectionStatus::Connecting;
2523        let connected = ConnectionStatus::Connected;
2524        let disconnecting = ConnectionStatus::Disconnecting;
2525        let disconnected = ConnectionStatus::Disconnected;
2526        let failed = ConnectionStatus::Failed("test error".to_string());
2527
2528        assert_eq!(connecting, ConnectionStatus::Connecting);
2529        assert_eq!(connected, ConnectionStatus::Connected);
2530        assert_eq!(disconnecting, ConnectionStatus::Disconnecting);
2531        assert_eq!(disconnected, ConnectionStatus::Disconnected);
2532        assert_ne!(connecting, connected);
2533
2534        if let ConnectionStatus::Failed(msg) = failed {
2535            assert_eq!(msg, "test error");
2536        } else {
2537            panic!("Expected Failed status");
2538        }
2539    }
2540
2541    #[tokio::test]
2542    async fn test_node_creation() -> Result<()> {
2543        let config = create_test_node_config();
2544        let node = P2PNode::new(config).await?;
2545
2546        assert_eq!(node.peer_id(), "test_peer_123");
2547        assert!(!node.is_running().await);
2548        assert_eq!(node.peer_count().await, 0);
2549        assert!(node.connected_peers().await.is_empty());
2550
2551        Ok(())
2552    }
2553
2554    #[tokio::test]
2555    async fn test_node_creation_without_peer_id() -> Result<()> {
2556        let mut config = create_test_node_config();
2557        config.peer_id = None;
2558
2559        let node = P2PNode::new(config).await?;
2560
2561        // Should have generated a peer ID
2562        assert!(node.peer_id().starts_with("peer_"));
2563        assert!(!node.is_running().await);
2564
2565        Ok(())
2566    }
2567
2568    #[tokio::test]
2569    async fn test_node_lifecycle() -> Result<()> {
2570        let config = create_test_node_config();
2571        let node = P2PNode::new(config).await?;
2572
2573        // Initially not running
2574        assert!(!node.is_running().await);
2575
2576        // Start the node
2577        node.start().await?;
2578        assert!(node.is_running().await);
2579
2580        // Check listen addresses were set (at least one)
2581        let listen_addrs = node.listen_addrs().await;
2582        assert!(
2583            !listen_addrs.is_empty(),
2584            "Expected at least one listening address"
2585        );
2586
2587        // Stop the node
2588        node.stop().await?;
2589        assert!(!node.is_running().await);
2590
2591        Ok(())
2592    }
2593
2594    #[tokio::test]
2595    async fn test_peer_connection() -> Result<()> {
2596        let config = create_test_node_config();
2597        let node = P2PNode::new(config).await?;
2598
2599        let peer_addr = "127.0.0.1:0";
2600
2601        // Connect to a peer
2602        let peer_id = node.connect_peer(peer_addr).await?;
2603        assert!(peer_id.starts_with("peer_from_"));
2604
2605        // Check peer count
2606        assert_eq!(node.peer_count().await, 1);
2607
2608        // Check connected peers
2609        let connected_peers = node.connected_peers().await;
2610        assert_eq!(connected_peers.len(), 1);
2611        assert_eq!(connected_peers[0], peer_id);
2612
2613        // Get peer info
2614        let peer_info = node.peer_info(&peer_id).await;
2615        assert!(peer_info.is_some());
2616        let info = peer_info.expect("Peer info should exist after adding peer");
2617        assert_eq!(info.peer_id, peer_id);
2618        assert_eq!(info.status, ConnectionStatus::Connected);
2619        assert!(info.protocols.contains(&"p2p-foundation/1.0".to_string()));
2620
2621        // Disconnect from peer
2622        node.disconnect_peer(&peer_id).await?;
2623        assert_eq!(node.peer_count().await, 0);
2624
2625        Ok(())
2626    }
2627
2628    #[tokio::test]
2629    async fn test_event_subscription() -> Result<()> {
2630        let config = create_test_node_config();
2631        let node = P2PNode::new(config).await?;
2632
2633        let mut events = node.subscribe_events();
2634        let peer_addr = "127.0.0.1:0";
2635
2636        // Connect to a peer (this should emit an event)
2637        let peer_id = node.connect_peer(peer_addr).await?;
2638
2639        // Check for PeerConnected event
2640        let event = timeout(Duration::from_millis(100), events.recv()).await;
2641        assert!(event.is_ok());
2642
2643        let event_result = event
2644            .expect("Should receive event")
2645            .expect("Event should not be error");
2646        match event_result {
2647            P2PEvent::PeerConnected(event_peer_id) => {
2648                assert_eq!(event_peer_id, peer_id);
2649            }
2650            _ => panic!("Expected PeerConnected event"),
2651        }
2652
2653        // Disconnect from peer (this should emit another event)
2654        node.disconnect_peer(&peer_id).await?;
2655
2656        // Check for PeerDisconnected event
2657        let event = timeout(Duration::from_millis(100), events.recv()).await;
2658        assert!(event.is_ok());
2659
2660        let event_result = event
2661            .expect("Should receive event")
2662            .expect("Event should not be error");
2663        match event_result {
2664            P2PEvent::PeerDisconnected(event_peer_id) => {
2665                assert_eq!(event_peer_id, peer_id);
2666            }
2667            _ => panic!("Expected PeerDisconnected event"),
2668        }
2669
2670        Ok(())
2671    }
2672
2673    #[tokio::test]
2674    async fn test_message_sending() -> Result<()> {
2675        // Create two nodes
2676        let mut config1 = create_test_node_config();
2677        config1.listen_addr =
2678            std::net::SocketAddr::new(std::net::IpAddr::V4(std::net::Ipv4Addr::LOCALHOST), 0);
2679        let node1 = P2PNode::new(config1).await?;
2680        node1.start().await?;
2681
2682        let mut config2 = create_test_node_config();
2683        config2.listen_addr =
2684            std::net::SocketAddr::new(std::net::IpAddr::V4(std::net::Ipv4Addr::LOCALHOST), 0);
2685        let node2 = P2PNode::new(config2).await?;
2686        node2.start().await?;
2687
2688        // Wait a bit for nodes to start listening
2689        tokio::time::sleep(tokio::time::Duration::from_millis(50)).await;
2690
2691        // Get actual listening address of node2
2692        let node2_addr = node2.local_addr().ok_or_else(|| {
2693            P2PError::Network(crate::error::NetworkError::ProtocolError(
2694                "No listening address".to_string().into(),
2695            ))
2696        })?;
2697
2698        // Connect node1 to node2
2699        let peer_id =
2700            match timeout(Duration::from_millis(500), node1.connect_peer(&node2_addr)).await {
2701                Ok(res) => res?,
2702                Err(_) => return Err(P2PError::Network(NetworkError::Timeout)),
2703            };
2704
2705        // Wait a bit for connection to establish
2706        tokio::time::sleep(tokio::time::Duration::from_millis(30)).await;
2707
2708        // Send a message
2709        let message_data = b"Hello, peer!".to_vec();
2710        let result = match timeout(
2711            Duration::from_millis(500),
2712            node1.send_message(&peer_id, "test-protocol", message_data),
2713        )
2714        .await
2715        {
2716            Ok(res) => res,
2717            Err(_) => return Err(P2PError::Network(NetworkError::Timeout)),
2718        };
2719        // For now, we'll just check that we don't get a "not connected" error
2720        // The actual send might fail due to no handler on the other side
2721        if let Err(e) = &result {
2722            assert!(!e.to_string().contains("not connected"), "Got error: {}", e);
2723        }
2724
2725        // Try to send to non-existent peer
2726        let non_existent_peer = "non_existent_peer".to_string();
2727        let result = node1
2728            .send_message(&non_existent_peer, "test-protocol", vec![])
2729            .await;
2730        assert!(result.is_err(), "Sending to non-existent peer should fail");
2731
2732        Ok(())
2733    }
2734
2735    #[tokio::test]
2736    async fn test_remote_mcp_operations() -> Result<()> {
2737        let config = create_test_node_config();
2738        let node = P2PNode::new(config).await?;
2739
2740        // MCP removed; test reduced to simple start/stop
2741        node.start().await?;
2742        node.stop().await?;
2743        Ok(())
2744    }
2745
2746    #[tokio::test]
2747    async fn test_health_check() -> Result<()> {
2748        let config = create_test_node_config();
2749        let node = P2PNode::new(config).await?;
2750
2751        // Health check should pass with no connections
2752        let result = node.health_check().await;
2753        assert!(result.is_ok());
2754
2755        // Note: We're not actually connecting to real peers here
2756        // since that would require running bootstrap nodes.
2757        // The health check should still pass with no connections.
2758
2759        Ok(())
2760    }
2761
2762    #[tokio::test]
2763    async fn test_node_uptime() -> Result<()> {
2764        let config = create_test_node_config();
2765        let node = P2PNode::new(config).await?;
2766
2767        let uptime1 = node.uptime();
2768        assert!(uptime1 >= Duration::from_secs(0));
2769
2770        // Wait a bit
2771        tokio::time::sleep(Duration::from_millis(10)).await;
2772
2773        let uptime2 = node.uptime();
2774        assert!(uptime2 > uptime1);
2775
2776        Ok(())
2777    }
2778
2779    #[tokio::test]
2780    async fn test_node_config_access() -> Result<()> {
2781        let config = create_test_node_config();
2782        let expected_peer_id = config.peer_id.clone();
2783        let node = P2PNode::new(config).await?;
2784
2785        let node_config = node.config();
2786        assert_eq!(node_config.peer_id, expected_peer_id);
2787        assert_eq!(node_config.max_connections, 100);
2788        // MCP removed
2789
2790        Ok(())
2791    }
2792
2793    #[tokio::test]
2794    async fn test_mcp_server_access() -> Result<()> {
2795        let config = create_test_node_config();
2796        let _node = P2PNode::new(config).await?;
2797
2798        // MCP removed
2799        Ok(())
2800    }
2801
2802    #[tokio::test]
2803    async fn test_dht_access() -> Result<()> {
2804        let config = create_test_node_config();
2805        let node = P2PNode::new(config).await?;
2806
2807        // Should have DHT
2808        assert!(node.dht().is_some());
2809
2810        Ok(())
2811    }
2812
2813    #[tokio::test]
2814    async fn test_node_builder() -> Result<()> {
2815        // Create a config using the builder but don't actually build a real node
2816        let builder = P2PNode::builder()
2817            .with_peer_id("builder_test_peer".to_string())
2818            .listen_on("/ip4/127.0.0.1/tcp/0")
2819            .listen_on("/ip6/::1/tcp/0")
2820            .with_bootstrap_peer("/ip4/127.0.0.1/tcp/9000") // Use a valid port number
2821            .with_ipv6(true)
2822            .with_connection_timeout(Duration::from_secs(15))
2823            .with_max_connections(200);
2824
2825        // Test the configuration that was built
2826        let config = builder.config;
2827        assert_eq!(config.peer_id, Some("builder_test_peer".to_string()));
2828        assert_eq!(config.listen_addrs.len(), 2); // 2 added by builder (no defaults)
2829        assert_eq!(config.bootstrap_peers_str.len(), 1); // Check bootstrap_peers_str instead
2830        assert!(config.enable_ipv6);
2831        assert_eq!(config.connection_timeout, Duration::from_secs(15));
2832        assert_eq!(config.max_connections, 200);
2833
2834        Ok(())
2835    }
2836
2837    #[tokio::test]
2838    async fn test_bootstrap_peers() -> Result<()> {
2839        let mut config = create_test_node_config();
2840        config.bootstrap_peers = vec![
2841            std::net::SocketAddr::new(std::net::IpAddr::V4(std::net::Ipv4Addr::LOCALHOST), 9200),
2842            std::net::SocketAddr::new(std::net::IpAddr::V4(std::net::Ipv4Addr::LOCALHOST), 9201),
2843        ];
2844
2845        let node = P2PNode::new(config).await?;
2846
2847        // Start node (which attempts to connect to bootstrap peers)
2848        node.start().await?;
2849
2850        // In a test environment, bootstrap peers may not be available
2851        // The test verifies the node starts correctly with bootstrap configuration
2852        let peer_count = node.peer_count().await;
2853        assert!(
2854            peer_count <= 2,
2855            "Peer count should not exceed bootstrap peer count"
2856        );
2857
2858        node.stop().await?;
2859        Ok(())
2860    }
2861
2862    #[tokio::test]
2863    async fn test_production_mode_disabled() -> Result<()> {
2864        let config = create_test_node_config();
2865        let node = P2PNode::new(config).await?;
2866
2867        assert!(!node.is_production_mode());
2868        assert!(node.production_config().is_none());
2869
2870        // Resource metrics should fail when production mode is disabled
2871        let result = node.resource_metrics().await;
2872        assert!(result.is_err());
2873        assert!(result.unwrap_err().to_string().contains("not enabled"));
2874
2875        Ok(())
2876    }
2877
2878    #[tokio::test]
2879    async fn test_network_event_variants() {
2880        // Test that all network event variants can be created
2881        let peer_id = "test_peer".to_string();
2882        let address = "/ip4/127.0.0.1/tcp/9000".to_string();
2883
2884        let _peer_connected = NetworkEvent::PeerConnected {
2885            peer_id: peer_id.clone(),
2886            addresses: vec![address.clone()],
2887        };
2888
2889        let _peer_disconnected = NetworkEvent::PeerDisconnected {
2890            peer_id: peer_id.clone(),
2891            reason: "test disconnect".to_string(),
2892        };
2893
2894        let _message_received = NetworkEvent::MessageReceived {
2895            peer_id: peer_id.clone(),
2896            protocol: "test-protocol".to_string(),
2897            data: vec![1, 2, 3],
2898        };
2899
2900        let _connection_failed = NetworkEvent::ConnectionFailed {
2901            peer_id: Some(peer_id.clone()),
2902            address: address.clone(),
2903            error: "connection refused".to_string(),
2904        };
2905
2906        let _dht_stored = NetworkEvent::DHTRecordStored {
2907            key: vec![1, 2, 3],
2908            value: vec![4, 5, 6],
2909        };
2910
2911        let _dht_retrieved = NetworkEvent::DHTRecordRetrieved {
2912            key: vec![1, 2, 3],
2913            value: Some(vec![4, 5, 6]),
2914        };
2915    }
2916
2917    #[tokio::test]
2918    async fn test_peer_info_structure() {
2919        let peer_info = PeerInfo {
2920            peer_id: "test_peer".to_string(),
2921            addresses: vec!["/ip4/127.0.0.1/tcp/9000".to_string()],
2922            connected_at: Instant::now(),
2923            last_seen: Instant::now(),
2924            status: ConnectionStatus::Connected,
2925            protocols: vec!["test-protocol".to_string()],
2926            heartbeat_count: 0,
2927        };
2928
2929        assert_eq!(peer_info.peer_id, "test_peer");
2930        assert_eq!(peer_info.addresses.len(), 1);
2931        assert_eq!(peer_info.status, ConnectionStatus::Connected);
2932        assert_eq!(peer_info.protocols.len(), 1);
2933    }
2934
2935    #[tokio::test]
2936    async fn test_serialization() -> Result<()> {
2937        // Test that configs can be serialized/deserialized
2938        let config = create_test_node_config();
2939        let serialized = serde_json::to_string(&config)?;
2940        let deserialized: NodeConfig = serde_json::from_str(&serialized)?;
2941
2942        assert_eq!(config.peer_id, deserialized.peer_id);
2943        assert_eq!(config.listen_addrs, deserialized.listen_addrs);
2944        assert_eq!(config.enable_ipv6, deserialized.enable_ipv6);
2945
2946        Ok(())
2947    }
2948
2949    #[tokio::test]
2950    async fn test_get_peer_id_by_address_found() -> Result<()> {
2951        let config = create_test_node_config();
2952        let node = P2PNode::new(config).await?;
2953
2954        // Manually insert a peer for testing
2955        let test_peer_id = "peer_test_123".to_string();
2956        let test_address = "192.168.1.100:9000".to_string();
2957
2958        let peer_info = PeerInfo {
2959            peer_id: test_peer_id.clone(),
2960            addresses: vec![test_address.clone()],
2961            connected_at: Instant::now(),
2962            last_seen: Instant::now(),
2963            status: ConnectionStatus::Connected,
2964            protocols: vec!["test-protocol".to_string()],
2965            heartbeat_count: 0,
2966        };
2967
2968        node.peers
2969            .write()
2970            .await
2971            .insert(test_peer_id.clone(), peer_info);
2972
2973        // Test: Find peer by address
2974        let found_peer_id = node.get_peer_id_by_address(&test_address).await;
2975        assert_eq!(found_peer_id, Some(test_peer_id));
2976
2977        Ok(())
2978    }
2979
2980    #[tokio::test]
2981    async fn test_get_peer_id_by_address_not_found() -> Result<()> {
2982        let config = create_test_node_config();
2983        let node = P2PNode::new(config).await?;
2984
2985        // Test: Try to find a peer that doesn't exist
2986        let result = node.get_peer_id_by_address("192.168.1.200:9000").await;
2987        assert_eq!(result, None);
2988
2989        Ok(())
2990    }
2991
2992    #[tokio::test]
2993    async fn test_get_peer_id_by_address_invalid_format() -> Result<()> {
2994        let config = create_test_node_config();
2995        let node = P2PNode::new(config).await?;
2996
2997        // Test: Invalid address format should return None
2998        let result = node.get_peer_id_by_address("invalid-address").await;
2999        assert_eq!(result, None);
3000
3001        Ok(())
3002    }
3003
3004    #[tokio::test]
3005    async fn test_get_peer_id_by_address_multiple_peers() -> Result<()> {
3006        let config = create_test_node_config();
3007        let node = P2PNode::new(config).await?;
3008
3009        // Add multiple peers with different addresses
3010        let peer1_id = "peer_1".to_string();
3011        let peer1_addr = "192.168.1.101:9001".to_string();
3012
3013        let peer2_id = "peer_2".to_string();
3014        let peer2_addr = "192.168.1.102:9002".to_string();
3015
3016        let peer1_info = PeerInfo {
3017            peer_id: peer1_id.clone(),
3018            addresses: vec![peer1_addr.clone()],
3019            connected_at: Instant::now(),
3020            last_seen: Instant::now(),
3021            status: ConnectionStatus::Connected,
3022            protocols: vec!["test-protocol".to_string()],
3023            heartbeat_count: 0,
3024        };
3025
3026        let peer2_info = PeerInfo {
3027            peer_id: peer2_id.clone(),
3028            addresses: vec![peer2_addr.clone()],
3029            connected_at: Instant::now(),
3030            last_seen: Instant::now(),
3031            status: ConnectionStatus::Connected,
3032            protocols: vec!["test-protocol".to_string()],
3033            heartbeat_count: 0,
3034        };
3035
3036        node.peers
3037            .write()
3038            .await
3039            .insert(peer1_id.clone(), peer1_info);
3040        node.peers
3041            .write()
3042            .await
3043            .insert(peer2_id.clone(), peer2_info);
3044
3045        // Test: Find each peer by their unique address
3046        let found_peer1 = node.get_peer_id_by_address(&peer1_addr).await;
3047        let found_peer2 = node.get_peer_id_by_address(&peer2_addr).await;
3048
3049        assert_eq!(found_peer1, Some(peer1_id));
3050        assert_eq!(found_peer2, Some(peer2_id));
3051
3052        Ok(())
3053    }
3054
3055    #[tokio::test]
3056    async fn test_list_active_connections_empty() -> Result<()> {
3057        let config = create_test_node_config();
3058        let node = P2PNode::new(config).await?;
3059
3060        // Test: No connections initially
3061        let connections = node.list_active_connections().await;
3062        assert!(connections.is_empty());
3063
3064        Ok(())
3065    }
3066
3067    #[tokio::test]
3068    async fn test_list_active_connections_with_peers() -> Result<()> {
3069        let config = create_test_node_config();
3070        let node = P2PNode::new(config).await?;
3071
3072        // Add multiple peers
3073        let peer1_id = "peer_1".to_string();
3074        let peer1_addrs = vec![
3075            "192.168.1.101:9001".to_string(),
3076            "192.168.1.101:9002".to_string(),
3077        ];
3078
3079        let peer2_id = "peer_2".to_string();
3080        let peer2_addrs = vec!["192.168.1.102:9003".to_string()];
3081
3082        let peer1_info = PeerInfo {
3083            peer_id: peer1_id.clone(),
3084            addresses: peer1_addrs.clone(),
3085            connected_at: Instant::now(),
3086            last_seen: Instant::now(),
3087            status: ConnectionStatus::Connected,
3088            protocols: vec!["test-protocol".to_string()],
3089            heartbeat_count: 0,
3090        };
3091
3092        let peer2_info = PeerInfo {
3093            peer_id: peer2_id.clone(),
3094            addresses: peer2_addrs.clone(),
3095            connected_at: Instant::now(),
3096            last_seen: Instant::now(),
3097            status: ConnectionStatus::Connected,
3098            protocols: vec!["test-protocol".to_string()],
3099            heartbeat_count: 0,
3100        };
3101
3102        node.peers
3103            .write()
3104            .await
3105            .insert(peer1_id.clone(), peer1_info);
3106        node.peers
3107            .write()
3108            .await
3109            .insert(peer2_id.clone(), peer2_info);
3110
3111        // Test: List all active connections
3112        let connections = node.list_active_connections().await;
3113        assert_eq!(connections.len(), 2);
3114
3115        // Verify peer1 and peer2 are in the list
3116        let peer1_conn = connections.iter().find(|(id, _)| id == &peer1_id);
3117        let peer2_conn = connections.iter().find(|(id, _)| id == &peer2_id);
3118
3119        assert!(peer1_conn.is_some());
3120        assert!(peer2_conn.is_some());
3121
3122        // Verify addresses match
3123        assert_eq!(peer1_conn.unwrap().1, peer1_addrs);
3124        assert_eq!(peer2_conn.unwrap().1, peer2_addrs);
3125
3126        Ok(())
3127    }
3128
3129    #[tokio::test]
3130    async fn test_remove_peer_success() -> Result<()> {
3131        let config = create_test_node_config();
3132        let node = P2PNode::new(config).await?;
3133
3134        // Add a peer
3135        let peer_id = "peer_to_remove".to_string();
3136        let peer_info = PeerInfo {
3137            peer_id: peer_id.clone(),
3138            addresses: vec!["192.168.1.100:9000".to_string()],
3139            connected_at: Instant::now(),
3140            last_seen: Instant::now(),
3141            status: ConnectionStatus::Connected,
3142            protocols: vec!["test-protocol".to_string()],
3143            heartbeat_count: 0,
3144        };
3145
3146        node.peers.write().await.insert(peer_id.clone(), peer_info);
3147
3148        // Verify peer exists
3149        assert!(node.is_peer_connected(&peer_id).await);
3150
3151        // Remove the peer
3152        let removed = node.remove_peer(&peer_id).await;
3153        assert!(removed);
3154
3155        // Verify peer no longer exists
3156        assert!(!node.is_peer_connected(&peer_id).await);
3157
3158        Ok(())
3159    }
3160
3161    #[tokio::test]
3162    async fn test_remove_peer_nonexistent() -> Result<()> {
3163        let config = create_test_node_config();
3164        let node = P2PNode::new(config).await?;
3165
3166        // Try to remove a peer that doesn't exist
3167        let removed = node.remove_peer(&"nonexistent_peer".to_string()).await;
3168        assert!(!removed);
3169
3170        Ok(())
3171    }
3172
3173    #[tokio::test]
3174    async fn test_is_peer_connected() -> Result<()> {
3175        let config = create_test_node_config();
3176        let node = P2PNode::new(config).await?;
3177
3178        let peer_id = "test_peer".to_string();
3179
3180        // Initially not connected
3181        assert!(!node.is_peer_connected(&peer_id).await);
3182
3183        // Add peer
3184        let peer_info = PeerInfo {
3185            peer_id: peer_id.clone(),
3186            addresses: vec!["192.168.1.100:9000".to_string()],
3187            connected_at: Instant::now(),
3188            last_seen: Instant::now(),
3189            status: ConnectionStatus::Connected,
3190            protocols: vec!["test-protocol".to_string()],
3191            heartbeat_count: 0,
3192        };
3193
3194        node.peers.write().await.insert(peer_id.clone(), peer_info);
3195
3196        // Now connected
3197        assert!(node.is_peer_connected(&peer_id).await);
3198
3199        // Remove peer
3200        node.remove_peer(&peer_id).await;
3201
3202        // No longer connected
3203        assert!(!node.is_peer_connected(&peer_id).await);
3204
3205        Ok(())
3206    }
3207
3208    #[test]
3209    fn test_normalize_ipv6_wildcard() {
3210        use std::net::{IpAddr, Ipv6Addr, SocketAddr};
3211
3212        let wildcard = SocketAddr::new(IpAddr::V6(Ipv6Addr::UNSPECIFIED), 8080);
3213        let normalized = normalize_wildcard_to_loopback(wildcard);
3214
3215        assert_eq!(normalized.ip(), IpAddr::V6(Ipv6Addr::LOCALHOST));
3216        assert_eq!(normalized.port(), 8080);
3217    }
3218
3219    #[test]
3220    fn test_normalize_ipv4_wildcard() {
3221        use std::net::{IpAddr, Ipv4Addr, SocketAddr};
3222
3223        let wildcard = SocketAddr::new(IpAddr::V4(Ipv4Addr::UNSPECIFIED), 9000);
3224        let normalized = normalize_wildcard_to_loopback(wildcard);
3225
3226        assert_eq!(normalized.ip(), IpAddr::V4(Ipv4Addr::LOCALHOST));
3227        assert_eq!(normalized.port(), 9000);
3228    }
3229
3230    #[test]
3231    fn test_normalize_specific_address_unchanged() {
3232        let specific: std::net::SocketAddr = "192.168.1.100:3000".parse().unwrap();
3233        let normalized = normalize_wildcard_to_loopback(specific);
3234
3235        assert_eq!(normalized, specific);
3236    }
3237
3238    #[test]
3239    fn test_normalize_loopback_unchanged() {
3240        use std::net::{IpAddr, Ipv4Addr, Ipv6Addr, SocketAddr};
3241
3242        let loopback_v6 = SocketAddr::new(IpAddr::V6(Ipv6Addr::LOCALHOST), 5000);
3243        let normalized_v6 = normalize_wildcard_to_loopback(loopback_v6);
3244        assert_eq!(normalized_v6, loopback_v6);
3245
3246        let loopback_v4 = SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), 5000);
3247        let normalized_v4 = normalize_wildcard_to_loopback(loopback_v4);
3248        assert_eq!(normalized_v4, loopback_v4);
3249    }
3250}